From 858c5610b614de3f05fbfd45923641e5a73bb97a Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Fri, 10 Jan 2025 10:21:37 +0800 Subject: [PATCH] fix(s3stream): fix compaction block on upload exception (#2263) Signed-off-by: Shichao Nie --- .../stream/s3/compact/CompactionManager.java | 46 +++++++------ .../s3/compact/operator/DataBlockWriter.java | 8 ++- .../s3/compact/CompactionManagerTest.java | 69 +++++++++++++++++++ 3 files changed, 99 insertions(+), 24 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index b7e8610025..014385c27d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -32,6 +32,7 @@ import com.automq.stream.s3.objects.StreamObject; import com.automq.stream.s3.operator.ObjectStorage; import com.automq.stream.s3.streams.StreamManager; +import com.automq.stream.utils.FutureUtil; import com.automq.stream.utils.LogContext; import com.automq.stream.utils.ThreadUtils; import com.automq.stream.utils.Threads; @@ -750,28 +751,29 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List { - if (forceUploadException != null) { - logger.error("Error while force uploading stream set object", uploadException); - } - if (uploadException != null || forceUploadException != null) { - uploader.release().whenComplete((vvv, releaseException) -> { - if (releaseException != null) { - logger.error("Unexpected exception while release uploader"); - } - for (CompactedObject compactedObject : compactionPlan.compactedObjects()) { - compactedObject.streamDataBlocks().forEach(StreamDataBlock::release); - } - if (uploadException != null) { - compactionCf.completeExceptionally(new CompletionException("Uploading failed", uploadException)); - } else { - compactionCf.completeExceptionally(new CompletionException("Force uploading sso failed", forceUploadException)); - } - }); - } else { - compactionCf.complete(null); - } - }); + FutureUtil.exec(uploader::forceUploadStreamSetObject, logger, "force upload sso") + .whenComplete((vv, forceUploadException) -> { + if (forceUploadException != null) { + logger.error("Error while force uploading stream set object", uploadException); + } + if (uploadException != null || forceUploadException != null) { + FutureUtil.exec(uploader::release, logger, "release uploader").whenComplete((vvv, releaseException) -> { + if (releaseException != null) { + logger.error("Unexpected exception while release uploader"); + } + for (CompactedObject compactedObject : compactionPlan.compactedObjects()) { + compactedObject.streamDataBlocks().forEach(StreamDataBlock::release); + } + if (uploadException != null) { + compactionCf.completeExceptionally(new CompletionException("Uploading failed", uploadException)); + } else { + compactionCf.completeExceptionally(new CompletionException("Force uploading sso failed", forceUploadException)); + } + }); + } else { + compactionCf.complete(null); + } + }); }); } try { diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java index f3321dfbeb..e92f8f7815 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java @@ -85,10 +85,14 @@ public CompletableFuture forceUpload() { private void uploadWaitingList() { CompositeByteBuf buf = groupWaitingBlocks(); List blocks = new LinkedList<>(waitingUploadBlocks); - writer.write(buf).thenAccept(v -> { + writer.write(buf).whenComplete((v, ex) -> { for (StreamDataBlock block : blocks) { waitingUploadBlockCfs.computeIfPresent(block, (k, cf) -> { - cf.complete(null); + if (ex != null) { + cf.completeExceptionally(ex); + } else { + cf.complete(null); + } return null; }); } diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java index c6f586379d..c542cf298e 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java @@ -60,8 +60,13 @@ import io.netty.buffer.ByteBuf; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.http.HttpStatusCode; import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -618,6 +623,70 @@ public void testCompactNoneExistObjects2() { } } + @Test + public void testCompactWithUploadException() { + when(config.streamSetObjectCompactionStreamSplitSize()).thenReturn(100 * 1024 * 1024L); + when(config.streamSetObjectCompactionCacheSize()).thenReturn(1024 * 1024 * 1024L); + when(config.objectPartSize()).thenReturn(100 * 1024 * 1024); + Map> streamDataBlockMap = getStreamDataBlockMapLarge(); + S3ObjectMetadata objectMetadata0 = new S3ObjectMetadata(OBJECT_0, 0, S3ObjectType.STREAM_SET); + S3ObjectMetadata objectMetadata1 = new S3ObjectMetadata(OBJECT_1, 0, S3ObjectType.STREAM_SET); + S3ObjectMetadata objectMetadata2 = new S3ObjectMetadata(OBJECT_2, 0, S3ObjectType.STREAM_SET); + List s3ObjectMetadata = List.of(objectMetadata0, objectMetadata1, objectMetadata2); + this.compactionAnalyzer = new CompactionAnalyzer(config.streamSetObjectCompactionCacheSize(), config.streamSetObjectCompactionStreamSplitSize(), + config.maxStreamNumPerStreamSetObject(), config.maxStreamObjectNumPerCommit()); + List compactionPlans = this.compactionAnalyzer.analyze(streamDataBlockMap, new HashSet<>()); + CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest(); + + S3AsyncClient s3AsyncClient = Mockito.mock(S3AsyncClient.class); + doAnswer(invocation -> CompletableFuture.failedFuture(S3Exception.builder().statusCode(HttpStatusCode.NOT_FOUND).build())).when(s3AsyncClient).putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class)); + doAnswer(invocation -> CompletableFuture.completedFuture(CreateMultipartUploadResponse.builder().uploadId("123").build())).when(s3AsyncClient).createMultipartUpload(any(CreateMultipartUploadRequest.class)); + doAnswer(invocation -> CompletableFuture.failedFuture(S3Exception.builder().statusCode(HttpStatusCode.NOT_FOUND).build())).when(s3AsyncClient).uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class)); + + AwsObjectStorage objectStorage = Mockito.spy(new AwsObjectStorage(s3AsyncClient, "")); + doAnswer(invocation -> CompletableFuture.completedFuture(TestUtils.randomPooled(65 * 1024 * 1024))).when(objectStorage).rangeRead(any(), eq(objectMetadata0.key()), anyLong(), anyLong()); + doAnswer(invocation -> CompletableFuture.completedFuture(TestUtils.randomPooled(80 * 1024 * 1024))).when(objectStorage).rangeRead(any(), eq(objectMetadata1.key()), anyLong(), anyLong()); + doAnswer(invocation -> CompletableFuture.completedFuture(TestUtils.randomPooled(50 * 1024 * 1024))).when(objectStorage).rangeRead(any(), eq(objectMetadata2.key()), anyLong(), anyLong()); + + CompactionManager compactionManager = new CompactionManager(config, objectManager, streamManager, objectStorage); + Assertions.assertThrowsExactly(CompletionException.class, + () -> compactionManager.executeCompactionPlans(request, compactionPlans, s3ObjectMetadata)); + for (CompactionPlan plan : compactionPlans) { + plan.streamDataBlocksMap().forEach((streamId, blocks) -> blocks.forEach(block -> { + if (block.getObjectId() != OBJECT_1) { + block.getDataCf().thenAccept(data -> { + Assertions.assertEquals(0, data.refCnt()); + }).join(); + } + })); + } + } + + private static Map> getStreamDataBlockMapLarge() { + StreamDataBlock block1 = new StreamDataBlock(OBJECT_0, new DataBlockIndex(0, 0, 15, 15, 0, 15 * 1024 * 1024)); + StreamDataBlock block2 = new StreamDataBlock(OBJECT_0, new DataBlockIndex(1, 0, 20, 20, 15, 50 * 1024 * 1024)); + + StreamDataBlock block3 = new StreamDataBlock(OBJECT_1, new DataBlockIndex(0, 15, 12, 12, 0, 20 * 1024 * 1024)); + StreamDataBlock block4 = new StreamDataBlock(OBJECT_1, new DataBlockIndex(1, 20, 25, 25, 20, 60 * 1024 * 1024)); + + StreamDataBlock block5 = new StreamDataBlock(OBJECT_2, new DataBlockIndex(0, 27, 13, 20, 0, 20 * 1024 * 1024)); + StreamDataBlock block6 = new StreamDataBlock(OBJECT_2, new DataBlockIndex(3, 0, 30, 30, 20, 30 * 1024 * 1024)); + return Map.of( + OBJECT_0, List.of( + block1, + block2 + ), + OBJECT_1, List.of( + block3, + block4 + ), + OBJECT_2, List.of( + block5, + block6 + ) + ); + } + @Test public void testCompactWithLimit() { when(config.streamSetObjectCompactionStreamSplitSize()).thenReturn(70L);