博客 【Rabbitmq篇】RabbitMQ⾼级特性----持久性,发送⽅确认,重试机制

【Rabbitmq篇】RabbitMQ⾼级特性----持久性,发送⽅确认,重试机制

   数栈君   发表于 2025-01-14 15:09  184  0

一.持久化

我们在前⾯讲了消费端处理消息时,消息如何不丢失,但是如何保证当RabbitMQ服务停掉以后,⽣产者发送的消息不丢失呢.默认情况下, RabbitMQ 退出或者由于某种原因崩溃时,会忽视队列和消息,除⾮告知他不要这么做.

它确保消息在消息队列崩溃、重启或其他故障情况下仍能得以保留。

RabbitMQ 的持久化是确保消息在服务器重启或意外故障后不会丢失的重要特性。它主要涉及到交换机(Exchange)、队列(Queue)和消息(Message)三个层面的持久化。

1 .交换机持久化

交换器的持久化是通过在声明交换机时是将durable参数置为true实现的.相当于将交换机的属性在服务器内部保存,当MQ的服务器发⽣意外或关闭之后,重启 RabbitMQ 时不需要重新去建⽴交换机,交换机会⾃动建⽴,相当于⼀直存在

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/ce81b92def5b144b1523f104fdc3a0d6..png



此时都是持久化的

设置交换机持久化为true
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/f1256bba3cc218a589b350c54090e642..png


设置交换机持久化为false

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/cad4fb75696fbf19ffb2b2e8e238d68d..png


测试持久化交换机:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/0aca931a99a62733b3fc5a28323b3ebc..png



测试非持久化交换机:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/0f343d3ee35731c44f5b0f5f25e13cb0..png



此时我们重启我们的rabbitmq 看看结果怎么样?
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/0a27a2eff1c5892a0abb44712c7336d1..png

service rabbitmq-server restart

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/0747a82dfd10e8e2b93fb4965fa2ff38..png

此时非持久化的交换机没了,所有消息也丢失了

如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换机元数据会丢失,对⼀个⻓期使⽤的交换器来说,建议将其置为持久化的.

2 队列持久化

队列的持久化是通过在声明队列时将 durable 参数置为true实现的.
如果队列不设置持久化,那么在RabbitMQ服务重启之后,该队列就会被删掉,此时数据也会丢失.(队列没有了,消息也⽆处可存了)

持久化队列:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/32535800c1983843207b8b1adad6c078..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/450eb2b845bd48ef29abb63f1e49a661..png


点开源码可以看见该⽅法默认 durable 是true

非持久化队列:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/b53727e8b5827e98069a305998181458..png

3.消息持久化

消息实现持久化,需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2,
也就是 MessageDeliveryMode. PERSISTENT
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/e43b9e9f474018a7a20c12b9a5acf004..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c314330982833def03a187d1d818d20a..png



测试场景

1)队列持久化&&消息持久化 vs 队列非持久化&&消息非持久化

@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","consumer ack mode test...");
return "消息发送成功!";
}
@RequestMapping("/pres")
public String pres() {
Message message = new Message("Presistent test...".getBytes(), new MessageProperties());
//消息非持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
System.out.println(message);
rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);
return "消息发送成功";
}

队列

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/25dac8e7fb26fa76455d468c1e605000..png


重启设备 更新结果
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c9054febc8bee2c67506d9833143bd36..png



队列持久化&&消息持久化 消息保留

队列非持久化&&消息非持久化 丢失

2)队列持久化 &&消息非持久化

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c1eb37bf33e5b33fa14c4625b46f4558..png


@RequestMapping("/pres")
public String pres() {
Message message = new Message("Presistent test...".getBytes(), new MessageProperties());
//消息非持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
System.out.println(message);
rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);
return "消息发送成功";
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/109aabd9c9eaee56a80bbece5c7027b5..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/e57c0e685dac3d3c372d356408cac119..png


重启 更新结果 

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/4b961d8feb1b894fca9c727c44a8a8da..png



队列持久化 &&消息非持久化 队列未丢失,消息丢失

3)队列非持久化 &&消息持久化

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/084de687b8e047f4e79c9fa28bf7ea13..png


@RequestMapping("/pres")
public String pres() {
Message message = new Message("Presistent test...".getBytes(), new MessageProperties());
//消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
System.out.println(message);
rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);
return "消息发送成功";
}


重启 更新结果
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/02343171162a49c1dad61a306598a07a..png



队列非持久化 &&消息持久化 队列丢失,消息丢失

将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗

答案是否定的.

1. 从消费者来说,如果在订阅消费队列时将autoAck参数设置为true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据居丢失.这种情况很好解决,将autoAck参数设置为false,并进⾏⼿动确认,详细可以参考[消息确认]章节.
2. 在持久化的消息正确存⼊RabbitMQ之后,还需要有⼀段时间(虽然很短,但是不可忽视)才能存⼊磁盘中.RabbitMQ并不会为每条消息都进⾏同步存盘(调⽤内核的fsync⽅法)的处理,可能仅仅保存到操作系统缓存之中⽽不是物理磁盘之中.如果在这段时间内RabbitMQ服务节点发⽣了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失.

如何解决?

1. 引⼊RabbitMQ的仲裁队列(后⾯再讲),如果主节点(master)在此特殊时间内挂掉,可以⾃动切换到从节点(slave),这样有效地保证了⾼可⽤性,除⾮整个集群都挂掉(此⽅法也不能保证100%可靠,但是配置了仲裁队列要⽐没有配置仲裁队列的可靠性要⾼很多,实际⽣产环境中的关键业务队列⼀般都会设置仲裁队列).
2. 还可以在发送端引⼊事务机制或者发送⽅确认机制来保证消息已经正确地发送并存储⾄RabbitMQ中,详细参考下⼀个章节内容介绍--"发送⽅确认"

二.发送⽅确认

在使⽤RabbitMQ的时候,可以通过消息持久化来解决因为服务器的异常崩溃⽽导致的消息丢失,

但是还有⼀个问题,当消息的⽣产者将消息发送出去之后,消息到底有没有正确地到达服务器呢

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/9c24e03259f7584064b0f20025693e39..png

如果在消息到达服务器之前已经丢失(⽐如RabbitMQ重启,那么RabbitMQ重启期间⽣产者消息投递失败),持久化操作也解决不了这个问题,

因为消息根本没有到达服务器,何谈持久化 ?

RabbitMQ为我们提供了两种解决⽅案:
a. 通过事务机制实现(消耗性能,不推荐)
b. 通过发送⽅确认(publisherconfirm)机制实现

1 .confirm确认模式

Producer在发送消息的时候,对发送端设置⼀个ConfirmCallback的监听,⽆论消息是否到达
Exchange,这个监听都会被执⾏,如果Exchange成功收到,ACK( Acknowledge character ,确认字符)为true,如果没收到消息,ACK就为false.

1)配置RabbitMQ

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/2fae261215b488ad906f8134f5df6b8e..png



2)设置确认回调逻辑并发送消息


⽆论消息确认成功还是失败,都会调⽤ConfirmCallback的confirm⽅法.如果消息成功发送到Broker, ack为true.
如果消息发送失败,ack为false,并且cause提供失败的原因.
package com.bite.extensions.config;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitTemplateConfig {
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置回调方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("执行了confirm方法");
if (ack) {
System.out.printf("接收到消息, 消息ID: %s \n", correlationData == null ? null : correlationData.getId());
} else {
System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData == null ? null : correlationData.getId(), cause);
//相应的业务处理
}
}
});
return rabbitTemplate;
}
}


3)测试

@RequestMapping("/producer")
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","consumer ack mode test...");
return "消息发送成功!";
}
@RequestMapping("/pres")
public String pres() {
Message message = new Message("Presistent test...".getBytes(), new MessageProperties());
//消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
System.out.println(message);
rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);
return "消息发送成功";
}
@RequestMapping("/confirm")
public String confirm() {
CorrelationData correlationData = new CorrelationData("1");
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE+"hhh","confirm","confirm test...",correlationData);

return "消息发送成功";
}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/f9da75f9ca48d90711bb1b06cf8c2a41..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/ff30ad64b3d438149d3d16903853952f..png




http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/523ca1168cd1c2633c33068653ee25d6..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/65b4f317991c92ce1880a50078698e59..png


这里在prev 方法里面没有实现 confirmRabbitTemplate方法 却依然打印了接收到信息
为什么?

因为此时他们共用了一个confirmRabbitTemplate方法

此时要把他们分开 各种使用各自的
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/468fc84e127fbfc2dbb8d4e79226e98f..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/46bc1e20b13a9ab2c0f29545bb385f88..png

测试正常:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/38cfb226a3ee14ce84767da6d391bef9..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/edb3ae5163fa430b2562eb3ad43b52e2..png


抛出问题 继续测试
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/e5b67e2d6e122d5b60f10ce2391c6a8e..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/434de04d614459f8cbfcd66a0a0004e8..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d9e4d8d056cdd9c4af5ca853e47e7a1b..png


此时不正确的routingKey 此时应该发送失败 为什么还发送成功?


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/3c68fbfff6a485e662808a304eefb1d7..png



原来此时ack只是看看是否发送到交换机 发送成功就显示true

但是并没有发送到对应队列 这如何解决?

引入return模式

2 return退回模式

消息到达Exchange之后,会根据路由规则匹配,把消息放⼊Queue中.Exchange到Queue的过程,如果⼀条消息⽆法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等),可以选择把消息退回给发送者.消息退回给发送者时,我们可以设置⼀个返回回调⽅法,对消息进⾏处理.

1)配置RabbitMQ

publisher-confirm-type: correlated #消息发送确认

2) 设置返回回调逻辑并发送消息

//rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息退回:"+returnedMessage);
}
});

使⽤RabbitTemplate的setMandatory⽅法设置消息的mandatory属性为true(默认为false).这个属性的作⽤是告诉RabbitMQ,如果⼀条消息⽆法被任何队列消费,RabbitMQ应该将消息返回给发送者,此时 ReturnCallback 就会被触发.

3)测试

@RequestMapping("/returns")
public String returns() {
CorrelationData correlationData = new CorrelationData("3");
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm111","returns test...",correlationData);
return "消息发送成功";
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/1e952617edfb21ff48492742bc476c57..png

并没有撤回 原因在于 :

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d99965833848e703d9fe93e565acea73..png



给他恢复设置成true rabbitTemplate.setMandatory(true);

继续测试
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/610390230fcb28965b8ba1ce99fc788a..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/06bc1fc5db624df76a6de47f4de0ec4d..png


此时消息已被撤回 剩下的还是刚才的未处理的信息

如何保证RabbitMQ消息的可靠传输?(面试重点)

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/b9e64f86f0db8baaa0584fe4fe93d851..png

从这个图中,可以看出,消息可能丢失的场景以及解决⽅案:

1. ⽣产者将消息发送到RabbitMQ失败

a. 可能原因:⽹络问题等

b. 解决办法:参考本章节[发送⽅确认-confirm确认模式]

2. 消息在交换机中⽆法路由到指定队列:


a. 可能原因:代码或者配置层⾯错误,导致消息路由失败

b. 解决办法:参考本章节[发送⽅确认-return模式]

3. 消息队列⾃⾝数据丢失


a. 可能原因:消息到达RabbitMQ之后,RabbitMQServer宕机导致消息丢失.

b. 解决办法:参考本章节[持久性].开启RabbitMQ持久化,就是消息写⼊之后会持久化到磁盘,如果RabbitMQ挂了,恢复之后会⾃动读取之前存储的数据.(极端情况下,RabbitMQ还未持久化就挂了,可能导致少量数据丢失,这个概率极低,也可以通过集群的⽅式提⾼可靠性)

4. 消费者异常,导致消息丢失

a. 可能原因:消息到达消费者,还没来得及消费,消费者宕机.消费者逻辑有问题.

b. 解决办法:参考本章节[消息确认].RabbitMQ提供了消费者应答机制来使RabbitMQ能够感知到消费者是否消费成功消息.默认情况下消费者应答机制是⾃动应答的,可以开启⼿动确认,当消费者确认消费成功后才会删除消息,从⽽避免消息丢失.除此之外,也可以配置重试机制(参考下⼀章节),当消息消费异常时,通过消息重试确保消息的可靠性

三. 重试机制

在消息传递过程中,可能会遇到各种问题,如⽹络故障,服务不可⽤,资源不⾜等,这些问题可能导致消息处理失败.为了解决这些问题,RabbitMQ提供了重试机制,允许消息在处理失败后重新发送.
但如果是程序逻辑引起的错误,那么多次重试也是没有⽤的,可以设置重试次数

1 )重试配置

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/080a62d17c40be73f91131abdf87c5a9..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/35376c161a51d8a2509a441bb2c639e4..png



2) 配置交换机&队列

//重试机制
public static final String RETRY_QUEUE = "retry.queue";
public static final String RETRY_EXCHANGE = "retry.exchange";
//重试机制
@Bean("retryQueue")
public Queue retryQueue(){
return QueueBuilder.durable(Constants.RETRY_QUEUE).build();
}
@Bean("retryExchange")
public DirectExchange retryExchange(){
return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).build();
}
@Bean("retryBinding")
public Binding retryBinding(@Qualifier("retryQueue") Queue queue, @Qualifier("retryExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("retry").noargs();
}

3 )发送消息

@RequestMapping("/retry")
public String retry() {
confirmRabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","retry test...");
return "消息发送成功";
}


4)消费消息

@RabbitListener(queues = Constants.RETRY_QUEUE)
public void handlerMessage(Message message) throws UnsupportedEncodingException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("["+Constants.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);
int num = 3/0;
System.out.println("业务处理完成");
}


5)auto(自动)测试

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/150ecd0325bbba8deff235ae6a9271b4..png





重发5次 因为代码错误 所以没有效果

6)manual 测试

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/f2d8543ba0f4ae11e977b58cf8dae2a2..png


@RabbitListener(queues = Constants.RETRY_QUEUE)
public void handlerMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);
try {
int num = 3 / 0;
System.out.println("业务处理完成");
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true);
}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d24ef33df61e2d888ceb23d8b5375418..png


无效 达不到我们想要的目的

使⽤重试机制时需要注意:

1. ⾃动确认模式下:程序逻辑异常,多次重试还是失败,消息就会被⾃动确认,那么消息就丢失

2. ⼿动确认模式下:程序逻辑异常,多次重试消息依然处理失败,⽆法被确认,就⼀直是
unacked的状态,导致消息积压

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据资产管理白皮书》下载地址:

《行业指标体系白皮书》下载地址:

《数据治理行业实践白皮书》下载地址:

《数栈V6.0产品白皮书》下载地址:

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack


0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群