Skip to content
This repository has been archived by the owner on May 9, 2023. It is now read-only.

Commit

Permalink
Add ACLs to AdminClient (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
azhur authored Nov 18, 2022
1 parent 30665ef commit 4d0f672
Show file tree
Hide file tree
Showing 22 changed files with 586 additions and 60 deletions.
35 changes: 30 additions & 5 deletions zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,38 @@ object KafkaTestUtils {
def adminSettings: ZIO[Kafka, Nothing, AdminClientSettings] =
ZIO.serviceWith[Kafka](_.bootstrapServers).map(AdminClientSettings(_))

def saslAdminSettings(username: String, password: String): ZIO[Kafka.Sasl, Nothing, AdminClientSettings] =
ZIO
.serviceWith[Kafka.Sasl](_.value.bootstrapServers)
.map(
AdminClientSettings(_).withProperties(
"sasl.mechanism" -> "PLAIN",
"security.protocol" -> "SASL_PLAINTEXT",
"sasl.jaas.config" -> s"""org.apache.kafka.common.security.plain.PlainLoginModule required username="$username" password="$password";"""
)
)

def withAdmin[T](f: AdminClient => RIO[Kafka, T]) =
for {
settings <- adminSettings
fRes <- ZIO.scoped {
AdminClient
.make(settings)
.flatMap(client => f(client))
}
fRes <- withAdminClient(settings)(f)
} yield fRes

def withSaslAdmin[T](
username: String = "admin",
password: String = "admin-secret"
)(
f: AdminClient => RIO[Kafka.Sasl, T]
) =
for {
settings <- saslAdminSettings(username, password)
fRes <- withAdminClient(settings)(f)
} yield fRes

private def withAdminClient[R, T](settings: AdminClientSettings)(f: AdminClient => RIO[R, T]) =
ZIO.scoped[R] {
AdminClient
.make(settings)
.flatMap(client => f(client))
}
}
25 changes: 25 additions & 0 deletions zio-kafka-test-utils/src/main/scala/zio/kafka/embedded/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ trait Kafka {

object Kafka {

final case class Sasl(value: Kafka) extends AnyVal

final case class EmbeddedKafkaService(embeddedK: EmbeddedK) extends Kafka {
override def bootstrapServers: List[String] = List(s"localhost:${embeddedK.config.kafkaPort}")
override def stop(): UIO[Unit] = ZIO.succeed(embeddedK.stop(true))
Expand All @@ -22,10 +24,33 @@ object Kafka {

val embedded: ZLayer[Any, Throwable, Kafka] = ZLayer.scoped {
implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
zooKeeperPort = 6000,
kafkaPort = 6001,
customBrokerProperties = Map("group.min.session.timeout.ms" -> "500", "group.initial.rebalance.delay.ms" -> "0")
)
ZIO.acquireRelease(ZIO.attempt(EmbeddedKafkaService(EmbeddedKafka.start())))(_.stop())
}

val saslEmbedded: ZLayer[Any, Throwable, Kafka.Sasl] = ZLayer.scoped {
val kafkaPort = 6003
implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
zooKeeperPort = 6002,
kafkaPort = kafkaPort,
customBrokerProperties = Map(
"group.min.session.timeout.ms" -> "500",
"group.initial.rebalance.delay.ms" -> "0",
"authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer",
"sasl.enabled.mechanisms" -> "PLAIN",
"sasl.mechanism.inter.broker.protocol" -> "PLAIN",
"inter.broker.listener.name" -> "SASL_PLAINTEXT",
"listeners" -> s"SASL_PLAINTEXT://localhost:$kafkaPort",
"advertised.listeners" -> s"SASL_PLAINTEXT://localhost:$kafkaPort",
"super.users" -> "User:admin",
"listener.name.sasl_plaintext.plain.sasl.jaas.config" -> """org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_kafkabroker1="kafkabroker1-secret";"""
)
)
ZIO.acquireRelease(ZIO.attempt(Kafka.Sasl(EmbeddedKafkaService(EmbeddedKafka.start()))))(_.value.stop())
}

val local: ZLayer[Any, Nothing, Kafka] = ZLayer.succeed(DefaultLocal)
}
64 changes: 64 additions & 0 deletions zio-kafka-test/src/test/scala/zio/kafka/AdminSaslSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package zio.kafka

import zio.Scope
import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._
import zio._
import zio.kafka.admin.acl.AclBindingFilter
import zio.kafka.admin.resource.ResourcePatternFilter
import zio.kafka.admin.acl.AccessControlEntryFilter
import zio.kafka.admin.acl.AclBinding
import zio.kafka.admin.resource.ResourcePattern
import zio.kafka.admin.acl.AccessControlEntry
import zio.kafka.admin.acl.AclPermissionType
import zio.kafka.admin.acl.AclOperation
import zio.kafka.admin.resource.ResourceType
import zio.kafka.admin.resource.PatternType
import java.util.concurrent.TimeoutException

object AdminSaslSpec extends ZIOSpecWithSaslKafka {

override def kafkaPrefix: String = "adminsaslspec"

override def spec: Spec[Environment with TestEnvironment with Scope, Any] =
suite("client sasl admin test")(
test("ACLs") {
KafkaTestUtils.withSaslAdmin() { client =>
for {
topic <- randomTopic
bindings =
Set(
AclBinding(
ResourcePattern(ResourceType.Topic, name = topic, patternType = PatternType.Literal),
AccessControlEntry(
principal = "User:*",
host = "*",
operation = AclOperation.Write,
permissionType = AclPermissionType.Allow
)
)
)
_ <- client.createAcls(bindings)
createdAcls <-
client
.describeAcls(AclBindingFilter(ResourcePatternFilter.Any, AccessControlEntryFilter.Any))
.repeatWhile(_.isEmpty) // because the createAcls is executed async by the broker
.timeoutFail(new TimeoutException())(100.millis)
deletedAcls <-
client
.deleteAcls(Set(AclBindingFilter(ResourcePatternFilter.Any, AccessControlEntryFilter.Any)))
remainingAcls <-
client
.describeAcls(AclBindingFilter(ResourcePatternFilter.Any, AccessControlEntryFilter.Any))
.repeatWhile(_.nonEmpty) // because the deleteAcls is executed async by the broker
.timeoutFail(new TimeoutException())(100.millis)

} yield assert(createdAcls)(equalTo(bindings)) &&
assert(deletedAcls)(equalTo(bindings)) &&
assert(remainingAcls)(equalTo(Set.empty[AclBinding]))
}
}
) @@ withLiveClock @@ sequential

}
3 changes: 2 additions & 1 deletion zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import zio.kafka.admin.AdminClient.{
OffsetSpec,
TopicPartition
}
import zio.kafka.admin.acl._
import zio.kafka.consumer.{ CommittableRecord, Consumer, OffsetBatch, Subscription }
import zio.kafka.embedded.Kafka
import zio.kafka.serde.Serde
import zio.stream.ZSink
import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._
import zio.{ Chunk, Duration, Schedule, ZIO }
import zio._

import java.util.UUID

Expand Down
19 changes: 19 additions & 0 deletions zio-kafka-test/src/test/scala/zio/kafka/KafkaRandom.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package zio.kafka

import zio.{ Task, ZIO }

import java.util.UUID

trait KafkaRandom {

def kafkaPrefix: String

def randomThing(prefix: String): Task[String] =
ZIO.attempt(UUID.randomUUID()).map(uuid => s"$prefix-$uuid")

def randomTopic: Task[String] = randomThing(s"$kafkaPrefix-topic")

def randomGroup: Task[String] = randomThing(s"$kafkaPrefix-group")

def randomClient: Task[String] = randomThing(s"$kafkaPrefix-client")
}
18 changes: 2 additions & 16 deletions zio-kafka-test/src/test/scala/zio/kafka/ZIOSpecWithKafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,9 @@ package zio.kafka

import zio.kafka.embedded.Kafka
import zio.test._
import zio.{ Scope, Task, ZIO, ZLayer }

import java.util.UUID

trait ZIOSpecWithKafka extends ZIOSpec[TestEnvironment with Kafka] {

def kafkaPrefix: String
import zio.{ Scope, ZLayer }

trait ZIOSpecWithKafka extends ZIOSpec[TestEnvironment with Kafka] with KafkaRandom {
override val bootstrap: ZLayer[Scope, Any, TestEnvironment with Kafka] =
testEnvironment ++ Kafka.embedded

def randomThing(prefix: String): Task[String] =
ZIO.attempt(UUID.randomUUID()).map(uuid => s"$prefix-$uuid")

def randomTopic: Task[String] = randomThing(s"$kafkaPrefix-topic")

def randomGroup: Task[String] = randomThing(s"$kafkaPrefix-group")

def randomClient: Task[String] = randomThing(s"$kafkaPrefix-client")
}
10 changes: 10 additions & 0 deletions zio-kafka-test/src/test/scala/zio/kafka/ZIOSpecWithSaslKafka.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package zio.kafka

import zio.kafka.embedded.Kafka
import zio.test._
import zio.{ Scope, ZLayer }

trait ZIOSpecWithSaslKafka extends ZIOSpec[TestEnvironment with Kafka.Sasl] with KafkaRandom {
override val bootstrap: ZLayer[Scope, Any, TestEnvironment with Kafka.Sasl] =
testEnvironment ++ Kafka.saslEmbedded
}
38 changes: 0 additions & 38 deletions zio-kafka/src/main/scala/zio/kafka/admin/AclOperation.scala

This file was deleted.

Loading

0 comments on commit 4d0f672

Please sign in to comment.