Kafka 3.0 源码笔记(1)-Kafka 服务端的网络通信架构

Kafka 3.0 源码笔记(1)-Kafka 服务端的网络通信架构,第1张

Kafka 3.0 源码笔记(1)-Kafka 服务端的网络通信架构

文章目录
  • Kafka 网络通信组件架构
  • 1. 底层服务器 SocketServer
      • 1.1 Acceptor 连接接收器
      • 1.2 Processor 连接处理器
      • 1.3 RequestChannel 请求队列
  • 2. 请求处理器线程池 KafkaRequestHandlerPool
      • KafkaRequestHandler 请求处理器

Kafka 网络通信组件架构

本系列文章基于 Kafka 3.0 版本 ,读者如有兴趣可以点击链接进入 github 下载源码。 Kafka 的服务端核心模块用 scala语言 编写,这是一种基本与 Java 兼容的 JVM 语言,虽然其语法与 Java 会有许多不同,不过认真读的话也是能看懂的。需要注意的是,Kafka 自 2.8 版本开始移除 ZoomKeeper 依赖,自行管理集群元数据,所以本文讨论的服务端核心结构为KafkaRaftServer,不涉及兼容旧版本的 KafkaServer

Kafka 集群中的节点能够同时承担两种角色,一个是作为 Broker 处理外部请求,另一个是作为 Controller 管理整个集群元数据

  • 2.8 版本以前
    Kafka 依赖 zk 管理集群元数据,元数据包括 topic 信息、各个节点的连接信息等。此时 Controller 节点无法指定,一个 Kafka 节点成功在 zk 中 建立 /controller 节点即成为 Controller,存在很大的随机性
  • 2.8 版本以后
    Kafka 移除 zk 依赖,通过配置文件中的 process.roles 属性指定节点角色,一个节点可被直接指定为 Controller 节点,Controller 不再需要和 zk 通信管理集群元数据。 整个 Kafka 集群中 Controller 节点可以存在多个,共同组成 Controller 集群,负责处理集群元数据

上图为 Kafka 3.0 版本 Broker 服务端完成请求处理的基本架构,其事件驱动框架基于 主从 Reactor 多线程 实现,示意图中可以看到 KafkaRaftServer 内部可能会启动两个处理不同请求的服务端,不过二者的组成基本类似,都包含一个底层的网络服务器 SocketServer 和一个请求处理器线程池 KafkaRequestHandlerPool

  1. ControllerServer
    属性 process.roles 指定了 Controller 角色时才被创建,处理元数据类请求,包括 topic 创建删除等,由配置 controller.listener.names 指定供外部请求的端口等信息
  2. BrokerServer
    属性 process.roles 指定了 Broker 角色时才被创建,处理消息类请求,例如消息的生产消费,其供外部请求的端口由配置 listeners 与 controller.listener.names 的差集决定

Kafka 服务端网络通信的架构目前大致经历过 3 个阶段的演进:

  1. 早期的设计是任意节点都可能成为 Controller,同时集群中每个节点都需要暴露端口给 Controller 进行连接方便控制,所以节点使用同一个端口监听处理所有网络请求
  2. 到了 2.2 版本开始将请求分流,Kafka 节点分别用数据平面DataPlane和控制平面ControlPlane来对应处理数据类请求和 控制类请求(来自集群内部Controller的控制请求),二者区分出不同的端口(控制类请求的处理端口由 control.plane.listener.name 配置),且同时存在于同一个 KafkaServer 中。读者如感兴趣可以参考 Kafka 社区记录KAFKA-4453 add request prioritization
  3. 到了 2.8 版本,Kafka 推出了剔除 zk 的 KRaft 模式。在该模式下KafkaRaftServer 分别抽象出对应Controller角色的 ControllerServer 和对应Broker角色的 BrokerServer,消息生产之类的请求只会被 BrokerServer 处理,元数据类请求则由 ControllerServer 处理,一个节点可同时充当两种角色。在 KRaft 模式下,Kaka 节点已经不支持控制类请求, control.plane.listener.name 配置在 3.0 版本后的 KRaft 模式下将导致异常
1. 底层服务器 SocketServer

SocketServer 是一个面向底层网络 IO 的服务器,从源码来看,其比较关键的属性如下,源码中列出的控制平面相关组件与数据平面类似,不做赘述

  1. dataPlaneAcceptors: 一个 ConcurrentHashMap,值为 Acceptor 对象,这个对象主要负责监听端口,接受远程连接
  2. dataPlaneProcessors: 一个 ConcurrentHashMap,值为 Processor 对象,该对象负责监听 Socket 读写就绪事件并将网络数据解析为 Kafka 请求
  3. dataPlaneRequestChannel: 一个 RequestChannel 对象,这个对象主要充当 Kafka 请求的缓冲队列,所有请求处理器都从这个队列中获取请求并处理。队列的最大容量由 queued.max.requests 配置决定
class SocketServer(val config: KafkaConfig,
                   val metrics: Metrics,
                   val time: Time,
                   val credentialProvider: CredentialProvider,
                   val apiVersionManager: ApiVersionManager)
  extends Logging with KafkaMetricsGroup with BrokerReconfigurable {

  private val maxQueuedRequests = config.queuedMaxRequests

 ......
  // data-plane
  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics)
  // control-plane
  private var controlPlaneProcessorOpt : Option[Processor] = None
  private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
  val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
    new RequestChannel(20, ControlPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics))
  ......
}
1.1 Acceptor 连接接收器

Acceptor 是 SocketServer.scala的内部类,这个对象实际上充当了 主从 Reactor 多线程 模型当中的主 Reactor,从源码看其中比较关键的属性如下:

  1. serverChannel: JDK 中的 ServerSocketChannel 对象,负责绑定监听指定的服务端端口
  2. nioSelector: JDK 中的 Selector 对象,用于注册监听 serverChannel 上的连接建立事件
  3. processors: 属于该连接器的 Processor 对象集合,Acceptor 连接接收器与连接处理器 Processor 对象是一对多的关系
private[kafka] class Acceptor(val endPoint: EndPoint,
                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              nodeId: Int,
                              connectionQuotas: ConnectionQuotas,
                              metricPrefix: String,
                              time: Time,
                              logPrefix: String = "") extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

  this.logIdent = logPrefix
  private val nioSelector = NSelector.open()
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
  private val processors = new ArrayBuffer[Processor]()

  ......
}
1.2 Processor 连接处理器

Processor 同样是 SocketServer.scala的内部类,这个对象相当于主从 Reactor 多线程 模型当中的从 Reactor,主要负责监听处理 Acceptor 接受的远程连接上的读写事件,其中比较关键的属性如下:

  1. requestChannel: 整个 SocketServer 共用的 RequestChannel 请求队列,连接处理器会将网络数据解析为 Kafka 请求,并将请求存入这个队列
  2. newConnections: 一个存放新连接的 ArrayBlockingQueue 阻塞队列,当 Acceptor接受新的连接后会轮询选择一个 Processor,将新建的连接分配到其新连接队列中
  3. responseQueue: 一个存放响应的 linkedBlockingDeque 阻塞队列,当请求处理器完成一个请求的处理后,会把对请求的响应放入对应的 Processor 响应队列中,由Processor 发送给请求方
  4. selector: 一个 Kafka 自定义的封装了 JDK Selector 的 Kafka Selector 对象,负责监听分配给 Processor 处理的连接上的读写事件
private[kafka] class Processor(val id: Int,
                               time: Time,
                               maxRequestSize: Int,
                               requestChannel: RequestChannel,
                               connectionQuotas: ConnectionQuotas,
                               connectionsMaxIdleMs: Long,
                               failedAuthenticationDelayMs: Int,
                               listenerName: ListenerName,
                               securityProtocol: SecurityProtocol,
                               config: KafkaConfig,
                               metrics: Metrics,
                               credentialProvider: CredentialProvider,
                               memoryPool: MemoryPool,
                               logContext: LogContext,
                               connectionQueueSize: Int,
                               isPrivilegedListener: Boolean,
                               apiVersionManager: ApiVersionManager) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

  ......

  private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
  private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
  private val responseQueue = new linkedBlockingDeque[RequestChannel.Response]()

  private val selector = createSelector(
    ChannelBuilders.serverChannelBuilder(
      listenerName,
      listenerName == config.interBrokerListenerName,
      securityProtocol,
      config,
      credentialProvider.credentialCache,
      credentialProvider.tokenCache,
      time,
      logContext,
      () => apiVersionManager.apiVersionResponse(throttleTimeMs = 0)
    )
  )
 ......
}
1.3 RequestChannel 请求队列

RequestChannel 为整个 SocketServer 接收到的外部请求的缓冲队列,其内部重要属性如下:

  1. requestQueue: 一个 ArrayBlockingQueue 阻塞队列,负责存储当前 SocketServer中所有连接处理器 Processor 解析出来的 Kafka 请求
  2. processors:当前 SocketServer所有连接处理器 Processor 的集合,当请求处理器完成请求处理后,需要借助这个集合将响应分配给对应的 Processor
class RequestChannel(val queueSize: Int,
                     val metricNamePrefix: String,
                     time: Time,
                     val metrics: RequestChannel.Metrics) extends KafkaMetricsGroup {
  import RequestChannel._
  private val requestQueue = new ArrayBlockingQueue[baseRequest](queueSize)
  private val processors = new ConcurrentHashMap[Int, Processor]()

  ......
}
2. 请求处理器线程池 KafkaRequestHandlerPool

KafkaRequestHandlerPool 是 KafkaRequestHandler.scala 的内部类,实际上是一个请求处理器线程池,线程池大小由配置num.io.threads决定,关键属性如下:

  1. threadPoolSize : 线程池中的线程数量
  2. runnables: 线程实例的集合,可以看到在代码块中会 for 循环调用 KafkaRequestHandlerPool#createHandler() 方法创建请求处理器,并将其扔进线程中执行
class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: ApiRequestHandler,
                              time: Time,
                              numThreads: Int,
                              requestHandlerAvgIdleMetricName: String,
                              logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {

  private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
  
  private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)

  this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], "
  val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
  for (i <- 0 until numThreads) {
    createHandler(i)
  }

  def createHandler(id: Int): Unit = synchronized {
    runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
    KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
  }
  ......
}
KafkaRequestHandler 请求处理器

KafkaRequestHandler 负责处理接收到的外部 Kafka 请求,其关键属性在类声明形式的构造方法中:

  1. requestChannel: 也就是上一节的 1.3 RequestChannel 请求队列,请求处理器会使用这个对象不断轮询取出请求来进行处理
  2. apis: 一个 ApiRequestHandler 接口实例对象,实际负责处理请求,其实现类分别为 KafkaApis(对应Broker角色) 和 ControllerApis(对应Controller角色)
class KafkaRequestHandler(id: Int,
                          brokerId: Int,
                          val aggregateIdleMeter: Meter,
                          val totalHandlerThreads: AtomicInteger,
                          val requestChannel: RequestChannel,
                          apis: ApiRequestHandler,
                          time: Time) extends Runnable with Logging {
  this.logIdent = s"[Kafka Request Handler $id on Broker $brokerId], "
  private val shutdownComplete = new CountDownLatch(1)
  private val requestLocal = RequestLocal.withThreadConfinedCaching
  @volatile private var stopped = false
  
  ......
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5659541.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存