设置交换机持久化为false
测试持久化交换机:
测试非持久化交换机:
此时我们重启我们的rabbitmq 看看结果怎么样?
service rabbitmq-server restart
此时非持久化的交换机没了,所有消息也丢失了
如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换机元数据会丢失,对⼀个⻓期使⽤的交换器来说,建议将其置为持久化的.
点开源码可以看见该⽅法默认 durable 是true
非持久化队列:
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","consumer ack mode test...");
return "消息发送成功!";
}
@RequestMapping("/pres")
public String pres() {
Message message = new Message("Presistent test...".getBytes(), new MessageProperties());
//消息非持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
System.out.println(message);
rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);
return "消息发送成功";
}
队列
重启设备 更新结果
队列持久化&&消息持久化 消息保留
队列非持久化&&消息非持久化 丢失
@RequestMapping("/pres")
public String pres() {
Message message = new Message("Presistent test...".getBytes(), new MessageProperties());
//消息非持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
System.out.println(message);
rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);
return "消息发送成功";
}
@RequestMapping("/pres")
public String pres() {
Message message = new Message("Presistent test...".getBytes(), new MessageProperties());
//消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
System.out.println(message);
rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);
return "消息发送成功";
}
重启 更新结果
队列非持久化 &&消息持久化 队列丢失,消息丢失
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗
答案是否定的.
1. 从消费者来说,如果在订阅消费队列时将autoAck参数设置为true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据居丢失.这种情况很好解决,将autoAck参数设置为false,并进⾏⼿动确认,详细可以参考[消息确认]章节.
2. 在持久化的消息正确存⼊RabbitMQ之后,还需要有⼀段时间(虽然很短,但是不可忽视)才能存⼊磁盘中.RabbitMQ并不会为每条消息都进⾏同步存盘(调⽤内核的fsync⽅法)的处理,可能仅仅保存到操作系统缓存之中⽽不是物理磁盘之中.如果在这段时间内RabbitMQ服务节点发⽣了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失.
如何解决?
1. 引⼊RabbitMQ的仲裁队列(后⾯再讲),如果主节点(master)在此特殊时间内挂掉,可以⾃动切换到从节点(slave),这样有效地保证了⾼可⽤性,除⾮整个集群都挂掉(此⽅法也不能保证100%可靠,但是配置了仲裁队列要⽐没有配置仲裁队列的可靠性要⾼很多,实际⽣产环境中的关键业务队列⼀般都会设置仲裁队列).
2. 还可以在发送端引⼊事务机制或者发送⽅确认机制来保证消息已经正确地发送并存储⾄RabbitMQ中,详细参考下⼀个章节内容介绍--"发送⽅确认"
如果在消息到达服务器之前已经丢失(⽐如RabbitMQ重启,那么RabbitMQ重启期间⽣产者消息投递失败),持久化操作也解决不了这个问题,
因为消息根本没有到达服务器,何谈持久化 ?
RabbitMQ为我们提供了两种解决⽅案:
a. 通过事务机制实现(消耗性能,不推荐)
b. 通过发送⽅确认(publisherconfirm)机制实现
package com.bite.extensions.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitTemplateConfig {
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置回调方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("执行了confirm方法");
if (ack) {
System.out.printf("接收到消息, 消息ID: %s \n", correlationData == null ? null : correlationData.getId());
} else {
System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData == null ? null : correlationData.getId(), cause);
//相应的业务处理
}
}
});
return rabbitTemplate;
}
}
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","consumer ack mode test...");
return "消息发送成功!";
}
@RequestMapping("/pres")
public String pres() {
Message message = new Message("Presistent test...".getBytes(), new MessageProperties());
//消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
System.out.println(message);
rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);
return "消息发送成功";
}
@RequestMapping("/confirm")
public String confirm() {
CorrelationData correlationData = new CorrelationData("1");
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE+"hhh","confirm","confirm test...",correlationData);
return "消息发送成功";
}
}
这里在prev 方法里面没有实现 confirmRabbitTemplate方法 却依然打印了接收到信息
为什么?
因为此时他们共用了一个confirmRabbitTemplate方法
此时要把他们分开 各种使用各自的
测试正常:
抛出问题 继续测试
此时不正确的routingKey 此时应该发送失败 为什么还发送成功?
原来此时ack只是看看是否发送到交换机 发送成功就显示true
但是并没有发送到对应队列 这如何解决?
引入return模式
publisher-confirm-type: correlated #消息发送确认
//rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息退回:"+returnedMessage);
}
});
使⽤RabbitTemplate的setMandatory⽅法设置消息的mandatory属性为true(默认为false).这个属性的作⽤是告诉RabbitMQ,如果⼀条消息⽆法被任何队列消费,RabbitMQ应该将消息返回给发送者,此时 ReturnCallback 就会被触发.
@RequestMapping("/returns")
public String returns() {
CorrelationData correlationData = new CorrelationData("3");
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm111","returns test...",correlationData);
return "消息发送成功";
}
并没有撤回 原因在于 :
给他恢复设置成true rabbitTemplate.setMandatory(true);
继续测试
此时消息已被撤回 剩下的还是刚才的未处理的信息
如何保证RabbitMQ消息的可靠传输?(面试重点)
从这个图中,可以看出,消息可能丢失的场景以及解决⽅案:
1. ⽣产者将消息发送到RabbitMQ失败
a. 可能原因:⽹络问题等
b. 解决办法:参考本章节[发送⽅确认-confirm确认模式]
2. 消息在交换机中⽆法路由到指定队列:
a. 可能原因:代码或者配置层⾯错误,导致消息路由失败
b. 解决办法:参考本章节[发送⽅确认-return模式]
3. 消息队列⾃⾝数据丢失
a. 可能原因:消息到达RabbitMQ之后,RabbitMQServer宕机导致消息丢失.
b. 解决办法:参考本章节[持久性].开启RabbitMQ持久化,就是消息写⼊之后会持久化到磁盘,如果RabbitMQ挂了,恢复之后会⾃动读取之前存储的数据.(极端情况下,RabbitMQ还未持久化就挂了,可能导致少量数据丢失,这个概率极低,也可以通过集群的⽅式提⾼可靠性)
4. 消费者异常,导致消息丢失
a. 可能原因:消息到达消费者,还没来得及消费,消费者宕机.消费者逻辑有问题.
b. 解决办法:参考本章节[消息确认].RabbitMQ提供了消费者应答机制来使RabbitMQ能够感知到消费者是否消费成功消息.默认情况下消费者应答机制是⾃动应答的,可以开启⼿动确认,当消费者确认消费成功后才会删除消息,从⽽避免消息丢失.除此之外,也可以配置重试机制(参考下⼀章节),当消息消费异常时,通过消息重试确保消息的可靠性
//重试机制
public static final String RETRY_QUEUE = "retry.queue";
public static final String RETRY_EXCHANGE = "retry.exchange";
//重试机制
@Bean("retryQueue")
public Queue retryQueue(){
return QueueBuilder.durable(Constants.RETRY_QUEUE).build();
}
@Bean("retryExchange")
public DirectExchange retryExchange(){
return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).build();
}
@Bean("retryBinding")
public Binding retryBinding(@Qualifier("retryQueue") Queue queue, @Qualifier("retryExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("retry").noargs();
}
@RequestMapping("/retry")
public String retry() {
confirmRabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","retry test...");
return "消息发送成功";
}
@RabbitListener(queues = Constants.RETRY_QUEUE)
public void handlerMessage(Message message) throws UnsupportedEncodingException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("["+Constants.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);
int num = 3/0;
System.out.println("业务处理完成");
}
@RabbitListener(queues = Constants.RETRY_QUEUE)
public void handlerMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);
try {
int num = 3 / 0;
System.out.println("业务处理完成");
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true);
}
}
无效 达不到我们想要的目的
使⽤重试机制时需要注意:
1. ⾃动确认模式下:程序逻辑异常,多次重试还是失败,消息就会被⾃动确认,那么消息就丢失
了
2. ⼿动确认模式下:程序逻辑异常,多次重试消息依然处理失败,⽆法被确认,就⼀直是
unacked的状态,导致消息积压
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《行业指标体系白皮书》下载地址:
《数据治理行业实践白皮书》下载地址:
《数栈V6.0产品白皮书》下载地址:
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack