博客 【Rabbitmq篇】RabbitMQ⾼级特性----消息确认

【Rabbitmq篇】RabbitMQ⾼级特性----消息确认

   蓝袋鼠   发表于 2024-12-04 17:47  229  0

前言:

前期讲了RabbitMQ的概念和应⽤,RabbitMQ实现了AMQP0-9-1规范的许多扩展,在RabbitMQ官⽹上,也给⼤家介绍了RabbitMQ的⼀些特性,我们挑⼀些重要的且常⽤的给⼤家讲⼀下

Rabbitmq官网

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/9975a7979c542d93f5c9b1bd88301f90..png

一.消息确认机制 

⽣产者发送消息之后,到达消费端之后,可能会有以下情况:
a. 消息处理成功
b. 消息处理异常

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/17f58beff931f4fe3b61b135f3c3305d..png

RabbitMQ向消费者发送消息之后,就会把这条消息删掉,那么第两种情况,就会造成消息丢失.
那么如何确保消费端已经成功接收了,并正确处理了呢
为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制(messageacknowledgement)。

消费者在订阅队列时,可以指定autoAck参数,根据这个参数设置,消息确认机制分为以下两种: 

• ⾃动确认
当autoAck等于true时, RabbitMQ 会⾃动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,⽽不管消费者是否真正地消费到了这些消息.⾃动确认模式适合对于消息可靠性要求不⾼的场景.


• ⼿动确认
当autoAck等于false时,RabbitMQ会等待消费者显式地调⽤Basic.Ack命令,回复确认信号后才从内存(或者磁盘)中移去消息.这种模式适合对消息可靠性要求⽐较⾼的场景.

手动确认方法又分为三种:
肯定确认:Channel.basicAck(long deliveryTag, boolean multiple) RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了.
否定确认: Channel.basicReject(long deliveryTag, boolean requeue)
消费者客⼾端可以调⽤channel.basicReject⽅法来告诉RabbitMQ拒绝这个消息.
否定批量确认: Channel.basicNack(long deliveryTag, boolean multiple,boolean requeue)
Basic.Reject命令⼀次只能拒绝⼀条消息,如果想要批量拒绝消息,则可以使⽤Basic.Nack这个命令.消费者客⼾端可以调⽤channel.basicNack⽅法来实现.
参数说明:

1)deliveryTag :
消息的唯⼀标识,它是⼀个单调递增的64位的⻓整型值. deliveryTag 是每个通道
(Channel)独⽴维护的,所以在每个通道上都是唯⼀的.当消费者确认(ack)⼀条消息时,必须使⽤对应的通道上进⾏确认.

2)multiple 

是否批量确认.在某些情况下,为了减少⽹络流量,可以对⼀系列连续的 deliveryTag 进
⾏批量确认.值为true则会⼀次性ack所有⼩于或等于指定deliveryTag的消息.值为false,则只确认当前指定deliveryTag的消息.

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/2c9d89c06bfcd3868515e6aa0838b2b9..png

3)requeue

表⽰拒绝后,这条消息如何处理.如果requeue参数设置为true,则RabbitMQ会重新将这条
消息存⼊队列,以便可以发送给下⼀个订阅的消费者.如果requeue参数设置为false,则RabbitMQ会把消息从队列中移除,⽽不会把它发送给新的消费者.

二. 代码实现(spring环境)

1.可以直接使用RabbitMQ Java Client 库

2.使用spring集成的amqp

 主要介绍第二种,在spring环境下实现

Spring-AMQP 对消息确认机制提供了三种策略.

public enum AcknowledgeMode { 
NONE //确认,
MANUAL//手动 ,
AUTO //默认;
}

配置相关信息:

基本信息以及确认机制

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/35e09bdb27903888e8a34efcc745b340..png

队列,交换机,以及它们之间的绑定关系 

package com.bite.extensions.config;

import com.bite.extensions.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
@Bean("ackQueue")
public Queue ackQueue() {
return QueueBuilder.durable(Constants.ACK_QUEUE).build();
}
@Bean("directExchange")
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();
}
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("ackQueue") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("ack");
}
}

生产者:

主要解释消费者在不同确认机制的状态

package com.bite.extensions.controller;

import com.bite.extensions.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RequestMapping("/producer")
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","consumer ack mode test...");
return "消息发送成功!";
}
}

1)AcknowledgeMode.NONE 

这种模式下,消息⼀旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ就会⾃动确认
消息,从RabbitMQ队列中移除消息.如果消费者处理消息失败,消息可能会丢失.

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/144409c9917e3ca99de9c7acfe3f1667..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/37923fa6def7bfeec09fd2b97ed083e0..png

1)消费者 正常消费情况下

package com.bite.extensions.listener;

import com.bite.extensions.constant.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {
//消费者逻辑
long deliverTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
//业务逻辑处理
System.out.println("业务逻辑处理!");

System.out.println("业务逻辑完成!");
}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/436f0cff76c0e5f6e3ba25a6acf7c286..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/2084e29c0a77e0291c9ae81623f144ab..png

消费者正确处理,MQ删除相应信息

2)消费者 异常消费情况下

@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {
//消费者逻辑
long deliverTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
//业务逻辑处理
System.out.println("业务逻辑处理!");
int num = 3/0; //异常
System.out.println("业务逻辑完成!");
}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/578290152153664fb14ad83d348dd4a4..png
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/3b5774928e97b52b0bcb70ab46b8c92e..png

 可以看到,消费者处理失败,但是消息已经从RabbitMQ中移除.

2 )AcknowledgeMode.AUTO

这种模式下,消费者在消息处理成功时会⾃动确认消息,但如果处理过程中抛出了异常,则不会确认消息. 
listener:
simple:
acknowledge-mode: auto #消息接收确认

1)消费者 正常消费情况下 

@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {
//消费者逻辑
long deliverTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
//业务逻辑处理
System.out.println("业务逻辑处理!");
//int num = 3/0;
System.out.println("业务逻辑完成!");
}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/4573347a3a07050690de73431920e10d..png
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/213e7bdd8fac538540bd8f57cb73e4c5..png

2)消费者 异常消费情况下 

@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {
//消费者逻辑
long deliverTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
//业务逻辑处理
System.out.println("业务逻辑处理!");
int num = 3/0;
System.out.println("业务逻辑完成!");
}
}
..........
接收到信息: consumer ack mode test..., deliveryTag: 88
业务逻辑处理!
2024-11-17T15:19:11.420+08:00 WARN 22936 --- [rabbitmq-extensions-demo] [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
接收到信息: consumer ack mode test..., deliveryTag: 89
业务逻辑处理!
2024-11-17T15:19:11.477+08:00 INFO 22936 --- [rabbitmq-extensions-demo] [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2024-11-17T15:19:11.477+08:00 INFO 22936 --- [rabbitmq-extensions-demo] [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/5907719db451696cb5385fbf3864cb08..png

消费者处理异常,会一直重试发送,所有仍然保留在mq中

3)AcknowledgeMode.MANUAL

⼿动确认模式下,消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息.如果消
息未被确认,RabbitMQ会认为消息尚未被成功处理,并且会在消费者可⽤时重新投递该消息,这
种模式提⾼了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,⽽是可以被重新处理.

listener:
simple:
acknowledge-mode: manual#消息接收确认

1)消费者 正常消费情况下

@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handleMessage(Message message, Channel channel) throws Exception {
//消费者逻辑
long deliverTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
//业务逻辑处理
System.out.println("业务逻辑处理!");
//int num = 3/0;
System.out.println("业务逻辑完成!");
//肯定确认
channel.basicAck(deliverTag,false);
} catch (Exception e) {
//否定确认
channel.basicNack(deliverTag,false,true);
}
}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/44745f507973070cb76a01b70cf7bf4d..png
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/64e6cef9b79201d83de529b9d26bcd2e..png

如果不进行确认 又会发送什么?

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/152e4385a3eeae483fb0c7c8d9da40b5..png
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/181845ec33337d6898b8c14c96216299..png
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/df6b31f447ccc6d2bf7db2c01e4ccb47..png

 当我们使用手动确认(manual)的时候,一定要手动添加上肯定确认,不然即使消费者处理成功,也不会进行确认!

 2)消费者 异常消费情况下 

@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void handleMessage(Message message, Channel channel) throws Exception {
//消费者逻辑
long deliverTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.printf("接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
//业务逻辑处理
System.out.println("业务逻辑处理!");
int num = 3/0;
System.out.println("业务逻辑完成!");
//肯定确认
channel.basicAck(deliverTag,false);
} catch (Exception e) {
//否定确认
channel.basicNack(deliverTag,false,true);
}
}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/f477aeb80bd7e78c4a43929be2e7b6c5..png
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/dfe7404da08e7a6723292f507ad8b59a..png

 否定确认完,又会进行重新入队,会变成Ready状态

此时修改为false,不让它入队,会发生什么? 

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/4df1ab8a109c0d3e5485e4ad4555642c..png
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/ab83280da062ef8b810f4b28ef86f8ad..png
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user9/article/cbb8902b2e1798261e13462a7e3eb0ae..png

消费者处理异常,会不停的重试 

使用manual,一定要进行手动确认

总结:

模式 确认方式 可靠性 性能  使用场景
 None 无确认 低,可能丢失消息 高 不关心消息是否成功消费,丢失消息可容忍的场景
 Auto 自动确认
 较低,可能丢失消息 较高 对丢失消息容忍度较高的场景
 Manual 手动确认 高,消息只有成功处理才会确认 较低 需要确保每条消息被成功消费的场景

None 适用于性能要求高,但对消息丢失不敏感的场景。
Auto 适合那些不需要太高消息可靠性的应用,但仍然需要自动化确认机制。
Manual 最适合那些对消息处理的可靠性要求较高,尤其是在出现异常时需要精细控制消息是否重新入队或丢弃的场景。
选择哪种模式取决于你的具体需求,尤其是对于消息可靠性的要求以及系统的性能考虑。

结语: 写博客不仅仅是为了分享学习经历,同时这也有利于我巩固知识点,总结该知识点,由于作者水平有限,对文章有任何问题的还请指出,接受大家的批评,让我改进。同时也希望读者们不吝啬你们的点赞+收藏+关注,你们的鼓励是我创作的最大动力!

原文链接:https://blog.csdn.net/chaodddddd/article/details/143831131

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据资产管理白皮书》下载地址:

《行业指标体系白皮书》下载地址:

《数据治理行业实践白皮书》下载地址:

《数栈V6.0产品白皮书》下载地址:

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群