From 3d422ecce3ff461be90029665ff714eb53db3ed7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ty=C3=A1s=20Kuti?= Date: Mon, 4 Mar 2024 14:24:07 +0100 Subject: [PATCH 1/2] Handle message errors in schema reader Previously the errors that can surface when polling for messages have not been handled in the schema reader. This commit fixes that and specifically handles and error-logs group and topic authorization exceptions. This mirrors the similar situation in the REST proxy's consumer manager. Other errors (`KafkaError` and subclasses) raised here will be handled and logged as unexpected in the main message handler loop. Prompted by https://github.com/Aiven-Open/karapace/issues/812. --- karapace/schema_reader.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index f3d7dd5b9..750992686 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -12,12 +12,14 @@ from enum import Enum from jsonschema.validators import Draft7Validator from kafka.errors import ( + GroupAuthorizationFailedError, InvalidReplicationFactorError, KafkaConfigurationError, KafkaTimeoutError, NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError, + TopicAuthorizationFailedError, UnknownTopicOrPartitionError, ) from karapace import constants @@ -26,6 +28,7 @@ from karapace.errors import InvalidReferences, InvalidSchema from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.common import translate_from_kafkaerror from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator @@ -316,13 +319,23 @@ def handle_messages(self) -> None: for msg in msgs: try: message_key = msg.key() - if message_key is None: - LOG.warning("Empty message key when consuming from topic %s, error: %s", msg.topic(), msg.error()) - continue + message_error = msg.error() + if message_error is not None: + raise translate_from_kafkaerror(message_error) + + assert message_key is not None key = json_decode(message_key) except JSONDecodeError: LOG.exception("Invalid JSON in msg.key() at offset %s", msg.offset()) continue + except (GroupAuthorizationFailedError, TopicAuthorizationFailedError) as exc: + LOG.error( + "Kafka authorization error when consuming from %s: %s %s", + self.config["topic_name"], + exc, + msg.error(), + ) + continue assert isinstance(key, dict) msg_keymode = KeyMode.CANONICAL if is_key_in_canonical_format(key) else KeyMode.DEPRECATED_KARAPACE From e45b9f9d189f025e87a6f4ffc4388d27baf23c41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ty=C3=A1s=20Kuti?= Date: Mon, 4 Mar 2024 14:27:19 +0100 Subject: [PATCH 2/2] Document schema registry ACL requirements --- README.rst | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/README.rst b/README.rst index 7e7bc178f..1207f691f 100644 --- a/README.rst +++ b/README.rst @@ -568,6 +568,17 @@ Example of complete authorization file ] } +Karapace Schema Registry access to the schemas topic +==================================================== + +The principal used by the Karapace Schema Registry has to have adequate access to the schemas topic (see the ``topic_name`` configuration option above). +In addition to what is required to access the topic, as described in the Confluent Schema Registry documentation_, the unique, single-member consumer group +used by consumers in the schema registry needs ``Describe`` and ``Read`` permissions_ on the group. +These unique (per instance of the schema registry) consumer group names are prefixed by ``karapace-autogenerated-``, followed by a random string. + +.. _`documentation`: https://docs.confluent.io/platform/current/schema-registry/security/index.html#authorizing-access-to-the-schemas-topic +.. _`permissions`: https://docs.confluent.io/platform/current/kafka/authorization.html#group-resource-type-operations + OAuth2 authentication and authorization of Karapace REST proxy ===================================================================