@KafkaListener(id = "eventConsumer", topics = "perception_event", groupId = "defaultConsumerGroup", containerFactory = "kafkaListenerContainerFactory")
public void consume(List<ConsumerRecord<String, String>> consumerRecordList) {
.......
}
/**
* 消费失败消息最大重试15次,存入到死信队列中
*
* @param configurer kafkaConsumerFactory kafkaTemplate
* @return factory
*/
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<Object, Object> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
//最大重试15次
RetryingBatchErrorHandler retryingBatchErrorHandler = new RetryingBatchErrorHandler(new FixedBackOff(500L, 15L),
createConsumerRecordRecoverer());
factory.setBatchErrorHandler(retryingBatchErrorHandler);
return factory;
}
/**
* 最终消费失败打印日志即可
*/
private ConsumerRecordRecoverer createConsumerRecordRecoverer() {
return (consumerRecord, exception) -> {
log.error("consumer event last fail:{}, exception:{}", SensitiveUtils.phone(consumerRecord.toString()), exception.toString());
};
}
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack