<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>
package com.pany.camp.kafka;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
*
* @description: Kafka 消费者
* @copyright: @Copyright (c) 2022
* @company: Aiocloud
* @author: pany
* @version: 1.0.0
* @createTime: 2023-06-26 18:04
*/
public class KafkaConsumerExample {
private static final String TOPIC_NAME = "test-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d\n",
record.key(), record.value(), record.partition(), record.offset());
});
}
}
}
package com.pany.camp.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
*
* @description: kafka 消费者
* @copyright: @Copyright (c) 2022
* @company: Aiocloud
* @author: pany
* @version: 1.0.0
* @createTime: 2023-07-14 8:31
*/
@Component
public class kafkaConsumerListenerExample {
@KafkaListener(topics = "your_topic_name", groupId = "your_consumer_group_id")
public void consume(ConsumerRecord<?, ?> record) {
Optional<?> value = Optional.ofNullable(record.value());
// 进行消息处理逻辑
System.out.println("print message: " + value);
}
}
●在应用程序启动时,注解处理器会扫描所有的 Bean,查找带有 KafkaListener 注解的方法,并为每个方法创建一个 KafkaMessageListenerContainer 对象。
●注解处理器会解析注解上的参数,例如指定的 Kafka 主题、消费者组 ID、反序列化器等。
2. 创建 Kafka 消费者:
●KafkaMessageListenerContainer 对象负责创建和管理底层的 Kafka 消费者。
●根据注解上的参数,KafkaMessageListenerContainer 会创建一个或多个 Kafka 消费者,并配置消费者的相关属性。
●消费者的创建和配置是通过 Apache Kafka 客户端库实现的,Spring Kafka 提供了对客户端库的封装和集成。
3. 消费 Kafka 消息:
●KafkaMessageListenerContainer 对象会启动 Kafka 消费者,开始轮询 Kafka 服务器以获取新的消息。
●当有消息到达时,消费者会将消息传递给带有 KafkaListener 注解的方法进行处理。
●方法可以根据业务逻辑进行相应的处理,例如解析消息内容、进行数据处理、调用其他服务等。
●处理完成后,可以选择确认消息的消费,或者抛出异常以触发重试机制。
4. 并发处理消息:
●KafkaMessageListenerContainer 支持并发处理消息,可以通过配置 concurrency 参数来指定处理消息的线程数。
●如果设置了并发处理,KafkaMessageListenerContainer 会为每个线程创建一个独立的 Kafka 消费者,并通过分区分配策略将消息分配给不同的线程进行处理。
●这样可以提高消息处理的吞吐量。
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack