博客 微服务(SpringCloud)第四篇之RabbitMQ(消息队列基础篇)

微服务(SpringCloud)第四篇之RabbitMQ(消息队列基础篇)

   数栈君   发表于 2024-03-27 11:13  971  0

RabbitMQ(基础篇)
同步和异步
RabbitMQ是高性能的异步通讯组件
微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们成这种调用方式为同步调用,也可以叫同步通讯。但在很多场景下,我们可能需要采用异步通讯的方式,为什么呢?
我们先来看看什么是同步通讯和异步通讯。如图:


同步通讯:就如同打视频电话,双方的交互都是实时的。因此同一时刻你只能跟一个人打视频电话。


异步通讯:就如同发微信聊天,双方的交互不是实时的,你不需要立刻给对方回应。因此你可以多线操作,同时跟多人聊天。


两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发微信可以同时与多个人收发微信,但是往往响应会有延迟。

所以,如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。

同步调用


同步调用的优势是什么?
时效性强,等待到结果后才返回
同步调用的问题是什么?
1.拓展性差

目前的业务相对简单,但是随着业务规模扩大,产品的功能也在不断完善。也就是说每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动,不符合开闭原则,拓展性不好。

2.性能下降
由于我们采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和:

假如每个微服务的执行时长都是50ms,则最终整个业务的耗时可能高达300ms,性能太差了。

3.级联失败问题
由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。
这其实就是同步调用的级联失败问题。
但是大家思考一下,我们假设用户余额充足,扣款已经成功,此时我们应该确保支付流水单更新为已支付,确保交易成功。毕竟收到手里的钱没道理再退回去吧。因此,这里不能因为短信通知、更新订单状态失败而回滚整个事务。

异步调用
异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
消息发送者:投递消息的人,就是原来的调用方
消息代理:管理,暂存,转发消息,你可以把它理解成微信服务器
消息接收者:接收和处理消息的人,就是原来的服务提供方

支付服务不在同步调用业务关联度低的服务,而是发送消息通知到Broker。
具备下列优势:
解除耦合,拓展性强
无需等待,性能好
故障隔离
缓存消息,流量削峰填谷

(QPS为每秒钟的访问量)

异步调用的优势是什么?
耦合度低,拓展性强
异步调用,无需等待,性能好
故障隔离,下游服务故障不影响上游业务
缓存消息,流量削峰填谷
异步调用的问题是什么?
不能立即得到调用结果,时效性差
不确定下游业务执行是否成功
业务安全依赖于Broker(消息代理)的可靠性
MQ技术选型
MQ(MessageQueue),中文是消息队列,字面意思就是存放消息的队列。也就是异步调用中的Broker

RabbitMQ的消息可靠性比较高很多公司也在用RabbitMQ,所以这里我们以RabbitMQ为例讲解,其他的MQ在OpenFeign的调用下都大同小异


RabbitMQ的整体架构及核心概念:
virtual-host:虚拟主机,起到数据隔离的作用
publisher:消息发送者
consumer:消息的消费者
queue:队列,存储消息
exchange:交换机,负责路由转发消息(没有存储消息的能力)

快速入门
1.创建一个消息队列


2.选择交换机并绑定消息队列


可以看到队列和交换机绑定成功了


3.可以给利用交换机发送消息

4.队列可以查看消息

数据隔离
需求:在RabbitMQ的控制台完成下列操作:
新建一个用户hmall
为hmall用户创建一个virtual host
测试不同virtual host之间的数据隔离现象

用hmall用户登录创建虚拟主机

选择自己的虚拟主机就看不到别的用户的队列(保证了数据隔离)

交换机也是隔离的
所以当你有多个项目的时候,可以创建不同的用户以及不同的虚拟主机实现数据隔离
SpringAMQP(Java客户端)
快速入门
SpringAmqp的官方地址


1.引入spring-amqp依赖
在工程中引入spring-amqp依赖,这样publisher和consumer服务都可以使用:

2.配置RabbitMQ服务信息
在每个微服务中引入MQ服务端信息,这样微服务才能连接到RabbitMQ

3.发送消息
SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。发送消息代码如下:
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
void testSendMessage2Queue() {
//队列名称
String queueName = "simple.queue";
//消息
String msg = "hello, amqp!";
//发送消息
rabbitTemplate.convertAndSend(queueName, msg);
}
1
2
3
4
5
6
7
8
9
10
11
12
4.接收消息
SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法:
@Slf4j//日志
@Component//注册为一个Bean
public class MqListener {

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg){
System.out.println("消费者收到了simple.queue的消息:【" + msg +"】");
}
}
1
2
3
4
5
6
7
8
9


SpringAMQP如何发消息?
1.引入spring-boot-starter-amqp依赖
2.配置rabbitmq服务端信息
3.利用RabbitTemplate发送消息
4.利用@RabbitListener注解声明要监听的队列,监听消息
work模式(Work Queues)
Work queues,任务模型简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

消费者消息推送限制
默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者,但者并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
因此我们需要修改application.yaml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:

Work模型的使用:
多个消费者绑定到一个队列,可以加快消息处理速度
通一条消息只会被一个消费者处理
通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳
交换机
真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种:
Fanout:广播
Direct:定向
Topic:话题

Fanout交换机
FanoutExchange会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式

利用SpringAMQP演示FanoutExchange的使用
实现思路如下:
1.在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
2.在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定
3.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) throws InterruptedException {
System.out.println("消费者1 收到了 fanout.queue1的消息:【" + msg +"】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) throws InterruptedException {
System.out.println("消费者2 收到了 fanout.queue2的消息:【" + msg +"】");
}
1
2
3
4
5
6
7
8
9
4.在publisher中编写测试方法,向hmall.fanout发送消息
@Test
void testSendFanout() {
String exchangeName = "hmall.fanout";
String msg = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, null, msg);
}
1
2
3
4
5
6


交换机的作用是什么?
接收publisher发送的消息
将消息按照规则路由到与之绑定的队列
FanoutExchange会将消息路由到每个绑定的队列
Direct交换机
DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由
每一个Queue都与Exchange设置一个Bindingkey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列


描述下Direct交换机与Fanout交换机的差异
Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同RoutingKey,则与Fanout功能类似
Topic交换机(推荐使用,功能最强大)
TopicExchange与DirectExchange类似,区别在于routingKey可以时多个单词的列表,并且以 . 分割,Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词

利用SpringAMQP演示DirectExchange的使用
1.在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2

2.在RabbitMQ控制台中,声明交换机hmall.topic,将两个队列与其绑定

3.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) throws InterruptedException {
System.out.println("消费者1 收到了 topic.queue1的消息:【" + msg +"】");
}

@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) throws InterruptedException {
System.out.println("消费者2 收到了 topic.queue2的消息:【" + msg +"】");
}
1
2
3
4
5
6
7
8
9
4.在publisher中编写测试方法,利用不同的RoutingKey向hmall.topic发送消息
@Test
void testSendTopic() {
String exchangeName = "hmall.topic";
String msg = "今天天气挺不错,我的心情的挺好的";
rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg);
}
1
2
3
4
5
6
描述下Direct交换机和Topic交换机的差异?
Topic交换机接收的消息RoutingKey可以时多个单词,以点分割
Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个词
*:代表1个单词
声明队列交换机
SpringAMQP提供了几个类,用来声明队列,交换机及其绑定关系:
Queue:用于声明队列,可以用工厂类QueueBuilder构建
Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

例如,声明(基于Bean的声明方式)一个Fanout类型的交换机,并创建队列与其绑定:
该方式需要写多个Bean比较麻烦不推荐
@Configuration
public class FanoutConfiguration {

@Bean//声明fanout交换机
public FanoutExchange fanoutExchange(){
// ExchangeBuilder.fanoutExchange("").build();
return new FanoutExchange("hmall.fanout2");
}

@Bean//声明队列
public Queue fanoutQueue3(){
// QueueBuilder.durable("ff").build();//durable 耐用的持久的(持久化把队列写入磁盘)
return new Queue("fanout.queue3");
}

@Bean//声明绑定关系
public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
}

@Bean//声明队列
public Queue fanoutQueue4(){
return new Queue("fanout.queue4");
}

@Bean//声明绑定关系
public Binding fanoutBinding4(){
return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:
该方式大大简化了创建Bean的过程简化了代码推荐使用这种方式
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1", durable = "true"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) throws InterruptedException {
System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg +"】");
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2", durable = "true"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg) throws InterruptedException {
System.out.println("消费者2 收到了 direct.queue2的消息:【" + msg +"】");
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
声明队列,交换机,绑定关系的Bean是什么?
Queue
FanoutExchange,DirectExchange,TopicExchange
Binding
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
@Queue
@Exchanger
消息转换器
需求:测试利用SpringAMQP发送对象类型的消息

1.声明一个队列,名为object.queue

2.编写单元测试,向队列中直接发送一条消息,消息类型为Map

3.在控制台查看消息,总结问题



这里可以看到它对我们发送的消息做了序列化,jdk自带的对象字节流给我们的消息转成字节了

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。

存在以下问题

1.JDK的序列化有安全风险
2.JDK序列化的消息太大占用空间
3.JDK序列化的消息可读性差
建议采用JSON序列化代替默认的JDK序列化,需要做两件事情

1.在publisher和consumer中都要引入依赖:

<!--Jackson-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
1
2
3
4
5
2.在publisher和consumer中都要配置MessageConverter:
@Bean
public MessageConverter jacksonMessageConvertor(){
return new Jackson2JsonMessageConverter();
}
1
2
3
4
重新发送刚才的消息,效果如下


内容来源于网络,如侵删

《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs

《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack   

0条评论
上一篇:RabbitMQ延迟队列
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群