博客 Kafka系列之消息重新消费

Kafka系列之消息重新消费

   数栈君   发表于 2023-08-09 10:49  195  0

概述

需求来源,在review前人留下的屎山代码时发现如下截图所示的代码片段:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/083f1705839820abfefd64794cca91b1..png
  
也就是说代码是空实现的。另外,从类名定义也知道需求未实现。

于是有此需求:已经消费过的消息重新消费。

调研

调研下来,主要有以下3种可能性方案

实现方案

修改偏移量,即offset,可通过脚本快速实现
新增Group,这将使 Kafka 认为您正在使用一个新的消费者组,并从起始偏移量开始重新消费消息。需通过代码实现
消息产生者重新发送消息,实际业务中不太常见,也不现实
Kafka的偏移量的保存方式,根据不同版本号有3种方式:保存在zookeeper中、保存在kafka的自带_consumer_offset这个topic中、保存在自定义的存储系统中。

版本

首先需要知道Kafka什么版本,找到Kafka的安装目录。可以通过find命令:find / -name '*kafka*'。
根据命令输出得知安装目录为:/usr/local/kafka,进入到libs目录,发现很多kafka_2.11-2.2.0.*文件。其中,2.11为scala版本,2.2.0为kafka版本。

脚本

使用Kafka自带的bin目录下的kafka-consumer-groups.sh脚本设置消费者组(consumer group)的位移, 这是0.11.0.0版本提供的新功能且只适用于新版本consumer。在此版本之前,如果要为已有的consumer group调整位移必须要手动编写Java程序调用KafkaConsumer.seek()方法。

使用此脚本可修改consumer group的位移,有个前提:consumer group必须是inactive的,即不能是处于正在工作中的状态。

格式:
./kafka-consumer-groups.sh --bootstrap-server ip:9092 --group <> --topic <topic> --reset-offsets <offset_option>

如:./kafka-consumer-groups.sh --bootstrap-server 172.100.200.200:9092 --group collect_data_business_service --topic topic_event:0 --reset-offsets --to-offset 195725 --execute

topic:指定topic的作用域

--all-topics:为consumer group下所有topic的所有分区调整位移
--topic t1 --topic t2:为指定的若干个topic的所有分区调整位移
--topic t1:0,1,2:为指定的topic的某个分区调整位移
reset-offsets:确定位移重设策略

--to-earliest:把位移调整到分区当前最小位移
--to-latest:把位移调整到分区当前最新位移
--to-current:把位移调整到分区当前位移
--to-offset <offset>:把位移调整到指定位移处
--shift-by N:把位移调整到当前位移 + N处,N为负数表示向前移动
--to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式yyyy-MM-ddTHH:mm:ss.xxx
--by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,如PT0H5M0S
--from-file <file>:从CSV文件中读取调整策略

常见报错:

Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
原因:命令指定的IP有误,或端口不对,或未开放等原因导致连接超时
Error: Assignments can only be reset if the group ‘collect_data_business_service’ is inactive, but the current state is Stable.
原因:在修改offset时,需要停止消费者应用程序,如杀掉Java进程,停止container容器等
WARN New offset (0) is lower than earliest offset for topic partition topic_event-0. Value will be set to 195551 (kafka.admin.ConsumerGroupCommand$)
原因:

代码

也可以通过代码的方式,指定一个新的消费者组,即group.id。

@Override
public void afterPropertiesSet() throws Exception {
    Properties p = new Properties();
    p.setProperty("bootstrap.servers", "172.100.200.200:9092");
    p.setProperty("group.id", "collect_data_business_service");
    p.setProperty("auto.offset.reset", "earliest");
    p.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    p.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
   
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(p);
    consumer.subscribe(Collections.singleton("topic_event"));
    consumer.poll(Duration.ofSeconds(1));
    consumer.seekToBeginning(consumer.assignment());
    while (true) {
        ConsumerRecords<String, String> message = consumer.poll(Duration.ofSeconds(1));
        log.info("topic_event---collect_data_business_service--消费消息:" + message);
        CollectDataV2Content content = JsonUtil.jsonToBean(message.toString(), CollectDataV2Content.class);
        service.saveContent(content);
    }
}

配置:

auto.offset.reset:只能配置以下3种情况:latest, earliest, none,Spring-Kafka应用启动时会检查此项配置,不正确的配置会报错,应用启动失败
key.deserializer:必须配置,否则报错:Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value.,应用启动失败。
value.deserializer:同上



免责申明:

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群