博客 RabbitMQ之交换机

RabbitMQ之交换机

   数栈君   发表于 2024-01-29 13:56  458  0

一、RabbitMQ交换机
1、交换机的由来
在RabbitMQ中,生产者发送信息不会直接将消息投递到队列中,而是将消息投递到交换机中,再由交换机转发到具体的队列中,队列再将消息以推送或者拉取方式给消费进行消费
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user166259/article/ac546bc2cc348da88b24710f35414eed..jpg



在交换机诞生了两个概念

1、路由键:



2、绑定键:


3、两者中的关系



2、交换机类型

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user166259/article/b9aec306773a691eb0b543352edd97a4..jpg

2.1直连交换机(Direct Exchange)
如图所示:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user166259/article/4520d7479bfc365f6cf368d1acaf9960..jpg

2.2主题交换机(Topic Exchange)

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user166259/article/65b31c2025546c607d97d809c3116fa8..jpg
 
2.3扇形交换机(Fanout Exchange)

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user166259/article/2ff6a91a765bd35ceb2c254ba98a33c1..jpg
 
2.4首部交换机(Headers Exchange)
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user166259/article/c2031154aedec087675bd085160d7b8a..jpg


2.5默认交换机(Default Exchange)
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user166259/article/ff258e3efebd0c29fb92cd78dd8fad14..jpg


二、RabbitMQ交换机实例讲解
一、直连交换机讲解
1、先在生产者中创建一个直连交换机配置类
DirectQueueConfig:生成队列,交换机,以及路由键,定义三个队列
package com.zj.provider;

import lombok.With;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
@SuppressWarnings("all")
public class DirectQueueConfig {


/**
* 生成一个队列
* @return
*/
@Bean
public Queue directQueueA(){
return new Queue("directQueueA",true);
}

@Bean
public Queue directQueueB(){
return new Queue("directQueueB",true);
}

@Bean
public Queue directQueueC(){
return new Queue("directQueueC",true);
}



@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}


@Bean
public Binding bindingA(){
return BindingBuilder.bind(directQueueA()).to(directExchange()).with("AA");
}

@Bean
public Binding bindingB(){
return BindingBuilder.bind(directQueueB()).to(directExchange()).with("BB");
}

@Bean
public Binding bindingC(){
return BindingBuilder.bind(directQueueC()).to(directExchange()).with("CC");
}





}

2、之后创建一个控制类,用来发信息
DirectController:其中rabbitTemplate用来发送信息辅助类

package com.zj.provider;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Map;

@RestController
@RequestMapping("/sendDirect")
@SuppressWarnings("all")
public class DirectController {

@Autowired
private RabbitTemplate rabbitTemplate;


@RequestMapping("/sendDirect")
public String sendDirect(String routerKey) {
rabbitTemplate.convertAndSend("directExchange", routerKey, "Hello world");
return "yes";
}
}

3、在消费者中定义好接受者
DirectReciverA:再生成连个同样的类但是要注意的是必须要打@RabbitHandler和@RabbitListener(queues = "directQueueA")第一个是对队列处理者,第二个是队列的监听者,监听队列,不加第一个注解,消息将会接收不到
package com.zj.consumer.mq;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "directQueueA")
public class DirectReciverA {



@RabbitHandler
public void process(String message){
log.warn("A接收到了"+message);
}


}

结果运行成功:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user166259/article/1d0f8b13fd66625e514f5e6ed86a7cbd..jpg



二、主题交换机讲解
1、先在生产者中创建一个直连交换机配置类
TopicQueueConfig:注意:这里面需要特定指定键
注意:必须在绑定键前加一个Topic来区分,必须介以区别,不然将会报错,因为加入了bean对象

package com.zj.provider.MQ;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
@SuppressWarnings("all")
public class TopicQueueConfig {

private final static String KEY_A="*.orange.*";
private final static String KEY_B="*.*.rabbit";
private final static String KEY_C="lazy.#";



/**
* 生成一个队列
* @return
*/
@Bean
public Queue topicQueueA(){
return new Queue("topicQueueA",true);
}

@Bean
public Queue topicQueueB(){
return new Queue("topicQueueB",true);
}

@Bean
public Queue topicQueueC(){
return new Queue("topicQueueC",true);
}


@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}


@Bean
public Binding topicbindingA(){
return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(KEY_A);
}

@Bean
public Binding topicbindingB(){
return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(KEY_B);
}

@Bean
public Binding topicbindingC(){
return BindingBuilder.bind(topicQueueC()).to(topicExchange()).with(KEY_C);
}





}

2、之后创建一个控制类,用来发信息
@RequestMapping("/sendTopic")
public String sendTopic(String routerKey) {
rabbitTemplate.convertAndSend("topicExchange", routerKey, "Hello world");
return "yes";
}
3、在消费者中定义好接受者:
package com.zj.consumer.mq;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "topicQueueA")
public class TopicReciverA {



@RabbitHandler
public void process(String message){
log.info("A接收到了"+message);
}


}

注意:需要进行发信息才能在RabbitMQ发现队列
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user166259/article/c95cdd98a68c7e7755ae4bc248a79f26..jpg



显示出队列:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user166259/article/78d76935a6143aa45f8d822ac1157219..jpg



接收成功:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user166259/article/948121b21dda17fd036de27fa1d9f0fb..jpg



三、扇形(广播)交换机讲解
扇形交换机和其他两个交换机不一样,扇形交换机不用绑定键,因为他会进行广播,同样的在队列与交换机进行绑定时,需要加上不同的名字来进行区分

1、先在生产者中创建一个直连交换机配置类
package com.zj.provider.MQ;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
@SuppressWarnings("all")
public class FanoutQueueConfig {


/**
* 生成一个队列
* @return
*/
@Bean
public Queue fanoutQueueA(){
return new Queue("fanoutQueueA",true);
}

@Bean
public Queue fanoutQueueB(){
return new Queue("fanoutQueueB",true);
}

@Bean
public Queue fanoutQueueC(){
return new Queue("fanoutQueueC",true);
}



@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}


@Bean
public Binding fanoutbindingA(){
return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
}

@Bean
public Binding fanoutbindingB(){
return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
}

@Bean
public Binding fanoutbindingC(){
return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());
}

}

2、之后创建一个控制类,用来发信息
没有绑定键,但是要写空值,不然fanoutExchange会被认为是路由键
@RequestMapping("/sendFanout")
public String sendFanout() {
rabbitTemplate.convertAndSend("fanoutExchange", "null" ,"Hello world");
return "yes";
}
3、在消费者中定义好接受者:
package com.zj.consumer.mq;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "fanoutQueueA")
public class FanoutReciverA {



@RabbitHandler
public void process(String message){
log.info("A接收到了"+message);
}


}

生产者运行效果:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user166259/article/5537e524ce4866956312d7e9ce91c3ff..jpg



消费者接收到信息

今天的知识就分享到这了,希望能够帮助到你!
————————————————

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

原文链接:https://blog.csdn.net/m0_53151031/article/details/123140428



《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack  
0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群