diff --git a/zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala b/zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala index 7816ae2d7..41414659e 100644 --- a/zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala +++ b/zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala @@ -140,13 +140,38 @@ object KafkaTestUtils { def adminSettings: ZIO[Kafka, Nothing, AdminClientSettings] = ZIO.serviceWith[Kafka](_.bootstrapServers).map(AdminClientSettings(_)) + def saslAdminSettings(username: String, password: String): ZIO[Kafka.Sasl, Nothing, AdminClientSettings] = + ZIO + .serviceWith[Kafka.Sasl](_.value.bootstrapServers) + .map( + AdminClientSettings(_).withProperties( + "sasl.mechanism" -> "PLAIN", + "security.protocol" -> "SASL_PLAINTEXT", + "sasl.jaas.config" -> s"""org.apache.kafka.common.security.plain.PlainLoginModule required username="$username" password="$password";""" + ) + ) + def withAdmin[T](f: AdminClient => RIO[Kafka, T]) = for { settings <- adminSettings - fRes <- ZIO.scoped { - AdminClient - .make(settings) - .flatMap(client => f(client)) - } + fRes <- withAdminClient(settings)(f) } yield fRes + + def withSaslAdmin[T]( + username: String = "admin", + password: String = "admin-secret" + )( + f: AdminClient => RIO[Kafka.Sasl, T] + ) = + for { + settings <- saslAdminSettings(username, password) + fRes <- withAdminClient(settings)(f) + } yield fRes + + private def withAdminClient[R, T](settings: AdminClientSettings)(f: AdminClient => RIO[R, T]) = + ZIO.scoped[R] { + AdminClient + .make(settings) + .flatMap(client => f(client)) + } } diff --git a/zio-kafka-test-utils/src/main/scala/zio/kafka/embedded/Kafka.scala b/zio-kafka-test-utils/src/main/scala/zio/kafka/embedded/Kafka.scala index c83bff105..fa9ed2eab 100644 --- a/zio-kafka-test-utils/src/main/scala/zio/kafka/embedded/Kafka.scala +++ b/zio-kafka-test-utils/src/main/scala/zio/kafka/embedded/Kafka.scala @@ -10,6 +10,8 @@ trait Kafka { object Kafka { + final case class Sasl(value: Kafka) extends AnyVal + final case class EmbeddedKafkaService(embeddedK: EmbeddedK) extends Kafka { override def bootstrapServers: List[String] = List(s"localhost:${embeddedK.config.kafkaPort}") override def stop(): UIO[Unit] = ZIO.succeed(embeddedK.stop(true)) @@ -22,10 +24,33 @@ object Kafka { val embedded: ZLayer[Any, Throwable, Kafka] = ZLayer.scoped { implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig( + zooKeeperPort = 6000, + kafkaPort = 6001, customBrokerProperties = Map("group.min.session.timeout.ms" -> "500", "group.initial.rebalance.delay.ms" -> "0") ) ZIO.acquireRelease(ZIO.attempt(EmbeddedKafkaService(EmbeddedKafka.start())))(_.stop()) } + val saslEmbedded: ZLayer[Any, Throwable, Kafka.Sasl] = ZLayer.scoped { + val kafkaPort = 6003 + implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig( + zooKeeperPort = 6002, + kafkaPort = kafkaPort, + customBrokerProperties = Map( + "group.min.session.timeout.ms" -> "500", + "group.initial.rebalance.delay.ms" -> "0", + "authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer", + "sasl.enabled.mechanisms" -> "PLAIN", + "sasl.mechanism.inter.broker.protocol" -> "PLAIN", + "inter.broker.listener.name" -> "SASL_PLAINTEXT", + "listeners" -> s"SASL_PLAINTEXT://localhost:$kafkaPort", + "advertised.listeners" -> s"SASL_PLAINTEXT://localhost:$kafkaPort", + "super.users" -> "User:admin", + "listener.name.sasl_plaintext.plain.sasl.jaas.config" -> """org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_kafkabroker1="kafkabroker1-secret";""" + ) + ) + ZIO.acquireRelease(ZIO.attempt(Kafka.Sasl(EmbeddedKafkaService(EmbeddedKafka.start()))))(_.value.stop()) + } + val local: ZLayer[Any, Nothing, Kafka] = ZLayer.succeed(DefaultLocal) } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/AdminSaslSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/AdminSaslSpec.scala new file mode 100644 index 000000000..4f9382f76 --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/AdminSaslSpec.scala @@ -0,0 +1,64 @@ +package zio.kafka + +import zio.Scope +import zio.test.Assertion._ +import zio.test.TestAspect._ +import zio.test._ +import zio._ +import zio.kafka.admin.acl.AclBindingFilter +import zio.kafka.admin.resource.ResourcePatternFilter +import zio.kafka.admin.acl.AccessControlEntryFilter +import zio.kafka.admin.acl.AclBinding +import zio.kafka.admin.resource.ResourcePattern +import zio.kafka.admin.acl.AccessControlEntry +import zio.kafka.admin.acl.AclPermissionType +import zio.kafka.admin.acl.AclOperation +import zio.kafka.admin.resource.ResourceType +import zio.kafka.admin.resource.PatternType +import java.util.concurrent.TimeoutException + +object AdminSaslSpec extends ZIOSpecWithSaslKafka { + + override def kafkaPrefix: String = "adminsaslspec" + + override def spec: Spec[Environment with TestEnvironment with Scope, Any] = + suite("client sasl admin test")( + test("ACLs") { + KafkaTestUtils.withSaslAdmin() { client => + for { + topic <- randomTopic + bindings = + Set( + AclBinding( + ResourcePattern(ResourceType.Topic, name = topic, patternType = PatternType.Literal), + AccessControlEntry( + principal = "User:*", + host = "*", + operation = AclOperation.Write, + permissionType = AclPermissionType.Allow + ) + ) + ) + _ <- client.createAcls(bindings) + createdAcls <- + client + .describeAcls(AclBindingFilter(ResourcePatternFilter.Any, AccessControlEntryFilter.Any)) + .repeatWhile(_.isEmpty) // because the createAcls is executed async by the broker + .timeoutFail(new TimeoutException())(100.millis) + deletedAcls <- + client + .deleteAcls(Set(AclBindingFilter(ResourcePatternFilter.Any, AccessControlEntryFilter.Any))) + remainingAcls <- + client + .describeAcls(AclBindingFilter(ResourcePatternFilter.Any, AccessControlEntryFilter.Any)) + .repeatWhile(_.nonEmpty) // because the deleteAcls is executed async by the broker + .timeoutFail(new TimeoutException())(100.millis) + + } yield assert(createdAcls)(equalTo(bindings)) && + assert(deletedAcls)(equalTo(bindings)) && + assert(remainingAcls)(equalTo(Set.empty[AclBinding])) + } + } + ) @@ withLiveClock @@ sequential + +} diff --git a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala index b5b20c8bb..7fd49acde 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala @@ -21,6 +21,7 @@ import zio.kafka.admin.AdminClient.{ OffsetSpec, TopicPartition } +import zio.kafka.admin.acl._ import zio.kafka.consumer.{ CommittableRecord, Consumer, OffsetBatch, Subscription } import zio.kafka.embedded.Kafka import zio.kafka.serde.Serde @@ -28,7 +29,7 @@ import zio.stream.ZSink import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ -import zio.{ Chunk, Duration, Schedule, ZIO } +import zio._ import java.util.UUID diff --git a/zio-kafka-test/src/test/scala/zio/kafka/KafkaRandom.scala b/zio-kafka-test/src/test/scala/zio/kafka/KafkaRandom.scala new file mode 100644 index 000000000..5c637fa37 --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/KafkaRandom.scala @@ -0,0 +1,19 @@ +package zio.kafka + +import zio.{ Task, ZIO } + +import java.util.UUID + +trait KafkaRandom { + + def kafkaPrefix: String + + def randomThing(prefix: String): Task[String] = + ZIO.attempt(UUID.randomUUID()).map(uuid => s"$prefix-$uuid") + + def randomTopic: Task[String] = randomThing(s"$kafkaPrefix-topic") + + def randomGroup: Task[String] = randomThing(s"$kafkaPrefix-group") + + def randomClient: Task[String] = randomThing(s"$kafkaPrefix-client") +} diff --git a/zio-kafka-test/src/test/scala/zio/kafka/ZIOSpecWithKafka.scala b/zio-kafka-test/src/test/scala/zio/kafka/ZIOSpecWithKafka.scala index eb1cbdbc3..30097b74a 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/ZIOSpecWithKafka.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/ZIOSpecWithKafka.scala @@ -2,23 +2,9 @@ package zio.kafka import zio.kafka.embedded.Kafka import zio.test._ -import zio.{ Scope, Task, ZIO, ZLayer } - -import java.util.UUID - -trait ZIOSpecWithKafka extends ZIOSpec[TestEnvironment with Kafka] { - - def kafkaPrefix: String +import zio.{ Scope, ZLayer } +trait ZIOSpecWithKafka extends ZIOSpec[TestEnvironment with Kafka] with KafkaRandom { override val bootstrap: ZLayer[Scope, Any, TestEnvironment with Kafka] = testEnvironment ++ Kafka.embedded - - def randomThing(prefix: String): Task[String] = - ZIO.attempt(UUID.randomUUID()).map(uuid => s"$prefix-$uuid") - - def randomTopic: Task[String] = randomThing(s"$kafkaPrefix-topic") - - def randomGroup: Task[String] = randomThing(s"$kafkaPrefix-group") - - def randomClient: Task[String] = randomThing(s"$kafkaPrefix-client") } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/ZIOSpecWithSaslKafka.scala b/zio-kafka-test/src/test/scala/zio/kafka/ZIOSpecWithSaslKafka.scala new file mode 100644 index 000000000..d791377fb --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/ZIOSpecWithSaslKafka.scala @@ -0,0 +1,10 @@ +package zio.kafka + +import zio.kafka.embedded.Kafka +import zio.test._ +import zio.{ Scope, ZLayer } + +trait ZIOSpecWithSaslKafka extends ZIOSpec[TestEnvironment with Kafka.Sasl] with KafkaRandom { + override val bootstrap: ZLayer[Scope, Any, TestEnvironment with Kafka.Sasl] = + testEnvironment ++ Kafka.saslEmbedded +} diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/AclOperation.scala b/zio-kafka/src/main/scala/zio/kafka/admin/AclOperation.scala deleted file mode 100644 index 56e518f51..000000000 --- a/zio-kafka/src/main/scala/zio/kafka/admin/AclOperation.scala +++ /dev/null @@ -1,38 +0,0 @@ -package zio.kafka.admin - -import org.apache.kafka.common.acl.{ AclOperation => JAclOperation } - -sealed trait AclOperation - -object AclOperation { - case object Unknown extends AclOperation - case object Any extends AclOperation - case object All extends AclOperation - case object Read extends AclOperation - case object Write extends AclOperation - case object Create extends AclOperation - case object Delete extends AclOperation - case object Alter extends AclOperation - case object Describe extends AclOperation - case object ClusterAction extends AclOperation - case object DescribeConfigs extends AclOperation - case object AlterConfigs extends AclOperation - case object IdempotentWrite extends AclOperation - - def apply(jAclOperation: JAclOperation): AclOperation = - jAclOperation match { - case JAclOperation.UNKNOWN => Unknown - case JAclOperation.ANY => Any - case JAclOperation.ALL => All - case JAclOperation.READ => Read - case JAclOperation.WRITE => Write - case JAclOperation.CREATE => Create - case JAclOperation.DELETE => Delete - case JAclOperation.ALTER => Alter - case JAclOperation.DESCRIBE => Describe - case JAclOperation.CLUSTER_ACTION => ClusterAction - case JAclOperation.DESCRIBE_CONFIGS => DescribeConfigs - case JAclOperation.ALTER_CONFIGS => AlterConfigs - case JAclOperation.IDEMPOTENT_WRITE => IdempotentWrite - } -} diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala b/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala index a128b84fd..fe0bc8b60 100644 --- a/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala +++ b/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala @@ -11,6 +11,7 @@ import org.apache.kafka.clients.admin.{ ConsumerGroupListing => JConsumerGroupListing, CreatePartitionsOptions => JCreatePartitionsOptions, CreateTopicsOptions => JCreateTopicsOptions, + DeleteAclsOptions => _, DeleteConsumerGroupsOptions => JDeleteConsumerGroupsOptions, DeleteRecordsOptions => JDeleteRecordsOptions, DeleteTopicsOptions => JDeleteTopicsOptions, @@ -48,6 +49,8 @@ import org.apache.kafka.common.{ } import zio._ +import zio.kafka.admin.acl._ + import java.util.Optional import scala.annotation.{ nowarn, tailrec } import scala.collection.mutable.ListBuffer @@ -268,6 +271,41 @@ trait AdminClient { configs: Map[ConfigResource, KafkaConfig], options: AlterConfigsOptions ): Task[Map[ConfigResource, Task[Unit]]] + + /* + * Lists access control lists (ACLs) according to the supplied filter. + * + * Note: it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of + * describeAcls. + */ + def describeAcls(filter: AclBindingFilter, options: Option[DescribeAclOptions] = None): Task[Set[AclBinding]] + + /** + * Creates access control lists (ACLs) which are bound to specific resources. + */ + def createAcls(acls: Set[AclBinding], options: Option[CreateAclOptions] = None): Task[Unit] + + /** + * Creates access control lists (ACLs) which are bound to specific resources async. + */ + def createAclsAsync( + acls: Set[AclBinding], + options: Option[CreateAclOptions] = None + ): Task[Map[AclBinding, Task[Unit]]] + + /** + * Deletes access control lists (ACLs) according to the supplied filters. + */ + def deleteAcls(filters: Set[AclBindingFilter], options: Option[DeleteAclsOptions] = None): Task[Set[AclBinding]] + + /** + * Deletes access control lists (ACLs) according to the supplied filters async. + */ + def deleteAclsAsync( + filters: Set[AclBindingFilter], + options: Option[DeleteAclsOptions] = None + ): Task[Map[AclBindingFilter, Task[Map[AclBinding, Option[Throwable]]]]] + } object AdminClient { @@ -754,6 +792,90 @@ object AdminClient { .map(_.asScala.map { case (configResource, kf) => (ConfigResource(configResource), ZIO.fromCompletionStage(kf.toCompletionStage).unit) }.toMap) + + override def describeAcls( + filter: AclBindingFilter, + options: Option[DescribeAclOptions] + ): Task[Set[AclBinding]] = + fromKafkaFuture( + ZIO + .attemptBlocking( + options + .fold(adminClient.describeAcls(filter.asJava))(opt => adminClient.describeAcls(filter.asJava, opt.asJava)) + .values() + ) + ).map(_.asScala.view.map(AclBinding(_)).toSet) + + override def createAcls(acls: Set[AclBinding], options: Option[CreateAclOptions]): Task[Unit] = + fromKafkaFutureVoid( + ZIO + .attemptBlocking( + options + .fold(adminClient.createAcls(acls.map(_.asJava).asJava))(opt => + adminClient.createAcls(acls.map(_.asJava).asJava, opt.asJava) + ) + .all() + ) + ) + + override def createAclsAsync( + acls: Set[AclBinding], + options: Option[CreateAclOptions] + ): Task[Map[AclBinding, Task[Unit]]] = + ZIO + .attemptBlocking( + options + .fold(adminClient.createAcls(acls.map(_.asJava).asJava))(opt => + adminClient.createAcls(acls.map(_.asJava).asJava, opt.asJava) + ) + .values() + ) + .map(_.asScala.view.map { case (k, v) => + (AclBinding(k), ZIO.fromCompletionStage(v.toCompletionStage).unit) + }.toMap) + + override def deleteAcls(filters: Set[AclBindingFilter], options: Option[DeleteAclsOptions]): Task[Set[AclBinding]] = + fromKafkaFuture( + ZIO + .attemptBlocking( + options + .fold(adminClient.deleteAcls(filters.map(_.asJava).asJava))(opt => + adminClient.deleteAcls(filters.map(_.asJava).asJava, opt.asJava) + ) + .all() + ) + ).map(_.asScala.view.map(AclBinding(_)).toSet) + + override def deleteAclsAsync( + filters: Set[AclBindingFilter], + options: Option[DeleteAclsOptions] + ): Task[Map[AclBindingFilter, Task[Map[AclBinding, Option[Throwable]]]]] = + ZIO + .attemptBlocking( + options + .fold(adminClient.deleteAcls(filters.map(_.asJava).asJava))(opt => + adminClient.deleteAcls(filters.map(_.asJava).asJava, opt.asJava) + ) + .values() + ) + .map(_.asScala.view.map { case (k, v) => + ( + AclBindingFilter(k), + ZIO + .fromCompletionStage(v.toCompletionStage) + .map( + _.values().asScala.view.map { filterRes => + /** + * [[org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult.binding()]] is claimed to be + * nullable in javadoc but in fact it is always set, see + * [[org.apache.kafka.common.requests.DeleteAclsResponse.aclBinding]] + */ + AclBinding(filterRes.binding()) -> Option(filterRes.exception()) + }.toMap + ) + ) + }.toMap) + } val live: ZLayer[AdminClientSettings, Throwable, AdminClient] = diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/acl/AccessControlEntry.scala b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AccessControlEntry.scala new file mode 100644 index 000000000..2459c3545 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AccessControlEntry.scala @@ -0,0 +1,21 @@ +package zio.kafka.admin.acl + +import org.apache.kafka.common.acl.{ AccessControlEntry => JAccessControlEntry } + +final case class AccessControlEntry( + principal: String, + host: String, + operation: AclOperation, + permissionType: AclPermissionType +) { + def asJava: JAccessControlEntry = new JAccessControlEntry(principal, host, operation.asJava, permissionType.asJava) +} + +object AccessControlEntry { + def apply(jAccessControlEntry: JAccessControlEntry): AccessControlEntry = AccessControlEntry( + principal = jAccessControlEntry.principal(), + host = jAccessControlEntry.host(), + operation = AclOperation(jAccessControlEntry.operation()), + permissionType = AclPermissionType(jAccessControlEntry.permissionType()) + ) +} diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/acl/AccessControlEntryFilter.scala b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AccessControlEntryFilter.scala new file mode 100644 index 000000000..f7995afc5 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AccessControlEntryFilter.scala @@ -0,0 +1,24 @@ +package zio.kafka.admin.acl + +import org.apache.kafka.common.acl.{ AccessControlEntryFilter => JAccessControlEntryFilter } + +final case class AccessControlEntryFilter( + principal: String, + host: String, + operation: AclOperation, + permissionType: AclPermissionType +) { + def asJava: JAccessControlEntryFilter = + new JAccessControlEntryFilter(principal, host, operation.asJava, permissionType.asJava) +} + +object AccessControlEntryFilter { + val Any: AccessControlEntryFilter = AccessControlEntryFilter(null, null, AclOperation.Any, AclPermissionType.Any) + + def apply(jAccessControlEntryFilter: JAccessControlEntryFilter): AccessControlEntryFilter = AccessControlEntryFilter( + principal = jAccessControlEntryFilter.principal(), + host = jAccessControlEntryFilter.host(), + operation = AclOperation(jAccessControlEntryFilter.operation()), + permissionType = AclPermissionType(jAccessControlEntryFilter.permissionType()) + ) +} diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/acl/AclBinding.scala b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AclBinding.scala new file mode 100644 index 000000000..13982bdb8 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AclBinding.scala @@ -0,0 +1,13 @@ +package zio.kafka.admin.acl + +import org.apache.kafka.common.acl.{ AclBinding => JAclBinding } +import zio.kafka.admin.resource.ResourcePattern + +final case class AclBinding(pattern: ResourcePattern, entry: AccessControlEntry) { + def asJava: JAclBinding = new JAclBinding(pattern.asJava, entry.asJava) +} + +object AclBinding { + def apply(jAclBinding: JAclBinding): AclBinding = + AclBinding(pattern = ResourcePattern(jAclBinding.pattern()), entry = AccessControlEntry(jAclBinding.entry())) +} diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/acl/AclBindingFilter.scala b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AclBindingFilter.scala new file mode 100644 index 000000000..ee4d6eef2 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AclBindingFilter.scala @@ -0,0 +1,15 @@ +package zio.kafka.admin.acl + +import org.apache.kafka.common.acl.{ AclBindingFilter => JAclBindingFilter } +import zio.kafka.admin.resource.ResourcePatternFilter + +final case class AclBindingFilter(patternFilter: ResourcePatternFilter, entryFilter: AccessControlEntryFilter) { + def asJava: JAclBindingFilter = new JAclBindingFilter(patternFilter.asJava, entryFilter.asJava) +} + +object AclBindingFilter { + def apply(jAclBindingFilter: JAclBindingFilter): AclBindingFilter = AclBindingFilter( + patternFilter = ResourcePatternFilter(jAclBindingFilter.patternFilter()), + entryFilter = AccessControlEntryFilter(jAclBindingFilter.entryFilter()) + ) +} diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/acl/AclOperation.scala b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AclOperation.scala new file mode 100644 index 000000000..6d00360f6 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AclOperation.scala @@ -0,0 +1,66 @@ +package zio.kafka.admin.acl + +import org.apache.kafka.common.acl.{ AclOperation => JAclOperation } + +sealed trait AclOperation { + def asJava: JAclOperation +} + +object AclOperation { + case object Unknown extends AclOperation { + def asJava: JAclOperation = JAclOperation.UNKNOWN + } + case object Any extends AclOperation { + def asJava: JAclOperation = JAclOperation.ANY + } + case object All extends AclOperation { + def asJava: JAclOperation = JAclOperation.ALL + } + case object Read extends AclOperation { + def asJava: JAclOperation = JAclOperation.READ + } + case object Write extends AclOperation { + def asJava: JAclOperation = JAclOperation.WRITE + } + case object Create extends AclOperation { + def asJava: JAclOperation = JAclOperation.CREATE + } + case object Delete extends AclOperation { + def asJava: JAclOperation = JAclOperation.DELETE + } + case object Alter extends AclOperation { + def asJava: JAclOperation = JAclOperation.ALTER + } + case object Describe extends AclOperation { + def asJava: JAclOperation = JAclOperation.DESCRIBE + } + case object ClusterAction extends AclOperation { + def asJava: JAclOperation = JAclOperation.CLUSTER_ACTION + } + case object DescribeConfigs extends AclOperation { + def asJava: JAclOperation = JAclOperation.DESCRIBE_CONFIGS + } + case object AlterConfigs extends AclOperation { + def asJava: JAclOperation = JAclOperation.ALTER_CONFIGS + } + case object IdempotentWrite extends AclOperation { + def asJava: JAclOperation = JAclOperation.IDEMPOTENT_WRITE + } + + def apply(jAclOperation: JAclOperation): AclOperation = + jAclOperation match { + case JAclOperation.UNKNOWN => Unknown + case JAclOperation.ANY => Any + case JAclOperation.ALL => All + case JAclOperation.READ => Read + case JAclOperation.WRITE => Write + case JAclOperation.CREATE => Create + case JAclOperation.DELETE => Delete + case JAclOperation.ALTER => Alter + case JAclOperation.DESCRIBE => Describe + case JAclOperation.CLUSTER_ACTION => ClusterAction + case JAclOperation.DESCRIBE_CONFIGS => DescribeConfigs + case JAclOperation.ALTER_CONFIGS => AlterConfigs + case JAclOperation.IDEMPOTENT_WRITE => IdempotentWrite + } +} diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/acl/AclPermissionType.scala b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AclPermissionType.scala new file mode 100644 index 000000000..4ce2e8cb1 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AclPermissionType.scala @@ -0,0 +1,29 @@ +package zio.kafka.admin.acl + +import org.apache.kafka.common.acl.{ AclPermissionType => JAclPermissionType } + +sealed trait AclPermissionType { + def asJava: JAclPermissionType +} + +object AclPermissionType { + case object Unknown extends AclPermissionType { + override def asJava: JAclPermissionType = JAclPermissionType.UNKNOWN + } + case object Any extends AclPermissionType { + override def asJava: JAclPermissionType = JAclPermissionType.ANY + } + case object Deny extends AclPermissionType { + override def asJava: JAclPermissionType = JAclPermissionType.DENY + } + case object Allow extends AclPermissionType { + override def asJava: JAclPermissionType = JAclPermissionType.ALLOW + } + + def apply(jAclPermissionType: JAclPermissionType): AclPermissionType = jAclPermissionType match { + case JAclPermissionType.UNKNOWN => Unknown + case JAclPermissionType.ANY => Any + case JAclPermissionType.DENY => Deny + case JAclPermissionType.ALLOW => Allow + } +} diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/acl/CreateAclOptions.scala b/zio-kafka/src/main/scala/zio/kafka/admin/acl/CreateAclOptions.scala new file mode 100644 index 000000000..900a56a7c --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/admin/acl/CreateAclOptions.scala @@ -0,0 +1,12 @@ +package zio.kafka.admin.acl + +import zio._ +import org.apache.kafka.clients.admin.{ CreateAclsOptions => JCreateAclOptions } + +final case class CreateAclOptions(timeout: Option[Duration]) { + def asJava: JCreateAclOptions = { + val jopts = new JCreateAclOptions() + + timeout.fold(jopts)(timeout => jopts.timeoutMs(timeout.toMillis.toInt)) + } +} diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/acl/DeleteAclOptions.scala b/zio-kafka/src/main/scala/zio/kafka/admin/acl/DeleteAclOptions.scala new file mode 100644 index 000000000..bc2c80bac --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/admin/acl/DeleteAclOptions.scala @@ -0,0 +1,12 @@ +package zio.kafka.admin.acl + +import zio._ +import org.apache.kafka.clients.admin.{ DeleteAclsOptions => JDeleteAclsOptions } + +final case class DeleteAclsOptions(timeout: Option[Duration]) { + def asJava: JDeleteAclsOptions = { + val jopts = new JDeleteAclsOptions() + + timeout.fold(jopts)(timeout => jopts.timeoutMs(timeout.toMillis.toInt)) + } +} diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/acl/DescribeAclOptions.scala b/zio-kafka/src/main/scala/zio/kafka/admin/acl/DescribeAclOptions.scala new file mode 100644 index 000000000..730de02ed --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/admin/acl/DescribeAclOptions.scala @@ -0,0 +1,12 @@ +package zio.kafka.admin.acl + +import org.apache.kafka.clients.admin.{ DescribeAclsOptions => JDescribeAclsOptions } +import zio._ + +final case class DescribeAclOptions(timeout: Option[Duration]) { + def asJava: JDescribeAclsOptions = { + val jopts = new JDescribeAclsOptions() + + timeout.fold(jopts)(timeout => jopts.timeoutMs(timeout.toMillis.toInt)) + } +} diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/resource/PatternType.scala b/zio-kafka/src/main/scala/zio/kafka/admin/resource/PatternType.scala new file mode 100644 index 000000000..c5f0ee917 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/admin/resource/PatternType.scala @@ -0,0 +1,33 @@ +package zio.kafka.admin.resource + +import org.apache.kafka.common.resource.{ PatternType => JPatternType } + +sealed trait PatternType { + def asJava: JPatternType +} + +object PatternType { + case object Literal extends PatternType { + def asJava: JPatternType = JPatternType.LITERAL + } + case object Unknown extends PatternType { + def asJava: JPatternType = JPatternType.UNKNOWN + } + case object Any extends PatternType { + def asJava: JPatternType = JPatternType.ANY + } + case object Prefixed extends PatternType { + def asJava: JPatternType = JPatternType.PREFIXED + } + case object Match extends PatternType { + def asJava: JPatternType = JPatternType.MATCH + } + + def apply(jPatternType: JPatternType): PatternType = jPatternType match { + case JPatternType.LITERAL => Literal + case JPatternType.UNKNOWN => Unknown + case JPatternType.ANY => Any + case JPatternType.PREFIXED => Prefixed + case JPatternType.MATCH => Match + } +} diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/resource/ResourcePattern.scala b/zio-kafka/src/main/scala/zio/kafka/admin/resource/ResourcePattern.scala new file mode 100644 index 000000000..a492f411e --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/admin/resource/ResourcePattern.scala @@ -0,0 +1,16 @@ +package zio.kafka.admin.resource + +import org.apache.kafka.common.resource.{ ResourcePattern => JResourcePattern } + +final case class ResourcePattern(resourceType: ResourceType, name: String, patternType: PatternType) { + def asJava: JResourcePattern = new JResourcePattern(resourceType.asJava, name, patternType.asJava) +} + +object ResourcePattern { + def apply(jResourcePattern: JResourcePattern): ResourcePattern = + ResourcePattern( + ResourceType(jResourcePattern.resourceType()), + jResourcePattern.name(), + PatternType(jResourcePattern.patternType()) + ) +} diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/resource/ResourcePatternFilter.scala b/zio-kafka/src/main/scala/zio/kafka/admin/resource/ResourcePatternFilter.scala new file mode 100644 index 000000000..ccebbee23 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/admin/resource/ResourcePatternFilter.scala @@ -0,0 +1,18 @@ +package zio.kafka.admin.resource + +import org.apache.kafka.common.resource.{ ResourcePatternFilter => JResourcePatternFilter } + +final case class ResourcePatternFilter(resourceType: ResourceType, name: String, patternType: PatternType) { + def asJava: JResourcePatternFilter = new JResourcePatternFilter(resourceType.asJava, name, patternType.asJava) +} + +object ResourcePatternFilter { + val Any: ResourcePatternFilter = ResourcePatternFilter(ResourceType.Any, null, PatternType.Any) + + def apply(jResourcePatternFilter: JResourcePatternFilter): ResourcePatternFilter = + ResourcePatternFilter( + ResourceType(jResourcePatternFilter.resourceType()), + jResourcePatternFilter.name(), + PatternType(jResourcePatternFilter.patternType()) + ) +} diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/resource/ResourceType.scala b/zio-kafka/src/main/scala/zio/kafka/admin/resource/ResourceType.scala new file mode 100644 index 000000000..075a07e89 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/admin/resource/ResourceType.scala @@ -0,0 +1,41 @@ +package zio.kafka.admin.resource + +import org.apache.kafka.common.resource.{ ResourceType => JResourceType } + +sealed trait ResourceType { + def asJava: JResourceType +} + +object ResourceType { + case object TransactionalId extends ResourceType { + def asJava: JResourceType = JResourceType.TRANSACTIONAL_ID + } + case object Unknown extends ResourceType { + def asJava: JResourceType = JResourceType.UNKNOWN + } + case object Topic extends ResourceType { + def asJava: JResourceType = JResourceType.TOPIC + } + case object Cluster extends ResourceType { + def asJava: JResourceType = JResourceType.CLUSTER + } + case object Any extends ResourceType { + def asJava: JResourceType = JResourceType.ANY + } + case object Group extends ResourceType { + def asJava: JResourceType = JResourceType.GROUP + } + case object DelegationToken extends ResourceType { + def asJava: JResourceType = JResourceType.DELEGATION_TOKEN + } + + def apply(jResourceType: JResourceType): ResourceType = jResourceType match { + case JResourceType.TRANSACTIONAL_ID => TransactionalId + case JResourceType.UNKNOWN => Unknown + case JResourceType.TOPIC => Topic + case JResourceType.CLUSTER => Cluster + case JResourceType.ANY => Any + case JResourceType.GROUP => Group + case JResourceType.DELEGATION_TOKEN => DelegationToken + } +}