Skip to content

Commit

Permalink
Run tests concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Oct 19, 2022
1 parent a223b59 commit 24ba242
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import io.aiven.guardian.kafka.s3.Generators.s3ConfigGen
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
import org.mdedetrich.akka.stream.support.CirceStreamSupport
import org.scalatest.WorkStealingParallelExecution
import org.scalatest.propspec.AnyPropSpecLike

import scala.concurrent.Future
Expand All @@ -33,7 +34,11 @@ import scala.language.postfixOps

import java.time.temporal.ChronoUnit

trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with BackupClientSpec {
trait RealS3BackupClientTest
extends AnyPropSpecLike
with KafkaClusterTest
with BackupClientSpec
with WorkStealingParallelExecution {
def compression: Option[CompressionConfig]

override lazy val s3Settings: S3Settings = S3Settings()
Expand Down
15 changes: 13 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ val organizeImportsVersion = "0.6.0"
// See https://github.com/akka/akka-http/pull/3995 and https://github.com/akka/akka-http/pull/3995#issuecomment-1026978593
ThisBuild / libraryDependencySchemes += "org.scala-lang.modules" %% "scala-xml" % "always"

concurrentRestrictions in Global += Tags.limit(Tags.ForkedTestGroup, EvaluateTask.SystemProcessors)
ThisBuild / Test / testForkedParallel := true

val flagsFor12 = Seq(
"-Xlint:_",
"-Ywarn-infer-any",
Expand Down Expand Up @@ -83,6 +86,9 @@ val cliSettings = Seq(

val baseName = "guardian-for-apache-kafka"

ThisBuild / Test / testForkedParallel := true
ThisBuild / concurrentRestrictions := Seq(Tags.limitAll(10))

lazy val guardian = project
.in(file("."))
.enablePlugins(ScalaUnidocPlugin)
Expand All @@ -106,6 +112,9 @@ lazy val guardian = project
cliRestore
)
.settings(
commands += Command.command("testUntilFailed") { state =>
"test" :: "testUntilFailed" :: state
},
publish / skip := true,
crossScalaVersions := List() // workaround for https://github.com/sbt/sbt/issues/3465
)
Expand Down Expand Up @@ -286,7 +295,8 @@ lazy val restoreS3 = project
.in(file("restore-s3"))
.settings(
librarySettings,
name := s"$baseName-restore-s3"
Test / fork := true,
name := s"$baseName-restore-s3"
)
.dependsOn(coreRestore % "compile->compile;test->test", coreS3 % "compile->compile;test->test")
.dependsOn(backupS3 % "test->compile")
Expand All @@ -295,7 +305,8 @@ lazy val restoreGCS = project
.in(file("restore-gcs"))
.settings(
librarySettings,
name := s"$baseName-restore-gcs"
Test / fork := true,
name := s"$baseName-restore-gcs"
)
.dependsOn(coreRestore % "compile->compile;test->test", coreGCS % "compile->compile;test->test")

Expand Down
37 changes: 18 additions & 19 deletions core/src/test/scala/io/aiven/guardian/kafka/KafkaClusterTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import akka.kafka.ConsumerSettings
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import com.dimafeng.testcontainers.ForAllTestContainer
import com.dimafeng.testcontainers.KafkaContainer
import io.aiven.guardian.akka.AkkaStreamTestKit
import io.aiven.guardian.kafka.TestUtils.KafkaFutureToCompletableFuture
Expand All @@ -25,13 +24,13 @@ import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._
import scala.language.postfixOps

trait KafkaClusterTest extends ForAllTestContainer with AkkaStreamTestKit { this: Suite =>
trait KafkaClusterTest extends AkkaStreamTestKit { this: Suite =>

/** Timeout constant to wait for both Akka Streams plus initialization of consumer/kafka cluster
*/
val KafkaInitializationTimeoutConstant: FiniteDuration = AkkaStreamInitializationConstant + (2.5 seconds)

override lazy val container: KafkaContainer = new KafkaContainer()
lazy val container: KafkaContainer = KafkaClusterTest.container

def baseKafkaConfig: Some[ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]]] =
Some(
Expand Down Expand Up @@ -80,22 +79,7 @@ trait KafkaClusterTest extends ForAllTestContainer with AkkaStreamTestKit { this
).runWith(Producer.plainSink(producerSettings))
}

protected var adminClient: AdminClient = _

override def afterStart(): Unit = {
super.afterStart()
adminClient = AdminClient.create(
Map[String, AnyRef](
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> container.bootstrapServers
).asJava
)
}

override def beforeStop(): Unit = {
adminClient.close()
super.beforeStop()
}

import KafkaClusterTest.adminClient
def createTopics(topics: Set[String])(implicit executionContext: ExecutionContext): Future[Unit] =
for {
currentTopics <- adminClient.listTopics().names().toCompletableFuture.asScala
Expand All @@ -118,3 +102,18 @@ trait KafkaClusterTest extends ForAllTestContainer with AkkaStreamTestKit { this

case object TerminationException extends Exception("termination-exception")
}

object KafkaClusterTest {
protected var adminClient: AdminClient = _

protected lazy val container: KafkaContainer = {
val con = new KafkaContainer()
con.start()
adminClient = AdminClient.create(
Map[String, AnyRef](
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> con.bootstrapServers
).asJava
)
con
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.scalatest

import org.scalatest.tools.ConcurrentDistributor

trait WorkStealingParallelExecution extends SuiteMixin with ParallelTestExecution { this: Suite =>

lazy val parallelism: Int = 1 max testNames.size

abstract override def run(testName: Option[String], args: Args): Status =
super.run(testName,
args.copy(
distributor = Some(
new ConcurrentDistributor(
args,
java.util.concurrent.Executors.newWorkStealingPool(parallelism)
)
)
)
)
}

0 comments on commit 24ba242

Please sign in to comment.