博客 RabbitMQ 七种工作模式介绍(上)

RabbitMQ 七种工作模式介绍(上)

   数栈君   发表于 2024-12-06 10:13  196  0

RabbitMQ 共提供了7种⼯作模式供我们进⾏消息传递,接下来一一介绍它的实现与目的

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/36b4f885f8f1976aeda8da878f46e6c6..png
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/36b4f885f8f1976aeda8da878f46e6c6..png

1.简单模式队列

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/dfb9915cb68a30cbbcd01cce26bd4987..png
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/dfb9915cb68a30cbbcd01cce26bd4987..png

P为生产者 发送信息中间(消息队列)C作为消费者 直接消费消息队列里面的内容

特点:⼀个⽣产者P,⼀个消费者C,消息只能被消费⼀次.也称为点对点(Point-to-Point)模式.

生产者:

1.创建连接工厂

2.设置工厂参数

3.创建channel

4.声明queue

5 通过channel发送到queue

6. 资源释放

public static void main(String[] args) throws IOException,TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置工厂参数
factory.setHost(Constants.HOST);
factory.setPort(Constants.PORT);
factory.setUsername(Constants.USER_NAME);
factory.setPassword(Constants.PASSWORD);
factory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = factory.newConnection();
//3.创建channel
Channel channel = connection.createChannel();
//4.声明queue
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
* Map<String, Object> arguments)
* 1.队列名称
* 2.durable 可持久化 true为持久化
* 3.exclusive 是否独占 false
* 4. autoDelete 是否自动删除 false
* arguments 参数
*/
//如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
channel.queueDeclare("simple",true,false,false,null);
//5 通过channel发送到queue
/**
* basicPublish(String exchange, String routingKey, AMQP.BasicProperties props,
* byte[] body)
* 1.exchange 交换机名称 ,简单情况下 一般默认的情况为""
* 2.routingKey 路由名称=队列名称
* 3.props 配置信息
* 4.body 发现信息的数据
*/
for (int i = 0; i < 10; i++) {
String msg = "hello 简单队列~"+i;
channel.basicPublish("","simple", null, msg.getBytes());
}
System.out.println("信息发送成功!");
//6. 资源释放
channel.close();
connection.close();
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/53ae44ad6fa38dc1a9f078621a652a28..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/58acf065a00cc9aeba807a085ef6734a..png

消费者:

1.创建连接工厂

2.设置工厂参数

3.创建channel

4.声明queue

5.消费数据

6.资源释放

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置工厂参数
factory.setHost(Constants.HOST);
factory.setPort(Constants.PORT);
factory.setUsername(Constants.USER_NAME);
factory.setPassword(Constants.PASSWORD);
factory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = factory.newConnection();

//3.创建channel
Channel channel = connection.createChannel();
//4.声明queue
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
* Map<String, Object> arguments)
* 1.队列名称
* 2.durable 可持久化 true为持久化
* 3.exclusive 是否独占 false
* 4. autoDelete 是否自动删除 false
* arguments 参数
*/
//如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
channel.queueDeclare("simple",true,false,false,null);
//5.接收信息 并消费
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue :队列名称
* autoAck:是否自动确认 消费者接收信息与MQ
* callback :回调对象
*/
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*
回调方法 当收到信息 自动执行该方法
consumerTag
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到信息:"+new String(body));
}
};
channel.basicConsume("simple",true,consumer);
// 释放资源
channel.close();
connection.close();
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/92ba3669c6278677ed631b40c4285636..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/64f0bebb1e0c8e0ed6e601952b4320ca..png

2.WorkQueue(⼯作队列)

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d7db8ab859bebccd17dcbbf93174d644..png

 一个生产者(P) 多个消费者(C) 消息队列会平均分配给消费者

特点:消息不会重复,分配给不同的消费者.
适⽤场景:集群环境中做异步处理


生产者:

跟简单模式类似 或一个队列名称

public static void main(String[] args) throws IOException,TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置工厂参数
factory.setHost(Constants.HOST);
factory.setPort(Constants.PORT);
factory.setUsername(Constants.USER_NAME);
factory.setPassword(Constants.PASSWORD);
factory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = factory.newConnection();
//3.创建channel
Channel channel = connection.createChannel();
//4.声明queue
channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
//5 通过channel发送到queue
for (int i = 0; i < 10; i++) {
String msg = "hello 工作队列~"+i;
channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());
}
System.out.println("信息发送成功!");
//6. 资源释放
channel.close();
connection.close();
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/566331331a815ae66fb9da5e61c29397..png

消费者1:

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置工厂参数
factory.setHost(Constants.HOST);
factory.setPort(Constants.PORT);
factory.setUsername(Constants.USER_NAME);
factory.setPassword(Constants.PASSWORD);
factory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = factory.newConnection();
//3.创建channel
Channel channel = connection.createChannel();
//4.声明queue
//如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
//5.接收信息 并消费
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到信息:"+new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE,true,consumer);
// 释放资源
channel.close();
connection.close();
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/25410d794d419eaa9a17ebfa05d39594..png

消费者2 也是同样的代码

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/454934e330d1e292ef6fdddf13846442..png

3 Publish/Subscribe(发布/订阅)

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/e3d2f8697501517aeacec7076598853f..png

Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)

X作为交换机 将消息复制多份 并且发送多个消费者 并且每个消费者收到相同的信息

比如 P发送了10条消息 C1和C2得消费10条信息

适合场景: 消息需要被多个消费者同时接收的场景. 如: 实时通知或者⼴播消息

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a753b84853aaf534428e39e340f5986f..png

Exchange(交换机)只负责转发消息, 不具备存储消息的能⼒, 因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失

RoutingKey: 路由键.⽣产者将消息发给交换器时, 指定的⼀个字符串, ⽤来告诉交换机应该如何处理这个消息.
Binding Key:绑定. RabbitMQ中通过Binding(绑定)将交换器与队列关联起来, 在绑定的时候⼀般会指定⼀个Binding Key, 这样RabbitMQ就知道如何正确地将消息路由到队列了.

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/7e3612bc3b2a2ef5a45a1a18ca1fe455..png

生产者:

public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
//4. 声明队列
channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
//5. 交换机和队列绑定
channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
//6. 发布消息
for (int i = 0; i < 10; i++) {
String msg = "hello 发布订阅队列~"+i;
channel.basicPublish(Constants.FANOUT_EXCHANGE,"", null, msg.getBytes());
}
System.out.println("消息发送成功");
//7. 释放资源
channel.close();
connection.close();
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/66223231c651eaf775bf5a86978481b8..png

 消费者1

public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
//4. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
//从队列中收到消息, 就会执行的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:"+ new String(body));
}
};
channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/06b61a0040cc95b4b95beee88a5ac5fe..png

消费者2同理 

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/872f09d1adac8b7d062ddc62e5c00bf9..png

4 Routing(路由模式)

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/95d1cc2b303b51e7b2892b0fa7e8b366..png

路由模式是发布订阅模式的变种, 在发布订阅基础上, 增加路由key

发布订阅模式是⽆条件的将所有消息分发给所有消费者, 路由模式是Exchange根据RoutingKey的规则, 将数据筛选后发给对应的消费者队列
适合场景: 需要根据特定规则分发消息的场景.

⽐如系统打印⽇志, ⽇志等级分为error, warning, info,debug, 就可以通过这种模式,把不同的⽇志发送到不同的队列, 最终输出到不同的⽂件

生产者:

public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
//4. 声明队列
channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);
//5. 交换机和队列绑定
channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");
channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a");
channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b");
channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c");
//6. 发布消息
String msg_a = "hello 路由队列~ my routingKey is a...";
channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msg_a.getBytes());
String msg_b = "hello 路由队列~ my routingKey is b...";
channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msg_b.getBytes());
String msg_c = "hello 路由队列~ my routingKey is c...";
channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msg_c.getBytes());

System.out.println("消息发送成功");
//7. 释放资源
channel.close();
connection.close();
}

消费者1

public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置工厂参数
factory.setHost(Constants.HOST);
factory.setPort(Constants.PORT);
factory.setUsername(Constants.USER_NAME);
factory.setPassword(Constants.PASSWORD);
factory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = factory.newConnection();
//3.创建channel
Channel channel = connection.createChannel();
//4.声明queue
//如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
//5.接收信息 并消费
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到信息:"+new String(body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE1,true,consumer);
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/83ee80c3a60b810009bb689662a3dedb..png

消费者 2同理

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/5a460ecc5efa0762b89b4d7f14ba92e0..png

————————————————

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据资产管理白皮书》下载地址:

《行业指标体系白皮书》下载地址:

《数据治理行业实践白皮书》下载地址:

《数栈V6.0产品白皮书》下载地址:

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群