type
status
date
slug
summary
tags
category
icon
password
原文

日志是 Kafka的核心组件(大部分中间件都是如此),按照我的个人理解 kafka 日志分为消息日志、消费者日志以及 KRaft日志。
- 消息日志 也就是topic 文件下的分区日志,以LogSegment 方式保存。
- 消费者日志 以__consumer_offsets为开头的文件夹下的日志,__consumer_offsets 是 kafka 内置的一个 topic,它记录了所有消费者组的消费进度,是实现 Kafka Consumer Group 重平衡、位移提交、故障恢复的核心机制。
- KRaft日志 以__cluster_metadata为开头的文件夹下的日志,KRaft运行期间产生的日志,它在 Raft基础上做了扩展。
1.消息日志文件结构
不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止Log过大又引入了日志分段(LogSegment)的概念,将 Log切分成多个 LogSegment,相当于一个巨型文件被平均分配多个相对较小的文件,这样便于消息的维护和清理。
向 Log中追加消息时是顺序写入的,只有最后一个 LogSegment才能执行写入操作,在此之前的所有的 LogSegment 都不能执行写入操作。我们将最后一个 LogSegment 称为“activeSegment”表示当前活跃的日志分段。随着消息的不断写入,当activeSegment 满足一定条件时,就要创建新的 activeSegment,之后追加的消息将写入新的 activeSement 。
为了便于消息的检索,每个 LogSegment中的日志文件(.log 为文件后缀)都有两个索引文件:偏移量索引文件(以“.index”为文件后缀)和时间戳索引文件(以“.timeindex”为文件后缀)。每个 LogSegment 都有一个基准偏移量(baseOffset),用来表示当前 LogSegment 中的第一条消息的 offset。偏移量是一个 64 位的长整型,日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为 20 位数字,没有达到位数则用 0 填充。比如第一个LogSegment 的基准偏移量为 0,对应的日志文件为00000000000000000000.log。
example:
我创建一个 my-topic(分区数为 3) 的 topic,这时在log.dir 下会以my-topic-0、1、2为的文件夹。
每个文件下的文件目录,以 my-topic-0目录为例。这里00000000000000000000.log 表示偏移量从 0 开始,00000000000000012992.log表示偏移量12992开始。
1.1 消息日志结构
.log 、 .timeindex 、 .index 文件为二进制文件,需要通过
kafka
自带的工具kafka-dump-log.sh
进行解析(也可以自行编写解析工具,需要非常熟悉消息日志结构)。example:
使用
kafka-dump-log.sh
能够看到消息的日志结构。Kafka的消息结构由于版本不同,有很大变化。这里暂时只谈论新版的 RecordBacth这种消息结构。
在官网的描述中有详细描述消息的结构(官方文档).大体结构如下
字段名 | 类型 | 含义 |
BaseOffset | int64 | 本批次中第一条消息的 offset(逻辑上的) |
BatchLength | int32 | 整个 RecordBatch(含所有记录)的长度 |
PartitionLeaderEpoch | int32 | 分区 leader epoch(用于控制一致性) |
Magic | int8 | 消息格式版本(2 代表 RecordBatch) |
CRC | int32 | 整个 batch 的 CRC32 校验 |
Attributes | int16 | 位图标识,表示压缩类型、事务标志等 |
LastOffsetDelta | int32 | 最后一条记录相对 BaseOffset 的偏移 |
FirstTimestamp | int64 | 第一条记录的时间戳 |
MaxTimestamp | int64 | 最后一条记录的时间戳 |
ProducerId | int64 | 生产者 ID(支持幂等和事务) |
ProducerEpoch | int16 | 生产者 epoch |
BaseSequence | int32 | 批次内第一条记录的 sequence number |
Records | List | 一批 Record(即实际消息内容) |
这里有意思的地方是RecordBacth看命名是批量的意思,但是消息不可能总是批量发送的,所以单条消息保存会和批量保存的格式稍有不同。同时这里的消息日志都是基于 trunk 分支产生的同官方文档描述的日志结构稍有差异。
1.1.1 异步发送和同步发送对日志的影响
起初调试日志结构时,无论怎么改变生产者
linger.ms
、batch.size
参数,.log 文件都不会呈现出batchRecord这种日志结构的日志。问询 AI工具,提示我需要加参数compression.type
,结果也不奏效。经过更详细的对比官方示例和自己编写的示例,我发现问题出现是同步还是异步发送这个环节上。生产者会等待一定的时间(linger.ms > 0 )或消息已经达到 batch.size 的消息(这里的消息指的是发往同一 topic 和分区),整理成一个批次发往该topic的分区 Leader 节点。如果是单线程同步发送(这里还要取决于KafkaProducer 对象是否为全局共享的),这样的发送方式并不会让 KafkaProducer 产生同一批次消息,而是一条一条的顺序发送到服务端,所以在.log 中并不会产生 RecordBatch 这样的日志结构。
异步发送让消息累积成批次发送的好处是得到更高的吞吐量,同一批次消息因为共享了消息的描述信息从而节省空间,同时也提高了压缩效率。
2.索引文件
在log.dir文件夹下,每个 LogSegment有对应的.index 和.timeIndex 两个索引文件,主要用来提高消息的查找的效率。
- .index 文件是用来建立消息偏移量到物理地址之间的映射,方便快速定位消息所在的物理文件位置。
- .timeindex 文件则根据时间戳来查找对应的偏移量信息。
kafka 的索引文件以稀疏索引(sparse index)的方式构建消息的索引,它并不保证每个消息在索引文件中都有对应的索引项(节省磁盘空间)。每当写入一定量(broker端 log.index.interval.bytes 指定,默认值为 4096,即 4KB)的消息时,.index 和.timeindex 文件分别增加一个偏移量索引项和时间戳索引项。增大或减少 log.index.interval.bytes 的值,对应地可以增加或缩小索引项的密度。
稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引的查询速度。偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。稀疏索引的方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。
在之前提到过ActiveSegment这个概念,.index 、 .log同理,不是活跃的会变成只读的。
hints:
MappedByteBuffer 是一种高效的文件访问方式,也是一种零拷贝实现方式。
3.日志清理
消息日志存于 Broker端中,每个分区副本都对应一个 Log,而Log又可以分为多个日志分段,这样也便于日志的清理操作。
Kafka 提供了两种日志清理策略。
- 日志删除:按照一定的保留策略直接删除不符合条件的的日志分段。
- 日志压缩:针对每个消息的 key进行整合,对于有相同的 key的不同 value 值,只保留最后一个版本。
可以通过broker端参数
log.cleanup.policy
来设置日志清理策略,此参数的默认值为“delete”,即采用日志删除的清理策略。如果要采用日志压缩的清理策略,就需要将log.cleanup.policy
设置为“compact”,并且还需要将log.cleaner.enable(默认值为true)设定为true。通过将log.cleanup.policy参数设置为“delete,compact”,还可以同时支持日志删除和日志压缩两种策略。在Kafka的日志管理器中会有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300000,即5分钟。当前日志分段的保留策略有3种:基于时间的保留策略、基于日志大小的保留策略和基于日志起始偏移量的保留策略。
在 源码的 config 目录下server.properties 有描述日志的保留策略。
3.1 基于时间的删除策略
Kafka Broker 启动时,会启动一个定时任务
kafka-log-retention
,这个定时任务的执行时间取决于log.relention.check.interval.ms,它会扫描每个分区的LogSegment的可删除日志。日志删除任务会检查是否有保留时间超过设定的阈值(retentionMs),来寻找可删除的日志分段集合。retentionMs 可以通过broker中配置的 log.retention.hours、 log.retention.minutes 和 log.retention.ms 来配置,配置优先级 log.retention.ms > log.retention.minutes > log.retention.ms。
3.1.2 如何识别可删除日志
查找过期的日志分段文件,并不是简单的根据日志分段的最近修改时间 lastModifiedTime 来计算的,而是根据LogSegment的largestTimestamp来计算的。因为日志分段的 lastModifiedTime 可以被修改,比如执行了 touch 操作,或者分区副本进行了重新分配,lastModifiedTime 并不能真实地反映出日志分段在磁盘的保留时间。要获取日志分段中的最大的最大时间戳,首先要要查询该日志分段对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一项索引的时间戳大于0,则取其值,否则才使用 lastModifiedTime。
若待删除的日志分段的总数等于该所有文件中的所有日志分段的数量,那说明所有的日志分段都已过期,但该日志文件中中还要有一个日志分段用于消息的写入,即必须保持一个活跃的 activeSegment。在该情况下会重新切分一个新的日志分段作为 activeSegment,然后执行“删除”操作。
这里值的注意的是 baseOffset 的计算,如果要删除所有分段,则创建一个新的 segment 的 baseOffset 要等于删除列表中的 Segment 最后一个 Segment的最后一条日志记录的 offset+1(logEndOffset)。如果新建的 Segment的 baseOffset和待删除的 Segment 列表的最后一个 Segment 的 baseOffset 相等,则将最后一个 Segment 从待删除列表中删除。
在我的设想中,只要永远保留最后一个 Segment作为活跃 activeSegment就好了,因为系统是并发运行的,保持最后一个,可以避免消息并发写入时需要阻塞等待 Segment 的分配,但是删除策略是可以基于时间、大小、offset 因素,如果全部日志过去,保留最后一个日志确实和配置有所冲突,所以这里还是决定要重新创建一个新的 Segment是更好的方案。
如果新建的 Segment的 baseOffset和待删除的 Segment 列表的最后一个 Segment 的 baseOffset 相等,则将最后一个 Segment 从待删除列表中删除。首先.log 文件都是 baseOffset 为命名的,系统不能构建同名文件,所以创建新的 Segment 时,会对同样的 baseOffset 文件进行更名,也就是文件添加.deleted 后缀。
删除日志分段时,首先会从 Log 对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作。然后将日志分段所对应的文件添加上.delete 的后缀(包括索引文件)。
最后由另一个定时任务
kafka-delete-logs
负责删除这些以.delete 为后缀的文件,这个延迟执行时间可以通过 file.delete.delay.ms参数(默认值为 60000 一分钟)来配置。日志的删除分为两个阶段:标记-删除。
3.2 基于大小的删除
日志删除任务会检查当前日志的大小是否超过设定的阈值(retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments),retentionSize可以通过broker端参数log.retention.bytes来配置,默认值为-1,表示无穷大。注意log.retention.bytes配置的是Log中所有日志文件的总大小,而不是单个日志分段(确切地说应该为.log日志文件)的大小。单个日志分段的大小由 broker 端参数log.segment.bytes 来限制,默认值为1073741824,即1GB。
基于日志大小的保留策略与基于时间的保留策略类似,首先计算日志文件的总大小size和retentionSize的差值diff,即计算需要删除的日志总大小,然后从日志文件中的第一个日志分段开始进行查找可删除的日志分段的文件集合 deletableSegments。查找出deletableSegments 之后就执行删除操作,这个删除操作和基于时间的保留策略的删除操作相同,这里不再赘述。
3.3 基于偏移量的删除(TODO)
一般情况下,日志文件的起始偏移量 logStartOffset 等于第一个日志分段的baseOffset,但这并不是绝对的,logStartOffset 的值可以通过DeleteRecordsRequest 请求(比如使用KafkaAdminClient的deleteRecords()方法、使用
kafka-delete-records.sh
脚本)、日志的清理和截断等操作进行修改。4.日志压缩
日志压缩是指在默认的日志删除规则之外提供的一种清理过时数据的方式。日志压缩(Log Retention)对于有相同 key的不同 value 值,只保留最后一个版本。如果应用只关心key对应的最新value值,则可以开启Kafka的日志清理功能,Kafka会定期将相同key的消息进行合并,只保留最新的value值。
在使用 KafkaProducer 发送消息时,我们可以指定key,也可以不指定 key。在日志中的表现是无 key的话,消息日志显示 keysize为-1,有key的会显示 keysize 大于 0,且日志条目中会显示具体的 key。
Kafka的日志压缩可以类比 Redis 的 RDB 持久化模式。如果一个系统使用 kafka 来保存状态,那么每次有状态变更都会将其写入 kafka。在某一时刻此系统异常崩溃,进而在恢复时通过读取Kafka 中的消息来恢复其应有的状态,那么此系统关心的是它原本的最新状态而不是历史时刻中的每一个状态。
如果 Kafka 的日志保存策略是日志删除(Log Deletion),那么系统势必要一股脑地读取Kafka中的所有数据来进行恢复,如果日志保存策略是 Log Compaction,那么可以减少数据的加载量进而加快系统的恢复速度。Log Compaction在某些应用场景下可以简化技术栈,提高系统整体的质量。
在 log.dir 下有一个名为 cleanr-offset-checkpoint 文件,这个文件就是清理的检查点文件,用来记录每个主题的每个分区已清理的偏移量。
- Author:newrain-zh
- URL:https://alex.sh.cn/article/example-14
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!