bin\windows\kafka-server-start.bat config\server.properties
org.apache.kafka
kafka-clients
public static void main(String[] args) throws InterruptedException {
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.ACKS_CONFIG, "all");
prop.put(ProducerConfig.RETRIES_CONFIG, 0);
prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
prop.put(ProducerConfig.LINGER_MS_CONFIG, 1);
prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
String topic = "hello";
KafkaProducer producer = new KafkaProducer<>(prop);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord(topic, Integer.toString(2), "hello kafka" + i));
System.out.println("生产消息:" + i);
Thread.sleep(1000);
}
producer.close();
},>,>
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "con-1"); // 消费者组
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); //自动提交偏移量
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); //自动提交时间
KafkaConsumer consumer = new KafkaConsumer<>(prop);
ArrayList topics = new ArrayList<>();
//可以订阅多个消息
topics.add("hello");
consumer.subscribe(topics);
try {
while(true) {
ConsumerRecords poll = consumer.poll(Duration.ofSeconds(10));
for (TopicPartition topicPartition : poll.partitions()) {
// 通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息
List> partitionRecords = poll.records(topicPartition);
// 获取TopicPartition对应的主题名称
String topic = topicPartition.topic();
// 获取TopicPartition对应的分区位置
int partition = topicPartition.partition();
// 获取当前TopicPartition下的消息条数
int size = partitionRecords.size();
System.out.printf("--- 获取topic: %s, 分区位置:%s, 消息总数: %s%n",
topic,
partition,
size);
for(int i = 0; i < size; i++) {
ConsumerRecord consumerRecord = partitionRecords.get(i);
// 实际的数据内容
String key = consumerRecord.key();
// 实际的数据内容
String value = consumerRecord.value();
// 当前获取的消息偏移量
long offset = consumerRecord.offset();
// 表示下一次从什么位置(offset)拉取消息
long commitOffser = offset + 1;
System.out.printf("消费消息 key:%s, value:%s, 消息offset: %s, 提交offset: %s%n",
key, value, offset, commitOffser);
Thread.sleep(1500);
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
},>,>,>
生产消息
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs
《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack