Skip to content

Commit

Permalink
Use Range for padTimestampsMillis to fix test flakiness
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Sep 18, 2022
1 parent 8186243 commit ec5f978
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,13 @@ class MockedKafkaClientBackupClientSpec
"Creating many objects in a small period of time works despite S3's in progress multipart upload eventual consistency issues",
RealS3Available
) {
forAll(kafkaDataWithTimePeriodsGen(100, 100, 1000, tailingSentinelValue = true),
s3ConfigGen(useVirtualDotHost, bucketPrefix)
forAll(
kafkaDataWithTimePeriodsGen(20,
20,
padTimestampsMillis = Range.inclusive(1000, 1000),
trailingSentinelValue = true
),
s3ConfigGen(useVirtualDotHost, bucketPrefix)
) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod, s3Config: S3Config) =>
logger.info(s"Data bucket is ${s3Config.dataBucket}")
val data = kafkaDataWithTimePeriod.data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class RestoreClientInterfaceSpec

property("Calculating finalKeys contains correct keys with fromWhen filter") {
forAll(
kafkaDataWithTimePeriodsAndPickedRecordGen(padTimestampsMillis = ChronoUnit.HOURS.getDuration.toMillis.toInt,
kafkaDataWithTimePeriodsAndPickedRecordGen(padTimestampsMillis =
Range.inclusive(1, ChronoUnit.HOURS.getDuration.toMillis.toInt),
min = 1000,
max = 1000
)
Expand Down Expand Up @@ -103,7 +104,8 @@ class RestoreClientInterfaceSpec

property("Round-trip with backup and restore works using fromWhen filter") {
forAll(
kafkaDataWithTimePeriodsAndPickedRecordGen(padTimestampsMillis = ChronoUnit.HOURS.getDuration.toMillis.toInt,
kafkaDataWithTimePeriodsAndPickedRecordGen(padTimestampsMillis =
Range.inclusive(1, ChronoUnit.HOURS.getDuration.toMillis.toInt),
min = 1000,
max = 1000
)
Expand Down
30 changes: 19 additions & 11 deletions core/src/test/scala/io/aiven/guardian/kafka/Generators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object Generators {
def kafkaReducedConsumerRecordsGen(topic: String,
min: Int,
max: Int,
padTimestampsMillis: Int
padTimestampsMillis: Range = Range.inclusive(1, 10)
): Gen[List[ReducedConsumerRecord]] = for {
t <- Gen.const(topic)
numberOfTotalReducedRecords <- Gen.chooseNum[Int](min, max)
Expand All @@ -87,7 +87,7 @@ object Generators {
.flatten
timestampsWithPadding <- Gen
.sequence((1 to reducedConsumerRecordsWithoutTimestamp.size).map { _ =>
Gen.chooseNum[Long](1, padTimestampsMillis + 1)
Gen.chooseNum[Long](padTimestampsMillis.start, padTimestampsMillis.last)
})
.map(_.asScala.toList)
timestamps = timestampsWithPadding.foldLeft(ListBuffer(1L)) { case (timestamps, padding) =>
Expand Down Expand Up @@ -118,6 +118,8 @@ object Generators {
def topics: Set[String] = data.map(_.topic).toSet
}

/** Generates a random `FiniteDuration` that is between the time periods within the `reducedConsumerRecords`.
*/
def randomPeriodSliceBetweenMinMax(reducedConsumerRecords: List[ReducedConsumerRecord]): Gen[FiniteDuration] = {
val head = reducedConsumerRecords.head
val last = reducedConsumerRecords.last
Expand All @@ -126,7 +128,7 @@ object Generators {

def kafkaDateGen(min: Int = 2,
max: Int = 100,
padTimestampsMillis: Int = 10,
padTimestampsMillis: Range = Range.inclusive(1, 10),
condition: Option[List[ReducedConsumerRecord] => Boolean] = None
): Gen[List[ReducedConsumerRecord]] = for {
topic <- kafkaTopic
Expand All @@ -141,25 +143,30 @@ object Generators {
* The minimum number of `ReducedConsumerRecord`'s to generate. Defaults to 2.
* @param max
* The maximum number of `ReducedConsumerRecord`'s to generate. Defaults to 100.
* @param periodSliceFunction
* A provided function that allows you to specify a generated `FiniteDuration` which is handy for
* `TimeConfiguration` configuration. The function has both the generated records and `trailingSentinelValue` as an
* argument giving you the necessary input to generate the `FiniteDuration` as you wish. Note that if
* `trailingSentinelValue` is true then the sentinel value won't be provided to this function.
* @param padTimestampsMillis
* The amount of padding (in milliseconds) between consecutive timestamps. If set to 0 then all timestamps will
* differ by a single millisecond. Defaults to 10 millis.
* @param tailingSentinelValue
* The amount of padding (in milliseconds) specified as a [[scala.collection.immutable.Range]] between consecutive
* timestamps. If set to 0 then all timestamps will differ by a single millisecond. Defaults to 10 millis.
* @param trailingSentinelValue
* Whether to add a value at the end of the ReducedConsumerRecords that will have a massively large timestamp so
* that it will typically be skipped when backing up data
*/
def kafkaDataWithTimePeriodsGen(min: Int = 2,
max: Int = 100,
padTimestampsMillis: Int = 10,
padTimestampsMillis: Range = Range.inclusive(1, 10),
periodSliceFunction: List[ReducedConsumerRecord] => Gen[FiniteDuration] =
randomPeriodSliceBetweenMinMax,
condition: Option[List[ReducedConsumerRecord] => Boolean] = None,
tailingSentinelValue: Boolean = false
trailingSentinelValue: Boolean = false
): Gen[KafkaDataWithTimePeriod] = for {
records <- kafkaDateGen(min, max, padTimestampsMillis, condition)
duration <- periodSliceFunction(records)
} yield {
val finalRecords = if (tailingSentinelValue) {
val finalRecords = if (trailingSentinelValue) {
val last = records.last
records ::: List(
ReducedConsumerRecord(last.topic,
Expand All @@ -178,7 +185,7 @@ object Generators {
def kafkaDataWithTimePeriodsAndPickedRecordGen(
min: Int = 2,
max: Int = 100,
padTimestampsMillis: Int = 10,
padTimestampsMillis: Range = Range.inclusive(1, 10),
periodSliceFunction: List[ReducedConsumerRecord] => Gen[FiniteDuration] = randomPeriodSliceBetweenMinMax,
condition: Option[List[ReducedConsumerRecord] => Boolean] = None
): Gen[KafkaDataWithTimePeriodAndPickedRecord] = for {
Expand Down Expand Up @@ -216,7 +223,8 @@ object Generators {
amount: Int,
toBytesFunc: List[ReducedConsumerRecord] => Array[Byte]
): Gen[KafkaDataInChunksWithTimePeriod] = {
val single = kafkaDateGen(1000, 10000, 10, Some(reducedConsumerRecordsUntilSize(size, toBytesFunc)))
val single =
kafkaDateGen(1000, 10000, Range.inclusive(1, 10), Some(reducedConsumerRecordsUntilSize(size, toBytesFunc)))
for {
recordsSplitBySize <- Gen.sequence(List.fill(amount)(single)).map(_.asScala.toList)
duration <- timePeriodAlwaysGreaterThanAllMessages(recordsSplitBySize.flatten)
Expand Down

0 comments on commit ec5f978

Please sign in to comment.