Skip to content
This repository has been archived by the owner on May 9, 2023. It is now read-only.

Commit

Permalink
Encapsulate errors from SslHelper.validateEndpoint in KafkaException (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
gbecan authored and guizmaii committed Feb 9, 2023
1 parent da45131 commit 9f2d4c1
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 22 deletions.
47 changes: 35 additions & 12 deletions zio-kafka-test/src/test/scala/zio/kafka/OOMSpecXmx300m.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package zio.kafka

import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.KafkaException
import zio.kafka.consumer.{ Consumer, Subscription }
import zio.kafka.embedded.Kafka
import zio.kafka.producer.Producer
import zio.kafka.serde.Serde
import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

Expand All @@ -22,37 +24,58 @@ object OOMSpecXmx300m extends ZIOSpecWithSslKafka {
topic <- randomTopic
_ <- Producer.produce(new ProducerRecord(topic, "boo", "baa"), Serde.string, Serde.string)
} yield ()).provideSomeLayer(KafkaTestUtils.producer).exit
} yield assertTrue(result.isFailure) &&
assertTrue(
result.toEither.left.map(_.getMessage()) == Left(
"Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled"
} yield assert(result)(
fails(
isSubtype[KafkaException](
hasField(
"cause message",
_.getCause.getMessage,
equalTo(
"Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled"
)
)
)
)
)
},
test("consumer should fail with ssl check") {
for {
result <- (for {
topic <- randomTopic
_ <- Consumer.subscribe(Subscription.Topics(Set(topic)))
} yield ()).provideSomeLayer(KafkaTestUtils.consumer("test")).exit
} yield assertTrue(result.isFailure) &&
assertTrue(
result.toEither.left.map(_.getMessage()) == Left(
"Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled"
} yield assert(result)(
fails(
isSubtype[KafkaException](
hasField(
"cause message",
_.getCause.getMessage,
equalTo(
"Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled"
)
)
)
)
)
},
test("admin client should fail with ssl check") {
for {
result <- (KafkaTestUtils.withAdmin { client =>
client.listTopics()
}).exit
} yield assertTrue(result.isFailure) &&
assertTrue(
result.toEither.left.map(_.getMessage()) == Left(
"Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled"
} yield assert(result)(
fails(
isSubtype[KafkaException](
hasField(
"cause message",
_.getCause.getMessage,
equalTo(
"Received an unexpected SSL packet from the server. Please ensure the client is properly configured with SSL enabled"
)
)
)
)
)
}
) @@ withLiveClock @@ sequential
}
15 changes: 10 additions & 5 deletions zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.{
ConsumerGroupState => JConsumerGroupState,
IsolationLevel => JIsolationLevel,
KafkaException,
KafkaFuture,
Metric => JMetric,
MetricName => JMetricName,
Expand Down Expand Up @@ -1500,11 +1501,15 @@ object AdminClient {
}

def javaClientFromSettings(settings: AdminClientSettings): ZIO[Scope, Throwable, JAdmin] =
ZIO.acquireRelease(
SslHelper.validateEndpoint(settings.bootstrapServers, settings.properties) *> ZIO.attempt(
JAdmin.create(settings.driverSettings.asJava)
)
)(client => ZIO.succeed(client.close(settings.closeTimeout)))
ZIO.acquireRelease {
val endpointCheck = SslHelper
.validateEndpoint(settings.bootstrapServers, settings.properties)
.mapError(e =>
new KafkaException("Failed to create new KafkaAdminClient", e)
) // Mimic behaviour of KafkaAdminClient.createInternal

endpointCheck *> ZIO.attempt(JAdmin.create(settings.driverSettings.asJava))
}(client => ZIO.succeed(client.close(settings.closeTimeout)))

implicit final class MapOps[K1, V1](val v: Map[K1, V1]) extends AnyVal {
def bimap[K2, V2](fk: K1 => K2, fv: V1 => V2): Map[K2, V2] = v.map(kv => fk(kv._1) -> fv(kv._2))
Expand Down
8 changes: 6 additions & 2 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package zio.kafka.consumer

import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, OffsetAndTimestamp }
import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo, TopicPartition }
import org.apache.kafka.common.{ KafkaException, Metric, MetricName, PartitionInfo, TopicPartition }
import zio._
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.internal.{ ConsumerAccess, Runloop }
Expand Down Expand Up @@ -350,7 +350,11 @@ object Consumer {
diagnostics: Diagnostics = Diagnostics.NoOp
): ZIO[Scope, Throwable, Consumer] =
for {
_ <- SslHelper.validateEndpoint(settings.bootstrapServers, settings.properties)
_ <- SslHelper
.validateEndpoint(settings.bootstrapServers, settings.properties)
.mapError(e =>
new KafkaException("Failed to construct kafka consumer", e)
) // Mimic behaviour of KafkaConsumer constructor
wrapper <- ConsumerAccess.make(settings)
runloop <- Runloop(
settings.hasGroupId,
Expand Down
8 changes: 6 additions & 2 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package zio.kafka.producer

import org.apache.kafka.clients.producer.{ KafkaProducer, Producer => JProducer, ProducerRecord, RecordMetadata }
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.{ Metric, MetricName }
import org.apache.kafka.common.{ KafkaException, Metric, MetricName }
import zio._
import zio.kafka.serde.Serializer
import zio.kafka.utils.SslHelper
Expand Down Expand Up @@ -248,7 +248,11 @@ object Producer {
def make(settings: ProducerSettings): ZIO[Scope, Throwable, Producer] =
for {
props <- ZIO.attempt(settings.driverSettings)
_ <- SslHelper.validateEndpoint(settings.bootstrapServers, props)
_ <- SslHelper
.validateEndpoint(settings.bootstrapServers, props)
.mapError(e =>
new KafkaException("Failed to construct kafka producer", e)
) // Mimic behaviour of KafkaProducer constructor
rawProducer <- ZIO.attempt(
new KafkaProducer[Array[Byte], Array[Byte]](
props.asJava,
Expand Down
5 changes: 4 additions & 1 deletion zio-kafka/src/main/scala/zio/kafka/utils/SslHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object SslHelper {
ZIO.attempt(SocketChannel.open(addr))
)(channel => ZIO.attempt(channel.close()).orDie)
tls <- ZIO.attempt {
// make a simple request here and validate a server response
// Send a simple request and read the TLS record type from the answer
val buf = ByteBuffer.allocate(5)
channel.write(buf)
buf.position(0)
Expand All @@ -64,6 +64,9 @@ object SslHelper {
}
.unit

/**
* Check if first byte of buffer corresponds to a record type from a TLS server
*/
private def isTls(buf: ByteBuffer): Boolean = {
val tlsMessageType = buf.get()
tlsMessageType match {
Expand Down

0 comments on commit 9f2d4c1

Please sign in to comment.