diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedKafkaClientBackupClientSpec.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedKafkaClientBackupClientSpec.scala index 75b5c3e4..124c61ab 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedKafkaClientBackupClientSpec.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedKafkaClientBackupClientSpec.scala @@ -44,8 +44,13 @@ class MockedKafkaClientBackupClientSpec "Creating many objects in a small period of time works despite S3's in progress multipart upload eventual consistency issues", RealS3Available ) { - forAll(kafkaDataWithTimePeriodsGen(100, 100, 1000, tailingSentinelValue = true), - s3ConfigGen(useVirtualDotHost, bucketPrefix) + forAll( + kafkaDataWithTimePeriodsGen(20, + 20, + padTimestampsMillis = Range.inclusive(1000, 1000), + trailingSentinelValue = true + ), + s3ConfigGen(useVirtualDotHost, bucketPrefix) ) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod, s3Config: S3Config) => logger.info(s"Data bucket is ${s3Config.dataBucket}") val data = kafkaDataWithTimePeriod.data diff --git a/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceSpec.scala b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceSpec.scala index bfd8d26a..cce3c822 100644 --- a/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceSpec.scala +++ b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceSpec.scala @@ -39,7 +39,8 @@ class RestoreClientInterfaceSpec property("Calculating finalKeys contains correct keys with fromWhen filter") { forAll( - kafkaDataWithTimePeriodsAndPickedRecordGen(padTimestampsMillis = ChronoUnit.HOURS.getDuration.toMillis.toInt, + kafkaDataWithTimePeriodsAndPickedRecordGen(padTimestampsMillis = + Range.inclusive(1, ChronoUnit.HOURS.getDuration.toMillis.toInt), min = 1000, max = 1000 ) @@ -103,7 +104,8 @@ class RestoreClientInterfaceSpec property("Round-trip with backup and restore works using fromWhen filter") { forAll( - kafkaDataWithTimePeriodsAndPickedRecordGen(padTimestampsMillis = ChronoUnit.HOURS.getDuration.toMillis.toInt, + kafkaDataWithTimePeriodsAndPickedRecordGen(padTimestampsMillis = + Range.inclusive(1, ChronoUnit.HOURS.getDuration.toMillis.toInt), min = 1000, max = 1000 ) diff --git a/core/src/test/scala/io/aiven/guardian/kafka/Generators.scala b/core/src/test/scala/io/aiven/guardian/kafka/Generators.scala index e8a685ea..e19c8a4c 100644 --- a/core/src/test/scala/io/aiven/guardian/kafka/Generators.scala +++ b/core/src/test/scala/io/aiven/guardian/kafka/Generators.scala @@ -62,7 +62,7 @@ object Generators { def kafkaReducedConsumerRecordsGen(topic: String, min: Int, max: Int, - padTimestampsMillis: Int + padTimestampsMillis: Range = Range.inclusive(1, 10) ): Gen[List[ReducedConsumerRecord]] = for { t <- Gen.const(topic) numberOfTotalReducedRecords <- Gen.chooseNum[Int](min, max) @@ -87,7 +87,7 @@ object Generators { .flatten timestampsWithPadding <- Gen .sequence((1 to reducedConsumerRecordsWithoutTimestamp.size).map { _ => - Gen.chooseNum[Long](1, padTimestampsMillis + 1) + Gen.chooseNum[Long](padTimestampsMillis.start, padTimestampsMillis.last) }) .map(_.asScala.toList) timestamps = timestampsWithPadding.foldLeft(ListBuffer(1L)) { case (timestamps, padding) => @@ -118,6 +118,8 @@ object Generators { def topics: Set[String] = data.map(_.topic).toSet } + /** Generates a random `FiniteDuration` that is between the time periods within the `reducedConsumerRecords`. + */ def randomPeriodSliceBetweenMinMax(reducedConsumerRecords: List[ReducedConsumerRecord]): Gen[FiniteDuration] = { val head = reducedConsumerRecords.head val last = reducedConsumerRecords.last @@ -126,7 +128,7 @@ object Generators { def kafkaDateGen(min: Int = 2, max: Int = 100, - padTimestampsMillis: Int = 10, + padTimestampsMillis: Range = Range.inclusive(1, 10), condition: Option[List[ReducedConsumerRecord] => Boolean] = None ): Gen[List[ReducedConsumerRecord]] = for { topic <- kafkaTopic @@ -141,25 +143,30 @@ object Generators { * The minimum number of `ReducedConsumerRecord`'s to generate. Defaults to 2. * @param max * The maximum number of `ReducedConsumerRecord`'s to generate. Defaults to 100. + * @param periodSliceFunction + * A provided function that allows you to specify a generated `FiniteDuration` which is handy for + * `TimeConfiguration` configuration. The function has both the generated records and `trailingSentinelValue` as an + * argument giving you the necessary input to generate the `FiniteDuration` as you wish. Note that if + * `trailingSentinelValue` is true then the sentinel value won't be provided to this function. * @param padTimestampsMillis - * The amount of padding (in milliseconds) between consecutive timestamps. If set to 0 then all timestamps will - * differ by a single millisecond. Defaults to 10 millis. - * @param tailingSentinelValue + * The amount of padding (in milliseconds) specified as a [[scala.collection.immutable.Range]] between consecutive + * timestamps. If set to 0 then all timestamps will differ by a single millisecond. Defaults to 10 millis. + * @param trailingSentinelValue * Whether to add a value at the end of the ReducedConsumerRecords that will have a massively large timestamp so * that it will typically be skipped when backing up data */ def kafkaDataWithTimePeriodsGen(min: Int = 2, max: Int = 100, - padTimestampsMillis: Int = 10, + padTimestampsMillis: Range = Range.inclusive(1, 10), periodSliceFunction: List[ReducedConsumerRecord] => Gen[FiniteDuration] = randomPeriodSliceBetweenMinMax, condition: Option[List[ReducedConsumerRecord] => Boolean] = None, - tailingSentinelValue: Boolean = false + trailingSentinelValue: Boolean = false ): Gen[KafkaDataWithTimePeriod] = for { records <- kafkaDateGen(min, max, padTimestampsMillis, condition) duration <- periodSliceFunction(records) } yield { - val finalRecords = if (tailingSentinelValue) { + val finalRecords = if (trailingSentinelValue) { val last = records.last records ::: List( ReducedConsumerRecord(last.topic, @@ -178,7 +185,7 @@ object Generators { def kafkaDataWithTimePeriodsAndPickedRecordGen( min: Int = 2, max: Int = 100, - padTimestampsMillis: Int = 10, + padTimestampsMillis: Range = Range.inclusive(1, 10), periodSliceFunction: List[ReducedConsumerRecord] => Gen[FiniteDuration] = randomPeriodSliceBetweenMinMax, condition: Option[List[ReducedConsumerRecord] => Boolean] = None ): Gen[KafkaDataWithTimePeriodAndPickedRecord] = for { @@ -216,7 +223,8 @@ object Generators { amount: Int, toBytesFunc: List[ReducedConsumerRecord] => Array[Byte] ): Gen[KafkaDataInChunksWithTimePeriod] = { - val single = kafkaDateGen(1000, 10000, 10, Some(reducedConsumerRecordsUntilSize(size, toBytesFunc))) + val single = + kafkaDateGen(1000, 10000, Range.inclusive(1, 10), Some(reducedConsumerRecordsUntilSize(size, toBytesFunc))) for { recordsSplitBySize <- Gen.sequence(List.fill(amount)(single)).map(_.asScala.toList) duration <- timePeriodAlwaysGreaterThanAllMessages(recordsSplitBySize.flatten)