Skip to content

Commit

Permalink
Merge pull request #817 from Aiven-Open/jjaakola-aiven-handle-backup-…
Browse files Browse the repository at this point in the history
…restore-producer-buffer-full

fix: handle BufferError from Kafka producer
  • Loading branch information
giuseppelillo authored Feb 19, 2024
2 parents 3a3cfbf + 36b4a17 commit 2e520f6
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 12 deletions.
37 changes: 26 additions & 11 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@

LOG = logging.getLogger(__name__)

MAX_KAFKA_SEND_RETRIES = 10

B = TypeVar("B", str, bytes)
F = TypeVar("F")

Expand Down Expand Up @@ -386,17 +388,30 @@ def _handle_producer_send(
instruction.key,
instruction.value,
)
try:
producer.send(
instruction.topic_name,
key=instruction.key,
value=instruction.value,
partition=instruction.partition_index,
headers=[(key.decode() if key is not None else None, value) for key, value in instruction.headers],
timestamp=instruction.timestamp,
).add_done_callback(producer_callback)
except (KafkaError, AssertionError) as exc:
raise BackupDataRestorationError("Error while calling send on restoring messages") from exc
try_sending = True
tries = 0
while try_sending:
tries += 1
try:
send_future = producer.send(
instruction.topic_name,
key=instruction.key,
value=instruction.value,
partition=instruction.partition_index,
headers=[(key.decode() if key is not None else None, value) for key, value in instruction.headers],
timestamp=instruction.timestamp,
)
except (KafkaError, AssertionError) as exc:
raise BackupDataRestorationError("Error while calling send on restoring messages") from exc
except BufferError as exc:
producer.poll(timeout=0.1)
try_sending = tries < MAX_KAFKA_SEND_RETRIES
if not try_sending:
raise BackupDataRestorationError("Kafka producer buffer is full") from exc
else:
# Record is in the send buffer
send_future.add_done_callback(producer_callback)
break


def restore_backup(
Expand Down
60 changes: 59 additions & 1 deletion tests/integration/backup/test_v3_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
)


def test_backup_restoration_fails_when_producer_send_fails(
def test_backup_restoration_fails_when_producer_send_fails_on_unknown_topic_or_partition(
admin_client: KafkaAdminClient,
kafka_servers: KafkaServers,
) -> None:
Expand Down Expand Up @@ -641,6 +641,64 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
)


def test_backup_restoration_fails_when_producer_send_fails_on_buffer_error(
admin_client: KafkaAdminClient,
kafka_servers: KafkaServers,
) -> None:
topic_name = "296ddf62"
backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / topic_name
metadata_path = backup_directory / f"{topic_name}.metadata"

# Make sure topic doesn't exist beforehand.
try:
admin_client.delete_topic(topic_name)
except UnknownTopicOrPartitionError:
logger.info("No previously existing topic.")
else:
logger.info("Deleted topic from previous run.")

config = set_config_defaults(
{
"bootstrap_uri": kafka_servers.bootstrap_servers,
}
)

class FailToSendProducer(KafkaProducer):
def send(self, *args, **kwargs):
raise BufferError()

def poll(self, timeout: float) -> None: # pylint: disable=unused-argument
return

class FailToSendProducerContext:
def __init__(self):
self._producer = FailToSendProducer(
bootstrap_servers=config["bootstrap_uri"],
security_protocol=config["security_protocol"],
ssl_cafile=config["ssl_cafile"],
ssl_certfile=config["ssl_certfile"],
ssl_keyfile=config["ssl_keyfile"],
sasl_mechanism=config["sasl_mechanism"],
sasl_plain_username=config["sasl_plain_username"],
sasl_plain_password=config["sasl_plain_password"],
)

def __enter__(self):
return self._producer

def __exit__(self, exc_type, exc_value, exc_traceback):
self._producer.flush()

with patch("karapace.backup.api._producer") as p:
p.return_value = FailToSendProducerContext()
with pytest.raises(BackupDataRestorationError, match="Kafka producer buffer is full"):
api.restore_backup(
config=config,
backup_location=metadata_path,
topic_name=TopicName(topic_name),
)


def no_color_env() -> dict[str, str]:
env = os.environ.copy()
try:
Expand Down
Binary file not shown.
Binary file not shown.

0 comments on commit 2e520f6

Please sign in to comment.