1.3 Topic 创建过程 (上)

本部分主要梳理 topic 从创建到更新到 zookeeper 之前发生的事。

要创建一个 Topic ,可以通过 Kafka 提供的脚本文件。

打开 ./bin/kafka-topics.sh:

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

发现其实际上是调用 kafka.admin.TopicCommand 类中的方法来创建 topic,这个类的源代码位于 core模块中的 kafka.admin 包中:

object TopicCommand extends Logging

此处的Logging是 Kafka 中定义的一个特质(trait)而不是一个类,特质(trait)是Scala 当中特有的概念,特质用于在类之间共享程序接口和字段。 它类似于Java 8的接口。 类和对象 (Objects)可以扩展特质,但是特质不能被实例化,因此特质没有参数。

简单而言,特质的使用就像是它的命名一样,你可以在不添加新的父类的情况下使用特质为一个类增加新功能的横向切入。

回到Logging,它位于core模块的 kafka.utils包中:

trait Logging {

  protected lazy val logger = Logger(LoggerFactory.getLogger(loggerName))

  protected var logIdent: String = _

  Log4jControllerRegistration

  protected def loggerName: String = getClass.getName

  protected def msgWithLogIdent(msg: String): String =
    if (logIdent == null) msg else logIdent + msg

  def trace(msg: => String): Unit = logger.trace(msgWithLogIdent(msg))

  def trace(msg: => String, e: => Throwable): Unit = logger.trace(msgWithLogIdent(msg),e)

  def isDebugEnabled: Boolean = logger.underlying.isDebugEnabled

  def isTraceEnabled: Boolean = logger.underlying.isTraceEnabled

  def debug(msg: => String): Unit = logger.debug(msgWithLogIdent(msg))

  def debug(msg: => String, e: => Throwable): Unit = logger.debug(msgWithLogIdent(msg),e)

  def info(msg: => String): Unit = logger.info(msgWithLogIdent(msg))

  def info(msg: => String,e: => Throwable): Unit = logger.info(msgWithLogIdent(msg),e)

  def warn(msg: => String): Unit = logger.warn(msgWithLogIdent(msg))

  def warn(msg: => String, e: => Throwable): Unit = logger.warn(msgWithLogIdent(msg),e)

  def error(msg: => String): Unit = logger.error(msgWithLogIdent(msg))

  def error(msg: => String, e: => Throwable): Unit = logger.error(msgWithLogIdent(msg),e)

  def fatal(msg: => String): Unit =
    logger.error(Logging.FatalMarker, msgWithLogIdent(msg))

  def fatal(msg: => String, e: => Throwable): Unit =
    logger.error(Logging.FatalMarker, msgWithLogIdent(msg), e)
}

注意到,Logging 特质在 Kafka 中的作用是提供日志记录的功能,它定义了统一的日志记录方法接口。

回到 Topic 的创建过程上,我们找到createTopic方法:

这个方法接受两个参数,一个是 zkClient,类型为 KafkaZkClient,用于与 ZooKeeper 进行交互。另一个是 opts,类型为 TopicCommandOptions,包含了创建主题的相关选项和参数。

让我们来细细分析一下它的执行过程:

  1. 从opts中解析出各种所需信息,并进行冲突检查。

  2. 创建 AdminZkClient 对象,用于执行与主题管理相关的操作。

  3. 查看传入参数选项,如果如果选项中指定了副本分配方式,则调用 adminZkClient 的 createOrUpdateTopicPartitionAssignmentPathInZK 方法来创建或更新主题的分区分配路径,如果选项中没有指定副本分配方式,则根据指定的分区数、副本数rackAwareMode (机架感知模式? 这个参数是可选的)调用 adminZkClient 的 createTopic方法来创建主题。

  4. 打印创建成功消息,或是进行异常处理。

其中,parseReplicaAssignment 方法仍然定义在TopicCommand 类中:

从上述代码末尾的两个 if 判句中可以看出,这个方法主要确保了两件事:

  1. 同一个broker中的副本不允许相同

  2. 同一个 partition 的副本数必须相同

而对于adminZkClient.createTopic 方法而言,可以发现其位于core模块的zk包中:

查看其定义:

注意到,就没有指定副本分配方式的 Topic 创建方式而言,其实际上仍然是通过相关参数生成了replicaAssignment ,然后按照指定了副本分配方式的处理方式进行处理。(此处实际上还存在一个自动分配的算法assignReplicasToBrokers,后文再做分析,这里暂且按下不表)

很好!现在这个万众瞩目的createOrUpdateTopicPartitionAssignmentPathInZK就是我们要关注的目标,其定义在AdminZkClient中:

这个定义很简洁,并且 Kafka 的程序员们为我们留下了珍贵的注释他真的我哭死,其功能:

  1. 接受四个参数:topic 是主题的名称,partitionReplicaAssignment 是一个映射,可能表示某种分区到副本的分配关系,config 是一个用于配置的属性,update 表示是否要更新分配。

  2. 验证创建或更新主题的参数。validateCreateOrUpdateTopic这个函数可能会执行一些验证逻辑,以确保参数的正确性。

  3. 判断当前操作是一个全新的主题创建操作,还是更新操作,如果是全新的主题创建,那么代码会调用 zkClient.setOrCreateEntityConfigs 方法,将配置写入 ZooKeeper。我们注意到这个操作是在分区分配之前执行的。因为注释里告诉我们配置与分区分配是独立的

  4. 调用 writeTopicPartitionAssignment 函数,将主题的分区分配写入 ZooKeeper。

关于更新zk配置的操作我们暂且避开不谈,先看看这个writeTopicPartitionAssignment函数:

我们主要关心这里的zkClient.createTopicAssignment

这里使用了一个名为createRecursive的方法,其作用是递归地创建 ZooKeeper 路径

注意到这是一个私有方法,只能在zk包内部被访问。让我们来分析一下它做了什么:

  1. 在方法内部,首先定义了一个名为 parentPath的辅助方法,用于获取给定路径的父路径。

  2. 定义了一个名为 createRecursive0的内部方法,用于执行递归最底层创建路径的逻辑。

  3. createRecursive 方法的主体中,首先创建一个 CreateRequest 对象,其中包含了要创建的路径、数据、权限(acl,Access Control List)以及创建模式。

  4. 通过调用 retryRequestUntilConnected 方法发送创建请求并获取响应。

  5. 根据创建请求的结果进行不同的逻辑和异常处理。

这里最重要的就是这个retryRequestUntilConnected 方法:

重点关注到zooKeeperClient.handleRequests(remainingRequests),让我们来看看它的请求处理:

注意到这里的inFlightRequests.acquire()的使用看上去很熟悉:

此处的 in-flight requests 指的是已经开始且还未完成的请求,这里用信号量对其进行限制(就像体系结构里的outstanding)

让我们再来看看另一位老面孔:

啊哈!一个可重入的读写锁!这里的作用应该是为了避免多个线程重复执行初始化操作但这里只用到了读锁,创建请求为什么不用写锁嘞,考虑到这整个过程是在递归中完成的,使用可重入可以避免死锁。

总的来说,这段请求处理的代码使用了信号量和锁来控制并发访问,并使用 CountDownLatch实现等待所有请求完成的机制。根据代码的上下文,它应该是用于与 ZooKeeper 通信,发送请求并等待响应。

而到了send这里,基本就关于 topic 创建发送请求的部分操作就结束了(前面的区域以后再来探索叭),里面基本上是有关 zookeeper 操作的相关内容,比如send里的 CreateRequest

然而一个 topic 的创建过程显然还没有结束,接下来,我们需要关注这个 topic 被更新到zk后发生的事。

Last updated