From 2032be7a48f0f1190712ed8f6a779a4ba8f79445 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Tue, 17 Dec 2024 11:25:31 +0800 Subject: [PATCH] fix(core): fix potential infinite recursion on reading empty segment Signed-off-by: Shichao Nie --- .../streamaspect/ElasticLogFileRecords.java | 3 ++ .../log/streamaspect/ElasticLogSegment.java | 4 ++ .../streamaspect/ElasticLogCleanerTest.scala | 47 ++++++++++++++++++- 3 files changed, 53 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java index 0d3d93e3a2..ce41dfe018 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java @@ -181,6 +181,9 @@ private CompletableFuture fetch0(FetchContext context, long startOffset, l } readSize += recordBatchWithContext.rawPayload().remaining(); } + if (readSize == 0) { + return CompletableFuture.completedFuture(null); + } return fetch0(context, nextFetchOffset, endOffset, maxSize - readSize, results); }); } diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.java b/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.java index 308e8981b5..724bd89930 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.java @@ -262,6 +262,10 @@ public CompletableFuture readAsync(long startOffset, int maxSize, return CompletableFuture.failedFuture(new IllegalArgumentException("Invalid max size " + maxSize + " for log read from segment " + log)); // Note that relativePositionInSegment here is a fake value. There are no 'position' in elastic streams. + // if the start offset is less than base offset, use base offset. This usually happens when the prev segment is generated + // by compaction and its last offset is less than the base offset of the current segment. + startOffset = Utils.max(startOffset, baseOffset); + // if the start offset is already off the end of the log, return null if (startOffset >= log.nextOffset()) { return CompletableFuture.completedFuture(null); diff --git a/core/src/test/scala/kafka/log/streamaspect/ElasticLogCleanerTest.scala b/core/src/test/scala/kafka/log/streamaspect/ElasticLogCleanerTest.scala index a865be96a4..744eb7971d 100644 --- a/core/src/test/scala/kafka/log/streamaspect/ElasticLogCleanerTest.scala +++ b/core/src/test/scala/kafka/log/streamaspect/ElasticLogCleanerTest.scala @@ -8,7 +8,7 @@ import org.apache.kafka.common.Uuid import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig, LogDirFailureChannel, LogOffsetsListener} -import org.junit.jupiter.api.{Assertions, BeforeEach, Tag, Test} +import org.junit.jupiter.api.{Assertions, BeforeEach, Tag, Test, Timeout} import java.io.File import java.util.Properties @@ -80,6 +80,51 @@ class ElasticLogCleanerTest extends LogCleanerTest { } } + @Test + @Timeout(value = 30) + def testCleanSegmentCauseHollowWithEmptySegment(): Unit = { + val cleaner = makeCleaner(Int.MaxValue) + val logProps = new Properties() + logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + log.appendAsLeader(record(1, 0), leaderEpoch = 0) + log.appendAsLeader(record(2, 0), leaderEpoch = 0) + while (log.numberOfSegments < 2) { + log.appendAsLeader(record(1, log.logEndOffset.toInt), leaderEpoch = 0) + } + while (log.numberOfSegments < 3) { + log.appendAsLeader(record(3, 22), leaderEpoch = 0) + } + log.appendAsLeader(record(1, log.logEndOffset.toInt), leaderEpoch = 0) + log.appendAsLeader(record(3, log.logEndOffset.toInt), leaderEpoch = 0) + + val map = new FakeOffsetMap(Int.MaxValue) + map.put(key(2L), 1L) + map.put(key(1L), log.logEndOffset - 2) + map.put(key(3L), log.logEndOffset - 1) + + // create an empty segment in between first and last segment + cleaner.cleanSegments(log, log.logSegments.asScala.take(1).toSeq, map, 0L, new CleanerStats, new CleanedTransactionMetadata, -1) + cleaner.cleanSegments(log, log.logSegments.asScala.slice(1, 2).toSeq, map, 0L, new CleanerStats, new CleanedTransactionMetadata, -1) + + log.logSegments.asScala.slice(1, 2).foreach(s => { + Assertions.assertEquals(0, s.size()) + }) + + var offset = 0L + var total = 0 + while (offset < log.logEndOffset) { + val rst = log.read(offset, 1, FetchIsolation.LOG_END, minOneMessage = true) + Assertions.assertNotNull(rst) + rst.records.batches.forEach(b => { + total += 1 + offset = b.nextOffset() + }) + } + Assertions.assertEquals(4, total) + } + override protected def makeLog(dir: File, config: LogConfig, recoveryPoint: Long): ElasticUnifiedLog = {