type
status
date
slug
summary
tags
category
icon
password
原文
1.主题的创建方式和配置2. 主题的配置2.1 基础配置2.2 数据保留与清理策略3.主题的创建过程(KRaft)3.1 创建过程概述3.2 主题的分区分配策略3.2.1 机架信息(RackList)构建3.2.2 前置校验3.2.3 副本分配流程3.2.4 元数据以及一致性写入
主题(Topic)和分区是 Kafka 的核心概念,主题作为消息的归类,可以再细分一个或多个分区,分区可以看作实现消息的存储水平扩展、可伸缩性的策略。从 Kafka 的底层实现来说,主题和分区都是逻辑上的概念。分区可以有一个多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment),每个日志文件分段还可以细分为索引文件、日志存储文件和快照文件等。
1.主题的创建方式和配置
主题的创建方式可以通过命令行 (
kafka-topic.sh
) 、 编程方式(KafkaAdminClient
)方式来创建。除去以上两种方式还有一种自动创建方式,不过不建议在生产环境使用。自动创建需要Broker服务端开启配置
auto.create.topics.enable=true
2. 主题的配置
创建主题时,可以配置参数,这些参数决定了主题的行为特性。详细配置说明参考官方文档和源码。
2.1 基础配置
- topic <topic_name> 【必须】主题名称
- partitions <num_partitions> 【必须】 设置主题的分区数量。分区是 Kafka 并行处理和数据分布的基本单位。一个分区只能由一个消费者线程消费。
- replication-factor <replication_factor> 【必须】设置每个分区的副本数量。副本提供容错能力,即使部分 Broker 宕机,只要存在一个ISR 节点,分区仍然可用。
2.2 数据保留与清理策略
- retention.ms 数据在主题中保留的最长时间(毫秒)。达到此时间后,旧消息将被删除(无论是否被消费过)。优先级高于
retention.bytes
。默认值 7 天
- retention.bytes 主题所有分区保留的最大数据总量(字节)。达到此限制后,旧消息将被删除(无论是否被消费过)。如果设置了 retention.ms 且其先触发,则此配置可能不生效。默认值 -1,无限大小。
- cleanup.policy 清理旧数据策略。
delete
基于时间或大小删除旧数据(默认)compact
启用日志压缩,只保留每个 key的最新值。适用于需要精确KV映射的场景(数据库变更日志)。、delete,compact
同时启用删除和压缩。压缩会再删除策略之前运行。
3.主题的创建过程(KRaft)
3.1 创建过程概述
主题的创建过程简单的理解就是客户端发送请求到 Broker,Broker处理请求,广播到其他 Controller节点,达成一致然后响应请求。
KafkaAdminClient根据topic 参数(toplic名称、分区数、副本因子等参数),将topic名称不为空的转换成 NewTopic(支持批量创建),然后包装 Call对象(异步请求抽象的对象)放入请求队列,唤醒sender 线程进行发送请求。
创建 Call 对象时,会提供 Controller 节点(确保客户端发送的请求发送到 Controller 节点)。
QUESTION:
不同的Kafka版本关于发送请求到 Broker 这里有些许差异,Topic 是需要经过 Controller Leader 节点来处理的。Cluster环境怎么知道 Leader 节点,通常有两种处理方式。
- 发送请求到任意的 Broker,Broker 接受客户端请求后,通过元数据或自身的状态属性去判断该由谁(通常是 Leader节点)来处理该请求。这种方式的好处是 Client 无需感知 Controller 节点变化,通过任意节点转发也可以避免单点故障(取决你怎么配置bootstrap-server,如果只配单个 broker 节点还是有单点问题)。(没记错的话 Redis Cluster 采用的就是该种方式)
- 如果客户端缓存了集群状态(NOTE 客户端启动时指定了 bootstrap-server 可以通过心跳机制获取Cluster状态快照),知道谁是 Controller,可以直接将请求发送到对应的 Controller Leader 节点来处理。
从 Kafka的trunk 分支中的看来目前是两种方式都会使用。在 Client 端构建Call 对象时,会有一个参数
ControllerNodeProvider
用来提供 Controller 节点。即使这里未获取到也没关系(获取不到的原因有可能客户端未能获取到元数据),Broker 端接受请求后,会转发到 Controller 节点处理。集群中的各个节点,作为有状态服务,对于当前状态或者说变量的变更至关重要,例如 Leader Controller 节点的 state 必须是 Leader,而节点之间需要正确的同步状态,而分布式环境分区故障、宕机等问题,使同步状态保持并不是那么容易。举个例子:一个主从同步方式,主节点和Follower 节点发生网络通信故障,从节点认为主节点已经下线,从而变成主节点,这里出现了双主情况,那么整个集群节点虽然可用,但是数据可能不再一致。当然这对于选择 AP或追求BASE 的系统不算是糟糕的设计。
NOTE:
在分布式环境中客户端与服务端的交互,通常与主节点进行交互,有的会通过多轮消息来确认一个操作是否可以完成。例如 Raft 论文中描述的与客户端交互,是 Client 端向所有Raft 节点发送消息,但是这样的操作存在一个问题就是性能。并发高的交互下,每个请求所有节点都会接受处理(即使是拒绝处理),这并不是一个高效的交互设计。
Topic 在 Kafka 中属于元数据,必须保持强一致性。因为消息归类是由 Topic 来决定的,如果 Topic 在集群中都不能保持一致,那整个 Kafka Cluster 将无法使用。所以无论是使用ZK还是 KRaft 这里都必须经过 Controller Leader 来处理。
Controller节点接收到请求后,会根据请求类型(ApiKey),转发给 Controller 节点处理。从代码上来看是 Core 模块的
KafkaApis
接受处理请求,然后转发给QuorumController
类来处理该请求。这里将Broker 端接受请求后要做的流程为检查主题配置、执行主题的分区分配策略、以及构建元数据记录(TopicRecord、PartionRecord)。
3.2 主题的分区分配策略
主题的分区策略支持自定义,在 Broker 端(
ReplicationControlManager#createTopic
会根据主题的 topic.assignments 属性是否为空来判断)创建主题时会根据主题的分区策略来分配主题的分区。3.2.1 机架信息(RackList)构建
主题的分区策略讲究的是均衡,尽可能的将分区在不同的Broker与机架上放置,达到负载均衡的策略。
分区首先会获取集群中的所有可用的 Broker 列表,按照机架分组,每个分组内又按照隔离状态再次分组,然后按照机架名排序并随机一个起始偏移量,这样构成了一个机架信息(
RackList
)。如果没有机架信息,所有 Broker存入同一个 Rack对象(Optional.empty())。Kafka 的机架感知(Rack Awareness)通过配置实现跨物理位置(如机架、可用区)的分区副本分布优化和就近消费。
3.2.2 前置校验
这里有 3 个前置校验,硬性约束,如果不符合以下三个条件会抛出异常。
- 副本因子(复制因子)必须大于 0
- 必须存在未隔离的 Broker(用于 Leader)。
- 副本因子必须小于等于 Broker 数。
3.2.3 副本分配流程
校验通过后,会遍历分区数(取决于配置的分区数)生成分区分配的列表(Broker列表会随机化排列)。列表的结构为List<List<Integer>>。每个列表的第一个 为 Leader 副本。这里有一个强制规定 Leader必须从未隔离的Broker 从获取(有的版本是无法从未隔离的 Broker,则从隔离副本中选取一个当做 Leader,可以理解为一种降级策略)。具体的分区分配源代码由StripedReplicaPlacer类的 place 方法完成。
假设有一个包含机架 A、B、C 的集群,每个机架有 3 个副本,总共 9 个节点,偏移量为 0,会得到如下位置。
分配完成以后会返回一个
TopicAssignment
对象,该对象包含了当前创建主题分区的分配方案。然后根据分区 ID,重新组合成一个 KV 对象newParts(结构Map<Integer, PartitionRegistration>
)。Key为分区号,Value为PartitionRegistration
(封装了分区的核心数据,如 Broker列表,isr
列表,LeaderId等,也就是分区的元数据。)3.2.4 元数据以及一致性写入
创建主题时,元数据分为两块
TopicRecord
(主题元数据)、PartitionRecord
(分区元数据),元数据构建完成以后会再次封装成ControllerResult
对象,最终将该对象包装成一个事件(ControllerWriteEvent)放入队列(KafkaEnventQueue
)中,由队列来触发事件执行,然后唤醒线程唤醒线程去处理队列事件接下来就是元数据一致性写入过程,这里都由 KRaft 层来处理。
首先将消息序列化写入内存返回消息的偏移量(这一步采用了类似生产者发送消息的设计,使用消息累加器来提高性能。)
未能找到源码中整个KRaft 层的链路调用细节,以下步骤仅根据个人猜想。
序列化完成以后开始向集群的各个Follower节点发送createTopic 请求(可以理解就是 AE RPC 请求,Kafka 是基于 NIO 通信的,它将请求类型明确的分类,本质上是它自己实现了一个远程调用),各个Follower 节点开始比对请求参数中的任期、偏移量。如果通过规则校验(是否为合法请求,这里涉及 Raft 算法描述的规则,也就是Leader 的任期和prevLogIndex是否能匹配上),返回请求通过,Follower节点追加日志到本地内存,Leader节点开始统计返回信息,统计是否过半,过半以后开始提交日志,提交日志成功序列化到本地,最后应用到状态机并将响应返回到 Client 端。
TODO:
QUESTION 1:
KafkaClient 内部有个静态类
RaftMetadataLogCleanerManager
维护了一个定时器,定时将消息同步到 Follower 节点。这里我有点疑惑,Client 发来创建 Topic 主题,Leader 处理结束,应该直接发送到 Follower 节点,为何要定时器去同步?- Author:newrain-zh
- URL:https://alex.sh.cn/article/example-13
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!