🗒️kafka 源码阅读-生产者

type
status
date
slug
summary
tags
category
icon
password
原文

整体结构

所有分析都是基于 kafka 最新 trunk 分支。
kafka
apacheUpdated Jun 1, 2025
core: 服务端所有基础组件和逻辑
  • admin: 管理 Topic、Partition 的创建、删除以及扩展操作
  • network: 网络通信层,处理客户端连接和事件驱动模型,包含 ScoketServer、Acceptor(主 Reactor)、Processor(从 Reactor)等组件,采用主从 Reactor多线程实现高并发。
  • log: 消息持久化模块,管理日志段(LogSegment)、索引文件(.index、.timeIndex)和事务索引(.txnindex),支持顺序读写与高效检索。
  • controller: 集群元数据管理,负责 Leader 选举、Partition 分配和副本同步,KRaft 模式替代 Zookeeper 实现去中心化元数据管理
  • server: Broker 服务端主逻辑,包括动态配置管理、副本同步、元数据缓存等
clients: 包含生产者和消费者的客户端实现,定义与 Broker 的交互协议,支持同步/异步消息发送和消费组管理。
connect、streams :
connect实现 kafka 与外部系统(如数据库、Hadoop)的高性能数据传输
streams: 提供流处理功能,支持实时计算和状态管理。
 

启动配置

server.propertis

  • process.roles=broker,controller
    • kRaft 角色,broker处理生产/消费请求,controller 管理集群元数据(如分区分配、Leader 选举)
      组合模式:单节点同时承担两种角色(适合开发环境、生产建议分离)

生产者

KafkaProducer 主要组件
KafkaProducer 主要组件
  • ProducerConfig 中描述了生产者的相关配置,如果有些用户没有配置,就采用默认配置。
  • 扩展:
    • 分区器 partitionPlugin 用于自定义分区策略
    • keySerializerPlugin 用于自定义消息 key 的序列化,
    • valueSerializerPlugin 用于自定义消息内容的序列化
  • RecordAccumulator 充当本地队列的作用,它内部维护了一个缓冲池 BuffPool,缓冲池的大小取决于buffer.memory 的配置大小,默认 32MB。生产者创建消息时,会将两次发送间隔的消息组成批次放入到 ProducerBatch 中。
  • sender 线程 KafkaProducer启动时会创建一个 sender线程用于发送消息到 Borker,它会从 缓冲池中获得已就绪的 ProdcerBatch,然后将其发送到 Borker。
 

生产者发送消息流程

生产者就是发送消息的客户端程序,具体的时由 kafkaPorducer 来完成。
 
  1. 消息创建与拦截处理
  1. 序列化与分区选择
  1. 消息追加到缓冲区(RecordAccumulator)
  1. Sender 线程异步发送消息到 Broker
  1. Broker 接收并持久化消息
  1. 回调处理与资源释放
 

1.消息的创建与拦截处理

应用发送消息时,首先会将消息处理成 ProducerRecord 对象,这里面记录了分区、topic、Header、Key 等信息。
在发送之前,会有拦截器ProducerInterceptors 对消息进行处理。拦截器可以理解是 kafka 的客户端实现AOP的核心组件,Kafka 提供了 Interceptor 机制来允许用户在消息发送和接收的过程中插入自定义逻辑。
💡
ProducerInterceptors 提供了三类事件处理
  • onSend 消息发送前触发
  • onSendError 发送过程中发生错误时触发
  • onAcknowledgement 消息被服务端确认时触发(成功或失败)

2.序列化与分区选择

分区选择

kafka 的消息是与分区绑定的,分区选择是消息的负载均衡策略。
分区选择策略有以下几种
  1. 如果指定了分区,则使用指定的分区
  1. 自定义分区器 如果指定了
    1. 实现Partitioner 接口,来实现自定义分区策略
  1. hash(如果指定了 key,则使用该方式)
    1. 发送消息时,指定了key,murmur2.hash(key)% topic分区数量。
       
  1. 内置分区策略
    1. 没有指定 key的情况下,会返回一个特殊值 RecordMetadata.UNKNOWN_PARTITION(-1) ,在构建消息批次队列时,使用该值。
      内置分区策略也叫粘性分区策略,该策略依赖集群元数据和每个主题分区的负载统计信息。
      具体的选择分区规则如下:
    2. 生成基础随机数
    3. 获取当前分区负载统计快照(PartitionLoadStats)
    4. 判断PartitionLoadStats 是否为空
      1. PartitionLoadStats为空 (无分区负载统计信息)
        1. 根据 topic获取可用分区列表(依赖集群元数据),如果可用列表不为空,从可用分区列表随机选择一个。
          如果为空则从全部分区选择一个
      2. PartitionLoadStats 不为空
        1. 根据加权随机算法选择一个分区,低负载的分区被选中的几率高。
          - 示例:队列大小[0,3,1] → 权重[4,1,3] → 累积[4,5,8]
          • 随机数0-3 → 分区0,4 → 分区1,5-7 → 分区2

      3.消息追加到缓冲区(RecordAccumulator)

      消息的序列化和分区计算完成后,会追加RecordAccumulator(消息收集器)中。RecordAccumulator充当的角色是一个“队列”,间隔一段时间的消息拼凑成一个批次(ProducerBatch)再去发送可以减少网络请求的次数以提审整体的吞吐量。
      RecordAccumulator 内部维护了一个缓冲池,缓冲池的大小由配置由 buffer.memory 决定(默认为 32MB)。一条消息追加到RecordAccmulator时,会先去寻找与消息分区对应的分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch(如果没有则新建),判断当前消息是否可以写入ProucerBatch中,如果不能写入则需要创建一个新的 ProducerBatch。新建 ProducerBatch时会评估这条消息的大小是否超过了配置参数 batch.size 参数的大小,如果超过了会以消息的size从 BuffPool 申请内存空间,不过这段内存区域不会被复用。不超过则以batch.size 参数大小来创建 ProducerBatch。
       
      消息处理完成后,不是立即发送的(如果配置linger.ms 为 0,则视为立即发送)。
      ProducerRecord消息这里做的消息的批次处理,构建 ProducerBatch 批次的具体的结构为KV 结构,key 为分区号,value 为批次数据。,用于缓存临时数据。为了提升性能可以将消息按分区结构,组成批次数据,交给 Sender 线程将消息组成的批次数据发送给 Broker。
      发送的消息会根据分区号进行批次构建或追加,消息能进入同一个 ProdcerBatch 依据两个配置 batch.sizelinger.ms ,batch.size 当前的批次大小linger.ms 等待时长。消息会首先尝试加入现有的批次中,如果批次已满(≥batchSize)追加不成功,则会申请新的批次加入到新的批次当中。将消息追加成功后,将唤醒 Sender 线程发送批次消息。
       
      kafka 源码中有关于这块的描述 生产者会将两次请求发送间隔内到达的所有记录合并为一个批量请求
      通常情况下,这种行为仅在系统处于高负载时发生(即记录到达速度快于发送速度)。但在某些场景下,即使处于中等负载,客户端也可能需要主动降低请求数量。该机制通过引入少量人为延迟实现——生产者不会立即发送记录,而是等待指定时长(允许其他记录加入发送队列),从而将多条记录合并发送。此机制类似于 TCP 协议中的Nagle 算法

4. Send 线程发送消息

send 线程唤醒以后,会获取已就绪的批次消息,然后发送到 Borker。总结流程如下
Send 线程从 RecordAccmulator 中获取缓存消息(ProducerBatch)之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转换成<Node,List<ProducerBatch>>形式,Node 表示 Kafka 集群中的broker 节点。对于网络连接来说,生产者客户端是与具体的 Broker 节点建立连接,也就是向具体的 broker 节点发送消息,而不关心消息属于哪一个分区。
转换成<Node,List<ProducerBatch>>的形式之后,Sender 线程还会再进一步的封装<Node,Request>的形式,这样就可以将 Request请求发送到各个 Node了。
请求从 Sender 线程发往 Broker 之前还会保存到 InFlightRequests 中,InFlightRequests 保存的具体形式为KV 结构Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去了但是还没有接受到响应的请求(NodeId 为 String,表示节点的 Id 编号)。
InFlightRequests 提供了许多 API,并且可以通过配置参数可以限制每个连接(客户端与 Node之间的连接)最多缓存的请求数),默认为 5,即每个连接最多缓存 5 个未响应的请求,超过该值之后不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过 Deque<Request>的 size 大小来判断对应的 Node 是否已经堆积了很多未响应的消息,如果size>0,说明这个 Node 节点的负载较大或网络连接有问题,再继续向其发送请求会增大超时的可能。
Sender 线程的核心代码链路为:
KafkaProducer 构造函数 → Sender.run() → runOnce() → sendProducerData() → sendProduceRequest() → NetworkClient.send()
Prev
Session、Cookie、Token
Next
Kafka 源码阅读-基本概念介绍
Loading...