class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, initialBrokerInfo: BrokerInfo,
tokenManager: DelegationTokenManager, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
// some code here
private val topicChangeHandler = new TopicChangeHandler(this, eventManager)
// some more code
继续查看TopicChangeHandler:
class TopicChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
override val path: String = TopicsZNode.path
override def handleChildChange(): Unit = eventManager.put(controller.TopicChange)
}
找到TopicChange:
case object TopicChange extends ControllerEvent {
override def state: ControllerState = ControllerState.TopicChange
override def process(): Unit = {
if (!isActive) return
val topics = zkClient.getAllTopicsInCluster.toSet
val newTopics = topics -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- topics
controllerContext.allTopics = topics
registerPartitionModificationsHandlers(newTopics.toSeq)
val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics)
controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
!deletedTopics.contains(p._1.topic))
controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment
info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
s"[$addedPartitionReplicaAssignment]")
if (addedPartitionReplicaAssignment.nonEmpty)
onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
}
}
class PartitionStateMachine(config: KafkaConfig,
stateChangeLogger: StateChangeLogger,
controllerContext: ControllerContext,
topicDeletionManager: TopicDeletionManager,
zkClient: KafkaZkClient,
partitionState: mutable.Map[TopicPartition, PartitionState],
controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends Logging {
// some code here
def handleStateChanges(...)
// more code here
private def doHandleStateChanges(...)
// many cases
对于 NewPartition , 我们有:
case NewPartition =>
validPartitions.foreach { partition =>
stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState with " +
s"assigned replicas ${controllerContext.partitionReplicaAssignment(partition).mkString(",")}")
partitionState.put(partition, NewPartition)
}
对于OnlinePartition ,我们有:
case OnlinePartition =>
val uninitializedPartitions = validPartitions.filter(partition => partitionState(partition) == NewPartition)
val partitionsToElectLeader = validPartitions.filter(partition => partitionState(partition) == OfflinePartition || partitionState(partition) == OnlinePartition)
if (uninitializedPartitions.nonEmpty) {
val successfulInitializations = initializeLeaderAndIsrForPartitions(uninitializedPartitions)
successfulInitializations.foreach { partition =>
stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
partitionState.put(partition, OnlinePartition)
}
}
if (partitionsToElectLeader.nonEmpty) {
val successfulElections = electLeaderForPartitions(partitionsToElectLeader, partitionLeaderElectionStrategyOpt.get)
successfulElections.foreach { partition =>
stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
partitionState.put(partition, OnlinePartition)
}
}