From 3c0177ded3a52893fcce2b65989947b27e30a481 Mon Sep 17 00:00:00 2001 From: Kartikey-Mishra1 Date: Mon, 20 Jan 2025 15:18:32 +0530 Subject: [PATCH 1/2] fix: Fetch and initialize additional ConfigEntry properties Signed-off-by: Kartikey-Mishra1 --- .../io/vertx/kafka/admin/ConfigEntry.java | 20 +++ .../kafka/client/common/impl/Helper.java | 13 +- .../tests/AdminClientConfigEntryTest.java | 118 ++++++++++++++++++ 3 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 src/test/java/io/vertx/kafka/client/tests/AdminClientConfigEntryTest.java diff --git a/src/main/java/io/vertx/kafka/admin/ConfigEntry.java b/src/main/java/io/vertx/kafka/admin/ConfigEntry.java index cb0df6a0..08fef062 100644 --- a/src/main/java/io/vertx/kafka/admin/ConfigEntry.java +++ b/src/main/java/io/vertx/kafka/admin/ConfigEntry.java @@ -56,6 +56,26 @@ public ConfigEntry(String name, String value) { this.value = value; } + /** + * Constructor + * + * @param name the non-null config name + * @param value the config value or null + * @param source the source of this config entry + * @param isSensitive whether the config value is sensitive, the broker never returns the value if it is sensitive + * @param isReadOnly whether the config is read-only and cannot be updated + * @param synonyms Synonym configs in order of precedence + */ + public ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, + boolean isReadOnly, List synonyms) { + this.name = name; + this.value = value; + this.source = source; + this.isSensitive = isSensitive; + this.isReadOnly = isReadOnly; + this.synonyms = synonyms; + } + /** * Constructor (from JSON representation) * diff --git a/src/main/java/io/vertx/kafka/client/common/impl/Helper.java b/src/main/java/io/vertx/kafka/client/common/impl/Helper.java index 06ddf087..9f456429 100644 --- a/src/main/java/io/vertx/kafka/client/common/impl/Helper.java +++ b/src/main/java/io/vertx/kafka/client/common/impl/Helper.java @@ -19,6 +19,7 @@ import io.vertx.core.Handler; import io.vertx.kafka.admin.Config; import io.vertx.kafka.admin.ConfigEntry; +import io.vertx.kafka.admin.ConfigSynonym; import io.vertx.kafka.admin.ConsumerGroupListing; import io.vertx.kafka.admin.DescribeClusterOptions; import io.vertx.kafka.admin.DescribeConsumerGroupsOptions; @@ -197,13 +198,23 @@ public static Map fromConfigEntries(Collection configEntries) { return configEntries.stream().map(Helper::from).collect(Collectors.toList()); } + public static List fromConfigSynonyms(Collection configSynonyms) { + return configSynonyms.stream().map(Helper::from).collect(Collectors.toList()); + } + + public static ConfigSynonym from(org.apache.kafka.clients.admin.ConfigEntry.ConfigSynonym configSynonym) { + return new ConfigSynonym(configSynonym.name(), configSynonym.value(), configSynonym.source()); + } + public static ConsumerGroupListing from(org.apache.kafka.clients.admin.ConsumerGroupListing consumerGroupListing) { return new ConsumerGroupListing(consumerGroupListing.groupId(), consumerGroupListing.isSimpleConsumerGroup()); } diff --git a/src/test/java/io/vertx/kafka/client/tests/AdminClientConfigEntryTest.java b/src/test/java/io/vertx/kafka/client/tests/AdminClientConfigEntryTest.java new file mode 100644 index 00000000..cef64a95 --- /dev/null +++ b/src/test/java/io/vertx/kafka/client/tests/AdminClientConfigEntryTest.java @@ -0,0 +1,118 @@ +/* + * Copyright 2025 Red Hat Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.vertx.kafka.client.tests; + +import java.io.IOException; +import java.util.Collections; +import java.util.Properties; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.vertx.core.Vertx; +import io.vertx.ext.unit.TestContext; +import io.vertx.kafka.admin.ConfigEntry; +import io.vertx.kafka.admin.KafkaAdminClient; +import io.vertx.kafka.admin.NewTopic; +import io.vertx.kafka.client.common.ConfigResource; + +public class AdminClientConfigEntryTest extends KafkaClusterTestBase { + private static final String MIN_INSYNC_REPLICAS = "min.insync.replicas"; + private Vertx vertx; + private Properties config; + + @Before + public void beforeTest() { + this.vertx = Vertx.vertx(); + this.config = new Properties(); + this.config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + } + + @After + public void afterTest(TestContext ctx) { + this.vertx.close().onComplete(ctx.asyncAssertSuccess()); + } + + @BeforeClass + public static void setUp() throws IOException { + kafkaCluster = kafkaCluster(true).deleteDataPriorToStartup(true).addBrokers(2).startup(); + } + + @Test + public void testPropertiesOfEntryNotConfiguredExplicitly(TestContext ctx) { + KafkaAdminClient adminClient = KafkaAdminClient.create(this.vertx, config); + + String topicName = "topic-default-min-isr"; + NewTopic topic = new NewTopic(topicName, 1, (short)1); + + adminClient.createTopics(Collections.singletonList(topic)).onComplete(ctx.asyncAssertSuccess(v -> { + + ConfigResource topicResource = new ConfigResource(org.apache.kafka.common.config.ConfigResource.Type.TOPIC, topicName); + adminClient.describeConfigs(Collections.singletonList(topicResource)).onComplete(ctx.asyncAssertSuccess(desc -> { + + ConfigEntry minISREntry = desc.get(topicResource) + .getEntries() + .stream() + .filter(entry -> MIN_INSYNC_REPLICAS.equals(entry.getName())) + .findFirst() + .get(); + + ctx.assertTrue(minISREntry.isDefault()); + ctx.assertEquals(minISREntry.getSource(), org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DEFAULT_CONFIG); + + adminClient.deleteTopics(Collections.singletonList(topicName)).onComplete(ctx.asyncAssertSuccess(r -> { + adminClient.close(); + })); + })); + })); + } + + @Test + public void testPropertiesOfEntryConfiguredExplicitly(TestContext ctx) { + KafkaAdminClient adminClient = KafkaAdminClient.create(this.vertx, config); + + String topicName = "topic-custom-min-isr"; + NewTopic topic = new NewTopic(topicName, 1, (short)1); + topic.setConfig(Collections.singletonMap(MIN_INSYNC_REPLICAS, "1")); + + adminClient.createTopics(Collections.singletonList(topic)).onComplete(ctx.asyncAssertSuccess(v -> { + + ConfigResource topicResource = new ConfigResource(org.apache.kafka.common.config.ConfigResource.Type.TOPIC, topicName); + adminClient.describeConfigs(Collections.singletonList(topicResource)).onComplete(ctx.asyncAssertSuccess(desc -> { + + ConfigEntry minISREntry = desc.get(topicResource) + .getEntries() + .stream() + .filter(entry -> MIN_INSYNC_REPLICAS.equals(entry.getName())) + .findFirst() + .get(); + + ctx.assertFalse(minISREntry.isDefault()); + ctx.assertEquals(minISREntry.getSource(), org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG); + + adminClient.deleteTopics(Collections.singletonList(topicName)).onComplete(ctx.asyncAssertSuccess(r -> { + adminClient.close(); + })); + })); + })); + } + + +} From a64c8bffdd53606bbfcf3dbd5a237abd53be5f14 Mon Sep 17 00:00:00 2001 From: Kartikey-Mishra1 Date: Tue, 21 Jan 2025 16:44:07 +0530 Subject: [PATCH 2/2] style: format using .editorconfig Signed-off-by: Kartikey-Mishra1 --- .../java/io/vertx/kafka/admin/ConfigEntry.java | 8 ++++---- .../client/tests/AdminClientConfigEntryTest.java | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/main/java/io/vertx/kafka/admin/ConfigEntry.java b/src/main/java/io/vertx/kafka/admin/ConfigEntry.java index 08fef062..03c2e4a3 100644 --- a/src/main/java/io/vertx/kafka/admin/ConfigEntry.java +++ b/src/main/java/io/vertx/kafka/admin/ConfigEntry.java @@ -58,15 +58,15 @@ public ConfigEntry(String name, String value) { /** * Constructor - * - * @param name the non-null config name + * + * @param name the non-null config name * @param value the config value or null - * @param source the source of this config entry + * @param source the source of this config entry * @param isSensitive whether the config value is sensitive, the broker never returns the value if it is sensitive * @param isReadOnly whether the config is read-only and cannot be updated * @param synonyms Synonym configs in order of precedence */ - public ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, + public ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean isReadOnly, List synonyms) { this.name = name; this.value = value; diff --git a/src/test/java/io/vertx/kafka/client/tests/AdminClientConfigEntryTest.java b/src/test/java/io/vertx/kafka/client/tests/AdminClientConfigEntryTest.java index cef64a95..6428208e 100644 --- a/src/test/java/io/vertx/kafka/client/tests/AdminClientConfigEntryTest.java +++ b/src/test/java/io/vertx/kafka/client/tests/AdminClientConfigEntryTest.java @@ -34,7 +34,7 @@ import io.vertx.kafka.client.common.ConfigResource; public class AdminClientConfigEntryTest extends KafkaClusterTestBase { - private static final String MIN_INSYNC_REPLICAS = "min.insync.replicas"; + private static final String MIN_INSYNC_REPLICAS = "min.insync.replicas"; private Vertx vertx; private Properties config; @@ -58,15 +58,15 @@ public static void setUp() throws IOException { @Test public void testPropertiesOfEntryNotConfiguredExplicitly(TestContext ctx) { KafkaAdminClient adminClient = KafkaAdminClient.create(this.vertx, config); - + String topicName = "topic-default-min-isr"; NewTopic topic = new NewTopic(topicName, 1, (short)1); adminClient.createTopics(Collections.singletonList(topic)).onComplete(ctx.asyncAssertSuccess(v -> { - + ConfigResource topicResource = new ConfigResource(org.apache.kafka.common.config.ConfigResource.Type.TOPIC, topicName); adminClient.describeConfigs(Collections.singletonList(topicResource)).onComplete(ctx.asyncAssertSuccess(desc -> { - + ConfigEntry minISREntry = desc.get(topicResource) .getEntries() .stream() @@ -87,16 +87,16 @@ public void testPropertiesOfEntryNotConfiguredExplicitly(TestContext ctx) { @Test public void testPropertiesOfEntryConfiguredExplicitly(TestContext ctx) { KafkaAdminClient adminClient = KafkaAdminClient.create(this.vertx, config); - + String topicName = "topic-custom-min-isr"; NewTopic topic = new NewTopic(topicName, 1, (short)1); topic.setConfig(Collections.singletonMap(MIN_INSYNC_REPLICAS, "1")); adminClient.createTopics(Collections.singletonList(topic)).onComplete(ctx.asyncAssertSuccess(v -> { - + ConfigResource topicResource = new ConfigResource(org.apache.kafka.common.config.ConfigResource.Type.TOPIC, topicName); adminClient.describeConfigs(Collections.singletonList(topicResource)).onComplete(ctx.asyncAssertSuccess(desc -> { - + ConfigEntry minISREntry = desc.get(topicResource) .getEntries() .stream()