From 9f2d4c19bb01553622146105e95eed071161fa73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillaume=20B=C3=A9can?= Date: Tue, 7 Feb 2023 15:15:44 +0100 Subject: [PATCH] Encapsulate errors from SslHelper.validateEndpoint in KafkaException (#153) --- .../test/scala/zio/kafka/OOMSpecXmx300m.scala | 47 ++++++++++++++----- .../scala/zio/kafka/admin/AdminClient.scala | 15 ++++-- .../scala/zio/kafka/consumer/Consumer.scala | 8 +++- .../scala/zio/kafka/producer/Producer.scala | 8 +++- .../scala/zio/kafka/utils/SslHelper.scala | 5 +- 5 files changed, 61 insertions(+), 22 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/OOMSpecXmx300m.scala b/zio-kafka-test/src/test/scala/zio/kafka/OOMSpecXmx300m.scala index 6456d581d..fd5539c82 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/OOMSpecXmx300m.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/OOMSpecXmx300m.scala @@ -1,10 +1,12 @@ package zio.kafka import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.KafkaException import zio.kafka.consumer.{ Consumer, Subscription } import zio.kafka.embedded.Kafka import zio.kafka.producer.Producer import zio.kafka.serde.Serde +import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ @@ -22,12 +24,19 @@ object OOMSpecXmx300m extends ZIOSpecWithSslKafka { topic <- randomTopic _ <- Producer.produce(new ProducerRecord(topic, "boo", "baa"), Serde.string, Serde.string) } yield ()).provideSomeLayer(KafkaTestUtils.producer).exit - } yield assertTrue(result.isFailure) && - assertTrue( - result.toEither.left.map(_.getMessage()) == Left( - "Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled" + } yield assert(result)( + fails( + isSubtype[KafkaException]( + hasField( + "cause message", + _.getCause.getMessage, + equalTo( + "Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled" + ) + ) ) ) + ) }, test("consumer should fail with ssl check") { for { @@ -35,24 +44,38 @@ object OOMSpecXmx300m extends ZIOSpecWithSslKafka { topic <- randomTopic _ <- Consumer.subscribe(Subscription.Topics(Set(topic))) } yield ()).provideSomeLayer(KafkaTestUtils.consumer("test")).exit - } yield assertTrue(result.isFailure) && - assertTrue( - result.toEither.left.map(_.getMessage()) == Left( - "Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled" + } yield assert(result)( + fails( + isSubtype[KafkaException]( + hasField( + "cause message", + _.getCause.getMessage, + equalTo( + "Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled" + ) + ) ) ) + ) }, test("admin client should fail with ssl check") { for { result <- (KafkaTestUtils.withAdmin { client => client.listTopics() }).exit - } yield assertTrue(result.isFailure) && - assertTrue( - result.toEither.left.map(_.getMessage()) == Left( - "Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled" + } yield assert(result)( + fails( + isSubtype[KafkaException]( + hasField( + "cause message", + _.getCause.getMessage, + equalTo( + "Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled" + ) + ) ) ) + ) } ) @@ withLiveClock @@ sequential } diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala b/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala index f1341b808..85c4d5b1e 100644 --- a/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala +++ b/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala @@ -40,6 +40,7 @@ import org.apache.kafka.common.errors.ApiException import org.apache.kafka.common.{ ConsumerGroupState => JConsumerGroupState, IsolationLevel => JIsolationLevel, + KafkaException, KafkaFuture, Metric => JMetric, MetricName => JMetricName, @@ -1500,11 +1501,15 @@ object AdminClient { } def javaClientFromSettings(settings: AdminClientSettings): ZIO[Scope, Throwable, JAdmin] = - ZIO.acquireRelease( - SslHelper.validateEndpoint(settings.bootstrapServers, settings.properties) *> ZIO.attempt( - JAdmin.create(settings.driverSettings.asJava) - ) - )(client => ZIO.succeed(client.close(settings.closeTimeout))) + ZIO.acquireRelease { + val endpointCheck = SslHelper + .validateEndpoint(settings.bootstrapServers, settings.properties) + .mapError(e => + new KafkaException("Failed to create new KafkaAdminClient", e) + ) // Mimic behaviour of KafkaAdminClient.createInternal + + endpointCheck *> ZIO.attempt(JAdmin.create(settings.driverSettings.asJava)) + }(client => ZIO.succeed(client.close(settings.closeTimeout))) implicit final class MapOps[K1, V1](val v: Map[K1, V1]) extends AnyVal { def bimap[K2, V2](fk: K1 => K2, fv: V1 => V2): Map[K2, V2] = v.map(kv => fk(kv._1) -> fv(kv._2)) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 5c8d05340..acc5f5956 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -1,7 +1,7 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, OffsetAndTimestamp } -import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo, TopicPartition } +import org.apache.kafka.common.{ KafkaException, Metric, MetricName, PartitionInfo, TopicPartition } import zio._ import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.consumer.internal.{ ConsumerAccess, Runloop } @@ -350,7 +350,11 @@ object Consumer { diagnostics: Diagnostics = Diagnostics.NoOp ): ZIO[Scope, Throwable, Consumer] = for { - _ <- SslHelper.validateEndpoint(settings.bootstrapServers, settings.properties) + _ <- SslHelper + .validateEndpoint(settings.bootstrapServers, settings.properties) + .mapError(e => + new KafkaException("Failed to construct kafka consumer", e) + ) // Mimic behaviour of KafkaConsumer constructor wrapper <- ConsumerAccess.make(settings) runloop <- Runloop( settings.hasGroupId, diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index 050aed1d5..980757e1e 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -2,7 +2,7 @@ package zio.kafka.producer import org.apache.kafka.clients.producer.{ KafkaProducer, Producer => JProducer, ProducerRecord, RecordMetadata } import org.apache.kafka.common.serialization.ByteArraySerializer -import org.apache.kafka.common.{ Metric, MetricName } +import org.apache.kafka.common.{ KafkaException, Metric, MetricName } import zio._ import zio.kafka.serde.Serializer import zio.kafka.utils.SslHelper @@ -248,7 +248,11 @@ object Producer { def make(settings: ProducerSettings): ZIO[Scope, Throwable, Producer] = for { props <- ZIO.attempt(settings.driverSettings) - _ <- SslHelper.validateEndpoint(settings.bootstrapServers, props) + _ <- SslHelper + .validateEndpoint(settings.bootstrapServers, props) + .mapError(e => + new KafkaException("Failed to construct kafka producer", e) + ) // Mimic behaviour of KafkaProducer constructor rawProducer <- ZIO.attempt( new KafkaProducer[Array[Byte], Array[Byte]]( props.asJava, diff --git a/zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala b/zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala index 7dc865b46..d684ac52a 100644 --- a/zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala +++ b/zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala @@ -40,7 +40,7 @@ object SslHelper { ZIO.attempt(SocketChannel.open(addr)) )(channel => ZIO.attempt(channel.close()).orDie) tls <- ZIO.attempt { - // make a simple request here and validate a server response + // Send a simple request and read the TLS record type from the answer val buf = ByteBuffer.allocate(5) channel.write(buf) buf.position(0) @@ -64,6 +64,9 @@ object SslHelper { } .unit + /** + * Check if first byte of buffer corresponds to a record type from a TLS server + */ private def isTls(buf: ByteBuffer): Boolean = { val tlsMessageType = buf.get() tlsMessageType match {