Skip to content

Commit

Permalink
Run tests concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Sep 18, 2022
1 parent ec5f978 commit 42f3b6d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 19 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ val organizeImportsVersion = "0.6.0"
// See https://github.com/akka/akka-http/pull/3995 and https://github.com/akka/akka-http/pull/3995#issuecomment-1026978593
ThisBuild / libraryDependencySchemes += "org.scala-lang.modules" %% "scala-xml" % "always"

concurrentRestrictions in Global += Tags.limit(Tags.ForkedTestGroup, EvaluateTask.SystemProcessors)
ThisBuild / Test / testForkedParallel := true

val flagsFor12 = Seq(
"-Xlint:_",
"-Ywarn-infer-any",
Expand Down
37 changes: 18 additions & 19 deletions core/src/test/scala/io/aiven/guardian/kafka/KafkaClusterTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import akka.kafka.ConsumerSettings
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import com.dimafeng.testcontainers.ForAllTestContainer
import com.dimafeng.testcontainers.KafkaContainer
import io.aiven.guardian.akka.AkkaStreamTestKit
import io.aiven.guardian.kafka.TestUtils.KafkaFutureToCompletableFuture
Expand All @@ -25,13 +24,13 @@ import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._
import scala.language.postfixOps

trait KafkaClusterTest extends ForAllTestContainer with AkkaStreamTestKit { this: Suite =>
trait KafkaClusterTest extends AkkaStreamTestKit { this: Suite =>

/** Timeout constant to wait for both Akka Streams plus initialization of consumer/kafka cluster
*/
val KafkaInitializationTimeoutConstant: FiniteDuration = AkkaStreamInitializationConstant + (2.5 seconds)

override lazy val container: KafkaContainer = new KafkaContainer()
lazy val container: KafkaContainer = KafkaClusterTest.container

def baseKafkaConfig: Some[ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]]] =
Some(
Expand Down Expand Up @@ -80,22 +79,7 @@ trait KafkaClusterTest extends ForAllTestContainer with AkkaStreamTestKit { this
).runWith(Producer.plainSink(producerSettings))
}

protected var adminClient: AdminClient = _

override def afterStart(): Unit = {
super.afterStart()
adminClient = AdminClient.create(
Map[String, AnyRef](
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> container.bootstrapServers
).asJava
)
}

override def beforeStop(): Unit = {
adminClient.close()
super.beforeStop()
}

import KafkaClusterTest.adminClient
def createTopics(topics: Set[String])(implicit executionContext: ExecutionContext): Future[Unit] =
for {
currentTopics <- adminClient.listTopics().names().toCompletableFuture.asScala
Expand All @@ -118,3 +102,18 @@ trait KafkaClusterTest extends ForAllTestContainer with AkkaStreamTestKit { this

case object TerminationException extends Exception("termination-exception")
}

object KafkaClusterTest {
protected var adminClient: AdminClient = _

protected lazy val container: KafkaContainer = {
val con = new KafkaContainer()
con.start()
adminClient = AdminClient.create(
Map[String, AnyRef](
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> con.bootstrapServers
).asJava
)
con
}
}

0 comments on commit 42f3b6d

Please sign in to comment.