Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18399 Remove ZooKeeper from KafkaApis (7/N): CREATE_TOPICS, DELETE_TOPICS, CREATE_PARTITIONS #18433

Merged
merged 9 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 0 additions & 243 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE
import org.apache.kafka.common.internals.{FatalExitError, Topic}
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection}
import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection}
import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult, DeleteRecordsTopicResult}
import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
import org.apache.kafka.common.message.ElectLeadersResponseData.{PartitionResult, ReplicaElectionResult}
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition
Expand Down Expand Up @@ -1466,245 +1462,6 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)
}

def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)

def sendResponseCallback(results: CreatableTopicResultCollection): Unit = {
val responseData = new CreateTopicsResponseData()
.setTopics(results)
val response = new CreateTopicsResponse(responseData)
trace(s"Sending create topics response $responseData for correlation id " +
s"${request.header.correlationId} to client ${request.header.clientId}.")
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, response)
}

val createTopicsRequest = request.body[CreateTopicsRequest]
val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
if (!zkSupport.controller.isActive) {
createTopicsRequest.data.topics.forEach { topic =>
results.add(new CreatableTopicResult().setName(topic.name)
.setErrorCode(Errors.NOT_CONTROLLER.code))
}
sendResponseCallback(results)
} else {
createTopicsRequest.data.topics.forEach { topic =>
results.add(new CreatableTopicResult().setName(topic.name))
}
val hasClusterAuthorization = authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
logIfDenied = false)

val allowedTopicNames = {
val topicNames = createTopicsRequest
.data
.topics
.asScala
.map(_.name)
.toSet

topicNames.diff(Set(Topic.CLUSTER_METADATA_TOPIC_NAME))
}

val authorizedTopics = if (hasClusterAuthorization) {
allowedTopicNames
} else {
authHelper.filterByAuthorized(request.context, CREATE, TOPIC, allowedTopicNames)(identity)
}
val authorizedForDescribeConfigs = authHelper.filterByAuthorized(
request.context,
DESCRIBE_CONFIGS,
TOPIC,
allowedTopicNames,
logIfDenied = false
)(identity).map(name => name -> results.find(name)).toMap

results.forEach { topic =>
if (topic.name() == Topic.CLUSTER_METADATA_TOPIC_NAME) {
topic.setErrorCode(Errors.INVALID_REQUEST.code)
topic.setErrorMessage(s"Creation of internal topic ${Topic.CLUSTER_METADATA_TOPIC_NAME} is prohibited.")
} else if (results.findAll(topic.name).size > 1) {
topic.setErrorCode(Errors.INVALID_REQUEST.code)
topic.setErrorMessage("Found multiple entries for this topic.")
} else if (!authorizedTopics.contains(topic.name)) {
topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
topic.setErrorMessage("Authorization failed.")
}
if (!authorizedForDescribeConfigs.contains(topic.name) && topic.name() != Topic.CLUSTER_METADATA_TOPIC_NAME) {
topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
}
}
val toCreate = mutable.Map[String, CreatableTopic]()
createTopicsRequest.data.topics.forEach { topic =>
if (results.find(topic.name).errorCode == Errors.NONE.code) {
toCreate += topic.name -> topic
}
}
def handleCreateTopicsResults(errors: Map[String, ApiError]): Unit = {
errors.foreach { case (topicName, error) =>
val result = results.find(topicName)
result.setErrorCode(error.error.code)
.setErrorMessage(error.message)
// Reset any configs in the response if Create failed
if (error != ApiError.NONE) {
result.setConfigs(List.empty.asJava)
.setNumPartitions(-1)
.setReplicationFactor(-1)
.setTopicConfigErrorCode(Errors.NONE.code)
}
}
sendResponseCallback(results)
}
zkSupport.adminManager.createTopics(
createTopicsRequest.data.timeoutMs,
createTopicsRequest.data.validateOnly,
toCreate,
authorizedForDescribeConfigs,
controllerMutationQuota,
handleCreateTopicsResults)
}
}

def handleCreatePartitionsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val createPartitionsRequest = request.body[CreatePartitionsRequest]
val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 3)

def sendResponseCallback(results: Map[String, ApiError]): Unit = {
val createPartitionsResults = results.map {
case (topic, error) => new CreatePartitionsTopicResult()
.setName(topic)
.setErrorCode(error.error.code)
.setErrorMessage(error.message)
}.toSeq
val response = new CreatePartitionsResponse(new CreatePartitionsResponseData()
.setResults(createPartitionsResults.asJava))
trace(s"Sending create partitions response $response for correlation id ${request.header.correlationId} to " +
s"client ${request.header.clientId}.")
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, response)
}

if (!zkSupport.controller.isActive) {
val result = createPartitionsRequest.data.topics.asScala.map { topic =>
(topic.name, new ApiError(Errors.NOT_CONTROLLER, null))
}.toMap
sendResponseCallback(result)
} else {
// Special handling to add duplicate topics to the response
val topics = createPartitionsRequest.data.topics.asScala.toSeq
val dupes = topics.groupBy(_.name)
.filter { _._2.size > 1 }
.keySet
val notDuped = topics.filterNot(topic => dupes.contains(topic.name))
val (authorized, unauthorized) = authHelper.partitionSeqByAuthorized(request.context, ALTER, TOPIC,
notDuped)(_.name)

val (queuedForDeletion, valid) = authorized.partition { topic =>
zkSupport.controller.isTopicQueuedForDeletion(topic.name)
}

val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic in request.")) ++
unauthorized.map(_.name -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, "The topic authorization is failed.")) ++
queuedForDeletion.map(_.name -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is queued for deletion."))

zkSupport.adminManager.createPartitions(
createPartitionsRequest.data.timeoutMs,
valid,
createPartitionsRequest.data.validateOnly,
controllerMutationQuota,
result => sendResponseCallback(result ++ errors))
}
}

def handleDeleteTopicsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 5)

def sendResponseCallback(results: DeletableTopicResultCollection): Unit = {
val responseData = new DeleteTopicsResponseData()
.setResponses(results)
val response = new DeleteTopicsResponse(responseData)
trace(s"Sending delete topics response $response for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, response)
}

val deleteTopicRequest = request.body[DeleteTopicsRequest]
val results = new DeletableTopicResultCollection(deleteTopicRequest.numberOfTopics())
val toDelete = mutable.Set[String]()
if (!zkSupport.controller.isActive) {
deleteTopicRequest.topics().forEach { topic =>
results.add(new DeletableTopicResult()
.setName(topic.name())
.setTopicId(topic.topicId())
.setErrorCode(Errors.NOT_CONTROLLER.code))
}
sendResponseCallback(results)
} else if (!config.deleteTopicEnable) {
val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST else Errors.TOPIC_DELETION_DISABLED
deleteTopicRequest.topics().forEach { topic =>
results.add(new DeletableTopicResult()
.setName(topic.name())
.setTopicId(topic.topicId())
.setErrorCode(error.code))
}
sendResponseCallback(results)
} else {
val topicIdsFromRequest = deleteTopicRequest.topicIds().asScala.filter(topicId => topicId != Uuid.ZERO_UUID).toSet
deleteTopicRequest.topics().forEach { topic =>
if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID)
throw new InvalidRequestException("Topic name and topic ID can not both be specified.")
val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name()
else zkSupport.controller.controllerContext.topicName(topic.topicId).orNull
results.add(new DeletableTopicResult()
.setName(name)
.setTopicId(topic.topicId()))
}
val authorizedDescribeTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
results.asScala.filter(result => result.name() != null))(_.name)
val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
results.asScala.filter(result => result.name() != null))(_.name)
results.forEach { topic =>
val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null
if (unresolvedTopicId) {
topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
} else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics.contains(topic.name)) {

// Because the client does not have Describe permission, the name should
// not be returned in the response. Note, however, that we do not consider
// the topicId itself to be sensitive, so there is no reason to obscure
// this case with `UNKNOWN_TOPIC_ID`.
topic.setName(null)
topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
} else if (!authorizedDeleteTopics.contains(topic.name)) {
topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
} else if (!metadataCache.contains(topic.name)) {
topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
} else {
toDelete += topic.name
}
}
// If no authorized topics return immediately
if (toDelete.isEmpty)
sendResponseCallback(results)
else {
def handleDeleteTopicsResults(errors: Map[String, Errors]): Unit = {
errors.foreach {
case (topicName, error) =>
results.find(topicName)
.setErrorCode(error.code)
}
sendResponseCallback(results)
}

zkSupport.adminManager.deleteTopics(
deleteTopicRequest.data.timeoutMs,
toDelete,
controllerMutationQuota,
handleDeleteTopicsResults
)
}
}
}

def handleDeleteRecordsRequest(request: RequestChannel.Request): Unit = {
val deleteRecordsRequest = request.body[DeleteRecordsRequest]

Expand Down
Loading
Loading