博客 Kafka的数据存储

Kafka的数据存储

   数栈君   发表于 2023-12-01 09:42  499  0

前言

  不同于Redis和Memcache等内存消息队列,Kafka的设计是把所有的Message都要写入速度低容量大的硬盘,以此来换取更强的存储能力。实际上,Kafka使用硬盘并没有带来过多的性能损失。Kafka在磁盘上只做Sequence I/O。首先,Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache,同时标记Page属性为Dirty,当读写操作发生时,先从PageCache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用,同时如果有其他进程申请内存,回收PageCache的代价又很小。
  使用PageCache功能同时可以避免在JVM内部缓存数据以及JVM的GC问题。如果在Heap内管理缓存,JVM的GC线程会频繁扫描Heap空间,带来不必要的开销。如果Heap过大,执行一次Full GC对系统的可用性来说将是极大的挑战。所有在JVM内的对象都不免带有一个Object Overhead,内存的有效空间利用率会因此降低。所有的In-Process Cache在OS中都有一份同样的PageCache,所以通过将缓存只放在PageCache,可以至少让可用缓存空间翻倍。如果Kafka重启,所有的In-Process Cache都会失效,而OS管理的pageCache依然可以继续使用。

除了PageCache,Kafka还采用了SendFile技术,具体参见:Kafka高吞吐低延迟原理

- Topic
- Partition:一个topic可以分为多个partition
- Segment:每个partition又由多个segment file组成
- offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中
partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息
- message:kafka文件中最小的存储单位


  在Kafka文件存储中,同一个Topic下有多个不同partition,每个partition为一个目录,partition命名规则为Topic名称+有序序号,第一个partition序号从0开始序号最大值为partition数量减1。topic与partition的关系如下:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/2dcb6cdc1dedba02692ece8e54c4ec93..png

  每个partition(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。默认保留7天的数据。每个partition只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。
  Segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀“.index”和“.log”分别表示为segment索引文件、数据文件。Segment文件命名规则:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消费的offset值。
Kafka数据的存储方式

segment中的文件
  对于一个partition里面有很多大小相等的segment数据文件(文件的大小可以在config/server.properties中进行设置),这种特性可以方便old segment file的快速删除。partition中的segment file的组成:index file和data file,这两个文件是一一对应的。索引文件和数据文件对应关系如图:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/520d754b663ec48e2226fd744c3f59a8..png

  segment的索引文件中存储着大量的元数据,数据文件中存储着大量消息,索引文件中的元数据指向对应数据文件中的message的物理偏移地址。以索引文件中的3,497为例,在数据文件中表示第3个message(在全局partition表示第368772个message),以及该消息的物理偏移地址为497。

注:Partition中的每条message由offset来表示它在这个partition中的偏移量,这个offset并不是该Message在partition中实际存储位置,而是逻辑上的一个值,但它却唯一确定了partition中的一条Message(可以认为offset是partition中Message的id)。

message文件
  一个Kafka的Message由一个固定长度的header和一个变长的消息体body组成:

header由一个字节的magic(文件格式)和四个字节的CRC32(用于判断body消息体是否正常)构成。当magic的值为1的时候,会在magic和crc32之间多一个字节的数据:attributes(保存一些相关属性,比如是否压缩、压缩格式等)
body是由N个字节构成的一个消息体,包含了具体的key/value消息

message中的物理结构如图所示:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/819386fa7d6dfb13c33928b16d0d3ebc..png

参数说明:
关键字 解释说明
offset 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
message size message大小
CRC32 用crc32校验message
“magic” 表示本次发布Kafka服务程序协议版本号
“attributes” 表示为独立版本、或标识压缩类型、或编码类型
key length 表示key的长度,当key为-1时,K byte key字段不填
key 可选
value bytes payload 表示实际消息数据
Kafka如何通过offset查找message

Kafka在内部通过分段和索引解决查找效率低下的问题。

数据文件的分段
  例如加入有100条message,它们的offset是从0到99,假设将数据文件分为5段,第一段为0-19,第二段为20-39,依次类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪一段中。

为数据文件建索引
  数据文件分段使得可以在一个较小的数据文件中查找对应offset的message了,但是这依然需要顺序扫描才能找到对应offset的message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件。
  索引文件中包含若干个索引条目,每个条目表示数据文件中一条message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。

相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对offset表示这条message相对于其所属数据文件中最小的offset的大小,存储相对offset可以减小索引文件占用的空间
position:表示该条message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的message了

通过offset查找message

查找segment file
因为文件以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件
通过segment file查找message
通过第一步定位到segment file,然后依次定位到.index的元数据物理位置和.log的物理偏移地址,然后再通过.log顺序查找直到目标offset为止

  segment index file并没有为数据文件中的每条message建立索引,而是采取稀疏索引存储方式,每隔一定字节的数据建立一条索引,它减少了索引文件大小,通过map可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。

`Kafka高效文件存储设计特点:`
- Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,
就容易定期清除已经消费完文件,减少磁盘占用
- 通过索引信息可以快速定位message和确定response的最大大小
- 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作



Kafka 是如何清理过期数据的?
日志保存清理策略
属性名 含义 默认值
log.cleanup.policy 日志清理保存的策略只有delete和compact两种 delete
log.retention.hours 日志保存的时间,可以选择hours,minutes和ms 168(7day)
log.retention.bytes 删除前日志文件允许保存的最大值 -1
log.segment.delete.delay.ms 日志文件被真正删除前的保留时间 60000
log.cleanup.interval.mins 每隔一段时间多久调用一次清理的步骤 10
log.retention.check.interval.ms 周期性检查是否有日志符合删除的条件(新版本使用) 300000

  Kafka 的日志实际上是以日志的方式默认保存在/kafka-logs文件夹中的,默认7天清理机制。当删除的条件满足以后,日志将被“删除”,但是这里的删除其实只是将该日志进行了“delete”标注,文件只是无法被索引到了而已。但是文件本身,仍然是存在的,只有当过了log.segment.delete.delay.ms 这个时间以后,文件才会被真正的从文件系统中删除。
  为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。

Kafka怎么判断有大量积压消息?
生产的offset和消费的offset有大的差值。
————————————————
版权声明:本文为CSDN博主「K. Bob」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/ThreeAspects/article/details/106675018

免责申明:

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack



0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群