●consumerBatchEnabled:允许在消费者中批量处理批量消息;
●maxConcurrentConsumers以及消费者缩放间隔或触发器——DMLC 中没有自动缩放功能。但是,它确实允许您以编程方式更改consumersPerQueue属性,并且消费者也会相应地进行调整。
然而,与 SMLC 相比,DMLC 具有以下优点:
●在运行时添加和删除队列的效率更高。使用SMLC,整个消费者线程被重新启动(所有消费者被取消并重新创建)。通过 DMLC,不受影响的消费者不会被取消。
●避免了 RabbitMQ 客户端线程和消费者线程之间的上下文切换。
●线程在消费者之间共享,而不是为 SMLC 中的每个消费者拥有专用线程
listener:
simple:
concurrency: 50
maxConcurrency: 100
direct:
consumersPerQueue: 50 #每个队列消费者数量
@Bean(name = "connectionFactory")
@Primary
public ConnectionFactory connectionFactory(
@Value("${spring.rabbitmq.host}") String host,
@Value("${spring.rabbitmq.port}") int port,
@Value("${spring.rabbitmq.username}") String username,
@Value("${spring.rabbitmq.password}") String password) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setExecutor(createThreadPool(coreSize,
maxSize,
"mq-",
"mq-connection-group"));
return connectionFactory;
}
listener:
simple:
batch-size: 10
acknowledge-mode: auto
consumer-batch-enabled: true
@Primary
public SimpleRabbitListenerContainerFactory normalFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("connectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setBatchListener(true);
configurer.configure(factory, connectionFactory);
return factory;
}
@RabbitListener(queues = "QUEUE_DEMO_DIRECT")
public void ListenerQueue01(List<Message> message, Channel channel) throws IOException, InterruptedException {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
/**
* Request a specific prefetchCount "quality of service" settings
* for this channel.
* <p>
* Note the prefetch count must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
*
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @param global true if the settings should be applied to the
* entire channel rather than each consumer
* @throws java.io.IOException if an error is encountered
* @see #basicQos(int, int, boolean)
*/
void basicQos(int prefetchCount, boolean global) throws IOException;
●true:应用于整个频道
direct和simple配置字段相同都是prefetchCount
注意在simple模式下如果prefetchCount配置小于batchSize,那么prefetchCount就会被batchSize覆盖。
如果在direct模式下prefetchCount配置小于messagesPerAck,那么prefetchCount就会被messagesPerAck覆盖。
重试配置
rabbitMq的消费端的重试机制指的是本地重试。
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
●重试达到最大次数后,Spring会返回ack,默认消息会被丢弃
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
●RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
●ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
●RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack