def createTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
val topic = opts.options.valueOf(opts.topicOpt)
val configs = parseTopicConfigsToBeAdded(opts)
val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
if (Topic.hasCollisionChars(topic))
println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
val adminZkClient = new AdminZkClient(zkClient)
try {
if (opts.options.has(opts.replicaAssignmentOpt)) {
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, assignment, configs, update = false)
} else {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
else RackAwareMode.Enforced
adminZkClient.createTopic(topic, partitions, replicas, configs, rackAwareMode)
}
println("Created topic \"%s\".".format(topic))
} catch {
case e: TopicExistsException => if (!ifNotExists) throw e
}
}
def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
val partitionList = replicaAssignmentList.split(",")
val ret = new mutable.HashMap[Int, List[Int]]()
for (i <- 0 until partitionList.size) {
val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
val duplicateBrokers = CoreUtils.duplicates(brokerList)
if (duplicateBrokers.nonEmpty)
throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicateBrokers.mkString(",")))
ret.put(i, brokerList.toList)
if (ret(i).size != ret(0).size)
throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList)
}
ret.toMap
}
import kafka.zk.{AdminZkClient, KafkaZkClient}
def createTopic(topic: String,
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
val brokerMetadatas = getBrokerMetadatas(rackAwareMode)
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
createOrUpdateTopicPartitionAssignmentPathInZK(topic, replicaAssignment, topicConfig)
def createOrUpdateTopicPartitionAssignmentPathInZK(topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
config: Properties = new Properties,
update: Boolean = false) {
validateCreateOrUpdateTopic(topic, partitionReplicaAssignment, config, update)
if (!update) {
// write out the config if there is any, this isn't transactional with the partition assignments
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
}
// create the partition assignment
writeTopicPartitionAssignment(topic, partitionReplicaAssignment, update)
}
private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
try {
val assignment = replicaAssignment.map { case (partitionId, replicas) => (new TopicPartition(topic,partitionId), replicas) }.toMap
if (!update) {
info("Topic creation " + assignment)
zkClient.createTopicAssignment(topic, assignment)
} else {
info("Topic update " + assignment)
zkClient.setTopicAssignment(topic, assignment)
}
debug("Updated path %s with %s for replica assignment".format(TopicZNode.path(topic), assignment))
} catch {
case _: NodeExistsException => throw new TopicExistsException(s"Topic '$topic' already exists.")
case e2: Throwable => throw new AdminOperationException(e2.toString)
}
}