1.4 Topic 创建过程 (下)

本部分主要关注 Topic 更新到 zookeeper 后触发的操作。

打开kafka\controller\KafkaController.scala,注意到KafkaController 这个大类:

class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, initialBrokerInfo: BrokerInfo,
                      tokenManager: DelegationTokenManager, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
// some code here
private val topicChangeHandler = new TopicChangeHandler(this, eventManager)
// some more code

继续查看TopicChangeHandler

class TopicChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
  override val path: String = TopicsZNode.path

  override def handleChildChange(): Unit = eventManager.put(controller.TopicChange)
}

找到TopicChange

case object TopicChange extends ControllerEvent {
    override def state: ControllerState = ControllerState.TopicChange

    override def process(): Unit = {
      if (!isActive) return
      val topics = zkClient.getAllTopicsInCluster.toSet
      val newTopics = topics -- controllerContext.allTopics
      val deletedTopics = controllerContext.allTopics -- topics
      controllerContext.allTopics = topics

      registerPartitionModificationsHandlers(newTopics.toSeq)
      val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics)
      controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
        !deletedTopics.contains(p._1.topic))
      controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment
      info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
        s"[$addedPartitionReplicaAssignment]")
      if (addedPartitionReplicaAssignment.nonEmpty)
        onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
    }
  }

TopicChange对象实现了process方法,该方法用于处理主题变化事件。具体的处理逻辑如下:

  1. 检查控制器是否处于活动状态,如果不是则直接返回

  2. 获取集群中的所有主题,并与控制器上下文(controllerContext)中的所有主题进行比较,找出新增和删除的主题。(注意:-- 是 Scala 中的一个操作符,表示集合的差集运算)

  3. 更新控制器上下文中的主题列表

  4. 注册新增主题的分区修改处理器

  5. 获取新增主题的分区副本分配情况,并更新控制器上下文中的分区副本分配情况

  6. 打印日志,记录新增和删除的主题以及新的分区副本分配情况

  7. 如果有新增的分区副本分配情况,调用onNewPartitionCreation方法处理新增分区

这个方法有以下四步:

  1. 创建 Partition 对象,并将其状态置为 NewPartition 状态

  2. 创建 Replica 对象,并将其状态置为 NewReplica 状态

  3. 将 Partition 对象从 NewPartition 改为 OnlinePartition 状态

  4. 将 Replica 对象从 NewReplica 改为 OnlineReplica 状态

注意到,这里实际上涉及到两部分的更改:Partition 和 Replica ,相关的类分别定义在kafka\controller\PartitionStateMachine.scalaReplicaStateMachine.scala 中,这里仅以分区状态为例进行梳理。

关于分区状态机的流转过程可以参考:

我们首先来看这个handleStateChanges方法:

很显然,关键点是其中的doHandleStateChanges这一部分,我们发现它仍然定义在kafka\controller\PartitionStateMachine.scala这个文件中,它们的基本结构:

对于 NewPartition , 我们有:

对于OnlinePartition ,我们有:

我们注意到这一大段是用于处理分区转为在线状态的情况。让我们看看它干了什么:

  1. 首先,通过筛选得到两个集合:

    • uninitializedPartitions:包含那些状态为NewPartition的分区

    • partitionsToElectLeader:包含那些状态为OfflinePartition或OnlinePartition的分区

  2. 如果 uninitializedPartitions 集合非空,说明有一些分区处于未初始化状态,调用initializeLeaderAndIsrForPartitions方法进行初始化

  3. 如果 partitionsToElectLeader 集合非空,说明需要进行领导者选举,调用electLeaderForPartitions方法对分区进行领导者选举,并返回成功选举的分区集合

就 Topic 的创建过程而言,我们更关注这个初始化操作:

这段代码干了很多事,简单而言,它的作用是初始化分区的领导者和ISR(In-Sync Replica)。它接收一个分区列表作为输入,并为每个分区执行以下操作:

  1. 根据分区与副本的分配情况存活情况,确定每个分区的存活副本集合

  2. 将没有存活副本的分区标记为无法初始化,并记录失败的状态变更

  3. 对于每个有存活副本的分区,创建一个包含领导者和ISR信息LeaderIsrAndControllerEpoch对象,尝试在 ZooKeeper 中创建分区的领导者和ISR状态,根据创建的响应结果,更新控制器的分区领导者信息

  4. 记录成功初始化的分区,并返回这些分区。

到这一步,一个 Topic 就算真正被创建完成了。

Last updated