●Kafka header 的作用类似于 HTTP 或者 TCP/IP 协议中的 header 头部,在消息中添加一些描述性信息,方便消费者解析和处理消息。
●Kafka header 的使用并不是强制性的。你完全可以不使用它们,只发送负载数据。
●Kafka header 不同于消息的 key 和 value,因为它们与数据的生命周期无关。
通过在Header中添加一个全局唯一的ID,可以跟踪消息在整个应用系统中的传递轨迹。
ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", "myvalue");
record.headers().add("messageId", UUID.randomUUID().toString().getBytes());
producer.send(record);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
Headers headers = record.headers();
String messageId = new String(headers.lastHeader("messageId").value());
System.out.println("Received message with ID: " + messageId);
}
Header可以为消息添加一个路由键,在消息传递的过程中,可以根据这个路由键进行消息的路由。
ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", "myvalue");
record.headers().add("routeKey", "myroutekey".getBytes());
producer.send(record);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
Headers headers = record.headers();
String routeKey = new String(headers.lastHeader("routeKey").value());
if (routeKey.equals("myroutekey")) {
//process the message
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", "myvalue");
record.headers().add("content-type", "text/plain".getBytes());
producer.send(record);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
Headers headers = record.headers();
String contentType = new String(headers.lastHeader("content-type").value());
if (contentType.equals("text/plain")) {
//process the message
}
}
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack