Skip to content

Commit

Permalink
fix(metadata): fix stream endoffset set back (#2042)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Sep 26, 2024
1 parent 777cac9 commit f22830a
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,7 @@ public void replay(S3StreamEndOffsetsRecord record) {
if (streamMetadata == null) {
// should not happen
log.error("streamId={} not exist when replay S3StreamEndOffsetsRecord", streamEndOffset.streamId());
return;
continue;
}
streamMetadata.endOffset(streamEndOffset.endOffset());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,22 @@ S3StreamsMetadataImage apply() {
stream2partition = image.stream2partition();
}
registry.inLock(() -> {
newStreamEndOffsets.putAll(changedStreamEndOffsets);
deletedStreams.forEach(newStreamEndOffsets::remove);

// apply the delta changes of old streams since the last image
changedStreams.forEach((streamId, delta) -> newStreamMetadataMap.put(streamId, delta.apply()));
deletedStreams.forEach(newStreamMetadataMap::remove);

changedStreamEndOffsets.forEach((streamId, newEndOffset) -> newStreamEndOffsets.compute(streamId, (key, oldEndOffset) -> {
if (!newStreamMetadataMap.containsKey(streamId)) {
return null;
}
if (oldEndOffset == null) {
return newEndOffset;
}
// S3StreamSetObjectRecord maybe the SSO compaction record, we need ignore the offset.
return Math.max(oldEndOffset, newEndOffset);
}));
deletedStreams.forEach(newStreamEndOffsets::remove);

// apply the delta changes of old nodes since the last image
this.changedNodes.forEach((nodeId, delta) -> newNodeMetadataMap.put(nodeId, delta.apply()));
// remove the deleted nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.kafka.image;

import com.automq.stream.s3.index.LocalStreamRangeIndexCache;
import com.automq.stream.s3.index.RangeIndex;
import com.automq.stream.s3.index.SparseRangeIndex;
import com.automq.stream.s3.index.LocalStreamRangeIndexCache;
import com.automq.stream.s3.metadata.ObjectUtils;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metadata.S3ObjectType;
Expand Down Expand Up @@ -47,17 +47,22 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.AssignedStreamIdRecord;
import org.apache.kafka.common.metadata.RemoveS3StreamRecord;
import org.apache.kafka.common.metadata.S3StreamEndOffsetsRecord;
import org.apache.kafka.common.metadata.S3StreamRecord;
import org.apache.kafka.common.metadata.S3StreamSetObjectRecord;
import org.apache.kafka.image.S3StreamsMetadataImage.RangeGetter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.stream.InRangeObjects;
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3StreamEndOffsetsCodec;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.metadata.stream.S3StreamSetObject;
import org.apache.kafka.metadata.stream.StreamEndOffset;
import org.apache.kafka.metadata.stream.StreamTags;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.automq.AutoMQVersion;
import org.apache.kafka.timeline.TimelineHashMap;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -141,6 +146,31 @@ public void testAssignedChange() {
assertEquals(image2, delta1.apply());
}

@Test
public void testImage_compatible() {
S3StreamsMetadataImage image = new S3StreamsMetadataImage(0, RegistryRef.NOOP, new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0),
new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0), new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0),
new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0), new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0));
S3StreamsMetadataDelta delta = new S3StreamsMetadataDelta(image);
delta.replay(new S3StreamRecord().setStreamId(233L));
delta.replay((S3StreamSetObjectRecord) new S3StreamSetObject(0, 1, List.of(new StreamOffsetRange(233L, 100, 200L)), 0).toRecord(AutoMQVersion.V0).message());
delta.replay(new S3StreamEndOffsetsRecord().setEndOffsets(S3StreamEndOffsetsCodec.encode(List.of(new StreamEndOffset(233L, 300L)))));

image = delta.apply();
RecordListWriter writer = new RecordListWriter();
ImageWriterOptions options = new ImageWriterOptions.Builder().build();
image.write(writer, options);

delta = new S3StreamsMetadataDelta(S3StreamsMetadataImage.EMPTY);
RecordTestUtils.replayAll(delta, writer.records());
image = delta.apply();
delta = new S3StreamsMetadataDelta(image);
delta.replay((S3StreamSetObjectRecord) new S3StreamSetObject(0, 1, List.of(new StreamOffsetRange(233L, 100, 200L)), 0).toRecord(AutoMQVersion.V0).message());
image = delta.apply();

Assertions.assertEquals(300L, image.streamEndOffsets().get(233L));
}

private void testToImageAndBack(S3StreamsMetadataImage image) {
RecordListWriter writer = new RecordListWriter();
ImageWriterOptions options = new ImageWriterOptions.Builder().build();
Expand Down

0 comments on commit f22830a

Please sign in to comment.