diff --git a/README.rst b/README.rst index 7e7bc178f..1207f691f 100644 --- a/README.rst +++ b/README.rst @@ -568,6 +568,17 @@ Example of complete authorization file ] } +Karapace Schema Registry access to the schemas topic +==================================================== + +The principal used by the Karapace Schema Registry has to have adequate access to the schemas topic (see the ``topic_name`` configuration option above). +In addition to what is required to access the topic, as described in the Confluent Schema Registry documentation_, the unique, single-member consumer group +used by consumers in the schema registry needs ``Describe`` and ``Read`` permissions_ on the group. +These unique (per instance of the schema registry) consumer group names are prefixed by ``karapace-autogenerated-``, followed by a random string. + +.. _`documentation`: https://docs.confluent.io/platform/current/schema-registry/security/index.html#authorizing-access-to-the-schemas-topic +.. _`permissions`: https://docs.confluent.io/platform/current/kafka/authorization.html#group-resource-type-operations + OAuth2 authentication and authorization of Karapace REST proxy =================================================================== diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index f3d7dd5b9..750992686 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -12,12 +12,14 @@ from enum import Enum from jsonschema.validators import Draft7Validator from kafka.errors import ( + GroupAuthorizationFailedError, InvalidReplicationFactorError, KafkaConfigurationError, KafkaTimeoutError, NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError, + TopicAuthorizationFailedError, UnknownTopicOrPartitionError, ) from karapace import constants @@ -26,6 +28,7 @@ from karapace.errors import InvalidReferences, InvalidSchema from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.common import translate_from_kafkaerror from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator @@ -316,13 +319,23 @@ def handle_messages(self) -> None: for msg in msgs: try: message_key = msg.key() - if message_key is None: - LOG.warning("Empty message key when consuming from topic %s, error: %s", msg.topic(), msg.error()) - continue + message_error = msg.error() + if message_error is not None: + raise translate_from_kafkaerror(message_error) + + assert message_key is not None key = json_decode(message_key) except JSONDecodeError: LOG.exception("Invalid JSON in msg.key() at offset %s", msg.offset()) continue + except (GroupAuthorizationFailedError, TopicAuthorizationFailedError) as exc: + LOG.error( + "Kafka authorization error when consuming from %s: %s %s", + self.config["topic_name"], + exc, + msg.error(), + ) + continue assert isinstance(key, dict) msg_keymode = KeyMode.CANONICAL if is_key_in_canonical_format(key) else KeyMode.DEPRECATED_KARAPACE