Skip to content

Commit

Permalink
fix: handle BufferError from Kafka producer
Browse files Browse the repository at this point in the history
Try ten times when BufferError occurs, quit if cannot progress.
Call the Kafka producer poll to handle the pending requests and continue.
  • Loading branch information
jjaakola-aiven committed Feb 19, 2024
1 parent 3a3cfbf commit 36b4a17
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 12 deletions.
37 changes: 26 additions & 11 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@

LOG = logging.getLogger(__name__)

MAX_KAFKA_SEND_RETRIES = 10

B = TypeVar("B", str, bytes)
F = TypeVar("F")

Expand Down Expand Up @@ -386,17 +388,30 @@ def _handle_producer_send(
instruction.key,
instruction.value,
)
try:
producer.send(
instruction.topic_name,
key=instruction.key,
value=instruction.value,
partition=instruction.partition_index,
headers=[(key.decode() if key is not None else None, value) for key, value in instruction.headers],
timestamp=instruction.timestamp,
).add_done_callback(producer_callback)
except (KafkaError, AssertionError) as exc:
raise BackupDataRestorationError("Error while calling send on restoring messages") from exc
try_sending = True
tries = 0
while try_sending:
tries += 1
try:
send_future = producer.send(
instruction.topic_name,
key=instruction.key,
value=instruction.value,
partition=instruction.partition_index,
headers=[(key.decode() if key is not None else None, value) for key, value in instruction.headers],
timestamp=instruction.timestamp,
)
except (KafkaError, AssertionError) as exc:
raise BackupDataRestorationError("Error while calling send on restoring messages") from exc
except BufferError as exc:
producer.poll(timeout=0.1)
try_sending = tries < MAX_KAFKA_SEND_RETRIES
if not try_sending:
raise BackupDataRestorationError("Kafka producer buffer is full") from exc
else:
# Record is in the send buffer
send_future.add_done_callback(producer_callback)
break


def restore_backup(
Expand Down
60 changes: 59 additions & 1 deletion tests/integration/backup/test_v3_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
)


def test_backup_restoration_fails_when_producer_send_fails(
def test_backup_restoration_fails_when_producer_send_fails_on_unknown_topic_or_partition(
admin_client: KafkaAdminClient,
kafka_servers: KafkaServers,
) -> None:
Expand Down Expand Up @@ -641,6 +641,64 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
)


def test_backup_restoration_fails_when_producer_send_fails_on_buffer_error(
admin_client: KafkaAdminClient,
kafka_servers: KafkaServers,
) -> None:
topic_name = "296ddf62"
backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / topic_name
metadata_path = backup_directory / f"{topic_name}.metadata"

# Make sure topic doesn't exist beforehand.
try:
admin_client.delete_topic(topic_name)
except UnknownTopicOrPartitionError:
logger.info("No previously existing topic.")
else:
logger.info("Deleted topic from previous run.")

config = set_config_defaults(
{
"bootstrap_uri": kafka_servers.bootstrap_servers,
}
)

class FailToSendProducer(KafkaProducer):
def send(self, *args, **kwargs):
raise BufferError()

def poll(self, timeout: float) -> None: # pylint: disable=unused-argument
return

class FailToSendProducerContext:
def __init__(self):
self._producer = FailToSendProducer(
bootstrap_servers=config["bootstrap_uri"],
security_protocol=config["security_protocol"],
ssl_cafile=config["ssl_cafile"],
ssl_certfile=config["ssl_certfile"],
ssl_keyfile=config["ssl_keyfile"],
sasl_mechanism=config["sasl_mechanism"],
sasl_plain_username=config["sasl_plain_username"],
sasl_plain_password=config["sasl_plain_password"],
)

def __enter__(self):
return self._producer

def __exit__(self, exc_type, exc_value, exc_traceback):
self._producer.flush()

with patch("karapace.backup.api._producer") as p:
p.return_value = FailToSendProducerContext()
with pytest.raises(BackupDataRestorationError, match="Kafka producer buffer is full"):
api.restore_backup(
config=config,
backup_location=metadata_path,
topic_name=TopicName(topic_name),
)


def no_color_env() -> dict[str, str]:
env = os.environ.copy()
try:
Expand Down
Binary file not shown.
Binary file not shown.

0 comments on commit 36b4a17

Please sign in to comment.