博客 【Kafka】从kafka中读取最新数据

【Kafka】从kafka中读取最新数据

   数栈君   发表于 2024-01-18 10:57  1128  0

一、死循环无限拉取kafka数据

1.1 整体框架剖析

1、要想从Kafka中读取数据,就要先对消费者信息进行配置

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/168f5ae5cb6cc1762e13a70bc87d103f..png

2、消费者基本配置信息完成以后,创建消费者、订阅主题、为了后面的消费 

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/f30788afa0037670836a336c76a04b0c..png

3、订阅主题,就相当于已经订阅了kafka中的消息,下一步就是消费。而kafka消费消息的方式是poll拉取,我们这里对kafka中的数据进行消费,上面我们选择了自动提交offset,那么每次offset就是在上一次消费完成以后的最新位置,所以我们接下来的每次消费得到的都是最新未消费的数据!

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/56f9b1b85ec3d56e589e4e84f2cd65d0..png

1.2 测试

方法一:

1、创建MyConsumer1类,根据上面整体结构的剖析,添加如下代码,并进行测试。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/825498500a8b05e923e6f2d59cacfec5..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/22bdc780eeed1d18a1b50461f0d0ce11..png

方法二:
2、创建MyConsumer2类,根据上面整体结构的剖析,添加如下代码,并进行测试。 

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/905415da205f134cccdc03c2f61838d0..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/9069e11c837adb97f81f60e2fe454f8a..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/4265c4719e44177af7b4251b73f11464..png

注意:

方式一、方式二只是写法上的不同,整体架构都是一样的,任选其一来写即可。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/25d88a7c359094b5828a594fbb66a2af..png

至此,从kafka中读取最新数据的流程就全部结束了。

二、@KafkaListener注解 实现监听kafka数据

1、导入依赖

【我这里SpringBoot版本是2.2.13】

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/2eaf378cd941194d2d6617d22e6738a8..png

注意:

1、springboot +2、kafka-clients +3、spring-kafka(在下图中体现为Sprig for Apache Kafka Version) 这三个 要注意版本对应。具体对应情况如下图所示:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/6be481d95a2ace6ddd9f3a8e67896bb5..png

2、配置文件

application.yml文件中添加如下内容:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/ed4c8e79bfaee8d3dad43037de2b5723..png

3、创建MyConsumer类,添加如下内容:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/97e3e8f09d167bb3868a782720b1a10f..png

4、测试

运行主启动类,会自动进行监听且在程序运行的过程中将数据输出。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/044b86af656564f4fb2cbc4c0409adfa..png
 

————————————————
版权声明:本文为CSDN博主「努力搬砖的猪头」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/junR_980218/article/details/126343195 

免责申明:

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!  

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack      

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群