博客 RabbitMQ的工作队列在Spring Boot中实现(详解常⽤的⼯作模式)

RabbitMQ的工作队列在Spring Boot中实现(详解常⽤的⼯作模式)

   蓝袋鼠   发表于 2024-12-03 17:31  214  0

上文着重介绍RabbitMQ 七种工作模式介绍RabbitMQ 七种工作模式介绍_rabbitmq 工作模式-CSDN博客

本篇讲解如何在Spring环境下进⾏RabbitMQ的开发.(只演⽰部分常⽤的⼯作模式)

引⼊依赖 

pom.xml 可以导入依赖

<!--Spring MVC相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--RabbitMQ相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

或者创建项目时候勾选相应的选项

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/2e8c9628564ea09f0e2039a50e92e484..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/04a949283ea6c7350062fe552a2d2772..png

进入项目第一步先进行分类 三层架构 

进行配置相关rabbitmq属性 

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/e59c41cfc3ec14f73bd74ff8df12da06..png

一.工作队列模式

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/63601f0915a3387c74fd8d198bc83e9d..png

生产者:

@RestController
@RequestMapping("/produce")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/work")
public String work() {
rabbitTemplate.convertAndSend("", Constans.WORK_QUEUE,"hello spring amqp:work...");
return "发送成功";
}
}

convertAndSend是RabbitTemplate类提供的一个重要方法,用于将消息发送到 RabbitMQ 的指定队列中 。

第一个参数"":在这里通常表示交换机(Exchange)的名称为空字符串。
第二个参数Constans.WORK_QUEUE
第三个参数"hello spring amqp:work...":这就是要发送的实际消息内容

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/9a289984eda54226b4c8be986ba1ab61..png

通过网页进行测试是否发送成功

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/1b1447c4c7aa4ed7703379acb612c1a0..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/491e256406d98da3fe143857c1d610c1..png

从rabbitmq上可以看出已经发送成功到队列,等待消费者进行消费 

消费者:

@Component
public class WorkListener {
@RabbitListener(queues = Constants.WORK_QUEUE)
public void queueListener1(Message message) {
System.out.println("listener 1 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message);
}
@RabbitListener(queues = Constants.WORK_QUEUE)
public void queueListener2(Message message) {
System.out.println("listener 2 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message);
}
}

@RabbitListener 是Spring框架中⽤于监听RabbitMQ队列的注解,通过使⽤这个注解,可以定义⼀个⽅法,以便从RabbitMQ队列中接收消息.该注解⽀持多种参数类型,这些参数类型代表了从RabbitMQ接收到的消息和相关信息.

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/be7b3e44addfee9d665dffde43f56fc3..png

@Component
public class WorkListener {
@RabbitListener(queues = Constants.WORK_QUEUE)
public void queueListener1(String message) {
System.out.println("listener 1 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message);
}
@RabbitListener(queues = Constants.WORK_QUEUE)
public void queueListener2(String message) {
System.out.println("listener 2 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message);
}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/0d82dcbff7319eebfd994db5c83e941b..png

1. String 返回消息的内容
2. Message ( org.springframework.amqp.core.Message ):SpringAMQP的
Message 类,返回原始的消息体以及消息的属性,如消息ID,内容,队列信息等.

 二.Publish/Subscribe(发布订阅模式)

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/a343f7e16fad560dd7227453732be7b4..png

声明队列,交换机,绑定队列和交换机

//发布订阅模式
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
public static final String FANOUT_EXCHANGE = "fanout.exchange";
//发布订阅模式
@Bean("fanoutQueue1")
public Queue fanoutQueue1() {
return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
}
@Bean("fanoutQueue2")
public Queue fanoutQueue2() {
return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
}
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();
}
@Bean("fanoutQueueBinding1")
public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean("fanoutQueueBinding2")
public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue){
return BindingBuilder.bind(queue).to(fanoutExchange);
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/27aa5f8864b9596184d46c33e7e71b49..png

 生产者:

@RequestMapping("/fanout")
public String fanout(){
rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"", "hello spring amqp:fanout...");
return "发送成功";
}

消费者:

@Component
public class FanoutListener {
@RabbitListener(queues = Constants.FANOUT_QUEUE1)
public void queueListener1(String message) {
System.out.println("listener 1 ["+Constants.FANOUT_QUEUE1+"] 接收到消息:" +message);
}
@RabbitListener(queues = Constants.FANOUT_QUEUE2)
public void queueListener2(String message) {
System.out.println("listener 2 ["+Constants.FANOUT_QUEUE2+"] 接收到消息:" +message);
}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/3bcbf8cd1980af19fbc95c7663516461..png

三.Routing(路由模式)

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/c8a9aefdb313bd7ffd5dd5a87ef45696..png

声明队列,交换机,绑定队列和交换机 

//路由模式
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";
public static final String DIRECT_EXCHANGE = "direct.exchange";
//路由模式
@Bean("directQueue1")
public Queue directQueue1() {
return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
}
@Bean("directQueue2")
public Queue directQueue2() {
return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
}
@Bean("directExchange")
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
}
@Bean("directQueueBinding1")
public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue1") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("orange");
}
@Bean("directQueueBinding2")
public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("black");
}
@Bean("directQueueBinding3")
public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("orange");
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/a411cbcbe465e29db48a3c59f4accb94..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/823c735aa297827005d7a93ced78bb05..png

生产者:

@RequestMapping("/direct/{rountingKey}")
public String direct(@PathVariable("routingKey") String rountingKey){
rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,"", "hello spring amqp:direct, my routing key is "+rountingKey);
return "发送成功";
}

@PathVariable :用于从请求的 URL 路径中提取参数值。

当有一个请求访问/direct/后面跟着某个具体的值(例如/direct/key1)时,@PathVariable("routingKey") String rountingKey会将key1提取出来,并赋值给rountingKey变量。
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/f0bf300fa2b1a09295a8da02ac40cd6c..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/8e874ac70181382208bf267a2c503086..png

消费者:

@Component
public class DirectListener {
@RabbitListener(queues = Constants.DIRECT_QUEUE1)
public void queueListener1(String message) {
System.out.println("listener 1 ["+Constants.DIRECT_QUEUE1+"] 接收到消息:" +message);
}
@RabbitListener(queues = Constants.DIRECT_QUEUE2)
public void queueListener2(String message) {
System.out.println("listener 2 ["+Constants.DIRECT_QUEUE2+"] 接收到消息:" +message);
}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/2d202636ab1a142c749714a29e38a32a..png

四.Topics(通配符模式)

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/26a85a2812bbfa6a5b03660e136135c7..png

代表一个单词

# 代码多个单词 

//通配符模式
public static final String TOPIC_QUEUE1 = "topics_queue1";
public static final String TOPIC_QUEUE2 = "topics_queue2";
public static final String TOPIC_EXCHANGE = "topics_exchange";
//通配符模式
@Bean("topicQueue1")
public Queue topicQueue1(){
return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
}
@Bean("topicQueue2")
public Queue topicQueue2(){
return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
}
@Bean("topicExchange")
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();
}

@Bean("topicQueueBinding1")
public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue1") Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("*.orange.*");
}
@Bean("topicQueueBinding2")
public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("*.*.rabbit");
}
@Bean("topicQueueBinding3")
public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("lazy.#");
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/ba833859010f808f8edd57b64715319b..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/33379b74cd5688e6c5e724e8657a8136..png

生产者

@RequestMapping("/topic/{routingKey}")
public String topic(@PathVariable("routingKey") String routingKey){
rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey, "hello spring amqp:topic, my routing key is "+routingKey);
return "发送成功";
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/cf0986d2a8a2c5173a099cd6cab9f8b2..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/e92e1681afe83ca0b0d14793fa3ce225..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/f4d9c3ffa65591fe6ec9f1ad4471746f..png

消费者

@Component
public class TopicListener {
@RabbitListener(queues = Constants.TOPIC_QUEUE1)
public void queueListener1(String message) {
System.out.println("listener 1 ["+Constants.TOPIC_QUEUE1+"] 接收到消息:" +message);
}
@RabbitListener(queues = Constants.TOPIC_QUEUE2)
public void queueListener2(String message) {
System.out.println("listener 2 ["+Constants.TOPIC_QUEUE2+"] 接收到消息:" +message);
}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/9ddab4ace465b1e07d5458a6039d706f..png

结语: 写博客不仅仅是为了分享学习经历,同时这也有利于我巩固知识点,总结该知识点,由于作者水平有限,对文章有任何问题的还请指出,接受大家的批评,让我改进。同时也希望读者们不吝啬你们的点赞+收藏+关注,你们的鼓励是我创作的最大动力!  

https://blog.csdn.net/chaodddddd/article/details/143755517

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据资产管理白皮书》下载地址:

《行业指标体系白皮书》下载地址:

《数据治理行业实践白皮书》下载地址:

《数栈V6.0产品白皮书》下载地址:

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:

0条评论
上一篇:Yarn集群监控
下一篇:
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群