diff --git a/karapace/backup/api.py b/karapace/backup/api.py index 10ae7d6c1..927af69cb 100644 --- a/karapace/backup/api.py +++ b/karapace/backup/api.py @@ -63,6 +63,8 @@ LOG = logging.getLogger(__name__) +MAX_KAFKA_SEND_RETRIES = 10 + B = TypeVar("B", str, bytes) F = TypeVar("F") @@ -386,17 +388,30 @@ def _handle_producer_send( instruction.key, instruction.value, ) - try: - producer.send( - instruction.topic_name, - key=instruction.key, - value=instruction.value, - partition=instruction.partition_index, - headers=[(key.decode() if key is not None else None, value) for key, value in instruction.headers], - timestamp=instruction.timestamp, - ).add_done_callback(producer_callback) - except (KafkaError, AssertionError) as exc: - raise BackupDataRestorationError("Error while calling send on restoring messages") from exc + try_sending = True + tries = 0 + while try_sending: + tries += 1 + try: + send_future = producer.send( + instruction.topic_name, + key=instruction.key, + value=instruction.value, + partition=instruction.partition_index, + headers=[(key.decode() if key is not None else None, value) for key, value in instruction.headers], + timestamp=instruction.timestamp, + ) + except (KafkaError, AssertionError) as exc: + raise BackupDataRestorationError("Error while calling send on restoring messages") from exc + except BufferError as exc: + producer.poll(timeout=0.1) + try_sending = tries < MAX_KAFKA_SEND_RETRIES + if not try_sending: + raise BackupDataRestorationError("Kafka producer buffer is full") from exc + else: + # Record is in the send buffer + send_future.add_done_callback(producer_callback) + break def restore_backup( diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index b0a6d9a0d..535e6492c 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -586,7 +586,7 @@ def __exit__(self, exc_type, exc_value, exc_traceback): ) -def test_backup_restoration_fails_when_producer_send_fails( +def test_backup_restoration_fails_when_producer_send_fails_on_unknown_topic_or_partition( admin_client: KafkaAdminClient, kafka_servers: KafkaServers, ) -> None: @@ -641,6 +641,64 @@ def __exit__(self, exc_type, exc_value, exc_traceback): ) +def test_backup_restoration_fails_when_producer_send_fails_on_buffer_error( + admin_client: KafkaAdminClient, + kafka_servers: KafkaServers, +) -> None: + topic_name = "296ddf62" + backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / topic_name + metadata_path = backup_directory / f"{topic_name}.metadata" + + # Make sure topic doesn't exist beforehand. + try: + admin_client.delete_topic(topic_name) + except UnknownTopicOrPartitionError: + logger.info("No previously existing topic.") + else: + logger.info("Deleted topic from previous run.") + + config = set_config_defaults( + { + "bootstrap_uri": kafka_servers.bootstrap_servers, + } + ) + + class FailToSendProducer(KafkaProducer): + def send(self, *args, **kwargs): + raise BufferError() + + def poll(self, timeout: float) -> None: # pylint: disable=unused-argument + return + + class FailToSendProducerContext: + def __init__(self): + self._producer = FailToSendProducer( + bootstrap_servers=config["bootstrap_uri"], + security_protocol=config["security_protocol"], + ssl_cafile=config["ssl_cafile"], + ssl_certfile=config["ssl_certfile"], + ssl_keyfile=config["ssl_keyfile"], + sasl_mechanism=config["sasl_mechanism"], + sasl_plain_username=config["sasl_plain_username"], + sasl_plain_password=config["sasl_plain_password"], + ) + + def __enter__(self): + return self._producer + + def __exit__(self, exc_type, exc_value, exc_traceback): + self._producer.flush() + + with patch("karapace.backup.api._producer") as p: + p.return_value = FailToSendProducerContext() + with pytest.raises(BackupDataRestorationError, match="Kafka producer buffer is full"): + api.restore_backup( + config=config, + backup_location=metadata_path, + topic_name=TopicName(topic_name), + ) + + def no_color_env() -> dict[str, str]: env = os.environ.copy() try: diff --git a/tests/integration/test_data/backup_v3_single_partition/296ddf62/296ddf62.metadata b/tests/integration/test_data/backup_v3_single_partition/296ddf62/296ddf62.metadata new file mode 100644 index 000000000..e7340ac26 Binary files /dev/null and b/tests/integration/test_data/backup_v3_single_partition/296ddf62/296ddf62.metadata differ diff --git a/tests/integration/test_data/backup_v3_single_partition/296ddf62/296ddf62:0.data b/tests/integration/test_data/backup_v3_single_partition/296ddf62/296ddf62:0.data new file mode 100644 index 000000000..b33b5998b Binary files /dev/null and b/tests/integration/test_data/backup_v3_single_partition/296ddf62/296ddf62:0.data differ