Skip to content

Commit

Permalink
Add unique Generators that work with parallel tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Jan 3, 2023
1 parent 8e5ecec commit 369c281
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.softwaremill.diffx.scalatest.DiffMustMatcher._
import io.aiven.guardian.kafka.Generators._
import io.aiven.guardian.kafka.KafkaClusterTest
import io.aiven.guardian.kafka.TestUtils._
import io.aiven.guardian.kafka.UniqueGenerators
import io.aiven.guardian.kafka.Utils
import io.aiven.guardian.kafka.backup.BackupClientControlWrapper
import io.aiven.guardian.kafka.backup.KafkaConsumer
Expand All @@ -22,9 +23,10 @@ import io.aiven.guardian.kafka.codecs.Circe._
import io.aiven.guardian.kafka.configs.KafkaCluster
import io.aiven.guardian.kafka.models.Gzip
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import io.aiven.guardian.kafka.s3.Generators.s3ConfigGen
import io.aiven.guardian.kafka.s3.UniqueGenerators.uniqueS3ConfigGen
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
import org.mdedetrich.akka.stream.support.CirceStreamSupport
import org.scalatest.WorkStealingParallelExecution
import org.scalatest.propspec.AnyPropSpecLike

import scala.concurrent.Future
Expand All @@ -33,7 +35,12 @@ import scala.language.postfixOps

import java.time.temporal.ChronoUnit

trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with BackupClientSpec {
trait RealS3BackupClientTest
extends AnyPropSpecLike
with KafkaClusterTest
with BackupClientSpec
with UniqueGenerators
with WorkStealingParallelExecution {
def compression: Option[CompressionConfig]

override lazy val s3Settings: S3Settings = S3Settings()
Expand Down Expand Up @@ -95,9 +102,10 @@ trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with
}

property("basic flow without interruptions using PeriodFromFirst works correctly", RealS3Available) {
forAll(kafkaDataWithMinByteSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix),
kafkaConsumerGroupGen
forAll(
kafkaDataWithMinByteSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
uniqueS3ConfigGen(useVirtualDotHost, bucketPrefix),
uniqueKafkaConsumerGroupGen()
) {
(kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod,
s3Config: S3Config,
Expand Down Expand Up @@ -172,9 +180,10 @@ trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with
}

property("suspend/resume using PeriodFromFirst creates separate object after resume point", RealS3Available) {
forAll(kafkaDataWithMinByteSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix),
kafkaConsumerGroupGen
forAll(
kafkaDataWithMinByteSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
uniqueS3ConfigGen(useVirtualDotHost, bucketPrefix),
uniqueKafkaConsumerGroupGen()
) {
(kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod,
s3Config: S3Config,
Expand Down Expand Up @@ -291,9 +300,10 @@ trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with
}

property("suspend/resume for same object using ChronoUnitSlice works correctly", RealS3Available) {
forAll(kafkaDataWithMinByteSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix),
kafkaConsumerGroupGen
forAll(
kafkaDataWithMinByteSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
uniqueS3ConfigGen(useVirtualDotHost, bucketPrefix),
uniqueKafkaConsumerGroupGen()
) {
(kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod,
s3Config: S3Config,
Expand Down Expand Up @@ -390,8 +400,8 @@ trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with
RealS3Available
) {
forAll(kafkaDataWithTimePeriodsGen(min = 30000, max = 30000),
s3ConfigGen(useVirtualDotHost, bucketPrefix),
kafkaConsumerGroupGen
uniqueS3ConfigGen(useVirtualDotHost, bucketPrefix),
uniqueKafkaConsumerGroupGen()
) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod, s3Config: S3Config, kafkaConsumerGroup: String) =>
logger.info(s"Data bucket is ${s3Config.dataBucket}")
val data = kafkaDataWithTimePeriod.data
Expand Down Expand Up @@ -474,10 +484,10 @@ trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with
) {
forAll(
kafkaDataWithMinByteSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix),
s3ConfigGen(useVirtualDotHost, bucketPrefix),
kafkaConsumerGroupGen,
kafkaConsumerGroupGen
uniqueS3ConfigGen(useVirtualDotHost, bucketPrefix),
uniqueS3ConfigGen(useVirtualDotHost, bucketPrefix),
uniqueKafkaConsumerGroupGen(),
uniqueKafkaConsumerGroupGen()
) {
(kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod,
firstS3Config: S3Config,
Expand Down Expand Up @@ -599,10 +609,10 @@ trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with
) {
forAll(
kafkaDataWithTimePeriodsGen(min = 30000, max = 30000),
s3ConfigGen(useVirtualDotHost, bucketPrefix),
s3ConfigGen(useVirtualDotHost, bucketPrefix),
kafkaConsumerGroupGen,
kafkaConsumerGroupGen
uniqueS3ConfigGen(useVirtualDotHost, bucketPrefix),
uniqueS3ConfigGen(useVirtualDotHost, bucketPrefix),
uniqueKafkaConsumerGroupGen(),
uniqueKafkaConsumerGroupGen()
) {
(kafkaDataWithTimePeriod: KafkaDataWithTimePeriod,
firstS3Config: S3Config,
Expand Down
15 changes: 13 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ val organizeImportsVersion = "0.6.0"
// See https://github.com/akka/akka-http/pull/3995 and https://github.com/akka/akka-http/pull/3995#issuecomment-1026978593
ThisBuild / libraryDependencySchemes += "org.scala-lang.modules" %% "scala-xml" % "always"

concurrentRestrictions in Global += Tags.limit(Tags.ForkedTestGroup, EvaluateTask.SystemProcessors)
ThisBuild / Test / testForkedParallel := true

val flagsFor12 = Seq(
"-Xlint:_",
"-Ywarn-infer-any",
Expand Down Expand Up @@ -83,6 +86,9 @@ val cliSettings = Seq(

val baseName = "guardian-for-apache-kafka"

ThisBuild / Test / testForkedParallel := true
ThisBuild / concurrentRestrictions := Seq(Tags.limitAll(10))

lazy val guardian = project
.in(file("."))
.enablePlugins(ScalaUnidocPlugin)
Expand All @@ -106,6 +112,9 @@ lazy val guardian = project
cliRestore
)
.settings(
commands += Command.command("testUntilFailed") { state =>
"test" :: "testUntilFailed" :: state
},
publish / skip := true,
crossScalaVersions := List() // workaround for https://github.com/sbt/sbt/issues/3465
)
Expand Down Expand Up @@ -286,7 +295,8 @@ lazy val restoreS3 = project
.in(file("restore-s3"))
.settings(
librarySettings,
name := s"$baseName-restore-s3"
Test / fork := true,
name := s"$baseName-restore-s3"
)
.dependsOn(coreRestore % "compile->compile;test->test", coreS3 % "compile->compile;test->test")
.dependsOn(backupS3 % "test->compile")
Expand All @@ -295,7 +305,8 @@ lazy val restoreGCS = project
.in(file("restore-gcs"))
.settings(
librarySettings,
name := s"$baseName-restore-gcs"
Test / fork := true,
name := s"$baseName-restore-gcs"
)
.dependsOn(coreRestore % "compile->compile;test->test", coreGCS % "compile->compile;test->test")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.aiven.guardian.kafka.s3

import akka.actor.ActorSystem
import akka.stream.Attributes
import akka.stream.alpakka.s3.BucketAccess
import akka.stream.alpakka.s3.scaladsl.S3
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
import org.scalacheck.Gen

import scala.concurrent.Await
import scala.concurrent.duration.Duration

object UniqueGenerators {

def uniqueS3ConfigGen(useVirtualDotHost: Boolean, prefix: Option[String] = None)(implicit
system: ActorSystem,
s3Attrs: Attributes
): Gen[S3Config] =
for {
s3Config <- Generators.s3ConfigGen(useVirtualDotHost, prefix)
check = Await.result(S3.checkIfBucketExists(s3Config.dataBucket), Duration.Inf)
bucket <- check match {
case BucketAccess.AccessDenied => uniqueS3ConfigGen(useVirtualDotHost, prefix)
case BucketAccess.AccessGranted => uniqueS3ConfigGen(useVirtualDotHost, prefix)
case BucketAccess.NotExists => Gen.const(s3Config)
}
} yield bucket

}
14 changes: 10 additions & 4 deletions core/src/test/scala/io/aiven/guardian/kafka/KafkaClusterTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,17 @@ trait KafkaClusterTest extends AkkaStreamTestKit { this: Suite =>
} yield ()

def cleanTopics(topics: Set[String])(implicit executionContext: ExecutionContext): Future[Unit] =
Future.successful(())
// for {
// currentTopics <- adminClient.listTopics().names().toCompletableFuture.asScala
// topicsToDelete = topics.intersect(currentTopics.asScala.toSet)
// _ <- adminClient.deleteTopics(topicsToDelete.asJava).all().toCompletableFuture.asScala
// } yield ()

def checkConsumerGroupExists(consumerGroup: String)(implicit executionContext: ExecutionContext): Future[Boolean] =
for {
currentTopics <- adminClient.listTopics().names().toCompletableFuture.asScala
topicsToDelete = topics.intersect(currentTopics.asScala.toSet)
_ <- adminClient.deleteTopics(topicsToDelete.asJava).all().toCompletableFuture.asScala
} yield ()
consumerGroups <- adminClient.listConsumerGroups().valid().toCompletableFuture.asScala
} yield consumerGroups.asScala.toList.map(_.groupId()).contains(consumerGroup)

case object TerminationException extends Exception("termination-exception")
}
Expand Down
28 changes: 28 additions & 0 deletions core/src/test/scala/io/aiven/guardian/kafka/UniqueGenerators.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.aiven.guardian.kafka

import org.scalacheck.Gen
import org.scalatest.Suite

import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration

trait UniqueGenerators extends KafkaClusterTest { this: Suite =>
private def uniqueKafkaConsumerGroupGenRec()(implicit executionContext: ExecutionContext): Gen[String] =
for {
kafkaConsumerGroup <- Generators.kafkaConsumerGroupGen
exists = Await.result(checkConsumerGroupExists(kafkaConsumerGroup), Duration.Inf)
result <- if (exists)
uniqueKafkaConsumerGroupGenRec()
else
Gen.const(kafkaConsumerGroup)
} yield result

def uniqueKafkaConsumerGroupGen()(implicit executionContext: ExecutionContext): Gen[String] = {
// Its possible for a container to not be started so just referenced the variable causes the lazy value
// to initialize.
container
uniqueKafkaConsumerGroupGenRec()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.scalatest

import org.scalatest.tools.ConcurrentDistributor

trait WorkStealingParallelExecution extends SuiteMixin with ParallelTestExecution { this: Suite =>

lazy val parallelism: Int = 1 max testNames.size

abstract override def run(testName: Option[String], args: Args): Status =
super.run(testName,
args.copy(
distributor = Some(
new ConcurrentDistributor(
args,
java.util.concurrent.Executors.newWorkStealingPool(parallelism)
)
)
)
)
}

0 comments on commit 369c281

Please sign in to comment.