From 370de29ab6146e4af413012dd1d9f8d0a2c48623 Mon Sep 17 00:00:00 2001 From: g7ed6e <681739+g7ed6e@users.noreply.github.com> Date: Tue, 12 Dec 2023 07:41:45 +0100 Subject: [PATCH 1/4] Allow multiple calls to SetLogHandler / SetErrorHandler / SetStatisticsHandler --- src/Confluent.Kafka/ConsumerBuilder.cs | 6 +++--- src/Confluent.Kafka/ProducerBuilder.cs | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Confluent.Kafka/ConsumerBuilder.cs b/src/Confluent.Kafka/ConsumerBuilder.cs index 56549e783..f8d79338d 100644 --- a/src/Confluent.Kafka/ConsumerBuilder.cs +++ b/src/Confluent.Kafka/ConsumerBuilder.cs @@ -159,7 +159,7 @@ public ConsumerBuilder SetStatisticsHandler( { if (this.StatisticsHandler != null) { - throw new InvalidOperationException("Statistics handler may not be specified more than once."); + this.StatisticsHandler += statisticsHandler; } this.StatisticsHandler = statisticsHandler; return this; @@ -182,7 +182,7 @@ public ConsumerBuilder SetErrorHandler( { if (this.ErrorHandler != null) { - throw new InvalidOperationException("Error handler may not be specified more than once."); + this.ErrorHandler += errorHandler; } this.ErrorHandler = errorHandler; return this; @@ -212,7 +212,7 @@ public ConsumerBuilder SetLogHandler( { if (this.LogHandler != null) { - throw new InvalidOperationException("Log handler may not be specified more than once."); + this.LogHandler += logHandler; } this.LogHandler = logHandler; return this; diff --git a/src/Confluent.Kafka/ProducerBuilder.cs b/src/Confluent.Kafka/ProducerBuilder.cs index e2a3b488b..a82bf2a8d 100644 --- a/src/Confluent.Kafka/ProducerBuilder.cs +++ b/src/Confluent.Kafka/ProducerBuilder.cs @@ -169,7 +169,7 @@ public ProducerBuilder SetStatisticsHandler(Action SetErrorHandler(Action SetLogHandler(Action Date: Tue, 12 Dec 2023 07:41:56 +0100 Subject: [PATCH 2/4] Add Jetbrains Rider files to .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index de02846e6..84deaf8b0 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ UpgradeLog*.htm *~ \#* core +.idea/ From 056bce2f50cde5e92027ef70f886f0d9089237a7 Mon Sep 17 00:00:00 2001 From: g7ed6e <681739+g7ed6e@users.noreply.github.com> Date: Thu, 14 Dec 2023 21:35:05 +0100 Subject: [PATCH 3/4] Add unit tests --- src/Confluent.Kafka/ConsumerBuilder.cs | 18 +- src/Confluent.Kafka/ProducerBuilder.cs | 18 +- .../SkipWhenCITheory.cs | 16 ++ .../Tests/Builder_Handlers.cs | 235 ++++++++++++++++++ 4 files changed, 257 insertions(+), 30 deletions(-) create mode 100644 test/Confluent.Kafka.IntegrationTests/SkipWhenCITheory.cs create mode 100644 test/Confluent.Kafka.IntegrationTests/Tests/Builder_Handlers.cs diff --git a/src/Confluent.Kafka/ConsumerBuilder.cs b/src/Confluent.Kafka/ConsumerBuilder.cs index f8d79338d..d27307b5a 100644 --- a/src/Confluent.Kafka/ConsumerBuilder.cs +++ b/src/Confluent.Kafka/ConsumerBuilder.cs @@ -157,11 +157,7 @@ public ConsumerBuilder(IEnumerable> config) public ConsumerBuilder SetStatisticsHandler( Action, string> statisticsHandler) { - if (this.StatisticsHandler != null) - { - this.StatisticsHandler += statisticsHandler; - } - this.StatisticsHandler = statisticsHandler; + this.StatisticsHandler += statisticsHandler; return this; } @@ -180,11 +176,7 @@ public ConsumerBuilder SetStatisticsHandler( public ConsumerBuilder SetErrorHandler( Action, Error> errorHandler) { - if (this.ErrorHandler != null) - { - this.ErrorHandler += errorHandler; - } - this.ErrorHandler = errorHandler; + this.ErrorHandler += errorHandler; return this; } @@ -210,11 +202,7 @@ public ConsumerBuilder SetErrorHandler( public ConsumerBuilder SetLogHandler( Action, LogMessage> logHandler) { - if (this.LogHandler != null) - { - this.LogHandler += logHandler; - } - this.LogHandler = logHandler; + this.LogHandler += logHandler; return this; } diff --git a/src/Confluent.Kafka/ProducerBuilder.cs b/src/Confluent.Kafka/ProducerBuilder.cs index a82bf2a8d..bd9f439f8 100644 --- a/src/Confluent.Kafka/ProducerBuilder.cs +++ b/src/Confluent.Kafka/ProducerBuilder.cs @@ -167,11 +167,7 @@ public ProducerBuilder(IEnumerable> config) /// public ProducerBuilder SetStatisticsHandler(Action, string> statisticsHandler) { - if (this.StatisticsHandler != null) - { - this.StatisticsHandler += statisticsHandler; - } - this.StatisticsHandler = statisticsHandler; + this.StatisticsHandler += statisticsHandler; return this; } @@ -222,11 +218,7 @@ public ProducerBuilder SetDefaultPartitioner(PartitionerDelegate p /// public ProducerBuilder SetErrorHandler(Action, Error> errorHandler) { - if (this.ErrorHandler != null) - { - this.ErrorHandler += errorHandler; - } - this.ErrorHandler = errorHandler; + this.ErrorHandler += errorHandler; return this; } @@ -251,11 +243,7 @@ public ProducerBuilder SetErrorHandler(Action public ProducerBuilder SetLogHandler(Action, LogMessage> logHandler) { - if (this.LogHandler != null) - { - this.LogHandler += logHandler; - } - this.LogHandler = logHandler; + this.LogHandler += logHandler; return this; } diff --git a/test/Confluent.Kafka.IntegrationTests/SkipWhenCITheory.cs b/test/Confluent.Kafka.IntegrationTests/SkipWhenCITheory.cs new file mode 100644 index 000000000..8d49b435b --- /dev/null +++ b/test/Confluent.Kafka.IntegrationTests/SkipWhenCITheory.cs @@ -0,0 +1,16 @@ +using System; +using Xunit; + +namespace Confluent.Kafka.IntegrationTests; + +public sealed class SkipWhenCITheory : TheoryAttribute +{ + private const string JenkinsBuildIdEnvVarName = "BUILD_ID"; + + public SkipWhenCITheory(string reason) + { + Skip = Environment.GetEnvironmentVariables().Contains(JenkinsBuildIdEnvVarName) + ? reason + : null; + } +} \ No newline at end of file diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Builder_Handlers.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Builder_Handlers.cs new file mode 100644 index 000000000..053475a83 --- /dev/null +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Builder_Handlers.cs @@ -0,0 +1,235 @@ +// Copyright 2016-2017 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +#pragma warning disable xUnit1026 + +using System; +using System.Linq; +using System.Threading; +using Xunit; + + +namespace Confluent.Kafka.IntegrationTests +{ + /// + /// Test multiple calls to SetLogHandler, SetStatisticsHandler and SetErrorHandler + /// + public partial class Tests + { + private const string UnreachableBootstrapServers = "localhost:9000"; + + [Theory, MemberData(nameof(KafkaParameters))] + public void ProducerBuilder_SetLogHandler(string bootstrapServers) + { + LogToFile("start ProducerBuilder_SetLogHandler"); + + var producerConfig = new ProducerConfig + { + BootstrapServers = bootstrapServers, + Debug = "all" + }; + + ManualResetEventSlim mres1 = new(), mres2 = new(); + + using var _ = new ProducerBuilder(producerConfig) + .SetLogHandler((_, _) => mres1.Set()) + .SetLogHandler((_, _) => mres2.Set()) + .Build(); + + Assert.True(mres1.Wait(TimeSpan.FromSeconds(5))); + Assert.True(mres2.Wait(TimeSpan.FromSeconds(5))); + + LogToFile("end ProducerBuilder_SetLogHandler"); + } + + [Theory, MemberData(nameof(KafkaParameters))] + public void ProducerBuilder_SetStatisticsHandler(string bootstrapServers) + { + LogToFile("start ProducerBuilder_SetStatisticsHandler"); + + var producerConfig = new ProducerConfig + { + BootstrapServers = bootstrapServers, + StatisticsIntervalMs = 100 + }; + + ManualResetEventSlim mres1 = new(), mres2 = new(); + + using var _ = new ProducerBuilder(producerConfig) + .SetStatisticsHandler((_, _) => mres1.Set()) + .SetStatisticsHandler((_, _) => mres2.Set()) + .Build(); + + Assert.True(mres1.Wait(TimeSpan.FromSeconds(5))); + Assert.True(mres2.Wait(TimeSpan.FromSeconds(5))); + + LogToFile("end ProducerBuilder_SetStatisticsHandler"); + } + + [Theory, InlineData(UnreachableBootstrapServers)] + public void ProducerBuilder_SetErrorHandler(string bootstrapServers) + { + LogToFile("start ProducerBuilder_SetErrorHandler"); + + var producerConfig = new ProducerConfig + { + BootstrapServers = bootstrapServers + }; + + ManualResetEventSlim mres1 = new(), mres2 = new(); + + using var _ = new ProducerBuilder(producerConfig) + .SetErrorHandler((_, _) => mres1.Set()) + .SetErrorHandler((_, _) => mres2.Set()) + .Build(); + + Assert.True(mres1.Wait(TimeSpan.FromSeconds(5))); + Assert.True(mres2.Wait(TimeSpan.FromSeconds(5))); + + LogToFile("end ProducerBuilder_SetErrorHandler"); + } + + [Theory, MemberData(nameof(KafkaParameters))] + public void ConsumerBuilder_SetLogHandler(string bootstrapServers) + { + LogToFile("start ConsumerBuilder_SetLogHandler"); + + int N = 2; + var firstProduced = Util.ProduceNullStringMessages(bootstrapServers, singlePartitionTopic, 100, N); + + var consumerConfig = new ConsumerConfig + { + GroupId = Guid.NewGuid().ToString(), + BootstrapServers = bootstrapServers, + SessionTimeoutMs = 6000, + EnablePartitionEof = true, + Debug = "all" + }; + + ManualResetEventSlim mres1 = new(), mres2 = new(); + + using var consumer = new ConsumerBuilder(consumerConfig) + .SetPartitionsAssignedHandler((c, partitions) => + { + Assert.Single(partitions); + Assert.Equal(firstProduced.TopicPartition, partitions[0]); + return partitions.Select(p => new TopicPartitionOffset(p, firstProduced.Offset)); + }) + .SetLogHandler((_, _) => mres1.Set()) + .SetLogHandler((_, _) => mres2.Set()) + .Build(); + consumer.Subscribe(singlePartitionTopic); + + Assert.True(mres1.Wait(TimeSpan.FromSeconds(5))); + Assert.True(mres2.Wait(TimeSpan.FromSeconds(5))); + + LogToFile("end ConsumerBuilder_SetLogHandler"); + } + + [Theory, MemberData(nameof(KafkaParameters))] + public void ConsumerBuilder_SetStatisticsHandler(string bootstrapServers) + { + LogToFile("start ConsumerBuilder_SetStatisticsHandler"); + + int N = 2; + var firstProduced = Util.ProduceNullStringMessages(bootstrapServers, singlePartitionTopic, 100, N); + + var consumerConfig = new ConsumerConfig + { + GroupId = Guid.NewGuid().ToString(), + BootstrapServers = bootstrapServers, + SessionTimeoutMs = 6000, + EnablePartitionEof = true, + StatisticsIntervalMs = 100 + }; + + ManualResetEventSlim mres1 = new(), mres2 = new(); + + using (var consumer = new ConsumerBuilder(consumerConfig) + .SetPartitionsAssignedHandler((c, partitions) => + { + Assert.Single(partitions); + Assert.Equal(firstProduced.TopicPartition, partitions[0]); + return partitions.Select(p => new TopicPartitionOffset(p, firstProduced.Offset)); + }) + .SetStatisticsHandler((_, _) => mres1.Set()) + .SetStatisticsHandler((_, _) => mres2.Set()) + .Build()) + { + consumer.Subscribe(singlePartitionTopic); + + int msgCnt = 0; + while (true) + { + var record = consumer.Consume(TimeSpan.FromMilliseconds(100)); + if (record == null) { continue; } + if (record.IsPartitionEOF) { break; } + msgCnt += 1; + } + + Assert.True(mres1.Wait(TimeSpan.FromSeconds(5))); + Assert.True(mres2.Wait(TimeSpan.FromSeconds(5))); + consumer.Close(); + } + + LogToFile("end ConsumerBuilder_SetStatisticsHandler"); + } + + [SkipWhenCITheory("Requires to stop the broker in the while loop to simulate broker is down."), MemberData(nameof(KafkaParameters))] + public void ConsumerBuilder_SetErrorHandler(string bootstrapServers) + { + LogToFile("start ConsumerBuilder_SetErrorHandler"); + + int N = 2; + var firstProduced = Util.ProduceNullStringMessages(bootstrapServers, singlePartitionTopic, 100, N); + + var consumerConfig = new ConsumerConfig + { + GroupId = Guid.NewGuid().ToString(), + BootstrapServers = bootstrapServers, + SessionTimeoutMs = 6000 + }; + + bool errorHandler1Called = false, errorHandler2Called = false; + + using (var consumer = new ConsumerBuilder(consumerConfig) + .SetPartitionsAssignedHandler((c, partitions) => + { + Assert.Single(partitions); + Assert.Equal(firstProduced.TopicPartition, partitions[0]); + return partitions.Select(p => new TopicPartitionOffset(p, firstProduced.Offset)); + }) + .SetErrorHandler((_, _) => errorHandler1Called = true) + .SetErrorHandler((_, _) => errorHandler2Called = true) + .Build()) + { + consumer.Subscribe(singlePartitionTopic); + + int msgCnt = 0; + while (!errorHandler1Called && !errorHandler2Called) + { + var record = consumer.Consume(TimeSpan.FromMilliseconds(100)); + if (record == null) { continue; } + msgCnt += 1; + } + + consumer.Close(); + } + + LogToFile("end ConsumerBuilder_SetErrorHandler"); + } + } +} From e5552df3ad8387a820f98df7ac86acf2e0eb7d8a Mon Sep 17 00:00:00 2001 From: g7ed6e <681739+g7ed6e@users.noreply.github.com> Date: Mon, 8 Jan 2024 11:33:50 +0100 Subject: [PATCH 4/4] Drop useless msgCnt variables --- .../Tests/Builder_Handlers.cs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Builder_Handlers.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Builder_Handlers.cs index 053475a83..5c4651f56 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Builder_Handlers.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Builder_Handlers.cs @@ -171,13 +171,11 @@ public void ConsumerBuilder_SetStatisticsHandler(string bootstrapServers) { consumer.Subscribe(singlePartitionTopic); - int msgCnt = 0; while (true) { var record = consumer.Consume(TimeSpan.FromMilliseconds(100)); if (record == null) { continue; } if (record.IsPartitionEOF) { break; } - msgCnt += 1; } Assert.True(mres1.Wait(TimeSpan.FromSeconds(5))); @@ -218,12 +216,9 @@ public void ConsumerBuilder_SetErrorHandler(string bootstrapServers) { consumer.Subscribe(singlePartitionTopic); - int msgCnt = 0; while (!errorHandler1Called && !errorHandler2Called) { - var record = consumer.Consume(TimeSpan.FromMilliseconds(100)); - if (record == null) { continue; } - msgCnt += 1; + consumer.Consume(TimeSpan.FromMilliseconds(100)); } consumer.Close();