Skip to content

Commit

Permalink
Add IT for share consumer with duration base offet auto reset (apache…
Browse files Browse the repository at this point in the history
…#18251)

Reviewers: Andrew Schofield <[email protected]>
  • Loading branch information
peterxcli authored Dec 18, 2024
1 parent 0055ef0 commit 3bc366f
Showing 1 changed file with 74 additions and 0 deletions.
74 changes: 74 additions & 0 deletions core/src/test/java/kafka/test/api/ShareConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.WakeupException;
Expand Down Expand Up @@ -1675,6 +1676,79 @@ public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String pers
}
}

@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
public void testShareAutoOffsetResetByDuration(String persister) throws Exception {
// Set auto offset reset to 1 hour before current time
alterShareAutoOffsetReset("group1", "by_duration:PT1H");

try (KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1");
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) {

long currentTime = System.currentTimeMillis();
long twoHoursAgo = currentTime - TimeUnit.HOURS.toMillis(2);
long thirtyMinsAgo = currentTime - TimeUnit.MINUTES.toMillis(30);

// Produce messages with different timestamps
ProducerRecord<byte[], byte[]> oldRecord = new ProducerRecord<>(tp.topic(), tp.partition(),
twoHoursAgo, "old_key".getBytes(), "old_value".getBytes());
ProducerRecord<byte[], byte[]> recentRecord = new ProducerRecord<>(tp.topic(), tp.partition(),
thirtyMinsAgo, "recent_key".getBytes(), "recent_value".getBytes());
ProducerRecord<byte[], byte[]> currentRecord = new ProducerRecord<>(tp.topic(), tp.partition(),
currentTime, "current_key".getBytes(), "current_value".getBytes());

producer.send(oldRecord).get();
producer.send(recentRecord).get();
producer.send(currentRecord).get();
producer.flush();

shareConsumer.subscribe(Collections.singleton(tp.topic()));

// Should only receive messages from last hour (recent and current)
List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, 2);
assertEquals(2, records.size());

// Verify timestamps and order
assertEquals(thirtyMinsAgo, records.get(0).timestamp());
assertEquals("recent_key", new String(records.get(0).key()));
assertEquals(currentTime, records.get(1).timestamp());
assertEquals("current_key", new String(records.get(1).key()));
}

// Set the auto offset reset to 3 hours before current time
// so the consumer should consume all messages (3 records)
alterShareAutoOffsetReset("group2", "by_duration:PT3H");
try (KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group2");
KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) {

shareConsumer.subscribe(Collections.singleton(tp.topic()));
List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, 3);
assertEquals(3, records.size());
}
}

@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
public void testShareAutoOffsetResetByDurationInvalidFormat(String persister) throws Exception {
// Test invalid duration format
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, "group1");
Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new HashMap<>();

// Test invalid duration format
alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry(
GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:1h"), AlterConfigOp.OpType.SET)));
ExecutionException e1 = assertThrows(ExecutionException.class, () ->
adminClient.incrementalAlterConfigs(alterEntries).all().get());
assertTrue(e1.getCause() instanceof InvalidConfigurationException);

// Test negative duration
alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry(
GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:-PT1H"), AlterConfigOp.OpType.SET)));
ExecutionException e2 = assertThrows(ExecutionException.class, () ->
adminClient.incrementalAlterConfigs(alterEntries).all().get());
assertTrue(e2.getCause() instanceof InvalidConfigurationException);
}

private int produceMessages(int messageCount) {
try (KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
Expand Down

0 comments on commit 3bc366f

Please sign in to comment.