本部分主要梳理 topic 从创建到更新到 zookeeper 之前发生的事。
要创建一个 Topic ,可以通过 Kafka 提供的脚本文件。
打开 ./bin/kafka-topics.sh:
Copy exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
发现其实际上是调用 kafka.admin.TopicCommand
类中的方法来创建 topic,这个类的源代码位于 core
模块中的 kafka.admin
包中:
Copy object TopicCommand extends Logging
此处的Logging
是 Kafka 中定义的一个特质(trait) 而不是一个类,特质(trait) 是Scala 当中特有的概念,特质用于在类之间共享程序接口和字段。 它类似于Java 8的接口。 类和对象 (Objects)可以扩展特质,但是特质不能被实例化,因此特质没有参数。
简单而言,特质的使用就像是它的命名一样,你可以在不添加新的父类的情况下使用特质为一个类增加新功能的横向切入。
回到Logging
,它位于core
模块的 kafka.utils
包中:
Copy trait Logging {
protected lazy val logger = Logger(LoggerFactory.getLogger(loggerName))
protected var logIdent: String = _
Log4jControllerRegistration
protected def loggerName: String = getClass.getName
protected def msgWithLogIdent(msg: String): String =
if (logIdent == null) msg else logIdent + msg
def trace(msg: => String): Unit = logger.trace(msgWithLogIdent(msg))
def trace(msg: => String, e: => Throwable): Unit = logger.trace(msgWithLogIdent(msg),e)
def isDebugEnabled: Boolean = logger.underlying.isDebugEnabled
def isTraceEnabled: Boolean = logger.underlying.isTraceEnabled
def debug(msg: => String): Unit = logger.debug(msgWithLogIdent(msg))
def debug(msg: => String, e: => Throwable): Unit = logger.debug(msgWithLogIdent(msg),e)
def info(msg: => String): Unit = logger.info(msgWithLogIdent(msg))
def info(msg: => String,e: => Throwable): Unit = logger.info(msgWithLogIdent(msg),e)
def warn(msg: => String): Unit = logger.warn(msgWithLogIdent(msg))
def warn(msg: => String, e: => Throwable): Unit = logger.warn(msgWithLogIdent(msg),e)
def error(msg: => String): Unit = logger.error(msgWithLogIdent(msg))
def error(msg: => String, e: => Throwable): Unit = logger.error(msgWithLogIdent(msg),e)
def fatal(msg: => String): Unit =
logger.error(Logging.FatalMarker, msgWithLogIdent(msg))
def fatal(msg: => String, e: => Throwable): Unit =
logger.error(Logging.FatalMarker, msgWithLogIdent(msg), e)
}
注意到,Logging 特质在 Kafka 中的作用是提供日志记录 的功能,它定义了统一的日志记录方法接口。
回到 Topic 的创建过程上,我们找到createTopic
方法:
Copy def createTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
val topic = opts.options.valueOf(opts.topicOpt)
val configs = parseTopicConfigsToBeAdded(opts)
val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
if (Topic.hasCollisionChars(topic))
println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
val adminZkClient = new AdminZkClient(zkClient)
try {
if (opts.options.has(opts.replicaAssignmentOpt)) {
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, assignment, configs, update = false)
} else {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
else RackAwareMode.Enforced
adminZkClient.createTopic(topic, partitions, replicas, configs, rackAwareMode)
}
println("Created topic \"%s\".".format(topic))
} catch {
case e: TopicExistsException => if (!ifNotExists) throw e
}
}
这个方法接受两个参数,一个是 zkClient,类型为 KafkaZkClient,用于与 ZooKeeper 进行交互。另一个是 opts,类型为 TopicCommandOptions,包含了创建主题的相关选项和参数。
让我们来细细分析一下它的执行过程:
创建 AdminZkClient 对象,用于执行与主题管理相关的操作。
查看传入参数选项,如果如果选项中指定了副本分配方式 ,则调用 adminZkClient 的 createOrUpdateTopicPartitionAssignmentPathInZK
方法来创建或更新主题的分区分配路径,如果选项中没有指定副本分配方式 ,则根据指定的分区数、副本数 和 rackAwareMode (机架感知模式? 这个参数是可选的)调用 adminZkClient 的 createTopic
方法来创建主题。
其中,parseReplicaAssignment
方法仍然定义在TopicCommand
类中:
Copy def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
val partitionList = replicaAssignmentList.split(",")
val ret = new mutable.HashMap[Int, List[Int]]()
for (i <- 0 until partitionList.size) {
val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
val duplicateBrokers = CoreUtils.duplicates(brokerList)
if (duplicateBrokers.nonEmpty)
throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicateBrokers.mkString(",")))
ret.put(i, brokerList.toList)
if (ret(i).size != ret(0).size)
throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList)
}
ret.toMap
}
从上述代码末尾的两个 if 判句中可以看出,这个方法主要确保了两件事:
而对于adminZkClient.createTopic
方法而言,可以发现其位于core
模块的zk
包中:
Copy import kafka.zk.{AdminZkClient, KafkaZkClient}
查看其定义:
Copy def createTopic(topic: String,
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
val brokerMetadatas = getBrokerMetadatas(rackAwareMode)
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
createOrUpdateTopicPartitionAssignmentPathInZK(topic, replicaAssignment, topicConfig)
注意到,就没有指定副本分配方式 的 Topic 创建方式而言,其实际上仍然是通过相关参数生成了replicaAssignment
,然后按照指定了副本分配方式 的处理方式进行处理。(此处实际上还存在一个自动分配的算法assignReplicasToBrokers
,后文再做分析,这里暂且按下不表)
很好!现在这个万众瞩目的createOrUpdateTopicPartitionAssignmentPathInZK
就是我们要关注的目标,其定义在AdminZkClient
中:
Copy def createOrUpdateTopicPartitionAssignmentPathInZK(topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
config: Properties = new Properties,
update: Boolean = false) {
validateCreateOrUpdateTopic(topic, partitionReplicaAssignment, config, update)
if (!update) {
// write out the config if there is any, this isn't transactional with the partition assignments
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
}
// create the partition assignment
writeTopicPartitionAssignment(topic, partitionReplicaAssignment, update)
}
这个定义很简洁,并且 Kafka 的程序员们为我们留下了珍贵的注释他真的我哭死 ,其功能:
接受四个参数:topic 是主题的名称,partitionReplicaAssignment 是一个映射,可能表示某种分区到副本的分配关系,config 是一个用于配置的属性,update 表示是否要更新分配。
验证创建或更新主题的参数。validateCreateOrUpdateTopic
这个函数可能会执行一些验证逻辑,以确保参数的正确性。
判断当前操作是一个全新的主题创建操作 ,还是更新操作 ,如果是全新的主题创建 ,那么代码会调用 zkClient.setOrCreateEntityConfigs
方法,将配置写入 ZooKeeper。我们注意到这个操作是在分区分配之前执行的。因为注释里告诉我们配置与分区分配是独立的
调用 writeTopicPartitionAssignment
函数,将主题的分区分配写入 ZooKeeper。
关于更新zk配置的操作我们暂且避开 不谈,先看看这个writeTopicPartitionAssignment
函数:
Copy private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
try {
val assignment = replicaAssignment.map { case (partitionId, replicas) => (new TopicPartition(topic,partitionId), replicas) }.toMap
if (!update) {
info("Topic creation " + assignment)
zkClient.createTopicAssignment(topic, assignment)
} else {
info("Topic update " + assignment)
zkClient.setTopicAssignment(topic, assignment)
}
debug("Updated path %s with %s for replica assignment".format(TopicZNode.path(topic), assignment))
} catch {
case _: NodeExistsException => throw new TopicExistsException(s"Topic '$topic' already exists.")
case e2: Throwable => throw new AdminOperationException(e2.toString)
}
}
我们主要关心这里的zkClient.createTopicAssignment
:
Copy def createTopicAssignment(topic: String, assignment: Map[TopicPartition, Seq[Int]]) = {
createRecursive(TopicZNode.path(topic), TopicZNode.encode(assignment))
}
这里使用了一个名为createRecursive
的方法,其作用是递归地创建 ZooKeeper 路径 :
Copy private[zk] def createRecursive(path: String, data: Array[Byte] = null, throwIfPathExists: Boolean = true) = {
def parentPath(path: String): String = {
val indexOfLastSlash = path.lastIndexOf("/")
if (indexOfLastSlash == -1) throw new IllegalArgumentException(s"Invalid path ${path}")
path.substring(0, indexOfLastSlash)
}
def createRecursive0(path: String): Unit = {
val createRequest = CreateRequest(path, null, acls(path), CreateMode.PERSISTENT)
var createResponse = retryRequestUntilConnected(createRequest)
if (createResponse.resultCode == Code.NONODE) {
createRecursive0(parentPath(path))
createResponse = retryRequestUntilConnected(createRequest)
if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
throw createResponse.resultException.get
}
} else if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
throw createResponse.resultException.get
}
}
val createRequest = CreateRequest(path, data, acls(path), CreateMode.PERSISTENT)
var createResponse = retryRequestUntilConnected(createRequest)
if (throwIfPathExists && createResponse.resultCode == Code.NODEEXISTS) {
createResponse.maybeThrow
} else if (createResponse.resultCode == Code.NONODE) {
createRecursive0(parentPath(path))
createResponse = retryRequestUntilConnected(createRequest)
if (throwIfPathExists || createResponse.resultCode != Code.NODEEXISTS)
createResponse.maybeThrow
} else if (createResponse.resultCode != Code.NODEEXISTS)
createResponse.maybeThrow
}
注意到这是一个私有方法 ,只能在zk
包内部被访问。让我们来分析一下它做了什么:
在方法内部,首先定义了一个名为 parentPath
的辅助方法,用于获取给定路径的父路径。
定义了一个名为 createRecursive0
的内部方法,用于执行递归最底层创建路径的逻辑。
在 createRecursive
方法的主体中,首先创建一个 CreateRequest
对象,其中包含了要创建的路径、数据、权限(acl,Access Control List)以及创建模式。
通过调用 retryRequestUntilConnected
方法发送创建请求并获取响应。
这里最重要的就是这个retryRequestUntilConnected
方法:
Copy private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
val remainingRequests = ArrayBuffer(requests: _*)
val responses = new ArrayBuffer[Req#Response]
while (remainingRequests.nonEmpty) {
val batchResponses = zooKeeperClient.handleRequests(remainingRequests)
...
重点关注到zooKeeperClient.handleRequests(remainingRequests)
,让我们来看看它的请求处理:
Copy def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
if (requests.isEmpty)
Seq.empty
else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)
requests.foreach { request =>
inFlightRequests.acquire()
try {
inReadLock(initializationLock) {
send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
}
}
} catch {
case e: Throwable =>
inFlightRequests.release()
throw e
}
}
countDownLatch.await()
responseQueue.asScala.toBuffer
}
}
注意到这里的inFlightRequests.acquire()
的使用看上去很熟悉:
Copy private val inFlightRequests = new Semaphore(maxInFlightRequests)
此处的 in-flight requests 指的是已经开始且还未完成 的请求,这里用信号量对其进行限制(就像体系结构里的outstanding)
让我们再来看看另一位老面孔:
Copy private val initializationLock = new ReentrantReadWriteLock()
啊哈!一个可重入的读写锁!这里的作用应该是为了避免多个线程重复执行初始化操作 。 但这里只用到了读锁,创建请求为什么不用写锁嘞 ,考虑到这整个过程是在递归中完成的,使用可重入可以避免死锁。
总的来说,这段请求处理的代码使用了信号量和锁来控制并发访问 ,并使用 CountDownLatch
实现等待所有请求完成的机制。根据代码的上下文,它应该是用于与 ZooKeeper 通信,发送请求并等待响应。
而到了send
这里,基本就关于 topic 创建发送请求的部分操作就结束了(前面的区域以后再来探索叭) ,里面基本上是有关 zookeeper 操作的相关内容,比如send
里的 CreateRequest
:
Copy case CreateRequest(path, data, acl, createMode, ctx) =>
zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback {
override def processResult(rc: Int, path: String, ctx: Any, name: String): Unit =
callback(CreateResponse(Code.get(rc), path, Option(ctx), name, responseMetadata(sendTimeMs)))
}, ctx.orNull)
然而一个 topic 的创建过程显然还没有结束,接下来,我们需要关注这个 topic 被更新到zk后发生的事。