diff --git a/karapace/backup/api.py b/karapace/backup/api.py index 7a691a574..b117127f8 100644 --- a/karapace/backup/api.py +++ b/karapace/backup/api.py @@ -24,7 +24,6 @@ from enum import Enum from functools import partial from kafka import KafkaConsumer, KafkaProducer -from kafka.admin import KafkaAdminClient, NewTopic from kafka.consumer.fetcher import ConsumerRecord from kafka.errors import KafkaError, TopicAlreadyExistsError from kafka.structs import PartitionMetadata, TopicPartition @@ -33,6 +32,7 @@ from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER from karapace.backup.backends.v3.backend import SchemaBackupV3Reader, SchemaBackupV3Writer, VerifyFailure, VerifySuccess from karapace.config import Config +from karapace.kafka_admin import KafkaAdminClient from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config from karapace.key_format import KeyFormatter from karapace.utils import assert_never @@ -186,7 +186,7 @@ def __check_partition_count(topic: str, supplier: Callable[[str], AbstractSet[Pa @contextlib.contextmanager -def _admin(config: Config) -> KafkaAdminClient: +def _admin(config: Config) -> Iterator[KafkaAdminClient]: """Creates an automatically closing Kafka admin client. :param config: for the client. @@ -201,10 +201,7 @@ def _admin(config: Config) -> KafkaAdminClient: retry=retry_if_exception_type(KafkaError), )(kafka_admin_from_config)(config) - try: - yield admin - finally: - admin.close() + yield admin @retry( @@ -222,26 +219,24 @@ def _maybe_create_topic( topic_configs: Mapping[str, str], ) -> bool: """Returns True if topic creation was successful, False if topic already exists""" - topic = NewTopic( - name=name, - num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS, - replication_factor=replication_factor, - topic_configs=topic_configs, - ) - with _admin(config) as admin: try: - admin.create_topics([topic], timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS) + admin.new_topic( + name, + num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS, + replication_factor=replication_factor, + config=dict(topic_configs), + ) except TopicAlreadyExistsError: - LOG.debug("Topic %r already exists", topic.name) + LOG.debug("Topic %r already exists", name) return False LOG.info( "Created topic %r (partition count: %s, replication factor: %s, config: %s)", - topic.name, - topic.num_partitions, - topic.replication_factor, - topic.topic_configs, + name, + constants.SCHEMA_TOPIC_NUM_PARTITIONS, + replication_factor, + topic_configs, ) return True @@ -520,7 +515,7 @@ def create_backup( topic_configurations = get_topic_configurations( admin=admin, topic_name=topic_name, - config_source_filter={ConfigSource.TOPIC_CONFIG}, + config_source_filter={ConfigSource.DYNAMIC_TOPIC_CONFIG}, ) # Note: It's expected that we at some point want to introduce handling of diff --git a/karapace/backup/topic_configurations.py b/karapace/backup/topic_configurations.py index 43f507155..4a65f1bf7 100644 --- a/karapace/backup/topic_configurations.py +++ b/karapace/backup/topic_configurations.py @@ -2,25 +2,12 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from enum import Enum -from kafka import KafkaAdminClient -from kafka.admin import ConfigResource, ConfigResourceType -from kafka.errors import for_code -from kafka.protocol.admin import DescribeConfigsRequest -from typing import Container, Dict, Final +from __future__ import annotations +from karapace.kafka_admin import ConfigSource, KafkaAdminClient +from typing import Container, Final -class ConfigSource(int, Enum): - UNKNOWN = 0 - TOPIC_CONFIG = 1 - DYNAMIC_BROKER_CONFIG = 2 - DYNAMIC_DEFAULT_BROKER_CONFIG = 3 - STATIC_BROKER_CONFIG = 4 - DEFAULT_CONFIG = 5 - DYNAMIC_BROKER_LOGGER_CONFIG = 6 - - -ALL_CONFIG_SOURCES: Final = {item.value for item in ConfigSource.__members__.values()} +ALL_CONFIG_SOURCES: Final = ConfigSource.__members__.values() DEFAULT_CONFIGS: Final = [ @@ -35,7 +22,7 @@ def get_topic_configurations( admin: KafkaAdminClient, topic_name: str, config_source_filter: Container[ConfigSource] = (), -) -> Dict[str, str]: +) -> dict[str, str]: """Get configurations of the specified topic. The following configurations will be retrieved by default: - `cleanup.policy` - `min.insync.replicas` @@ -47,18 +34,4 @@ def get_topic_configurations( :param config_source_filter: returns all the configurations that match the sources specified, plus the default configurations. If empty, returns only the default configurations. """ - if admin._matching_api_version(DescribeConfigsRequest) == 0: # pylint: disable=protected-access - raise NotImplementedError("Broker version is not supported") - req_cfgs = [ConfigResource(ConfigResourceType.TOPIC, topic_name)] - cfgs = admin.describe_configs(req_cfgs) - assert len(cfgs) == 1 - assert len(cfgs[0].resources) == 1 - err, _, _, _, config_values = cfgs[0].resources[0] - if err != 0: - raise for_code(err) - topic_config = {} - for cv in config_values: - name, val, _, config_source, _, _ = cv - if name in DEFAULT_CONFIGS or (config_source in config_source_filter): - topic_config[name] = val - return topic_config + return admin.get_topic_config(topic_name, DEFAULT_CONFIGS, config_source_filter=config_source_filter) diff --git a/karapace/constants.py b/karapace/constants.py index 8b108478f..c2214dc77 100644 --- a/karapace/constants.py +++ b/karapace/constants.py @@ -8,7 +8,7 @@ SCHEMA_TOPIC_NUM_PARTITIONS: Final = 1 API_VERSION_AUTO_TIMEOUT_MS: Final = 30000 -TOPIC_CREATION_TIMEOUT_MS: Final = 20000 +TOPIC_CREATION_TIMEOUT_S: Final = 20 DEFAULT_SCHEMA_TOPIC: Final = "_schemas" DEFAULT_PRODUCER_MAX_REQUEST: Final = 1048576 DEFAULT_AIOHTTP_CLIENT_MAX_SIZE: Final = 1048576 diff --git a/karapace/kafka_admin.py b/karapace/kafka_admin.py new file mode 100644 index 000000000..daef518ca --- /dev/null +++ b/karapace/kafka_admin.py @@ -0,0 +1,276 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + +from __future__ import annotations + +from collections.abc import Iterable +from concurrent.futures import Future +from confluent_kafka import TopicPartition +from confluent_kafka.admin import ( + AdminClient, + BrokerMetadata, + ClusterMetadata, + ConfigResource, + ConfigSource, + NewTopic, + OffsetSpec, + ResourceType, + TopicMetadata, +) +from confluent_kafka.error import KafkaException +from kafka.errors import AuthenticationFailedError, for_code, UnknownTopicOrPartitionError +from karapace.constants import TOPIC_CREATION_TIMEOUT_S +from typing import Any, Callable, Container, NoReturn, Protocol, TypedDict, TypeVar +from typing_extensions import Unpack + +import logging + +LOG = logging.getLogger(__name__) + + +T = TypeVar("T") + + +def single_futmap_result(futmap: dict[Any, Future[T]]) -> T: + """Extract the result of a future wrapped in a dict. + + Bulk operations of the `confluent_kafka` library's Kafka clients return results + wrapped in a dictionary of futures. Most often we use these bulk operations to + operate on a single resource/entity. This function makes sure the dictionary of + futures contains a single future and returns its result. + """ + assert len(futmap) == 1 + + return next(iter(futmap.values())).result() + + +def raise_from_kafkaexception(exc: KafkaException) -> NoReturn: + """Raises a more developer-friendly error from a `KafkaException`. + + The confluent-kafka library's `KafkaException` is a wrapper around its internal + `KafkaError`. The resulting, raised exception however is coming from + `kafka-python`, due to these exceptions having human-readable names, providing + better context for error handling. + + `kafka.errors.for_code` is used to translate the original exception's error code + to a domain specific error class from `kafka-python`. + """ + raise for_code(exc.args[0].code()) from exc + + +# For now this is a bit of a trick to replace an explicit usage of +# `karapace.kafka_rest_apis.authentication.SimpleOauthTokenProvider` +# to avoid circular imports +class TokenWithExpiryProvider(Protocol): + def token_with_expiry(self, config: str | None) -> tuple[str, int | None]: + ... + + +class AdminClientParams(TypedDict, total=False): + client_id: str | None + connections_max_idle_ms: int | None + metadata_max_age_ms: int | None + sasl_mechanism: str | None + sasl_plain_password: str | None + sasl_plain_username: str | None + security_protocol: str | None + ssl_cafile: str | None + ssl_certfile: str | None + ssl_keyfile: str | None + sasl_oauth_token_provider: TokenWithExpiryProvider + + +class KafkaAdminClient(AdminClient): + def __init__(self, bootstrap_servers: Iterable[str] | str, **params: Unpack[AdminClientParams]) -> None: + super().__init__(self._get_config_from_params(bootstrap_servers, **params)) + self._activate_callbacks() + self._verify_connection() + + @staticmethod + def _get_config_from_params(bootstrap_servers: Iterable[str] | str, **params: Unpack[AdminClientParams]) -> dict: + if not isinstance(bootstrap_servers, str): + bootstrap_servers = ",".join(bootstrap_servers) + + config: dict[str, int | str | Callable | None] = { + "bootstrap.servers": bootstrap_servers, + "client.id": params.get("client_id"), + "connections.max.idle.ms": params.get("connections_max_idle_ms"), + "metadata.max.age.ms": params.get("metadata_max_age_ms"), + "sasl.mechanism": params.get("sasl_mechanism"), + "sasl.password": params.get("sasl_plain_password"), + "sasl.username": params.get("sasl_plain_username"), + "security.protocol": params.get("security_protocol"), + "ssl.ca.location": params.get("ssl_cafile"), + "ssl.certificate.location": params.get("ssl_certfile"), + "ssl.key.location": params.get("ssl_keyfile"), + } + config = {key: value for key, value in config.items() if value is not None} + + if "sasl_oauth_token_provider" in params: + config["oauth_cb"] = params["sasl_oauth_token_provider"].token_with_expiry + + return config + + def _activate_callbacks(self) -> None: + # Any client in the Confluent Kafka Python library needs poll called to + # trigger any callbacks registered (eg. for errors, OAuth tokens, etc.) + self.poll(timeout=0.0) + + def _verify_connection(self) -> None: + """Attempts to call `AdminClient.list_topics` a few times. + + The `list_topics` method is the only meaningful synchronous method of + the `AdminClient` class that can be used to verify that a connection and + authentication has been established with a Kafka cluster. + + Just instantiating and initializing the admin client doesn't result in + anything in its main thread in case of errors, only error logs from another + thread otherwise. + """ + for _ in range(3): + try: + self.list_topics(timeout=1) + except KafkaException as exc: + LOG.info("Could not establish connection due to: %s", exc) + continue + else: + break + else: + raise AuthenticationFailedError() + + def new_topic( + self, + name: str, + *, + num_partitions: int = 1, + replication_factor: int = 1, + config: dict[str, str] | None = None, + request_timeout: int = TOPIC_CREATION_TIMEOUT_S, + ) -> NewTopic: + try: + new_topic = NewTopic( + topic=name, + num_partitions=num_partitions, + replication_factor=replication_factor, + config=config if config is not None else {}, + ) + futmap: dict[str, Future] = self.create_topics([new_topic], request_timeout=request_timeout) + single_futmap_result(futmap) + return new_topic + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def update_topic_config(self, topic_name: str, config: dict[str, str]) -> None: + futmap = self.alter_configs([ConfigResource(ResourceType.TOPIC, topic_name, set_config=config)]) + try: + single_futmap_result(futmap) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def delete_topic(self, name: str) -> None: + futmap = self.delete_topics([name]) + try: + single_futmap_result(futmap) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def cluster_metadata(self, topics: Iterable[str] | None = None) -> dict: + """Fetch cluster metadata and topic information for given topics or all topics if not given. + + Using the `list_topics` method of the `AdminClient`, as this actually provides + metadata for the entire cluster, not just topics, as suggested by the name. + + The topics filter is only applied _after_ fetching the cluster metadata, + due to `list_topics` only accepting a single topic as a filter. + """ + cluster_metadata: ClusterMetadata = self.list_topics() + topics_metadata: dict[str, TopicMetadata] = cluster_metadata.topics + brokers_metadata: dict[str, BrokerMetadata] = cluster_metadata.brokers + + if topics is not None and any(topic not in topics_metadata.keys() for topic in topics): + raise UnknownTopicOrPartitionError() + + topics_data: dict[str, dict] = {} + for topic, topic_metadata in topics_metadata.items(): + if topics is not None and topic not in topics: + continue + + partitions_data = [] + for partition_id, partition_metadata in topic_metadata.partitions.items(): + partition_data = { + "partition": partition_id, + "leader": partition_metadata.leader, + "replicas": [ + { + "broker": replica_id, + "leader": replica_id == partition_metadata.leader, + "in_sync": replica_id in partition_metadata.isrs, + } + for replica_id in partition_metadata.replicas + ], + } + partitions_data.append(partition_data) + + topics_data[topic] = {"partitions": partitions_data} + + return {"topics": topics_data, "brokers": list(brokers_metadata.keys())} + + def get_topic_config( + self, + topic: str, + config_name_filter: Container[str] | None = None, + config_source_filter: Container[ConfigSource] | None = None, + ) -> dict[str, str]: + """Fetches, filters and returns topic configuration. + + The two filters, `config_name_filter` and `config_source_filter` work together + so if a config entry matches either of them, it'll be returned. + If a filter is not provided (ie. is `None`), it'll act as if matching all + config entries. + """ + futmap: dict[ConfigResource, Future] = self.describe_configs([ConfigResource(ResourceType.TOPIC, topic)]) + try: + topic_configs = single_futmap_result(futmap) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + config: dict[str, str] = {} + for config_name, config_entry in topic_configs.items(): + matches_name_filter: bool = config_name_filter is None or config_name in config_name_filter + matches_source_filter: bool = ( + config_source_filter is None or ConfigSource(config_entry.source) in config_source_filter + ) + + if matches_name_filter or matches_source_filter: + config[config_name] = config_entry.value + + return config + + def get_offsets(self, topic: str, partition_id: int) -> dict[str, int]: + """Returns the beginning and end offsets for a topic partition. + + Making two separate requests for beginning and end offsets, due to the + `AdminClient.list_offsets` behaviour. + """ + try: + latest_offset_futmap: dict[TopicPartition, Future] = self.list_offsets( + { + TopicPartition(topic, partition_id): OffsetSpec.latest(), + } + ) + endoffset = single_futmap_result(latest_offset_futmap) + + earliest_offset_futmap: dict[TopicPartition, Future] = self.list_offsets( + { + TopicPartition(topic, partition_id): OffsetSpec.earliest(), + } + ) + startoffset = single_futmap_result(earliest_offset_futmap) + except KafkaException as exc: + code = exc.args[0].code() + if code < 0: + raise UnknownTopicOrPartitionError() from exc + raise_from_kafkaexception(exc) + return {"beginning_offset": startoffset.offset, "end_offset": endoffset.offset} diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index c63194e52..fd3b64264 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -2,20 +2,18 @@ from aiokafka.errors import KafkaConnectionError from binascii import Error as B64DecodeError from collections import namedtuple -from contextlib import AsyncExitStack, closing +from contextlib import AsyncExitStack from http import HTTPStatus from kafka.errors import ( AuthenticationFailedError, BrokerResponseError, KafkaTimeoutError, - NoBrokersAvailable, - NodeNotReadyError, TopicAuthorizationFailedError, UnknownTopicOrPartitionError, ) from karapace.config import Config, create_client_ssl_context from karapace.errors import InvalidSchema -from karapace.kafka_rest_apis.admin import KafkaRestAdminClient +from karapace.kafka_admin import KafkaAdminClient, KafkaException from karapace.kafka_rest_apis.authentication import ( get_auth_config_from_header, get_expiration_time_from_header, @@ -300,10 +298,7 @@ async def get_user_proxy(self, request: HTTPRequest) -> "UserRestProxy": else: if self.proxies.get(key) is None: self.proxies[key] = UserRestProxy(self.config, self.kafka_timeout, self.serializer) - except (NoBrokersAvailable, AuthenticationFailedError): - # NoBrokersAvailable can be caused also due to misconfigration, but kafka-python's - # KafkaAdminClient cannot currently distinguish those two cases. - # A more expressive AuthenticationFailedError is raised in case of OAuth2 + except AuthenticationFailedError: log.exception("Failed to connect to Kafka with the credentials") self.r(body={"message": "Forbidden"}, content_type=JSON_CONTENT_TYPE, status=HTTPStatus.FORBIDDEN) proxy = self.proxies[key] @@ -620,7 +615,7 @@ async def cluster_metadata(self, topics: Optional[List[str]] = None) -> dict: self._metadata_birth = metadata_birth self._cluster_metadata = metadata self._cluster_metadata_complete = topics is None - except NodeNotReadyError: + except KafkaException: log.exception("Could not refresh cluster metadata") KafkaRest.r( body={ @@ -635,7 +630,7 @@ async def cluster_metadata(self, topics: Optional[List[str]] = None) -> dict: def init_admin_client(self): for retry in [True, True, False]: try: - self.admin_client = KafkaRestAdminClient( + self.admin_client = KafkaAdminClient( bootstrap_servers=self.config["bootstrap_uri"], security_protocol=self.config["security_protocol"], ssl_cafile=self.config["ssl_cafile"], @@ -662,9 +657,6 @@ async def aclose(self) -> None: log.info("Disposing async producer") stack.push_async_callback(self._async_producer.stop) - if self.admin_client is not None: - stack.enter_context(closing(self.admin_client)) - if self.consumer_manager is not None: stack.push_async_callback(self.consumer_manager.aclose) diff --git a/karapace/kafka_rest_apis/admin.py b/karapace/kafka_rest_apis/admin.py deleted file mode 100644 index eb49f5e8a..000000000 --- a/karapace/kafka_rest_apis/admin.py +++ /dev/null @@ -1,158 +0,0 @@ -""" -Copyright (c) 2023 Aiven Ltd -See LICENSE for details -""" - -from __future__ import annotations - -from kafka import KafkaAdminClient -from kafka.admin import ConfigResource, ConfigResourceType, NewTopic -from kafka.errors import Cancelled, for_code, UnrecognizedBrokerVersion -from kafka.future import Future -from kafka.protocol.admin import DescribeConfigsRequest -from kafka.protocol.metadata import MetadataRequest, MetadataResponse_v1 -from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy - -import logging - -LOG = logging.getLogger(__name__) - - -class KafkaRestAdminClient(KafkaAdminClient): - def get_topic_config(self, topic: str) -> dict: - config_version = self._matching_api_version(DescribeConfigsRequest) - req_cfgs = [ConfigResource(ConfigResourceType.TOPIC, topic)] - cfgs = self.describe_configs(req_cfgs) - assert len(cfgs) == 1 - assert len(cfgs[0].resources) == 1 - err, _, _, _, config_values = cfgs[0].resources[0] - if err != 0: - raise for_code(err) - topic_config = {} - for cv in config_values: - if config_version == 0: - name, val, _, _, _ = cv - else: - name, val, _, _, _, _ = cv - topic_config[name] = val - return topic_config - - def new_topic(self, name: str, *, num_partitions: int = 1, replication_factor: int = 1) -> None: - self.create_topics([NewTopic(name, num_partitions, replication_factor)]) - - def cluster_metadata(self, topics: list[str] | None = None, retries: int = 0) -> dict: - """Fetch cluster metadata and topic information for given topics or all topics if not given.""" - metadata_version = self._matching_api_version(MetadataRequest) - if metadata_version > 6 or metadata_version < 1: - raise UnrecognizedBrokerVersion( - f"Kafka Admin interface cannot determine the controller using MetadataRequest_v{metadata_version}." - ) - request = MetadataRequest[1](topics=topics) - future = self._send_request_to_least_loaded_node(request) - try: - self._wait_for_futures([future]) - except Cancelled: - if retries > 3: - raise - LOG.debug("Retrying metadata with %d retires", retries) - return self.cluster_metadata(topics, retries + 1) - return self._make_metadata_response(future.value) - - @staticmethod - def _make_metadata_response(metadata: MetadataResponse_v1) -> dict: - resp_brokers = metadata.brokers - brokers = set() - for b in resp_brokers: - node_id, _, _, _ = b - brokers.add(node_id) - if not metadata.topics: - return {"topics": {}, "brokers": list(brokers)} - - topics: dict[str, dict] = {} - for tup in metadata.topics: - err, topic, _, partitions = tup - if err: - raise for_code(err) - topic_data = [] - for part in partitions: - _, partition_index, leader_id, replica_nodes, isr_nodes = part - isr_nodes = set(isr_nodes) - topic_response = {"partition": partition_index, "leader": leader_id, "replicas": []} - for node in replica_nodes: - topic_response["replicas"].append( - {"broker": node, "leader": node == leader_id, "in_sync": node in isr_nodes} - ) - topic_data.append(topic_response) - topics[topic] = {"partitions": topic_data} - return {"topics": topics, "brokers": list(brokers)} - - def make_offsets_request(self, topic: str, partition_id: int, timestamp: int) -> Future: - v = self._matching_api_version(OffsetRequest) - replica_id = -1 - if v == 0: - max_offsets = 1 - partitions_v0 = [(partition_id, timestamp, max_offsets)] - topics_v0 = [(topic, partitions_v0)] - request = OffsetRequest[0](replica_id, topics_v0) - elif v == 1: - partitions_v1 = [(partition_id, timestamp)] - topics_v1 = [(topic, partitions_v1)] - request = OffsetRequest[1](replica_id, topics_v1) - else: - isolation_level = 1 - partitions = [(partition_id, timestamp)] - topics = [(topic, partitions)] - request = OffsetRequest[2](replica_id, isolation_level, topics) - - future = self.send_request_to_leader_node(request, topic, partition_id) - return future - - def send_request_to_leader_node(self, request: OffsetRequest, topic_name: str, partition_id: int) -> Future: - cluster_meta_resp = self.cluster_metadata(topics=[topic_name]) - partition = [p for p in cluster_meta_resp["topics"][topic_name]["partitions"] if p["partition"] == partition_id] - - # handle case where partition_id is not part metadata - if partition == []: - return self._send_request_to_least_loaded_node(request) - - leader_id = partition[0]["leader"] - return self._send_request_to_node(leader_id, request) - - def get_offsets(self, topic: str, partition_id: int) -> dict: - beginning_f = self.make_offsets_request(topic, partition_id, OffsetResetStrategy.EARLIEST) - end_f = self.make_offsets_request(topic, partition_id, OffsetResetStrategy.LATEST) - self._wait_for_futures([beginning_f, end_f]) - beginning_resp = beginning_f.value - end_resp = end_f.value - v = self._matching_api_version(OffsetRequest) - assert len(beginning_resp.topics) == 1 - assert len(end_resp.topics) == 1 - _, beginning_partitions = beginning_resp.topics[0] - _, end_partitions = end_resp.topics[0] - - assert len(beginning_partitions) == 1 - assert len(end_partitions) == 1 - if v == 0: - assert len(beginning_partitions[0][2]) == 1 - assert partition_id == beginning_partitions[0][0] - assert partition_id == end_partitions[0][0] - start_err = beginning_partitions[0][1] - end_err = beginning_partitions[0][1] - for e in [start_err, end_err]: - if e != 0: - raise for_code(e) - rv = { - "beginning_offset": beginning_partitions[0][2][0], - "end_offset": end_partitions[0][2][0], - } - else: - start_err = beginning_partitions[0][1] - end_err = beginning_partitions[0][1] - for e in [start_err, end_err]: - if e != 0: - raise for_code(e) - rv = { - "beginning_offset": beginning_partitions[0][3], - "end_offset": end_partitions[0][3], - } - return rv diff --git a/karapace/kafka_utils.py b/karapace/kafka_utils.py index 43b7a7ec4..6b79bbf36 100644 --- a/karapace/kafka_utils.py +++ b/karapace/kafka_utils.py @@ -2,10 +2,10 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from . import constants from .config import Config from .utils import KarapaceKafkaClient -from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer +from kafka import KafkaConsumer, KafkaProducer +from karapace.kafka_admin import KafkaAdminClient from typing import Iterator import contextlib @@ -13,7 +13,6 @@ def kafka_admin_from_config(config: Config) -> KafkaAdminClient: return KafkaAdminClient( - api_version_auto_timeout_ms=constants.API_VERSION_AUTO_TIMEOUT_MS, bootstrap_servers=config["bootstrap_uri"], client_id=config["client_id"], security_protocol=config["security_protocol"], @@ -23,7 +22,6 @@ def kafka_admin_from_config(config: Config) -> KafkaAdminClient: ssl_cafile=config["ssl_cafile"], ssl_certfile=config["ssl_certfile"], ssl_keyfile=config["ssl_keyfile"], - kafka_client=KarapaceKafkaClient, ) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 3dec4a887..0d8a65165 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -10,7 +10,6 @@ from contextlib import closing, ExitStack from jsonschema.validators import Draft7Validator from kafka import KafkaConsumer, TopicPartition -from kafka.admin import KafkaAdminClient, NewTopic from kafka.errors import ( InvalidReplicationFactorError, KafkaConfigurationError, @@ -24,6 +23,7 @@ from karapace.dependency import Dependency from karapace.errors import InvalidReferences, InvalidSchema from karapace.in_memory_database import InMemoryDatabase +from karapace.kafka_admin import KafkaAdminClient from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator from karapace.offset_watcher import OffsetWatcher @@ -86,7 +86,6 @@ def _create_consumer_from_config(config: Config) -> KafkaConsumer: def _create_admin_client_from_config(config: Config) -> KafkaAdminClient: return KafkaAdminClient( - api_version_auto_timeout_ms=constants.API_VERSION_AUTO_TIMEOUT_MS, bootstrap_servers=config["bootstrap_uri"], client_id=config["client_id"], security_protocol=config["security_protocol"], @@ -99,15 +98,6 @@ def _create_admin_client_from_config(config: Config) -> KafkaAdminClient: ) -def new_schema_topic_from_config(config: Config) -> NewTopic: - return NewTopic( - name=config["topic_name"], - num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS, - replication_factor=config["replication_factor"], - topic_configs={"cleanup.policy": "compact"}, - ) - - class KafkaSchemaReader(Thread): def __init__( self, @@ -166,7 +156,6 @@ def run(self) -> None: while not self._stop.is_set() and self.admin_client is None: try: self.admin_client = _create_admin_client_from_config(self.config) - stack.enter_context(closing(self.admin_client)) except (NodeNotReadyError, NoBrokersAvailable, AssertionError): LOG.warning("[Admin Client] No Brokers available yet. Retrying") self._stop.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS) @@ -198,25 +187,28 @@ def run(self) -> None: assert self.consumer is not None schema_topic_exists = False - schema_topic = new_schema_topic_from_config(self.config) - schema_topic_create = [schema_topic] while not self._stop.is_set() and not schema_topic_exists: try: - LOG.info("[Schema Topic] Creating %r", schema_topic.name) - self.admin_client.create_topics(schema_topic_create, timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS) - LOG.info("[Schema Topic] Successfully created %r", schema_topic.name) + LOG.info("[Schema Topic] Creating %r", self.config["topic_name"]) + topic = self.admin_client.new_topic( + name=self.config["topic_name"], + num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS, + replication_factor=self.config["replication_factor"], + config={"cleanup.policy": "compact"}, + ) + LOG.info("[Schema Topic] Successfully created %r", topic.topic) schema_topic_exists = True except TopicAlreadyExistsError: - LOG.warning("[Schema Topic] Already exists %r", schema_topic.name) + LOG.warning("[Schema Topic] Already exists %r", self.config["topic_name"]) schema_topic_exists = True except InvalidReplicationFactorError: LOG.info( "[Schema Topic] Failed to create topic %r, not enough Kafka brokers ready yet, retrying", - schema_topic.name, + topic.topic, ) self._stop.wait(timeout=SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS) except: # pylint: disable=bare-except - LOG.exception("[Schema Topic] Failed to create %r, retrying", schema_topic.name) + LOG.exception("[Schema Topic] Failed to create %r, retrying", topic.topic) self._stop.wait(timeout=SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS) while not self._stop.is_set(): diff --git a/mypy.ini b/mypy.ini index e06dfb201..69e95f788 100644 --- a/mypy.ini +++ b/mypy.ini @@ -14,6 +14,7 @@ warn_unused_ignores = True warn_no_return = True warn_unreachable = True strict_equality = True +enable_incomplete_feature = Unpack [mypy-karapace.schema_registry_apis] ignore_errors = True @@ -89,6 +90,9 @@ ignore_errors = True [mypy-aiokafka.*] ignore_missing_imports = True +[mypy-confluent_kafka.*] +ignore_missing_imports = True + [mypy-kafka.*] ignore_missing_imports = True diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 5ca42f308..7e9261048 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -52,6 +52,8 @@ click==8.1.3 # via flask configargparse==1.5.3 # via locust +confluent-kafka==2.3.0 + # via -r requirements.txt exceptiongroup==1.1.3 # via # -r requirements.txt diff --git a/requirements/requirements.in b/requirements/requirements.in index 3c2613404..751572395 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -2,6 +2,7 @@ accept-types<1 aiohttp<4 aiokafka<1 +confluent-kafka==2.3.0 isodate<1 jsonschema<5 networkx<4 diff --git a/requirements/requirements.txt b/requirements/requirements.txt index f7f31a844..2811dcb33 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -29,6 +29,8 @@ cachetools==5.3.1 # via -r requirements.in charset-normalizer==3.1.0 # via aiohttp +confluent-kafka==2.3.0 + # via -r requirements.in exceptiongroup==1.1.3 # via anyio frozenlist==1.3.3 diff --git a/tests/integration/backup/test_get_topic_configurations.py b/tests/integration/backup/test_get_topic_configurations.py index 49ef6251f..884033cc2 100644 --- a/tests/integration/backup/test_get_topic_configurations.py +++ b/tests/integration/backup/test_get_topic_configurations.py @@ -2,24 +2,12 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from kafka import KafkaAdminClient -from kafka.admin import ConfigResource, ConfigResourceType, NewTopic +from __future__ import annotations + from karapace.backup.topic_configurations import ALL_CONFIG_SOURCES, ConfigSource, DEFAULT_CONFIGS, get_topic_configurations -from karapace.constants import TOPIC_CREATION_TIMEOUT_MS -from typing import Dict +from karapace.kafka_admin import KafkaAdminClient, NewTopic import pytest -import secrets - - -@pytest.fixture(scope="function", name="new_topic") -def topic_fixture(admin_client: KafkaAdminClient) -> NewTopic: - new_topic = NewTopic(secrets.token_hex(4), 1, 1) - admin_client.create_topics([new_topic], timeout_ms=TOPIC_CREATION_TIMEOUT_MS) - try: - yield new_topic - finally: - admin_client.delete_topics([new_topic.name], timeout_ms=TOPIC_CREATION_TIMEOUT_MS) class TestTopicConfiguration: @@ -28,12 +16,12 @@ def test_get_custom_topic_configurations( self, new_topic: NewTopic, admin_client: KafkaAdminClient, - custom_topic_configs: Dict[str, str], + custom_topic_configs: dict[str, str], ) -> None: - admin_client.alter_configs([ConfigResource(ConfigResourceType.TOPIC, new_topic.name, configs=custom_topic_configs)]) + admin_client.update_topic_config(new_topic.topic, custom_topic_configs) retrieved_configs = get_topic_configurations( - admin_client, new_topic.name, config_source_filter={ConfigSource.TOPIC_CONFIG} + admin_client, new_topic.topic, config_source_filter={ConfigSource.DYNAMIC_TOPIC_CONFIG} ) # Verify that default configs are retrieved, and then remove them @@ -49,10 +37,9 @@ def test_get_only_default_topic_configurations( new_topic: NewTopic, admin_client: KafkaAdminClient, ) -> None: - custom_topic_configs = {"segment.bytes": "7890"} - admin_client.alter_configs([ConfigResource(ConfigResourceType.TOPIC, new_topic.name, configs=custom_topic_configs)]) + admin_client.update_topic_config(new_topic.topic, {"segment.bytes": "7890"}) - retrieved_configs = get_topic_configurations(admin_client, new_topic.name, config_source_filter=()) + retrieved_configs = get_topic_configurations(admin_client, new_topic.topic, config_source_filter=()) # Verify that default configs are retrieved, and then remove them for default_config in DEFAULT_CONFIGS: @@ -67,10 +54,9 @@ def test_get_all_topic_configurations( new_topic: NewTopic, admin_client: KafkaAdminClient, ) -> None: - custom_topic_configs = {"flush.ms": "999"} - admin_client.alter_configs([ConfigResource(ConfigResourceType.TOPIC, new_topic.name, configs=custom_topic_configs)]) + admin_client.update_topic_config(new_topic.topic, {"flush.ms": "999"}) - retrieved_configs = get_topic_configurations(admin_client, new_topic.name, config_source_filter=ALL_CONFIG_SOURCES) + retrieved_configs = get_topic_configurations(admin_client, new_topic.topic, config_source_filter=ALL_CONFIG_SOURCES) # Verify that default configs are retrieved, and then remove them for default_config in DEFAULT_CONFIGS: @@ -78,7 +64,7 @@ def test_get_all_topic_configurations( del retrieved_configs[default_config] # Verify that all custom topic configs are correctly retrieved, and then remove them - for custom_config_key, custom_config_value in custom_topic_configs.items(): + for custom_config_key, custom_config_value in ({"flush.ms": "999"}).items(): assert retrieved_configs[custom_config_key] == custom_config_value del retrieved_configs[custom_config_key] diff --git a/tests/integration/backup/test_legacy_backup.py b/tests/integration/backup/test_legacy_backup.py index 955472783..2c6e70df4 100644 --- a/tests/integration/backup/test_legacy_backup.py +++ b/tests/integration/backup/test_legacy_backup.py @@ -12,7 +12,7 @@ from karapace.backup.poll_timeout import PollTimeout from karapace.client import Client from karapace.config import set_config_defaults -from karapace.kafka_rest_apis import KafkaRestAdminClient +from karapace.kafka_admin import KafkaAdminClient from karapace.key_format import is_key_in_canonical_format from karapace.utils import Expiration from pathlib import Path @@ -80,7 +80,7 @@ async def test_backup_get( async def test_backup_restore_and_get_non_schema_topic( - kafka_servers: KafkaServers, tmp_path: Path, admin_client: KafkaRestAdminClient + kafka_servers: KafkaServers, tmp_path: Path, admin_client: KafkaAdminClient ) -> None: test_topic_name = new_random_name("non-schemas") diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index 7bdcf4b40..817b5f970 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -5,8 +5,7 @@ from __future__ import annotations from dataclasses import fields -from kafka import KafkaAdminClient, KafkaProducer, TopicPartition -from kafka.admin import ConfigResource, ConfigResourceType, NewTopic +from kafka import KafkaProducer, TopicPartition from kafka.consumer.fetcher import ConsumerRecord from kafka.errors import UnknownTopicOrPartitionError from karapace.backup import api @@ -17,7 +16,7 @@ from karapace.backup.poll_timeout import PollTimeout from karapace.backup.topic_configurations import ConfigSource, get_topic_configurations from karapace.config import Config, set_config_defaults -from karapace.constants import TOPIC_CREATION_TIMEOUT_MS +from karapace.kafka_admin import KafkaAdminClient, NewTopic from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config from karapace.utils import KarapaceKafkaClient from karapace.version import __version__ @@ -33,7 +32,6 @@ import logging import os import pytest -import secrets import shutil import subprocess import textwrap @@ -86,16 +84,6 @@ def admin_fixture(karapace_config: Config) -> Iterator[KafkaAdminClient]: admin.close() -@pytest.fixture(scope="function", name="new_topic") -def topic_fixture(kafka_admin: KafkaAdminClient) -> NewTopic: - new_topic = NewTopic(secrets.token_hex(4), 1, 1) - kafka_admin.create_topics([new_topic], timeout_ms=TOPIC_CREATION_TIMEOUT_MS) - try: - yield new_topic - finally: - kafka_admin.delete_topics([new_topic.name], timeout_ms=TOPIC_CREATION_TIMEOUT_MS) - - @pytest.fixture(scope="function", name="producer") def producer_fixture(karapace_config: Config) -> Iterator[KafkaProducer]: with kafka_producer_from_config(karapace_config) as producer: @@ -115,26 +103,18 @@ def test_roundtrip_from_kafka_state( karapace_config: Config, ) -> None: # Configure topic. - admin_client.alter_configs( - [ - ConfigResource( - ConfigResourceType.TOPIC, - new_topic.name, - configs={"max.message.bytes": "999"}, - ) - ] - ) + admin_client.update_topic_config(new_topic.topic, {"max.message.bytes": "999"}) # Populate topic. producer.send( - new_topic.name, + new_topic.topic, key=b"bar", value=b"foo", partition=0, timestamp_ms=1683474641, ).add_errback(_raise) producer.send( - new_topic.name, + new_topic.topic, key=b"foo", value=b"bar", partition=0, @@ -146,7 +126,7 @@ def test_roundtrip_from_kafka_state( ).add_errback(_raise) producer.flush() - topic_config = get_topic_configurations(admin_client, new_topic.name, {ConfigSource.TOPIC_CONFIG}) + topic_config = get_topic_configurations(admin_client, new_topic.topic, {ConfigSource.DYNAMIC_TOPIC_CONFIG}) # Execute backup creation. backup_location = tmp_path / "backup" @@ -158,7 +138,7 @@ def test_roundtrip_from_kafka_state( "--config", str(config_file), "--topic", - new_topic.name, + new_topic.topic, "--replication-factor=1", "--location", str(backup_location), @@ -172,14 +152,14 @@ def test_roundtrip_from_kafka_state( (backup_directory,) = tmp_path.iterdir() assert backup_directory.name == "backup" assert sorted(path.name for path in backup_directory.iterdir()) == [ - f"{new_topic.name}.metadata", - f"{new_topic.name}:0.data", + f"{new_topic.topic}.metadata", + f"{new_topic.topic}:0.data", ] (metadata_path,) = backup_directory.glob("*.metadata") assert metadata_path.exists() # Delete the source topic. - admin_client.delete_topics([new_topic.name], timeout_ms=10_000) + admin_client.delete_topic(new_topic.topic) # todo: assert new topic uuid != old topic uuid? # Execute backup restoration. @@ -190,7 +170,7 @@ def test_roundtrip_from_kafka_state( "--config", str(config_file), "--topic", - new_topic.name, + new_topic.topic, "--location", str(backup_directory), ], @@ -199,20 +179,20 @@ def test_roundtrip_from_kafka_state( ) # Verify configuration is identical. - assert topic_config == get_topic_configurations(admin_client, new_topic.name, {ConfigSource.TOPIC_CONFIG}) + assert topic_config == get_topic_configurations(admin_client, new_topic.topic, {ConfigSource.DYNAMIC_TOPIC_CONFIG}) # Verify records of restored topic. - with kafka_consumer_from_config(karapace_config, new_topic.name) as consumer: - (partition,) = consumer.partitions_for_topic(new_topic.name) + with kafka_consumer_from_config(karapace_config, new_topic.topic) as consumer: + (partition,) = consumer.partitions_for_topic(new_topic.topic) first_record, second_record = _consume_records( consumer=consumer, - topic_partition=TopicPartition(new_topic.name, partition), + topic_partition=TopicPartition(new_topic.topic, partition), poll_timeout=PollTimeout.default(), ) # First record. assert isinstance(first_record, ConsumerRecord) - assert first_record.topic == new_topic.name + assert first_record.topic == new_topic.topic assert first_record.partition == partition # Note: This might be unreliable due to not using idempotent producer, i.e. we have # no guarantee against duplicates currently. @@ -225,7 +205,7 @@ def test_roundtrip_from_kafka_state( # Second record. assert isinstance(second_record, ConsumerRecord) - assert second_record.topic == new_topic.name + assert second_record.topic == new_topic.topic assert second_record.partition == partition assert second_record.offset == 1 assert second_record.timestamp == 1683474657 @@ -246,15 +226,7 @@ def test_roundtrip_empty_topic( karapace_config: Config, ) -> None: # Configure topic. - admin_client.alter_configs( - [ - ConfigResource( - ConfigResourceType.TOPIC, - new_topic.name, - configs={"max.message.bytes": "987"}, - ) - ] - ) + admin_client.update_topic_config(new_topic.topic, {"max.message.bytes": "987"}) # Execute backup creation. backup_location = tmp_path / "backup" @@ -266,7 +238,7 @@ def test_roundtrip_empty_topic( "--config", str(config_file), "--topic", - new_topic.name, + new_topic.topic, "--replication-factor=1", "--location", str(backup_location), @@ -282,7 +254,7 @@ def test_roundtrip_empty_topic( (metadata_path,) = backup_directory.iterdir() # Delete the source topic. - admin_client.delete_topics([new_topic.name], timeout_ms=10_000) + admin_client.delete_topic(new_topic.topic) # Execute backup restoration. subprocess.run( @@ -292,7 +264,7 @@ def test_roundtrip_empty_topic( "--config", str(config_file), "--topic", - new_topic.name, + new_topic.topic, "--location", str(metadata_path), ], @@ -301,7 +273,7 @@ def test_roundtrip_empty_topic( ) # Verify configuration. - assert get_topic_configurations(admin_client, new_topic.name, {ConfigSource.TOPIC_CONFIG}) == { + assert get_topic_configurations(admin_client, new_topic.topic, {ConfigSource.DYNAMIC_TOPIC_CONFIG}) == { "min.insync.replicas": "1", "cleanup.policy": "delete", "retention.ms": "604800000", @@ -310,12 +282,12 @@ def test_roundtrip_empty_topic( } # Verify the restored partition is empty. - consumer_ctx = kafka_consumer_from_config(karapace_config, new_topic.name) + consumer_ctx = kafka_consumer_from_config(karapace_config, new_topic.topic) with consumer_ctx as consumer, pytest.raises(EmptyPartition): - (partition,) = consumer.partitions_for_topic(new_topic.name) + (partition,) = consumer.partitions_for_topic(new_topic.topic) () = _consume_records( consumer=consumer, - topic_partition=TopicPartition(new_topic.name, partition), + topic_partition=TopicPartition(new_topic.topic, partition), poll_timeout=PollTimeout.default(), ) @@ -351,13 +323,13 @@ def test_exits_with_return_code_3_for_data_restoration_error( # Make sure topic doesn't exist beforehand. try: - admin_client.delete_topics([topic_name]) + admin_client.delete_topic(topic_name) except UnknownTopicOrPartitionError: logger.info("No previously existing topic.") else: logger.info("Deleted topic from previous run.") - admin_client.create_topics([NewTopic(topic_name, 1, 1)]) + admin_client.new_topic(topic_name) with pytest.raises(subprocess.CalledProcessError) as er: subprocess.run( [ @@ -391,7 +363,7 @@ def test_roundtrip_from_file( # Make sure topic doesn't exist beforehand. try: - admin_client.delete_topics([topic_name]) + admin_client.delete_topic(topic_name) except UnknownTopicOrPartitionError: logger.info("No previously existing topic.") else: @@ -484,16 +456,13 @@ def test_roundtrip_from_file_skipping_topic_creation( # Create topic exactly as it was stored on backup file try: - admin_client.delete_topics([topic_name]) + admin_client.delete_topic(topic_name) except UnknownTopicOrPartitionError: logger.info("No previously existing topic.") else: logger.info("Deleted topic from previous run.") - admin_client.create_topics( - [NewTopic(topic_name, 1, 1)], - timeout_ms=TOPIC_CREATION_TIMEOUT_MS, - ) + admin_client.new_topic(topic_name) # Execute backup restoration. subprocess.run( @@ -579,7 +548,7 @@ def test_backup_restoration_fails_when_topic_does_not_exist_and_skip_creation_is # Make sure topic doesn't exist beforehand. try: - admin_client.delete_topics([topic_name]) + admin_client.delete_topic(topic_name) except UnknownTopicOrPartitionError: logger.info("No previously existing topic.") else: @@ -633,7 +602,7 @@ def test_producer_raises_exceptions( # Make sure topic doesn't exist beforehand. try: - admin_client.delete_topics([topic_name]) + admin_client.delete_topic(topic_name) except UnknownTopicOrPartitionError: logger.info("No previously existing topic.") else: @@ -866,7 +835,7 @@ def test_can_verify_file_integrity_from_large_topic( # Populate the test topic. for _ in range(100): producer.send( - new_topic.name, + new_topic.topic, key=1000 * b"a", value=1000 * b"b", partition=0, @@ -881,14 +850,14 @@ def test_can_verify_file_integrity_from_large_topic( "get", "--use-format-v3", f"--config={config_file!s}", - f"--topic={new_topic.name!s}", + f"--topic={new_topic.topic!s}", "--replication-factor=1", f"--location={backup_location!s}", ], capture_output=True, check=True, ) - metadata_path = backup_location / f"{new_topic.name}.metadata" + metadata_path = backup_location / f"{new_topic.topic}.metadata" cp = subprocess.run( [ @@ -906,7 +875,7 @@ def test_can_verify_file_integrity_from_large_topic( assert cp.stderr == b"" assert cp.stdout.decode() == textwrap.dedent( f"""\ - Integrity of {new_topic.name}:0.data is intact. + Integrity of {new_topic.topic}:0.data is intact. ✅ Verified 1 data files in backup OK. """ ) @@ -921,7 +890,7 @@ def test_can_verify_record_integrity_from_large_topic( # Populate the test topic. for _ in range(100): producer.send( - new_topic.name, + new_topic.topic, key=1000 * b"a", value=1000 * b"b", partition=0, @@ -936,14 +905,14 @@ def test_can_verify_record_integrity_from_large_topic( "get", "--use-format-v3", f"--config={config_file!s}", - f"--topic={new_topic.name}", + f"--topic={new_topic.topic}", "--replication-factor=1", f"--location={backup_location!s}", ], capture_output=True, check=True, ) - metadata_path = backup_location / f"{new_topic.name}.metadata" + metadata_path = backup_location / f"{new_topic.topic}.metadata" cp = subprocess.run( [ @@ -961,7 +930,7 @@ def test_can_verify_record_integrity_from_large_topic( assert cp.stderr == b"" assert cp.stdout.decode() == textwrap.dedent( f"""\ - Integrity of {new_topic.name}:0.data is intact. + Integrity of {new_topic.topic}:0.data is intact. ✅ Verified 1 data files in backup OK. """ ) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 1c16b3c2b..688f24fe1 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -13,7 +13,8 @@ from kafka import KafkaProducer from karapace.client import Client from karapace.config import Config, set_config_defaults, write_config -from karapace.kafka_rest_apis import KafkaRest, KafkaRestAdminClient +from karapace.kafka_admin import KafkaAdminClient, NewTopic +from karapace.kafka_rest_apis import KafkaRest from pathlib import Path from tests.conftest import KAFKA_VERSION from tests.integration.utils.cluster import RegistryDescription, RegistryEndpoint, start_schema_registry_cluster @@ -38,6 +39,7 @@ import pathlib import pytest import re +import secrets import string import time @@ -209,9 +211,8 @@ def fixture_producer(kafka_servers: KafkaServers) -> KafkaProducer: @pytest.fixture(scope="function", name="admin_client") -def fixture_admin(kafka_servers: KafkaServers) -> Iterator[KafkaRestAdminClient]: - with closing(KafkaRestAdminClient(bootstrap_servers=kafka_servers.bootstrap_servers)) as cli: - yield cli +def fixture_admin(kafka_servers: KafkaServers) -> Iterator[KafkaAdminClient]: + yield KafkaAdminClient(bootstrap_servers=kafka_servers.bootstrap_servers) @pytest.fixture(scope="function", name="rest_async") @@ -671,3 +672,12 @@ async def fixture_registry_async_auth_pair( port_range=port_range, ) as endpoints: yield [server.endpoint.to_url() for server in endpoints] + + +@pytest.fixture(scope="function", name="new_topic") +def topic_fixture(admin_client: KafkaAdminClient) -> NewTopic: + topic_name = secrets.token_hex(4) + try: + yield admin_client.new_topic(topic_name, num_partitions=1, replication_factor=1) + finally: + admin_client.delete_topic(topic_name) diff --git a/tests/integration/test_kafka_admin.py b/tests/integration/test_kafka_admin.py new file mode 100644 index 000000000..59cb90aa9 --- /dev/null +++ b/tests/integration/test_kafka_admin.py @@ -0,0 +1,146 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + +from __future__ import annotations + +from kafka import KafkaProducer +from kafka.errors import UnknownTopicOrPartitionError +from karapace.kafka_admin import ConfigSource, KafkaAdminClient, NewTopic +from tests.utils import new_topic as create_new_topic + +import pytest + + +class TestClusterMetadata: + def test_cluster_metadata_brokers(self, admin_client: KafkaAdminClient) -> None: + cluster_metadata = admin_client.cluster_metadata() + + assert len(cluster_metadata) == 2, "Cluster metadata should have keys topics and brokers" + assert len(cluster_metadata["brokers"]) == 1, "Only one broker during tests" + + def test_cluster_metadata_all_topics(self, admin_client: KafkaAdminClient) -> None: + topic_names = [create_new_topic(admin_client) for _ in range(5)] + + topics_metadata = admin_client.cluster_metadata()["topics"] + + assert set(topic_names) - set(topics_metadata.keys()) == set(), "All created topics should be returned" + for topic in topic_names: + partitions_data = topics_metadata[topic]["partitions"] + + assert len(partitions_data) == 1, "Should only have data for one partition" + assert len(partitions_data[0]["replicas"]) == 1, "Should only have 1 replica" + + def test_cluster_metadata_specific_topic(self, admin_client: KafkaAdminClient, new_topic: NewTopic) -> None: + topics_metadata = admin_client.cluster_metadata([new_topic.topic])["topics"] + + assert list(topics_metadata.keys()) == [new_topic.topic] + + def test_cluster_metadata_raises_for_unknown_topic(self, admin_client: KafkaAdminClient) -> None: + with pytest.raises(UnknownTopicOrPartitionError): + admin_client.cluster_metadata(["nonexistent_topic"]) + + +class TestGetTopicConfig: + def test_get_topic_config_no_filters(self, admin_client: KafkaAdminClient, new_topic: NewTopic) -> None: + cfg = admin_client.get_topic_config(new_topic.topic) + assert "cleanup.policy" in cfg + + def test_get_topic_config_empty_filters(self, admin_client: KafkaAdminClient, new_topic: NewTopic) -> None: + topic_config_filtered = admin_client.get_topic_config( + new_topic.topic, + config_name_filter=(), + config_source_filter=(), + ) + + assert topic_config_filtered == {} + + def test_get_topic_config_name_filter_only(self, admin_client: KafkaAdminClient, new_topic: NewTopic) -> None: + topic_config_filtered = admin_client.get_topic_config( + new_topic.topic, + config_name_filter=("flush.ms"), + config_source_filter=(), + ) + + assert list(topic_config_filtered.keys()) == ["flush.ms"] + + def test_get_topic_config_source_filter_only_noresult(self, admin_client: KafkaAdminClient, new_topic: NewTopic) -> None: + topic_config_filtered = admin_client.get_topic_config( + new_topic.topic, + config_name_filter=(), + config_source_filter=(ConfigSource.DYNAMIC_TOPIC_CONFIG,), + ) + + assert topic_config_filtered == {} + + def test_get_topic_config_source_filter_only(self, admin_client: KafkaAdminClient, new_topic: NewTopic) -> None: + admin_client.update_topic_config(new_topic.topic, {"flush.ms": "12345"}) + + topic_config_filtered = admin_client.get_topic_config( + new_topic.topic, + config_name_filter=(), + config_source_filter=(ConfigSource.DYNAMIC_TOPIC_CONFIG,), + ) + + assert topic_config_filtered == {"flush.ms": "12345"} + + def test_get_topic_config_raises_for_unknown_topic(self, admin_client: KafkaAdminClient) -> None: + with pytest.raises(UnknownTopicOrPartitionError): + admin_client.get_topic_config("nonexistent_topic") + + +class TestUpdateTopicConfig: + def test_update_topic_config(self, admin_client: KafkaAdminClient, new_topic: NewTopic) -> None: + admin_client.update_topic_config(new_topic.topic, {"flush.ms": "12345"}) + + topic_config_filtered = admin_client.get_topic_config( + new_topic.topic, + config_name_filter=("flush.ms",), + config_source_filter=(), + ) + assert topic_config_filtered == {"flush.ms": "12345"} + + def test_update_topic_config_raises_for_unknown_topic(self, admin_client: KafkaAdminClient) -> None: + with pytest.raises(UnknownTopicOrPartitionError): + admin_client.update_topic_config("nonexistent_topic", {"flush.ms": "12345"}) + + +class TestGetOffsets: + def test_get_offsets(self, admin_client: KafkaAdminClient, new_topic: NewTopic, producer: KafkaProducer) -> None: + topic_name = new_topic.topic + partition_id = 0 + number_of_messages = 5 + for _ in range(number_of_messages): + fut = producer.send(topic_name, value=b"test-message") + producer.flush() + fut.get() + + offsets = admin_client.get_offsets(topic_name, partition_id) + + assert offsets == {"beginning_offset": 0, "end_offset": number_of_messages} + + def test_get_offsets_raises_for_unknown_topic(self, admin_client: KafkaAdminClient) -> None: + with pytest.raises(UnknownTopicOrPartitionError): + admin_client.get_offsets("nonexistent_topic", 0) + + def test_get_offsets_raises_for_unknown_partition(self, admin_client: KafkaAdminClient, new_topic: NewTopic) -> None: + with pytest.raises(UnknownTopicOrPartitionError): + admin_client.get_offsets(new_topic.topic, 10) + + +class TestDeleteTopic: + def test_delete_topic(self, admin_client: KafkaAdminClient) -> None: + topic_name = create_new_topic(admin_client) + topics_metadata_before_delete = admin_client.cluster_metadata()["topics"] + + admin_client.delete_topic(topic_name) + + topics_metadata_after_delete = admin_client.cluster_metadata()["topics"] + + assert topic_name in topics_metadata_before_delete + assert topic_name not in topics_metadata_after_delete + + def test_delete_topic_raises_for_unknown_topic(self, admin_client: KafkaAdminClient) -> None: + with pytest.raises(UnknownTopicOrPartitionError): + admin_client.delete_topic("nonexistent_topic") diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index 9fec19285..67827bda9 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -5,11 +5,10 @@ from __future__ import annotations from kafka import KafkaProducer -from kafka.errors import UnknownTopicOrPartitionError from karapace.client import Client -from karapace.kafka_rest_apis import KafkaRest, KafkaRestAdminClient +from karapace.kafka_admin import KafkaAdminClient +from karapace.kafka_rest_apis import KafkaRest from karapace.version import __version__ -from pytest import raises from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES from tests.utils import ( new_random_name, @@ -54,7 +53,7 @@ async def test_health_endpoint(rest_async_client: Client) -> None: assert response["karapace_version"] == __version__ -async def test_request_body_too_large(rest_async_client: KafkaRestAdminClient, admin_client: Client) -> None: +async def test_request_body_too_large(rest_async_client: KafkaAdminClient, admin_client: Client) -> None: tn = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1) pl = {"records": [{"value": 1_048_576 * "a"}]} @@ -62,7 +61,7 @@ async def test_request_body_too_large(rest_async_client: KafkaRestAdminClient, a assert res.status_code == 413 -async def test_content_types(rest_async_client: KafkaRestAdminClient, admin_client: Client) -> None: +async def test_content_types(rest_async_client: KafkaAdminClient, admin_client: Client) -> None: tn = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1) valid_headers = [ @@ -134,7 +133,7 @@ async def test_content_types(rest_async_client: KafkaRestAdminClient, admin_clie assert not res.ok -async def test_avro_publish_primitive_schema(rest_async_client: KafkaRestAdminClient, admin_client: Client) -> None: +async def test_avro_publish_primitive_schema(rest_async_client: KafkaAdminClient, admin_client: Client) -> None: topic_str = new_topic(admin_client) topic_int = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[topic_str, topic_int], timeout=NEW_TOPIC_TIMEOUT, sleep=1) @@ -159,7 +158,7 @@ async def test_avro_publish_primitive_schema(rest_async_client: KafkaRestAdminCl async def test_avro_publish( rest_async_client: Client, registry_async_client: Client, - admin_client: KafkaRestAdminClient, + admin_client: KafkaAdminClient, ) -> None: tn = new_topic(admin_client) other_tn = new_topic(admin_client) @@ -204,47 +203,7 @@ async def test_avro_publish( # assert res.status_code == 422, f"Expecting schema {second_schema_json} to not match records {test_objects}" -async def test_admin_client(admin_client: KafkaRestAdminClient, producer: KafkaProducer) -> None: - topic_names = [new_topic(admin_client) for i in range(10, 13)] - topic_info = admin_client.cluster_metadata() - retrieved_names = list(topic_info["topics"].keys()) - assert ( - set(topic_names).difference(set(retrieved_names)) == set() - ), "Returned value {!r} differs from written one {!r}".format( - retrieved_names, - topic_names, - ) - assert len(topic_info["brokers"]) == 1, "Only one broker during tests" - for t in topic_names: - v = topic_info["topics"][t] - assert len(v["partitions"]) == 1, "Should only have data for one partition" - details = v["partitions"][0] - assert len(details["replicas"]) == 1, "Should have only 1 replica" - one_topic_info = admin_client.cluster_metadata(topic_names[:1]) - retrieved_names = list(one_topic_info["topics"].keys()) - assert len(retrieved_names) == 1 - assert retrieved_names[0] == topic_names[0], f"Returned value %r differs from expected {retrieved_names[0]}" - cfg = admin_client.get_topic_config(topic_names[0]) - assert "cleanup.policy" in cfg - for _ in range(5): - fut = producer.send(topic_names[0], value=b"foo_val") - producer.flush() - _ = fut.get() - offsets = admin_client.get_offsets(topic_names[0], 0) - assert offsets["beginning_offset"] == 0, f"Start offset should be 0 for {topic_names[0]}, partition 0" - assert offsets["end_offset"] == 5, f"End offset should be 0 for {topic_names[0]}, partition 0" - # invalid requests - with raises(UnknownTopicOrPartitionError): - admin_client.get_offsets("invalid_topic", 0) - with raises(UnknownTopicOrPartitionError): - admin_client.get_offsets(topic_names[0], 10) - with raises(UnknownTopicOrPartitionError): - admin_client.get_topic_config("another_invalid_name") - with raises(UnknownTopicOrPartitionError): - admin_client.cluster_metadata(topics=["another_invalid_name"]) - - -async def test_internal(rest_async: KafkaRest | None, admin_client: KafkaRestAdminClient) -> None: +async def test_internal(rest_async: KafkaRest | None, admin_client: KafkaAdminClient) -> None: topic_name = new_topic(admin_client) prepared_records = [ [b"key", b"value", 0], @@ -277,7 +236,7 @@ async def test_internal(rest_async: KafkaRest | None, admin_client: KafkaRestAdm assert rest_async_proxy.all_empty({"records": [{"value": {"foo": "bar"}}]}, "key") is True -async def test_topics(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None: +async def test_topics(rest_async_client: Client, admin_client: KafkaAdminClient) -> None: topic_foo = "foo" tn = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1) @@ -317,7 +276,7 @@ async def test_list_topics(rest_async_client, admin_client) -> None: assert tn1 in topic_list and tn2 in topic_list, f"Topic list contains all topics tn1={tn1} and tn2={tn2}" -async def test_publish(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None: +async def test_publish(rest_async_client: Client, admin_client: KafkaAdminClient) -> None: topic = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) topic_url = f"/topics/{topic}" @@ -337,7 +296,7 @@ async def test_publish(rest_async_client: Client, admin_client: KafkaRestAdminCl # Produce messages to a topic without key and without explicit partition to verify that # partitioner assigns partition randomly -async def test_publish_random_partitioning(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None: +async def test_publish_random_partitioning(rest_async_client: Client, admin_client: KafkaAdminClient) -> None: topic = new_topic(admin_client, num_partitions=100) await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) topic_url = f"/topics/{topic}" @@ -356,7 +315,7 @@ async def test_publish_random_partitioning(rest_async_client: Client, admin_clie assert len(partitions_seen) >= 2, "Partitioner should randomly assign to different partitions if no key given" -async def test_publish_malformed_requests(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None: +async def test_publish_malformed_requests(rest_async_client: Client, admin_client: KafkaAdminClient) -> None: topic_name = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1) for url in [f"/topics/{topic_name}", f"/topics/{topic_name}/partitions/0"]: @@ -394,7 +353,7 @@ async def test_publish_malformed_requests(rest_async_client: Client, admin_clien assert res.status_code == 422 -async def test_too_large_record(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None: +async def test_too_large_record(rest_async_client: Client, admin_client: KafkaAdminClient) -> None: tn = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1) # Record batch overhead is 22 bytes, reduce just above @@ -426,7 +385,7 @@ async def test_publish_to_nonexisting_topic(rest_async_client: Client) -> None: async def test_publish_with_incompatible_data( rest_async_client: Client, registry_async_client: Client, - admin_client: KafkaRestAdminClient, + admin_client: KafkaAdminClient, ) -> None: topic_name = new_topic(admin_client) subject_1 = f"{topic_name}-value" @@ -470,7 +429,7 @@ async def test_publish_with_incompatible_data( assert "Object does not fit to stored schema" in res_json["message"] -async def test_publish_with_incompatible_schema(rest_async_client: Client, admin_client: KafkaRestAdminClient) -> None: +async def test_publish_with_incompatible_schema(rest_async_client: Client, admin_client: KafkaAdminClient) -> None: topic_name = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1) url = f"/topics/{topic_name}" @@ -518,7 +477,7 @@ async def test_publish_with_incompatible_schema(rest_async_client: Client, admin async def test_publish_with_schema_id_of_another_subject( rest_async_client: Client, registry_async_client: Client, - admin_client: KafkaRestAdminClient, + admin_client: KafkaAdminClient, ) -> None: """ Karapace issue 658: https://github.com/aiven/karapace/issues/658 @@ -588,7 +547,7 @@ async def test_publish_with_schema_id_of_another_subject( async def test_publish_with_schema_id_of_another_subject_novalidation( rest_async_novalidation_client: Client, registry_async_client: Client, - admin_client: KafkaRestAdminClient, + admin_client: KafkaAdminClient, ) -> None: """ Same as above but with name_strategy_validation disabled as config @@ -659,7 +618,7 @@ async def test_brokers(rest_async_client: Client) -> None: async def test_partitions( rest_async_client: Client, - admin_client: KafkaRestAdminClient, + admin_client: KafkaAdminClient, producer: KafkaProducer, ) -> None: # TODO -> This seems to be the only combination accepted by the offsets endpoint diff --git a/tests/integration/test_rest_consumer_protobuf.py b/tests/integration/test_rest_consumer_protobuf.py index dfa7278b5..52662aeb9 100644 --- a/tests/integration/test_rest_consumer_protobuf.py +++ b/tests/integration/test_rest_consumer_protobuf.py @@ -4,7 +4,7 @@ """ from karapace.client import Client -from karapace.kafka_rest_apis import KafkaRestAdminClient +from karapace.kafka_admin import KafkaAdminClient from karapace.protobuf.kotlin_wrapper import trim_margin from tests.integration.test_rest import NEW_TOPIC_TIMEOUT from tests.utils import ( @@ -85,7 +85,7 @@ async def test_publish_consume_protobuf_second(rest_async_client, admin_client, async def test_publish_protobuf_with_references( rest_async_client: Client, - admin_client: KafkaRestAdminClient, + admin_client: KafkaAdminClient, registry_async_client: Client, ): topic_name = new_topic(admin_client) @@ -146,7 +146,7 @@ async def test_publish_protobuf_with_references( async def test_publish_and_consume_protobuf_with_recursive_references( rest_async_client: Client, - admin_client: KafkaRestAdminClient, + admin_client: KafkaAdminClient, registry_async_client: Client, ): topic_name = new_topic(admin_client) diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index 29847eb85..9fc482c9b 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -4,10 +4,11 @@ """ from contextlib import closing from dataclasses import dataclass -from kafka import KafkaAdminClient, KafkaProducer +from kafka import KafkaProducer from karapace.config import set_config_defaults from karapace.constants import DEFAULT_SCHEMA_TOPIC from karapace.in_memory_database import InMemoryDatabase +from karapace.kafka_admin import KafkaAdminClient from karapace.key_format import KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator from karapace.offset_watcher import OffsetWatcher diff --git a/tests/integration/test_schema_registry_auth.py b/tests/integration/test_schema_registry_auth.py index fbc3adb70..ce260f194 100644 --- a/tests/integration/test_schema_registry_auth.py +++ b/tests/integration/test_schema_registry_auth.py @@ -5,7 +5,7 @@ See LICENSE for details """ from karapace.client import Client -from karapace.kafka_rest_apis import KafkaRestAdminClient +from karapace.kafka_admin import KafkaAdminClient from karapace.schema_models import SchemaType, ValidatedTypedSchema from tests.utils import ( new_random_name, @@ -204,7 +204,7 @@ async def test_sr_auth_forwarding(registry_async_auth_pair: List[str]) -> None: # Test that Kafka REST API works when configured with Schema Registry requiring authorization -async def test_rest_api_with_sr_auth(rest_async_client_registry_auth: Client, admin_client: KafkaRestAdminClient) -> None: +async def test_rest_api_with_sr_auth(rest_async_client_registry_auth: Client, admin_client: KafkaAdminClient) -> None: client = rest_async_client_registry_auth topic = new_topic(admin_client, prefix="cave-rest-") diff --git a/tests/integration/utils/kafka_server.py b/tests/integration/utils/kafka_server.py index cfc9f9455..5b8c4b934 100644 --- a/tests/integration/utils/kafka_server.py +++ b/tests/integration/utils/kafka_server.py @@ -3,8 +3,8 @@ See LICENSE for details """ from dataclasses import dataclass -from kafka.errors import LeaderNotAvailableError, NoBrokersAvailable, UnrecognizedBrokerVersion -from karapace.kafka_rest_apis import KafkaRestAdminClient +from kafka.errors import AuthenticationFailedError +from karapace.kafka_admin import KafkaAdminClient from karapace.utils import Expiration from pathlib import Path from subprocess import Popen @@ -50,22 +50,8 @@ def wait_for_kafka( server=server, ) try: - KafkaRestAdminClient(bootstrap_servers=server).cluster_metadata() - # ValueError: - # - if the port number is invalid (i.e. not a number) - # - if the port is not bound yet - # NoBrokersAvailable: - # - if the address/port does not point to a running server - # LeaderNotAvailableError: - # - if there is no leader yet - # UnrecognizedBrokerVersion: - # - happens during start-up of dockerized Kafka - except ( - NoBrokersAvailable, - LeaderNotAvailableError, - UnrecognizedBrokerVersion, - ValueError, - ) as e: + KafkaAdminClient(bootstrap_servers=server).cluster_metadata() + except AuthenticationFailedError as e: print(f"Error checking kafka cluster: {e}") time.sleep(2.0) else: diff --git a/tests/unit/backup/test_api.py b/tests/unit/backup/test_api.py index b23c6d723..e952aede5 100644 --- a/tests/unit/backup/test_api.py +++ b/tests/unit/backup/test_api.py @@ -5,7 +5,6 @@ from __future__ import annotations from kafka import KafkaConsumer, KafkaProducer -from kafka.admin import NewTopic from kafka.errors import KafkaError, TopicAlreadyExistsError from kafka.structs import PartitionMetadata from karapace import config @@ -24,7 +23,7 @@ from karapace.backup.backends.writer import StdOut from karapace.backup.errors import BackupError, PartitionCountError from karapace.config import Config -from karapace.constants import DEFAULT_SCHEMA_TOPIC, TOPIC_CREATION_TIMEOUT_MS +from karapace.constants import DEFAULT_SCHEMA_TOPIC from pathlib import Path from types import FunctionType from typing import Callable, cast, ContextManager @@ -40,13 +39,6 @@ class TestAdmin: - @patch_admin_new - def test_auto_closing(self, admin_new: MagicMock) -> None: - admin_mock = admin_new.return_value - with _admin(config.DEFAULTS) as admin: - assert admin is admin_mock - assert admin_mock.close.call_count == 1 - @mock.patch("time.sleep", autospec=True) @patch_admin_new def test_retries_on_kafka_error(self, admin_new: MagicMock, sleep_mock: MagicMock) -> None: @@ -55,7 +47,6 @@ def test_retries_on_kafka_error(self, admin_new: MagicMock, sleep_mock: MagicMoc with _admin(config.DEFAULTS) as admin: assert admin is admin_mock assert sleep_mock.call_count == 2 # proof that we waited between retries - assert admin_mock.close.call_count == 1 @pytest.mark.parametrize("e", (KeyboardInterrupt, SystemExit, RuntimeError, MemoryError)) @mock.patch("time.sleep", autospec=True) @@ -75,52 +66,51 @@ def test_reraises_unknown_exceptions( class TestHandleRestoreTopic: @patch_admin_new def test_calls_admin_create_topics(self, admin_new: MagicMock) -> None: - create_topics: MagicMock = admin_new.return_value.create_topics + new_topic: MagicMock = admin_new.return_value.new_topic topic_configs = {"cleanup.policy": "compact"} _maybe_create_topic(DEFAULT_SCHEMA_TOPIC, config=config.DEFAULTS, replication_factor=1, topic_configs=topic_configs) - create_topics.assert_called_once_with(mock.ANY, timeout_ms=TOPIC_CREATION_TIMEOUT_MS) - ((new_topic,),) = create_topics.call_args.args - assert isinstance(new_topic, NewTopic) - assert new_topic.name == DEFAULT_SCHEMA_TOPIC - assert new_topic.num_partitions == 1 - assert new_topic.replication_factor == config.DEFAULTS["replication_factor"] - assert new_topic.topic_configs == topic_configs + new_topic.assert_called_once_with( + DEFAULT_SCHEMA_TOPIC, + num_partitions=1, + replication_factor=config.DEFAULTS["replication_factor"], + config=topic_configs, + ) @patch_admin_new def test_gracefully_handles_topic_already_exists_error(self, admin_new: MagicMock) -> None: - create_topics: MagicMock = admin_new.return_value.create_topics - create_topics.side_effect = TopicAlreadyExistsError() + new_topic: MagicMock = admin_new.return_value.new_topic + new_topic.side_effect = TopicAlreadyExistsError() _maybe_create_topic(DEFAULT_SCHEMA_TOPIC, config=config.DEFAULTS, replication_factor=1, topic_configs={}) - create_topics.assert_called_once() + new_topic.assert_called_once() @patch_admin_new def test_retries_for_kafka_errors(self, admin_new: MagicMock) -> None: - create_topics: MagicMock = admin_new.return_value.create_topics - create_topics.side_effect = [KafkaError("1"), KafkaError("2"), None] + new_topic: MagicMock = admin_new.return_value.new_topic + new_topic.side_effect = [KafkaError("1"), KafkaError("2"), None] with mock.patch("time.sleep", autospec=True): _maybe_create_topic(DEFAULT_SCHEMA_TOPIC, config=config.DEFAULTS, replication_factor=1, topic_configs={}) - assert create_topics.call_count == 3 + assert new_topic.call_count == 3 @patch_admin_new def test_noop_for_custom_name_on_legacy_versions( self, admin_new: MagicMock, ) -> None: - create_topics: MagicMock = admin_new.return_value.create_topics + new_topic: MagicMock = admin_new.return_value.new_topic assert "custom-name" != DEFAULT_SCHEMA_TOPIC instruction = RestoreTopicLegacy(topic_name="custom-name", partition_count=1) _handle_restore_topic_legacy(instruction, config.DEFAULTS) - create_topics.assert_not_called() + new_topic.assert_not_called() @patch_admin_new def test_allows_custom_name_on_v3( self, admin_new: MagicMock, ) -> None: - create_topics: MagicMock = admin_new.return_value.create_topics + new_topic: MagicMock = admin_new.return_value.new_topic topic_name = "custom-name" assert topic_name != DEFAULT_SCHEMA_TOPIC topic_configs = {"segment.bytes": "1000"} @@ -129,20 +119,14 @@ def test_allows_custom_name_on_v3( ) _handle_restore_topic(instruction, config.DEFAULTS) - create_topics.assert_called_once_with(mock.ANY, timeout_ms=TOPIC_CREATION_TIMEOUT_MS) - ((new_topic,),) = create_topics.call_args.args - assert isinstance(new_topic, NewTopic) - assert new_topic.name == topic_name - assert new_topic.num_partitions == 1 - assert new_topic.replication_factor == 2 - assert new_topic.topic_configs == topic_configs + new_topic.assert_called_once_with(topic_name, num_partitions=1, replication_factor=2, config=topic_configs) @patch_admin_new def test_skip_topic_creation( self, admin_new: MagicMock, ) -> None: - create_topics: MagicMock = admin_new.return_value.create_topics + new_topic: MagicMock = admin_new.return_value.new_topic _handle_restore_topic( RestoreTopic(topic_name="custom-name", partition_count=1, replication_factor=2, topic_configs={}), config.DEFAULTS, @@ -157,7 +141,7 @@ def test_skip_topic_creation( skip_topic_creation=True, ) - create_topics.assert_not_called() + new_topic.assert_not_called() class TestClients: diff --git a/tests/utils.py b/tests/utils.py index 2e575b1f4..24e3e8bcf 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -5,7 +5,7 @@ from aiohttp.client_exceptions import ClientOSError, ServerDisconnectedError from kafka.errors import TopicAlreadyExistsError from karapace.client import Client -from karapace.kafka_rest_apis import KafkaRestAdminClient +from karapace.kafka_admin import KafkaAdminClient from karapace.protobuf.kotlin_wrapper import trim_margin from karapace.utils import Expiration from pathlib import Path @@ -224,7 +224,7 @@ def create_name() -> str: return create_name -def new_topic(admin_client: KafkaRestAdminClient, prefix: str = "topic", *, num_partitions: int = 1) -> str: +def new_topic(admin_client: KafkaAdminClient, prefix: str = "topic", *, num_partitions: int = 1) -> str: topic_name = f"{new_random_name(prefix)}" try: admin_client.new_topic(topic_name, num_partitions=num_partitions)