From 5979da13081dbb8c54340a90fb13b88d6bac4e5c Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Mon, 23 Dec 2024 18:15:18 +0800 Subject: [PATCH] feat: use the internal partitioner to choose the partition to send msg to Signed-off-by: Ning Yu --- .../apache/kafka/tools/automq/perf/ProducerService.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ProducerService.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ProducerService.java index c4024c36f9..410a11072a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ProducerService.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ProducerService.java @@ -97,6 +97,7 @@ private Producer createProducer(Topic topic, ProducersConfig config, ProducerCal properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + properties.put(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG, true); KafkaProducer kafkaProducer = new KafkaProducer<>(properties); return new Producer(kafkaProducer, topic, callback); @@ -265,8 +266,6 @@ private static class Producer implements AutoCloseable { private final Topic topic; private final ProducerCallback callback; - private int partitionIndex = 0; - public Producer(KafkaProducer producer, Topic topic, ProducerCallback callback) { this.producer = producer; this.topic = topic; @@ -278,11 +277,7 @@ public Producer(KafkaProducer producer, Topic topic, ProducerCal * NOT thread-safe. */ public CompletableFuture sendAsync(byte[] payload) { - return sendAsync(nextKey(), payload, nextPartition()); - } - - private int nextPartition() { - return partitionIndex++ % topic.partitions; + return sendAsync(nextKey(), payload, null); } private String nextKey() {