From f22830a55407ce77c4f52af78bc063efa8de0896 Mon Sep 17 00:00:00 2001 From: "Xu Han@AutoMQ" Date: Thu, 26 Sep 2024 20:24:10 +0800 Subject: [PATCH] fix(metadata): fix stream endoffset set back (#2042) Signed-off-by: Robin Han --- .../stream/StreamControlManager.java | 2 +- .../kafka/image/S3StreamsMetadataDelta.java | 15 +++++++-- .../image/S3StreamsMetadataImageTest.java | 32 ++++++++++++++++++- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 1740625b74..e502b680af 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -1441,7 +1441,7 @@ public void replay(S3StreamEndOffsetsRecord record) { if (streamMetadata == null) { // should not happen log.error("streamId={} not exist when replay S3StreamEndOffsetsRecord", streamEndOffset.streamId()); - return; + continue; } streamMetadata.endOffset(streamEndOffset.endOffset()); } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java index 877f7b4752..088554ec31 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java @@ -185,13 +185,22 @@ S3StreamsMetadataImage apply() { stream2partition = image.stream2partition(); } registry.inLock(() -> { - newStreamEndOffsets.putAll(changedStreamEndOffsets); - deletedStreams.forEach(newStreamEndOffsets::remove); - // apply the delta changes of old streams since the last image changedStreams.forEach((streamId, delta) -> newStreamMetadataMap.put(streamId, delta.apply())); deletedStreams.forEach(newStreamMetadataMap::remove); + changedStreamEndOffsets.forEach((streamId, newEndOffset) -> newStreamEndOffsets.compute(streamId, (key, oldEndOffset) -> { + if (!newStreamMetadataMap.containsKey(streamId)) { + return null; + } + if (oldEndOffset == null) { + return newEndOffset; + } + // S3StreamSetObjectRecord maybe the SSO compaction record, we need ignore the offset. + return Math.max(oldEndOffset, newEndOffset); + })); + deletedStreams.forEach(newStreamEndOffsets::remove); + // apply the delta changes of old nodes since the last image this.changedNodes.forEach((nodeId, delta) -> newNodeMetadataMap.put(nodeId, delta.apply())); // remove the deleted nodes diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index 57e8918131..26b9c2eae3 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -17,9 +17,9 @@ package org.apache.kafka.image; +import com.automq.stream.s3.index.LocalStreamRangeIndexCache; import com.automq.stream.s3.index.RangeIndex; import com.automq.stream.s3.index.SparseRangeIndex; -import com.automq.stream.s3.index.LocalStreamRangeIndexCache; import com.automq.stream.s3.metadata.ObjectUtils; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3ObjectType; @@ -47,17 +47,22 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.metadata.AssignedStreamIdRecord; import org.apache.kafka.common.metadata.RemoveS3StreamRecord; +import org.apache.kafka.common.metadata.S3StreamEndOffsetsRecord; import org.apache.kafka.common.metadata.S3StreamRecord; +import org.apache.kafka.common.metadata.S3StreamSetObjectRecord; import org.apache.kafka.image.S3StreamsMetadataImage.RangeGetter; import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.image.writer.RecordListWriter; import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.RangeMetadata; +import org.apache.kafka.metadata.stream.S3StreamEndOffsetsCodec; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3StreamSetObject; +import org.apache.kafka.metadata.stream.StreamEndOffset; import org.apache.kafka.metadata.stream.StreamTags; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.automq.AutoMQVersion; import org.apache.kafka.timeline.TimelineHashMap; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -141,6 +146,31 @@ public void testAssignedChange() { assertEquals(image2, delta1.apply()); } + @Test + public void testImage_compatible() { + S3StreamsMetadataImage image = new S3StreamsMetadataImage(0, RegistryRef.NOOP, new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0), + new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0), new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0), + new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0), new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0)); + S3StreamsMetadataDelta delta = new S3StreamsMetadataDelta(image); + delta.replay(new S3StreamRecord().setStreamId(233L)); + delta.replay((S3StreamSetObjectRecord) new S3StreamSetObject(0, 1, List.of(new StreamOffsetRange(233L, 100, 200L)), 0).toRecord(AutoMQVersion.V0).message()); + delta.replay(new S3StreamEndOffsetsRecord().setEndOffsets(S3StreamEndOffsetsCodec.encode(List.of(new StreamEndOffset(233L, 300L))))); + + image = delta.apply(); + RecordListWriter writer = new RecordListWriter(); + ImageWriterOptions options = new ImageWriterOptions.Builder().build(); + image.write(writer, options); + + delta = new S3StreamsMetadataDelta(S3StreamsMetadataImage.EMPTY); + RecordTestUtils.replayAll(delta, writer.records()); + image = delta.apply(); + delta = new S3StreamsMetadataDelta(image); + delta.replay((S3StreamSetObjectRecord) new S3StreamSetObject(0, 1, List.of(new StreamOffsetRange(233L, 100, 200L)), 0).toRecord(AutoMQVersion.V0).message()); + image = delta.apply(); + + Assertions.assertEquals(300L, image.streamEndOffsets().get(233L)); + } + private void testToImageAndBack(S3StreamsMetadataImage image) { RecordListWriter writer = new RecordListWriter(); ImageWriterOptions options = new ImageWriterOptions.Builder().build();