From 3bc366f1a494acde774fdf99546d619926bf064e Mon Sep 17 00:00:00 2001 From: Peter Lee Date: Thu, 19 Dec 2024 05:47:38 +0800 Subject: [PATCH] Add IT for share consumer with duration base offet auto reset (#18251) Reviewers: Andrew Schofield --- .../kafka/test/api/ShareConsumerTest.java | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index 303f390363c53..2107cdd7718dc 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.InvalidRecordStateException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.WakeupException; @@ -1675,6 +1676,79 @@ public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String pers } } + @ParameterizedTest(name = "{displayName}.persister={0}") + @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) + public void testShareAutoOffsetResetByDuration(String persister) throws Exception { + // Set auto offset reset to 1 hour before current time + alterShareAutoOffsetReset("group1", "by_duration:PT1H"); + + try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); + KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) { + + long currentTime = System.currentTimeMillis(); + long twoHoursAgo = currentTime - TimeUnit.HOURS.toMillis(2); + long thirtyMinsAgo = currentTime - TimeUnit.MINUTES.toMillis(30); + + // Produce messages with different timestamps + ProducerRecord oldRecord = new ProducerRecord<>(tp.topic(), tp.partition(), + twoHoursAgo, "old_key".getBytes(), "old_value".getBytes()); + ProducerRecord recentRecord = new ProducerRecord<>(tp.topic(), tp.partition(), + thirtyMinsAgo, "recent_key".getBytes(), "recent_value".getBytes()); + ProducerRecord currentRecord = new ProducerRecord<>(tp.topic(), tp.partition(), + currentTime, "current_key".getBytes(), "current_value".getBytes()); + + producer.send(oldRecord).get(); + producer.send(recentRecord).get(); + producer.send(currentRecord).get(); + producer.flush(); + + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + // Should only receive messages from last hour (recent and current) + List> records = consumeRecords(shareConsumer, 2); + assertEquals(2, records.size()); + + // Verify timestamps and order + assertEquals(thirtyMinsAgo, records.get(0).timestamp()); + assertEquals("recent_key", new String(records.get(0).key())); + assertEquals(currentTime, records.get(1).timestamp()); + assertEquals("current_key", new String(records.get(1).key())); + } + + // Set the auto offset reset to 3 hours before current time + // so the consumer should consume all messages (3 records) + alterShareAutoOffsetReset("group2", "by_duration:PT3H"); + try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group2"); + KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) { + + shareConsumer.subscribe(Collections.singleton(tp.topic())); + List> records = consumeRecords(shareConsumer, 3); + assertEquals(3, records.size()); + } + } + + @ParameterizedTest(name = "{displayName}.persister={0}") + @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) + public void testShareAutoOffsetResetByDurationInvalidFormat(String persister) throws Exception { + // Test invalid duration format + ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, "group1"); + Map> alterEntries = new HashMap<>(); + + // Test invalid duration format + alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry( + GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:1h"), AlterConfigOp.OpType.SET))); + ExecutionException e1 = assertThrows(ExecutionException.class, () -> + adminClient.incrementalAlterConfigs(alterEntries).all().get()); + assertTrue(e1.getCause() instanceof InvalidConfigurationException); + + // Test negative duration + alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry( + GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:-PT1H"), AlterConfigOp.OpType.SET))); + ExecutionException e2 = assertThrows(ExecutionException.class, () -> + adminClient.incrementalAlterConfigs(alterEntries).all().get()); + assertTrue(e2.getCause() instanceof InvalidConfigurationException); + } + private int produceMessages(int messageCount) { try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());