博客 Kafka学习-Java使用Kafka

Kafka学习-Java使用Kafka

   数栈君   发表于 2025-01-14 14:32  151  0

一、Kafka

1、什么是消息队列

假设我们有两个服务:生产者A每秒能生产200个消息,消费者B每秒能消费100个消息。
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d5dedcfb7b003f08fab00364a4e44141..png



那么B服务是处理不了A这么多消息的,那么怎么使B不被压垮的同时还能处理A的消息呢,我们引入一个中间件,即Kafka。(当然着并不能使消费者的处理速度上升)
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/67e52b9139897b075df24aa93eac6ec1..png


offset
那么我们可以在B服务中加入一个队列,也就是一个链表,链表的每个节点相当于一条消息,每个节点有一个序号即offset,记录消息的位置。
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/3fe777cb311f0c587b4ae1774c86cad3..png



http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/ea6fd3032baab680d05a7a6ca2de0dcc..png


但是这样也会有个问题,还没有处理的消息是存储在内存中的,如果B服务挂掉,那么消息也就丢失了。
所以我们可以把队列移出,变成一个单独的进程,即使B服务挂掉,消息也不会丢失。
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/dd12cdb7a1c9423dfc16fc36fb5de256..png



2、高性能

B服务由于性能差,队列中未处理的消息会越来越多,我们可以增加更多的消费者来处理消息,相对的也可以增加更多的生产者来生成消息。
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/ec7f6b9b4dc842465187405a1c68fa8b..png



topic
但是,生产者与消费者会争抢同一个队列,没有抢到就要等待,那么怎么解决呢?
我们可以将消息进行分类,每一类消息是一个topic,生产者按消息的类型投递到不同的topic中,消费者也按照不同的topic进行消费。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/f3f4eebf19a320c874df8be859804fda..png


partition
但是单个topic的消息还是有可能过多,我们可以将单个队列拆分,每段是一个partition分区,每个消费者负责一个partition。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/db4208ed652157901408f70915e7f4b1..png


3、高扩展

broker
随着partition过多,所有的partition都在同一个机器上,就可能会导致单机的cpu和内存过高,影响性能,那么我们可以使用多台机器,将partition分散部署在不同的机器上。每台机器就代表一个broker。
我们可以增加broker来缓解服务器的cpu过高的性能问题。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/4d3b3d7dcb387e39896a1327cb389e72..png


4、高可用

replicas、leader、follower
假如某个broker挂了, 那么其中partition中的消息也就都丢失了,那么这个问题怎么解决呢?
我们可以给partition多加几个副本,统称replicas,并将它们分为leader和follower。
leader负责生产者和消费者的读写,follower只负责同步leader的数据。假如leader挂了,也不会影响follower,随后在follower中选出一个leader,保证消息队列的高可用。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/9315dc61e134a7e30901090295bef133..png


5、持久化和过期策略

在上面讲述了leader挂掉的情况,如果所有的broker都挂了,消息不就都丢失了?
为了解决这个问题,就不能只把数据存在内存中,也要存在磁盘中。
但是如果所有消息一直保存在磁盘中,那磁盘也会被占满,所以引入保留策略。

6、消费者组

如果我想在原有的基础上增加一个消费者,那么它只能跟着最新的offset接着消费,如果我想从某个offset开始消费呢?
我们引入消费者组,实现不同消费者维护自己的消费进度。
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/fdd940ae136588a4c23891ea1596af00..png



7、Zookeeper

上面介绍了很多的组件,每个组件都有自己的状态信息,那么就需要有一个组件去统一维护这些组件的状态信息,于是引入了Zookeeper组件,它会定期与broker通信,获取Kafka集群的状态,判断哪些broker挂了,消费者组消费到哪了等等。

8、架构图

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/41fad2a157118350a462575969fc0294..png


二、安装Zookeeper

1、官网地址


https://zookeeper.apache.org/

2、下载


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/928aaaa49869df82669b15ad1a1497ed..png


选择稳定版本下载


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/9ff2fc4795e8e89c4fc0458853b53d5b..png



3、解压,修改配置文件


解压后,复制 zoo_sample.cfg,重命名为 zoo.cfg

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/5e4ac7dab003b8f802e46fa17ff541e1..png


修改数据文件目录位置


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/0d207f1ef196cabaf4e231166ac6d3ae..png



4、启动


我们是在windows系统下安装的,运行 bin 目录下的 zkServer.cmd

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/87a92303a07d3d65385d9c878951f956..png



三、安装Kafka

1、官网地址


https://kafka.apache.org/

2、下载

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/30f9c332795d38d4ef511e40fe539513..png



3、解压,修改配置文件


修改 config 目录下 server.properties 文件
修改日志文件位置,其他参数(如zookeeper端口,根据需要修改)

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/dcf2f9618e33811b53f9139904738f88..png

4、启动

bin\windows\kafka-server-start.bat config\server.properties

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/0824186b6fee58fad74435d373a84227..png


四、Java中使用Kafka

1、引入依赖


org.apache.kafka
kafka-clients

2、生产者

public static void main(String[] args) throws InterruptedException {
Properties prop = new Properties();

prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.ACKS_CONFIG, "all");
prop.put(ProducerConfig.RETRIES_CONFIG, 0);
prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
prop.put(ProducerConfig.LINGER_MS_CONFIG, 1);
prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

String topic = "hello";

KafkaProducer producer = new KafkaProducer<>(prop);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord(topic, Integer.toString(2), "hello kafka" + i));
System.out.println("生产消息:" + i);
Thread.sleep(1000);
}
producer.close();
},>,>

3、消费者

public static void main(String[] args) {
Properties prop = new Properties();

prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "con-1"); // 消费者组
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); //自动提交偏移量
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); //自动提交时间

KafkaConsumer consumer = new KafkaConsumer<>(prop);
ArrayList topics = new ArrayList<>();
//可以订阅多个消息
topics.add("hello");
consumer.subscribe(topics);

try {
while(true) {
ConsumerRecords poll = consumer.poll(Duration.ofSeconds(10));
for (TopicPartition topicPartition : poll.partitions()) {

// 通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息
List> partitionRecords = poll.records(topicPartition);

// 获取TopicPartition对应的主题名称
String topic = topicPartition.topic();
// 获取TopicPartition对应的分区位置
int partition = topicPartition.partition();
// 获取当前TopicPartition下的消息条数
int size = partitionRecords.size();
System.out.printf("--- 获取topic: %s, 分区位置:%s, 消息总数: %s%n",
topic,
partition,
size);

for(int i = 0; i < size; i++) {
ConsumerRecord consumerRecord = partitionRecords.get(i);
// 实际的数据内容
String key = consumerRecord.key();
// 实际的数据内容
String value = consumerRecord.value();
// 当前获取的消息偏移量
long offset = consumerRecord.offset();
// 表示下一次从什么位置(offset)拉取消息
long commitOffser = offset + 1;
System.out.printf("消费消息 key:%s, value:%s, 消息offset: %s, 提交offset: %s%n",
key, value, offset, commitOffser);
Thread.sleep(1500);
}

}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
},>,>,>

4、运行效果

生产消息


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/46be465bb7ae3c2437baac7575e31343..png


消费消息


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/501d3e13186e6b8ce04060f8aa9ff93b..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/b17e2eb91428002ec0349723829fd683..png

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs

《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs

《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群