Skip to content

Commit

Permalink
Merge pull request #832 from Aiven-Open/matyaskuti/schema_reader_errors
Browse files Browse the repository at this point in the history
Handle schema reader errors
  • Loading branch information
eliax1996 authored Mar 4, 2024
2 parents b80461f + e45b9f9 commit 9d889bc
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
11 changes: 11 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,17 @@ Example of complete authorization file
]
}

Karapace Schema Registry access to the schemas topic
====================================================

The principal used by the Karapace Schema Registry has to have adequate access to the schemas topic (see the ``topic_name`` configuration option above).
In addition to what is required to access the topic, as described in the Confluent Schema Registry documentation_, the unique, single-member consumer group
used by consumers in the schema registry needs ``Describe`` and ``Read`` permissions_ on the group.
These unique (per instance of the schema registry) consumer group names are prefixed by ``karapace-autogenerated-``, followed by a random string.

.. _`documentation`: https://docs.confluent.io/platform/current/schema-registry/security/index.html#authorizing-access-to-the-schemas-topic
.. _`permissions`: https://docs.confluent.io/platform/current/kafka/authorization.html#group-resource-type-operations

OAuth2 authentication and authorization of Karapace REST proxy
===================================================================

Expand Down
19 changes: 16 additions & 3 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
from enum import Enum
from jsonschema.validators import Draft7Validator
from kafka.errors import (
GroupAuthorizationFailedError,
InvalidReplicationFactorError,
KafkaConfigurationError,
KafkaTimeoutError,
NoBrokersAvailable,
NodeNotReadyError,
TopicAlreadyExistsError,
TopicAuthorizationFailedError,
UnknownTopicOrPartitionError,
)
from karapace import constants
Expand All @@ -26,6 +28,7 @@
from karapace.errors import InvalidReferences, InvalidSchema
from karapace.in_memory_database import InMemoryDatabase
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.common import translate_from_kafkaerror
from karapace.kafka.consumer import KafkaConsumer
from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode
from karapace.master_coordinator import MasterCoordinator
Expand Down Expand Up @@ -316,13 +319,23 @@ def handle_messages(self) -> None:
for msg in msgs:
try:
message_key = msg.key()
if message_key is None:
LOG.warning("Empty message key when consuming from topic %s, error: %s", msg.topic(), msg.error())
continue
message_error = msg.error()
if message_error is not None:
raise translate_from_kafkaerror(message_error)

assert message_key is not None
key = json_decode(message_key)
except JSONDecodeError:
LOG.exception("Invalid JSON in msg.key() at offset %s", msg.offset())
continue
except (GroupAuthorizationFailedError, TopicAuthorizationFailedError) as exc:
LOG.error(
"Kafka authorization error when consuming from %s: %s %s",
self.config["topic_name"],
exc,
msg.error(),
)
continue

assert isinstance(key, dict)
msg_keymode = KeyMode.CANONICAL if is_key_in_canonical_format(key) else KeyMode.DEPRECATED_KARAPACE
Expand Down

0 comments on commit 9d889bc

Please sign in to comment.