Skip to content

Commit

Permalink
feat: enable the customization of the kafka properties
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 committed Jul 29, 2024
1 parent 8c50eb0 commit 04c1ce0
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 1 deletion.
1 change: 1 addition & 0 deletions karapace/kafka_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaCons
sasl_plain_username=config["sasl_plain_username"],
sasl_plain_password=config["sasl_plain_password"],
auto_offset_reset="earliest",
session_timeout_ms=config["session_timeout_ms"],
metadata_max_age_ms=config["metadata_max_age_ms"],
)
try:
Expand Down
81 changes: 81 additions & 0 deletions tests/integration/backup/test_session_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""
from confluent_kafka.admin import NewTopic
from karapace.backup.api import BackupVersion, create_backup
from karapace.config import Config, set_config_defaults
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka_utils import kafka_producer_from_config
from pathlib import Path
from tests.integration.conftest import create_kafka_server
from tests.integration.utils.config import KafkaDescription
from tests.integration.utils.kafka_server import KafkaServers
from tests.integration.utils.network import PortRangeInclusive

import pytest


SESSION_TIMEOUT_MS = 65000
GROUP_MIN_SESSION_TIMEOUT_MS = 60000
GROUP_MAX_SESSION_TIMEOUT_MS = 70000


# use a dedicated kafka server with specific values for
# group.min.session.timeout.ms and group.max.session.timeout.ms
@pytest.fixture(scope="function", name="kafka_server_session_timeout")
def fixture_kafka_server(
kafka_description: KafkaDescription,
port_range: PortRangeInclusive,
tmp_path_factory: pytest.TempPathFactory,
):
# use custom data and log dir to avoid conflict with other kafka servers
session_datadir = tmp_path_factory.mktemp("kafka_server_min_data")
session_logdir = tmp_path_factory.mktemp("kafka_server_min_log")
kafka_config_extra = {
"group.min.session.timeout.ms": GROUP_MIN_SESSION_TIMEOUT_MS,
"group.max.session.timeout.ms": GROUP_MAX_SESSION_TIMEOUT_MS,
}
yield from create_kafka_server(
session_datadir,
session_logdir,
kafka_description,
port_range,
kafka_config_extra,
)


def test_generic(
kafka_server_session_timeout: KafkaServers,
new_topic: NewTopic,
tmp_path: Path,
) -> None:
config = set_config_defaults(Config(
bootstrap_uri=kafka_server_session_timeout.bootstrap_servers,
session_timeout_ms=SESSION_TIMEOUT_MS,
))
# create topic
admin_client = KafkaAdminClient(bootstrap_servers=kafka_server_session_timeout.bootstrap_servers)
admin_client.new_topic(new_topic.topic, num_partitions=1, replication_factor=1)
# write data into topic
with kafka_producer_from_config(config) as producer:
producer.send(
new_topic.topic,
key=b"foo",
value=b"bar",
partition=0,
headers=[
("some-header", b"some header value"),
("other-header", b"some other header value"),
],
timestamp=1683474657,
)
producer.flush()
# backup the topic
create_backup(
config=config,
backup_location=tmp_path / "backup",
topic_name=new_topic.topic,
version=BackupVersion.V3,
replication_factor=1,
)
18 changes: 17 additions & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from tests.integration.utils.synchronization import lock_path_for
from tests.integration.utils.zookeeper import configure_and_start_zk
from tests.utils import repeat_until_successful_request
from typing import AsyncIterator, Iterator, List, Optional
from typing import Any, AsyncIterator, Iterator, List, Optional
from urllib.parse import urlparse

import asyncio
Expand Down Expand Up @@ -120,6 +120,21 @@ def fixture_kafka_server(
yield kafka_servers
return

yield from create_kafka_server(
session_datadir,
session_logdir,
kafka_description,
port_range,
)

def create_kafka_server(
session_datadir: Path,
session_logdir: Path,
kafka_description: KafkaDescription,
port_range: PortRangeInclusive,
kafka_properties: dict[str, Any] = {},
) -> Iterator[KafkaServers]:

zk_dir = session_logdir / "zk"

# File used to share data among test runners, including the dynamic
Expand Down Expand Up @@ -170,6 +185,7 @@ def fixture_kafka_server(
kafka_config=kafka_config,
kafka_description=kafka_description,
log4j_config=KAFKA_LOG4J,
kafka_properties=kafka_properties,
)
stack.callback(stop_process, kafka_proc)

Expand Down
2 changes: 2 additions & 0 deletions tests/integration/utils/kafka_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def configure_and_start_kafka(
kafka_config: KafkaConfig,
kafka_description: KafkaDescription,
log4j_config: str,
kafka_properties: dict[str, str | int]
) -> Popen:
config_path = Path(kafka_config.logdir) / "server.properties"

Expand Down Expand Up @@ -167,6 +168,7 @@ def configure_and_start_kafka(
"zookeeper.connection.timeout.ms": 6000,
"zookeeper.connect": f"127.0.0.1:{zk_config.client_port}",
}
kafka_ini.update(kafka_properties)

write_ini(config_path, kafka_ini)

Expand Down

0 comments on commit 04c1ce0

Please sign in to comment.