博客 RabbitMQ Work Queues (工作队列模式) 使用案例

RabbitMQ Work Queues (工作队列模式) 使用案例

   数栈君   发表于 2025-01-08 16:47  181  0

前言

在前面学习了简单模式的写法, 接下来学习另外几种工作模式的写法
简单模式
快速入门程序就是简单模式.

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/2445ee7029f011b445142217d3492b2f..png



Work Queues (工作队列)

简单模式的增强版, 和简单模式的区别就是: 简单模式有一个消费者, 工作队列模式支持多个消费者接收消息, 消费者之间是竞争关系, 每个消息只能被一个消费者接收

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c3e04f596e122b30dde374dcc16ca47a..png


1、引入依赖

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>


2、编写生产者代码

工作队列模式和简单模式区别是有多个消费者, 所以生产者消费者代码差异不大
相比简单模式, 生产者的代码基本⼀样, 为了能看到多个消费者竞争的关系, 我们一次发送10条消息
我们把发送消息的地方, 改为一发送10条消息

for (int i = 0; i < 10; i++) {
String mag = "hello work queue......" + i;
channel.basicPublish("", Constants.WORK_QUEUE, null, mag.getBytes());
}

Constant 包下 Constants 类

package rabbitmq.constant;

public class Constants {
public static final String HOST = "123.57.16.61";
public static final Integer PORT = 5672;
public static final String USERNAME = "study";
public static final String PASSWORD = "study";
public static final String VIRTUAL_HOST = "bite";
//声明一个工作队列
public static final String WORK_QUEUE = "work.queue";
}

完整代码:

package rabbitmq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


/**
* 工作队列模式
*/
public class Prooducer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USERNAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
//创建一个新的连接
Connection connection = connectionFactory.newConnection();
//开启一个通信
Channel channel = connection.createChannel();
//声明交换机,使用内置的交换机
//声明一个队列
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
//发送消息
for (int i = 0; i < 10; i++) {
String mag = "hello work queue......" + i;
channel.basicPublish("", Constants.WORK_QUEUE, null, mag.getBytes());
}
System.out.println("消息发送成功~");
//资源释放
channel.close();
connection.close();
}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/8aee00c3fae4ff3328734358f39869a6..png



3、编写消费者代码

消费者代码和简单模式⼀样, 只是复制两份. 两个消费者代码可以是⼀样的

消费者1:

package rabbitmq.work;

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USERNAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
//创建一个新的连接
Connection connection = connectionFactory.newConnection();
//开启一个通信
Channel channel = connection.createChannel();
//声明一个队列,如果队列不存在,则创建,如果队列存在,则不创建
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
//消费消息
//消费的逻辑
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接受到消息:" + new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
// channel.close();
// connection.close();

}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/3c0e03c763e25fa92d2a239f5c2e660a..png


消费者2:

package rabbitmq.work;

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USERNAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
//创建一个新的连接
Connection connection = connectionFactory.newConnection();
//开启一个通信
Channel channel = connection.createChannel();
//声明一个队列,如果队列不存在,则创建,如果队列存在,则不创建
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
//消费消息
//消费的逻辑
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接受到消息:" + new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
// channel.close();
// connection.close();
}
}


4、运行程序, 观察结果

先启动两个消费者运行, 再启动生产者
如果先启动⽣产者, 在启动消费者, 由于消息较少, 处理较快, 那么第⼀个启动的消费者就会瞬间把10条
消息消费掉, 所以我们先启动两个消费者, 再启动生产者

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/2f0da4e4844f5b107d9d42177ee0435d..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/ed3840ddaf35371e63f5c703f43dff97..png


观察RabbitMQ客户端:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a26f59e9e3429dd5d54c78fa2c3319ef..png




http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/4fb0bba4bccd975814e9a606392b3fa6..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/14f74f5c2b56c41ab4b5fdd005391d0e..png

启动生产者:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/2df1471726c90b6300bd5c39d522d925..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/656db05669f1d8ecc08d3beb14344cee..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/214a665866e77d2205b6e68b93be3750..png

————————————————

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs

《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs

《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群