Skip to content

Commit

Permalink
Run tests concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Sep 6, 2022
1 parent 13939d3 commit 9e04c58
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import io.aiven.guardian.kafka.s3.Generators.s3ConfigGen
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
import org.mdedetrich.akka.stream.support.CirceStreamSupport
import org.scalatest.FixedThreadPoolParallelExecution

import scala.concurrent.Future
import scala.concurrent.duration._
Expand All @@ -35,6 +36,7 @@ class RealS3BackupClientSpec
extends AnyPropTestKit(ActorSystem("RealS3BackupClientSpec"))
with KafkaClusterTest
with BackupClientSpec {

override lazy val s3Settings: S3Settings = S3Settings()

/** Virtual Dot Host in bucket names are disabled because you need an actual DNS certificate otherwise AWS will fail
Expand Down
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ val organizeImportsVersion = "0.6.0"
// See https://github.com/akka/akka-http/pull/3995 and https://github.com/akka/akka-http/pull/3995#issuecomment-1026978593
ThisBuild / libraryDependencySchemes += "org.scala-lang.modules" %% "scala-xml" % "always"

concurrentRestrictions in Global += Tags.limit(Tags.ForkedTestGroup, EvaluateTask.SystemProcessors)
ThisBuild / Test / testForkedParallel := true

val flagsFor12 = Seq(
"-Xlint:_",
"-Ywarn-infer-any",
Expand Down
37 changes: 18 additions & 19 deletions core/src/test/scala/io/aiven/guardian/kafka/KafkaClusterTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import akka.kafka.ConsumerSettings
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import com.dimafeng.testcontainers.ForAllTestContainer
import com.dimafeng.testcontainers.KafkaContainer
import io.aiven.guardian.akka.AkkaStreamTestKit
import io.aiven.guardian.kafka.TestUtils.KafkaFutureToCompletableFuture
Expand All @@ -25,13 +24,13 @@ import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._
import scala.language.postfixOps

trait KafkaClusterTest extends ForAllTestContainer with AkkaStreamTestKit { this: Suite =>
trait KafkaClusterTest extends AkkaStreamTestKit { this: Suite =>

/** Timeout constant to wait for both Akka Streams plus initialization of consumer/kafka cluster
*/
val KafkaInitializationTimeoutConstant: FiniteDuration = AkkaStreamInitializationConstant + (2.5 seconds)

override lazy val container: KafkaContainer = new KafkaContainer()
lazy val container: KafkaContainer = KafkaClusterTest.container

def baseKafkaConfig: Some[ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]]] =
Some(
Expand Down Expand Up @@ -80,22 +79,7 @@ trait KafkaClusterTest extends ForAllTestContainer with AkkaStreamTestKit { this
).runWith(Producer.plainSink(producerSettings))
}

protected var adminClient: AdminClient = _

override def afterStart(): Unit = {
super.afterStart()
adminClient = AdminClient.create(
Map[String, AnyRef](
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> container.bootstrapServers
).asJava
)
}

override def beforeStop(): Unit = {
adminClient.close()
super.beforeStop()
}

import KafkaClusterTest.adminClient
def createTopics(topics: Set[String])(implicit executionContext: ExecutionContext): Future[Unit] =
for {
currentTopics <- adminClient.listTopics().names().toCompletableFuture.asScala
Expand All @@ -118,3 +102,18 @@ trait KafkaClusterTest extends ForAllTestContainer with AkkaStreamTestKit { this

case object TerminationException extends Exception("termination-exception")
}

object KafkaClusterTest {
protected var adminClient: AdminClient = _

protected lazy val container: KafkaContainer = {
val con = new KafkaContainer()
con.start()
adminClient = AdminClient.create(
Map[String, AnyRef](
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> con.bootstrapServers
).asJava
)
con
}
}

0 comments on commit 9e04c58

Please sign in to comment.