From 6c6eab92911875e04afc48cb9cb19c516c3e3b08 Mon Sep 17 00:00:00 2001 From: Curtis Wan Date: Tue, 14 Nov 2023 05:52:57 -0600 Subject: [PATCH] refactor: enable access from the host (#445) Signed-off-by: Curtis Wan --- README.md | 20 +++++---------- config/kraft/broker.properties | 3 ++- docker/docker-compose.yaml | 46 +++++++++++++++++++++++----------- docker/scripts/start.sh | 38 ++++++++++++++++++++-------- 4 files changed, 67 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 3d3a2512bc..45197cd347 100644 --- a/README.md +++ b/README.md @@ -41,24 +41,19 @@ Launch an AutoMQ for Kafka cluster locally using Docker Compose. This cluster comprises 1 Controller node, 2 Broker nodes, and an additional LocalStack container to simulate S3 services locally. ``` bash -cd docker # launch AutoMQ for Kafka cluster -docker compose up -d +docker compose -f docker/docker-compose.yaml up -d ``` #### Run a console producer and consumer 1. Create a topic to store your events: ``` bash -# login to broker1 -docker exec -it broker1 bash -# go to kafka directory -cd /opt/kafka/kafka # create quickstart-events topic -bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server $(hostname -I | awk '{print $1}'):9092 +bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9094 ``` 2. Run the console producer client to write a few events into your topic. By default, each line you enter will result in a separate event being written to the topic. ``` bash -bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server $(hostname -I | awk '{print $1}'):9092 +bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9094 ``` You may input some messages like: ``` text @@ -68,17 +63,14 @@ This is my second event 3. Run the console consumer client to read the events you just created: ``` bash -# CRTL-C to exit the producer +# CRTL-C to exit the consumer # run console consumer -bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server $(hostname -I | awk '{print $1}'):9092 +bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9094 ``` 4. Clean up the cluster ``` bash -# exit from the broker1 container -exit -# clean up the cluster -docker compose down -v +docker compose -f docker/docker-compose.yaml down -v ``` [Explore more](https://docs.automq.com/zh/docs/automq-s3kafka/VKpxwOPvciZmjGkHk5hcTz43nde): Second-level partition migration and automatic traffic rebalancing. diff --git a/config/kraft/broker.properties b/config/kraft/broker.properties index cbde0c97c8..737a860aa8 100644 --- a/config/kraft/broker.properties +++ b/config/kraft/broker.properties @@ -197,4 +197,5 @@ autobalancer.reporter.network.in.capacity=102400 autobalancer.reporter.network.out.capacity=102400 # The reporter interval of Auto Balancer metric reporter, default 10s -autobalancer.reporter.metrics.reporting.interval.ms=10000 \ No newline at end of file +autobalancer.reporter.metrics.reporting.interval.ms=10000 + diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index a61c5b8f64..57afdeaf24 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -30,7 +30,7 @@ services: - /var/run/docker.sock:/var/run/docker.sock # use a static ip networks: - kos_net: + afk_net: ipv4_address: 10.6.0.2 # create needed buckets @@ -47,21 +47,19 @@ services: localstack: condition: service_healthy networks: - - kos_net + - afk_net controller: container_name: "${CONTROLLER_DOCKER_NAME-controller}" hostname: "${CONTROLLER_DOCKER_NAME-controller}" - image: automqinc/kafka:0.6.6 + image: automqinc/kafka:0.7.0 environment: - KAFKA_S3_ACCESS_KEY=test - KAFKA_S3_SECRET_KEY=test - KAFKA_HEAP_OPTS=-Xms1g -Xmx1g -XX:MetaspaceSize=96m - command: bash /opt/kafka/scripts/start.sh up --process.roles controller --node.id 0 --controller.quorum.voters 0@10.6.0.4:9093 --s3.bucket ko3 --s3.endpoint http://10.6.0.2:4566 --s3.region us-east-1 - # use a static ip + command: bash /opt/kafka/scripts/start.sh up --process.roles controller --node.id 0 --controller.quorum.voters 0@controller:9093 --s3.bucket ko3 --s3.endpoint http://10.6.0.2:4566 --s3.region us-east-1 networks: - kos_net: - ipv4_address: 10.6.0.4 + - afk_net depends_on: - localstack - aws-cli @@ -69,14 +67,23 @@ services: broker1: container_name: "${BROKER1_DOCKER_NAME-broker1}" hostname: "${BROKER1_DOCKER_NAME-broker1}" - image: automqinc/kafka:0.6.6 + image: automqinc/kafka:0.7.0 + ports: + - "9094:9094" environment: - KAFKA_S3_ACCESS_KEY=test - KAFKA_S3_SECRET_KEY=test - KAFKA_HEAP_OPTS=-Xms1g -Xmx1g -XX:MetaspaceSize=96m -XX:MaxDirectMemorySize=1G - command: bash /opt/kafka/scripts/start.sh up --process.roles broker --node.id 1 --controller.quorum.voters 0@10.6.0.4:9093 --s3.bucket ko3 --s3.endpoint http://10.6.0.2:4566 --s3.region us-east-1 --override autobalancer.reporter.network.in.capacity=5120 --override autobalancer.reporter.network.out.capacity=5120 --override autobalancer.reporter.metrics.reporting.interval.ms=5000 + - KAFKA_CFG_AUTOBALANCER_REPORTER_NETWORK_IN_CAPACITY=5120 + - KAFKA_CFG_AUTOBALANCER_REPORTER_NETWORK_OUT_CAPACITY=5120 + - KAFKA_CFG_AUTOBALANCER_REPORTER_METRICS_REPORTING_INTERVAL_MS=5000 + # override listener settings + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,EXTERNAL://:9094 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://broker1:9092,EXTERNAL://localhost:9094 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT + command: bash /opt/kafka/scripts/start.sh up --process.roles broker --node.id 1 --controller.quorum.voters 0@controller:9093 --s3.bucket ko3 --s3.endpoint http://10.6.0.2:4566 --s3.region us-east-1 networks: - - kos_net + - afk_net depends_on: - localstack - aws-cli @@ -85,14 +92,23 @@ services: broker2: container_name: "${BROKER2_DOCKER_NAME-broker2}" hostname: "${BROKER2_DOCKER_NAME-broker2}" - image: automqinc/kafka:0.6.6 + image: automqinc/kafka:0.7.0 + ports: + - "9095:9095" environment: - KAFKA_S3_ACCESS_KEY=test - KAFKA_S3_SECRET_KEY=test - KAFKA_HEAP_OPTS=-Xms1g -Xmx1g -XX:MetaspaceSize=96m -XX:MaxDirectMemorySize=1G - command: bash /opt/kafka/scripts/start.sh up --process.roles broker --node.id 2 --controller.quorum.voters 0@10.6.0.4:9093 --s3.bucket ko3 --s3.endpoint http://10.6.0.2:4566 --s3.region us-east-1 --override autobalancer.reporter.network.in.capacity=5120 --override autobalancer.reporter.network.out.capacity=5120 --override autobalancer.reporter.metrics.reporting.interval.ms=5000 + - KAFKA_CFG_AUTOBALANCER_REPORTER_NETWORK_IN_CAPACITY=5120 + - KAFKA_CFG_AUTOBALANCER_REPORTER_NETWORK_OUT_CAPACITY=5120 + - KAFKA_CFG_AUTOBALANCER_REPORTER_METRICS_REPORTING_INTERVAL_MS=5000 + # override listener settings + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,EXTERNAL://:9095 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://broker2:9092,EXTERNAL://localhost:9095 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT + command: bash /opt/kafka/scripts/start.sh up --process.roles broker --node.id 2 --controller.quorum.voters 0@controller:9093 --s3.bucket ko3 --s3.endpoint http://10.6.0.2:4566 --s3.region us-east-1 networks: - - kos_net + - afk_net depends_on: - localstack - aws-cli @@ -104,8 +120,8 @@ volumes: driver: local networks: - kos_net: - name: kos_net + afk_net: + name: afk_net driver: bridge ipam: driver: default diff --git a/docker/scripts/start.sh b/docker/scripts/start.sh index 5d36783ca2..8adbd25708 100644 --- a/docker/scripts/start.sh +++ b/docker/scripts/start.sh @@ -80,7 +80,7 @@ help|-h|--help Display this help message up [--process.roles ROLE] [--node.id NODE_ID] [--controller.quorum.voters VOTERS] [--s3.region REGION] [--s3.bucket BUCKET] [--s3.endpoint ENDPOINT] - [--s3.access.key ACCESS_KEY] [--s3.secret.key SECRET_KEY] [--override KEY=VALUE] + [--s3.access.key ACCESS_KEY] [--s3.secret.key SECRET_KEY] start node. EOF exit "${exit_status}" @@ -189,7 +189,7 @@ add_settings_for_s3() { add_or_setup_value "s3.stream.object.split.size" "16777216" "${file_name}" add_or_setup_value "s3.object.block.size" "16777216" "${file_name}" add_or_setup_value "s3.object.part.size" "33554432" "${file_name}" - add_or_setup_value "s3.cache.size" "1073741824" "${file_name}" + add_or_setup_value "s3.block.cache.size" "1073741824" "${file_name}" add_or_setup_value "stream.set.object.compaction.cache.size" "536870912" "${file_name}" fi } @@ -220,9 +220,32 @@ kafka_monitor_ip() { fi } +configure_from_environment_variables() { + file_name=$1 + # List of special cases to apply to the variables + local -r exception_regexps=( + "s/sasl\.ssl/sasl_ssl/g" + "s/sasl\.plaintext/sasl_plaintext/g" + ) + # Map environment variables to config properties + for var in "${!KAFKA_CFG_@}"; do + key="$(echo "$var" | sed -e 's/^KAFKA_CFG_//g' -e 's/_/\./g' | tr '[:upper:]' '[:lower:]')" + + # Exception for the camel case in this environment variable + [[ "$var" == "KAFKA_CFG_ZOOKEEPER_CLIENTCNXNSOCKET" ]] && key="zookeeper.clientCnxnSocket" + + # Apply exception regexps + for regex in "${exception_regexps[@]}"; do + key="$(echo "$key" | sed "$regex")" + done + + value="${!var}" + add_or_setup_value "${key}" "${value}" "${file_name}" + done +} + kafka_up() { echo "kafka_up: start" - override_settings=() while [[ $# -ge 1 ]]; do case "${1}" in @@ -235,7 +258,6 @@ kafka_up() { --s3.access.key) set_once s3_access_key "${2}" "s3 access key"; shift 2;; --s3.secret.key) set_once s3_secret_key "${2}" "s3 secret key"; shift 2;; --s3.endpoint) set_once s3_endpoint "${2}" "s3 endpoint"; shift 2;; - --override) [[ -n "${2}" ]] && override_settings+=("${2}"); shift 2;; esac done @@ -294,12 +316,8 @@ kafka_up() { kafka_monitor_ip echo "kafka_up: ip settings changed" - # override settings - for element in "${override_settings[@]}"; do - key=$(echo "${element}" | awk -F= '{print $1}') - value=$(echo "${element}" | awk -F= '{print $2}') - add_or_setup_value "${key}" "${value}" "${kafka_dir}/config/kraft/${process_role}.properties" - done + # override settings from env + configure_from_environment_variables "${kafka_dir}/config/kraft/${process_role}.properties" # format the data path must_do -v "${kafka_dir}/bin/kafka-storage.sh format -g -t ${cluster_id} -c ${kafka_dir}/config/kraft/${process_role}.properties"