博客 一文带你理解Kafka的Header

一文带你理解Kafka的Header

   数栈君   发表于 2023-08-10 10:58  768  0

Header简介

            

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/4e24adb7e99691f000f8e40fca5e0ded..jpg
  

Kafka从 0.11.0.0 版本开始提供了一种在生产者和消费者之间传递元数据的机制,叫做 Kafka header。使用这个机制,你可以在消息中添加一些与数据内容无关的附加信息,如消息的来源、类型、版本、生产时间、过期时间、分区数、用户 ID 等等。
Kafka header 是由一个或多个键值对组成的列表,每个键值对都称为 header。 消息可以包含零个或多个 header。
下面是一些简单的理解 Kafka header 的方式:

    ●Kafka header 可以看作是消息的元数据,因为它们不包含实际可用的消息负载。

    ●Kafka header 的作用类似于 HTTP 或者 TCP/IP 协议中的 header 头部,在消息中添加一些描述性信息,方便消费者解析和处理消息。

    ●Kafka header 的使用并不是强制性的。你完全可以不使用它们,只发送负载数据。

    ●Kafka header 不同于消息的 key 和 value,因为它们与数据的生命周期无关。

Header使用场景

1.消息追踪

通过在Header中添加一个全局唯一的ID,可以跟踪消息在整个应用系统中的传递轨迹。

ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", "myvalue");
record.headers().add("messageId", UUID.randomUUID().toString().getBytes());
producer.send(record);

在消费消息的时候,可以获取到消息的KafkaHeader,并从中提取出消息ID进行追踪:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    Headers headers = record.headers();
    String messageId = new String(headers.lastHeader("messageId").value());
    System.out.println("Received message with ID: " + messageId);
}

2.消息路由

Header可以为消息添加一个路由键,在消息传递的过程中,可以根据这个路由键进行消息的路由。

ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", "myvalue");
record.headers().add("routeKey", "myroutekey".getBytes());
producer.send(record);

在消费消息的时候,消费者可以根据路由键过滤出需要消费的消息:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    Headers headers = record.headers();
    String routeKey = new String(headers.lastHeader("routeKey").value());
    if (routeKey.equals("myroutekey")) {
        //process the message
    }
}

3.传递消息元数据

Header可以在消息传递过程中携带一些重要的元数据,这些元数据可以用于解释消息的内容或者处理方式。

ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", "myvalue");
record.headers().add("content-type", "text/plain".getBytes());
producer.send(record);

在消费消息的时候,可以从KafkaHeader中提取出content-type元数据,来解释消息的内容格式:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    Headers headers = record.headers();
    String contentType = new String(headers.lastHeader("content-type").value());
    if (contentType.equals("text/plain")) {
        //process the message
    }
}


Header设计思路

KafkaHeader的设计思路是基于Kafka的消息体系结构设计的,消息体系结构包括了消息的消息体、消息头和消息尾。KafkaHeader被设计成一个可扩展的消息头,可以为消息添加一些有用的元数据。KafkaHeader的键使用字符串表示,并且值可以是任何字节序列。

KafkaHeader还可以使用基于优先级的机制,覆盖或者添加KafkaHeader。当发送相同主题和分区的消息时,新的KafkaHeader将覆盖旧的KafkaHeader。

KafkaProducer和KafkaConsumer都提供了API,用于访问和修改KafkaHeader。其中KafkaProducer提供的API可以为将要发送到Kafka服务器的消息添加或者删除KafkaHeader。而KafkaConsumer提供的API可以用于从接收到的消息中提取KafkaHeader,并对消息进行分析和处理。

Header的性能影响和注意事项

    1.由于KafkaHeader是在消息中添加的元数据,因此在为消息添加KafkaHeader时需要注意以下几点:

    2.注意KafkaHeader的大小:KafkaHeader的大小会影响网络传输的性能,因此在添加KafkaHeader时需要权衡添加元数据的重要程度和通信性能的需要。

    3.注意KafkaHeader支持的数据类型:KafkaHeader支持的数据类型包括了任何字节序列,因此在添加KafkaHeader时需要确保添加的元数据可以正确地解析和处理。

    4.注意KafkaHeader的覆盖机制:当发送相同主题和分区的消息时,新的KafkaHeader会覆盖旧的KafkaHeader,因此需要注意不要在消息发送过程中出现意外丢失KafkaHeader的情况。

注意KafkaHeader的存在:KafkaHeader虽然可以为消息提供有用的元数据,但是当使用者处理消息时需要确保消息本身是唯一的标识,而不是KafkaHeader。

本文总结

本文介绍了 Kafka 中的 header,它可以用来传递一些与数据内容无关的附加信息,方便消费者解析和处理消息。与数据的 key、value 不同,header 不包含实际的消息内容,它们只是元数据,不影响消息的生命周期。在生产者和消费者中都可以使用 Kafka header,可以通过 API 来添加、读取和操作 header。


免责申明:

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
下一篇:
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群