博客 RabbitMq监听器simple和direct

RabbitMq监听器simple和direct

   数栈君   发表于 2023-09-26 09:47  1165  0

监听器类型 simple,direct区别

1、消费者,channel,connection的关系

首先明确,这里的consumer不是一台消费者机器,而是rabbitMq的最小消费单位,一台机器可以开启多个消费者,一个消费者总是对应一个channel。
一个TCP 被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID,保证了信道的私有性,对应上唯一的线程使用。
也就是rabbitMq采用一个TCP连接处理多个消费者的多线程请求,实际上就是多路复用。
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/fb00d578f20bc5ec45bb4419ab44241e..jpg
  

2、线程模型

simple
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/674c8de6c0cdddbbfd0a6fc0b031c6aa..jpg
  
simple模式每个消费者都有其私有的线程,可以增加消费者,也会自动增加消费线程,不管消费者是不是在处理消息,可能会造成资源线程的浪费。

direct
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d70fc51969a96090ce5098149d32ea05..jpg
  
看的出来direct的线程模型更简单,也因此压力集中在Connection线程池上,线程可以复用与多个消费者,但是如果采用这种模式,需要设置Connection线程池合适的参数。
看起来direct更好一些,那么如何选择

如何选择容器
看官网的说法

选择容器
2.0版本引入了DirectMessageListenerContainer(DMLC)。此前,仅SimpleMessageListenerContainer(SMLC) 可用。SMLC 对每个消费者使用一个内部队列和一个专用线程。如果容器配置为侦听多个队列,则使用同一个消费者线程来处理所有队列。并发控制由concurrentConsumers和其他属性。当消息从 RabbitMQ 客户端到达时,客户端线程通过队列将它们传递给消费者线程。之所以需要这种架构,是因为在 RabbitMQ 客户端的早期版本中,多个并发传送是不可能的。较新版本的客户端具有修订的线程模型,现在可以支持并发。这允许引入 DMLC,现在可以直接在 RabbitMQ 客户端线程上调用侦听器。因此,它的架构实际上比 SMLC“更简单”。然而,这种方法存在一些局限性,并且 DMLC 不具备 SMLC 的某些功能。consumersPerQueue此外,并发性由(以及客户端库的线程池)控制。concurrentConsumers此容器不提供 和关联的属性。
以下功能适用于 SMLC,但不适用于 DMLC:

    ●batchSize:使用SMLC,你可以设置这个来控制一个事务中投递多少条消息或者减少ack的数量,但是可能会导致失败后重复投递的数量增加。(DMLC 确实有,您可以使用它来减少确认,与 SMLCmessagesPerAck相同,但它不能与事务一起使用 - 每个消息都在单独的事务中传递和确认)。batchSize

    ●consumerBatchEnabled:允许在消费者中批量处理批量消息;

    ●maxConcurrentConsumers以及消费者缩放间隔或触发器——DMLC 中没有自动缩放功能。但是,它确实允许您以编程方式更改consumersPerQueue属性,并且消费者也会相应地进行调整。

然而,与 SMLC 相比,DMLC 具有以下优点:

    ●在运行时添加和删除队列的效率更高。使用SMLC,整个消费者线程被重新启动(所有消费者被取消并重新创建)。通过 DMLC,不受影响的消费者不会被取消。

    ●避免了 RabbitMQ 客户端线程和消费者线程之间的上下文切换。

    ●线程在消费者之间共享,而不是为 SMLC 中的每个消费者拥有专用线程

并发配置

消费者数量配置
图中consumer数量可进行配置

simple配置
listener:
    simple:
        concurrency: 50
        maxConcurrency: 100

支持可伸缩的配置,根据前面的线程模型,我们知道simple配置的并发数实际上也是消费线程的数量。

direct配置
direct:
    consumersPerQueue: 50 #每个队列消费者数量

根据前面的线程模型,使用direct模式需要设置合理的连接线程池,因为连接线程池还需要进行业务逻辑的处理,配置如下

    @Bean(name = "connectionFactory")
        @Primary
        public ConnectionFactory connectionFactory(
                        @Value("${spring.rabbitmq.host}") String host,
                        @Value("${spring.rabbitmq.port}") int port,
                        @Value("${spring.rabbitmq.username}") String username,
                        @Value("${spring.rabbitmq.password}") String password) {
                CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
                connectionFactory.setHost(host);
                connectionFactory.setPort(port);
                connectionFactory.setUsername(username);
                connectionFactory.setPassword(password);
                connectionFactory.setExecutor(createThreadPool(coreSize,
                                maxSize,
                                "mq-",
                                "mq-connection-group"));
                return connectionFactory;
    }    


配置后,rabbitMq控制台有会体现出来
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/3b877d8987acd42378c8066590fd383c..jpg
  

批量消费

首先要明白批量消费的意义,消费者可以批处理,还可以批量确认。减少ack次数

simple配置
listener:
    simple:
        batch-size: 10
        acknowledge-mode: auto
        consumer-batch-enabled: true

配置批量消费

@Primary
public SimpleRabbitListenerContainerFactory normalFactory(
    SimpleRabbitListenerContainerFactoryConfigurer configurer,
    @Qualifier("connectionFactory") ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setBatchListener(true);
    configurer.configure(factory, connectionFactory);

    return factory;
}

监听类

@RabbitListener(queues = "QUEUE_DEMO_DIRECT")
    public void ListenerQueue01(List<Message> message, Channel channel) throws IOException, InterruptedException {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }

这样在消息足够的情况下
List中就是10条消息。但是只需要确认最后一条消息就好了
原理
rabbitmq支持批量确认
channel.basicAck(deliveryTag,multiple)
在RabbitMQ中,channel.basicAck方法用于确认已经接收并处理了消息。该方法有两个参数:

    1、deliveryTag:表示消息的唯一标识。每个消息都有一个唯一的deliveryTag,用于标识消息在channel中的顺序。当消费者接收到消息后,需要调用channel.basicAck方法并传递deliveryTag来确认消息的处理。
    2、 multiple:表示是否批量确认消息。当multiple为false时,只确认当前deliveryTag对应的消息;当multiple为true时,会确认当前deliveryTag及之前所有未确认的消息。
direct配置
看过前面我们知道DirectMessageListenerContainer并不像SimpleMessageListenerContainer能够支持批量消息,但是其支持一个参数
messagesPerAck,也可以在处理多少个消息之后进行ack,减少ack次数。

流量控制
首先要了解rabbitMq的channel.basicQos方法

    /**
     * Request a specific prefetchCount "quality of service" settings
     * for this channel.
     * <p>
     * Note the prefetch count must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
     *
     * @param prefetchCount maximum number of messages that the server
     * will deliver, 0 if unlimited
     * @param global true if the settings should be applied to the
     * entire channel rather than each consumer
     * @throws java.io.IOException if an error is encountered
     * @see #basicQos(int, int, boolean)
     */
    void basicQos(int prefetchCount, boolean global) throws IOException;

pprefetchCount,服务器一次请求将传递的最大消息数,如果没有限制,则为0。调用此方法时,该值必填。默认值:0,简单说就是一个消费者的最大处理消息数,broker服务器发现一个消费者还有prefetchCount个消息未ack,那么就不会再给它发送消息。防止消息堆积
global,是否将设置应用于整个频道,而不是每个消费者

    ●默认值:false,应用于本身(一个消费者)

    ●true:应用于整个频道

direct和simple配置字段相同都是prefetchCount
注意在simple模式下如果prefetchCount配置小于batchSize,那么prefetchCount就会被batchSize覆盖。
如果在direct模式下prefetchCount配置小于messagesPerAck,那么prefetchCount就会被messagesPerAck覆盖。

重试配置
rabbitMq的消费端的重试机制指的是本地重试。

spring:
    rabbitmq:
        listener:
            simple:
                retry:
                    enabled: true # 开启消费者失败重试
                    initial-interval: 1000 # 初识的失败等待时长为1秒
                    multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
                    max-attempts: 3 # 最大重试次数
                    stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:
在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了
结论:

    ●开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试

    ●重试达到最大次数后,Spring会返回ack,默认消息会被丢弃

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

    ●RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

    ●ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

    ●RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机






免责申明:


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群