Skip to content

Commit

Permalink
perf(core): batch persistent meta when delete large scale segment (#1670
Browse files Browse the repository at this point in the history
)

* perf(core): reduce persistent meta when delete large scale segment

* perf(core): batch persistent meta when delete large scale segment

* perf(core): batch persistent meta when delete large scale segment
  • Loading branch information
lifepuzzlefun authored Jul 31, 2024
1 parent 4596ba6 commit 393b0d3
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -138,6 +142,10 @@ public ElasticLogMeta logMeta() {
}

class EventListener implements ElasticLogSegmentEventListener {
public static final long NO_OP_OFFSET = -1L;
private final Queue<Long> pendingDeleteSegmentBaseOffset = new ConcurrentLinkedQueue<>();
private volatile CompletableFuture<ElasticLogMeta> pendingPersistentMetaCf = null;

@Override
public void onEvent(long segmentBaseOffset, ElasticLogSegmentEvent event) {
switch (event) {
Expand All @@ -150,7 +158,7 @@ public void onEvent(long segmentBaseOffset, ElasticLogSegmentEvent event) {
LOGGER.debug("{} meta stream is closed, skip persisting log meta", logIdent);
}
} else {
asyncPersistLogMeta();
submitOrDrainPendingPersistentMetaQueue(segmentBaseOffset);
}
}
break;
Expand All @@ -164,6 +172,55 @@ public void onEvent(long segmentBaseOffset, ElasticLogSegmentEvent event) {
}
}
}

@VisibleForTesting
Queue<Long> getPendingDeleteSegmentQueue() {
return pendingDeleteSegmentBaseOffset;
}

@VisibleForTesting
synchronized CompletableFuture<ElasticLogMeta> getPendingPersistentMetaCf() {
return pendingPersistentMetaCf;
}

private void submitOrDrainPendingPersistentMetaQueue(long segmentBaseOffset) {
if (segmentBaseOffset != NO_OP_OFFSET) {
pendingDeleteSegmentBaseOffset.add(segmentBaseOffset);
}

synchronized (this) {
if (pendingPersistentMetaCf != null && !pendingPersistentMetaCf.isDone()) {
return;
}

long maxOffset = NO_OP_OFFSET;

while (!pendingDeleteSegmentBaseOffset.isEmpty()) {
long baseOffset = pendingDeleteSegmentBaseOffset.poll();
maxOffset = Math.max(maxOffset, baseOffset);
}

if (maxOffset != NO_OP_OFFSET) {
if (metaStream.isFenced()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} meta stream is closed, skip persisting log meta", logIdent);
}

return;
}

long finalMaxOffset = maxOffset;
pendingPersistentMetaCf = asyncPersistLogMeta();
pendingPersistentMetaCf.whenCompleteAsync((res, e) -> {
if (e != null) {
LOGGER.error("error when persisLogMeta maxOffset {}", finalMaxOffset, e);
}

submitOrDrainPendingPersistentMetaQueue(-1);
});
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.log.streamaspect;

import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@Tag("S3Unit")
public class ElasticLogSegmentManagerTest {
@Test
public void testSegmentDelete() {
ElasticLogMeta logMeta = mock(ElasticLogMeta.class);
ElasticLogSegment logSegment = mock(ElasticLogSegment.class);
MetaStream metaStream = mock(MetaStream.class);

when(metaStream.append(any(MetaKeyValue.class))).thenReturn(CompletableFuture.completedFuture(null));

ElasticLogStreamManager elasticLogStreamManager = mock(ElasticLogStreamManager.class);

ElasticLogSegmentManager manager = spy(new ElasticLogSegmentManager(metaStream, elasticLogStreamManager, "testLargeScaleSegmentDelete"));

when(manager.remove(anyLong())).thenReturn(logSegment);

when(manager.asyncPersistLogMeta()).thenReturn(CompletableFuture.completedFuture(logMeta));

ElasticLogSegmentManager.EventListener listener = manager.new EventListener();
listener.onEvent(1, ElasticLogSegmentEvent.SEGMENT_DELETE);

verify(manager, atLeastOnce()).asyncPersistLogMeta();
verify(manager, atMost(2)).asyncPersistLogMeta();
}

@Test
public void testLargeScaleSegmentDelete() throws InterruptedException {
ElasticLogMeta logMeta = mock(ElasticLogMeta.class);
ElasticLogSegment logSegment = mock(ElasticLogSegment.class);
MetaStream metaStream = mock(MetaStream.class);

when(metaStream.append(any(MetaKeyValue.class))).thenReturn(CompletableFuture.completedFuture(null));

ElasticLogStreamManager elasticLogStreamManager = mock(ElasticLogStreamManager.class);

ElasticLogSegmentManager manager = spy(new ElasticLogSegmentManager(metaStream, elasticLogStreamManager, "testLargeScaleSegmentDelete"));

Set<Long> removedSegmentId = new HashSet<>();

when(manager.remove(anyLong())).thenAnswer(invocation -> {
long id = invocation.getArgument(0);
removedSegmentId.add(id);
return logSegment;
});

CountDownLatch latch = new CountDownLatch(2);

when(manager.asyncPersistLogMeta())
.thenAnswer(invocation -> {
CompletableFuture<Object> cf = new CompletableFuture<>()
.completeOnTimeout(logMeta, 100, TimeUnit.MILLISECONDS);

cf.whenComplete((res, e) -> {
latch.countDown();
});

return cf;
});

ElasticLogSegmentManager.EventListener listener = spy(manager.new EventListener());

for (long i = 0L; i < 10L; i++) {
listener.onEvent(i, ElasticLogSegmentEvent.SEGMENT_DELETE);
}

// expect the first and the tail should call the persist method.
verify(manager, times(2)).asyncPersistLogMeta();

// check all segmentId removed.
for (long i = 0; i < 10L; i++) {
assertTrue(removedSegmentId.contains(i));
}

latch.await();

// the request can be finished.
CompletableFuture<ElasticLogMeta> pendingPersistentMetaCf = listener.getPendingPersistentMetaCf();
pendingPersistentMetaCf.join();

// all the queue can be removed.
assertTrue(listener.getPendingDeleteSegmentQueue().isEmpty());

}
}

0 comments on commit 393b0d3

Please sign in to comment.