diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegmentManager.java b/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegmentManager.java index f215b050b8..84a1bd3a4b 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegmentManager.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegmentManager.java @@ -16,11 +16,15 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,6 +142,10 @@ public ElasticLogMeta logMeta() { } class EventListener implements ElasticLogSegmentEventListener { + public static final long NO_OP_OFFSET = -1L; + private final Queue pendingDeleteSegmentBaseOffset = new ConcurrentLinkedQueue<>(); + private volatile CompletableFuture pendingPersistentMetaCf = null; + @Override public void onEvent(long segmentBaseOffset, ElasticLogSegmentEvent event) { switch (event) { @@ -150,7 +158,7 @@ public void onEvent(long segmentBaseOffset, ElasticLogSegmentEvent event) { LOGGER.debug("{} meta stream is closed, skip persisting log meta", logIdent); } } else { - asyncPersistLogMeta(); + submitOrDrainPendingPersistentMetaQueue(segmentBaseOffset); } } break; @@ -164,6 +172,55 @@ public void onEvent(long segmentBaseOffset, ElasticLogSegmentEvent event) { } } } + + @VisibleForTesting + Queue getPendingDeleteSegmentQueue() { + return pendingDeleteSegmentBaseOffset; + } + + @VisibleForTesting + synchronized CompletableFuture getPendingPersistentMetaCf() { + return pendingPersistentMetaCf; + } + + private void submitOrDrainPendingPersistentMetaQueue(long segmentBaseOffset) { + if (segmentBaseOffset != NO_OP_OFFSET) { + pendingDeleteSegmentBaseOffset.add(segmentBaseOffset); + } + + synchronized (this) { + if (pendingPersistentMetaCf != null && !pendingPersistentMetaCf.isDone()) { + return; + } + + long maxOffset = NO_OP_OFFSET; + + while (!pendingDeleteSegmentBaseOffset.isEmpty()) { + long baseOffset = pendingDeleteSegmentBaseOffset.poll(); + maxOffset = Math.max(maxOffset, baseOffset); + } + + if (maxOffset != NO_OP_OFFSET) { + if (metaStream.isFenced()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("{} meta stream is closed, skip persisting log meta", logIdent); + } + + return; + } + + long finalMaxOffset = maxOffset; + pendingPersistentMetaCf = asyncPersistLogMeta(); + pendingPersistentMetaCf.whenCompleteAsync((res, e) -> { + if (e != null) { + LOGGER.error("error when persisLogMeta maxOffset {}", finalMaxOffset, e); + } + + submitOrDrainPendingPersistentMetaQueue(-1); + }); + } + } + } } } diff --git a/core/src/test/java/kafka/log/streamaspect/ElasticLogSegmentManagerTest.java b/core/src/test/java/kafka/log/streamaspect/ElasticLogSegmentManagerTest.java new file mode 100644 index 0000000000..461f53f6f8 --- /dev/null +++ b/core/src/test/java/kafka/log/streamaspect/ElasticLogSegmentManagerTest.java @@ -0,0 +1,117 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.log.streamaspect; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@Tag("S3Unit") +public class ElasticLogSegmentManagerTest { + @Test + public void testSegmentDelete() { + ElasticLogMeta logMeta = mock(ElasticLogMeta.class); + ElasticLogSegment logSegment = mock(ElasticLogSegment.class); + MetaStream metaStream = mock(MetaStream.class); + + when(metaStream.append(any(MetaKeyValue.class))).thenReturn(CompletableFuture.completedFuture(null)); + + ElasticLogStreamManager elasticLogStreamManager = mock(ElasticLogStreamManager.class); + + ElasticLogSegmentManager manager = spy(new ElasticLogSegmentManager(metaStream, elasticLogStreamManager, "testLargeScaleSegmentDelete")); + + when(manager.remove(anyLong())).thenReturn(logSegment); + + when(manager.asyncPersistLogMeta()).thenReturn(CompletableFuture.completedFuture(logMeta)); + + ElasticLogSegmentManager.EventListener listener = manager.new EventListener(); + listener.onEvent(1, ElasticLogSegmentEvent.SEGMENT_DELETE); + + verify(manager, atLeastOnce()).asyncPersistLogMeta(); + verify(manager, atMost(2)).asyncPersistLogMeta(); + } + + @Test + public void testLargeScaleSegmentDelete() throws InterruptedException { + ElasticLogMeta logMeta = mock(ElasticLogMeta.class); + ElasticLogSegment logSegment = mock(ElasticLogSegment.class); + MetaStream metaStream = mock(MetaStream.class); + + when(metaStream.append(any(MetaKeyValue.class))).thenReturn(CompletableFuture.completedFuture(null)); + + ElasticLogStreamManager elasticLogStreamManager = mock(ElasticLogStreamManager.class); + + ElasticLogSegmentManager manager = spy(new ElasticLogSegmentManager(metaStream, elasticLogStreamManager, "testLargeScaleSegmentDelete")); + + Set removedSegmentId = new HashSet<>(); + + when(manager.remove(anyLong())).thenAnswer(invocation -> { + long id = invocation.getArgument(0); + removedSegmentId.add(id); + return logSegment; + }); + + CountDownLatch latch = new CountDownLatch(2); + + when(manager.asyncPersistLogMeta()) + .thenAnswer(invocation -> { + CompletableFuture cf = new CompletableFuture<>() + .completeOnTimeout(logMeta, 100, TimeUnit.MILLISECONDS); + + cf.whenComplete((res, e) -> { + latch.countDown(); + }); + + return cf; + }); + + ElasticLogSegmentManager.EventListener listener = spy(manager.new EventListener()); + + for (long i = 0L; i < 10L; i++) { + listener.onEvent(i, ElasticLogSegmentEvent.SEGMENT_DELETE); + } + + // expect the first and the tail should call the persist method. + verify(manager, times(2)).asyncPersistLogMeta(); + + // check all segmentId removed. + for (long i = 0; i < 10L; i++) { + assertTrue(removedSegmentId.contains(i)); + } + + latch.await(); + + // the request can be finished. + CompletableFuture pendingPersistentMetaCf = listener.getPendingPersistentMetaCf(); + pendingPersistentMetaCf.join(); + + // all the queue can be removed. + assertTrue(listener.getPendingDeleteSegmentQueue().isEmpty()); + + } +}