diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java b/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java index a880c5f97c..b3c3e5d826 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java +++ b/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java @@ -22,6 +22,7 @@ import java.util.Comparator; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; public class StreamDataBlock { public static final Comparator STREAM_OFFSET_COMPARATOR = Comparator.comparingLong(StreamDataBlock::getStartOffset); @@ -35,6 +36,7 @@ public class StreamDataBlock { private final ObjectReader.DataBlockIndex dataBlockIndex; private final CompletableFuture dataCf = new CompletableFuture<>(); + private final AtomicInteger refCount = new AtomicInteger(1); public StreamDataBlock(long streamId, long startOffset, long endOffset, long objectId, ObjectReader.DataBlockIndex dataBlockIndex) { this.streamId = streamId; @@ -101,23 +103,28 @@ public CompletableFuture getDataCf() { return this.dataCf; } - public void free() { - this.dataCf.thenAccept(buf -> { - if (buf != null) { - buf.release(); - } - }); + public void releaseRef() { + refCount.decrementAndGet(); + } + + public void release() { + if (refCount.decrementAndGet() == 0) { + dataCf.thenAccept(buf -> { + if (buf != null) { + buf.release(); + } + }); + } } @Override public String toString() { return "StreamDataBlock{" + - "streamId=" + streamId + + "objectId=" + objectId + + ", streamId=" + streamId + ", startOffset=" + startOffset + ", endOffset=" + endOffset + - ", objectId=" + objectId + - ", blockPosition=" + getBlockEndPosition() + - ", blockSize=" + getBlockSize() + + ", dataBlockIndex=" + dataBlockIndex + '}'; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index 453b50a543..82e3bed260 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -50,6 +50,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -129,8 +130,7 @@ private void scheduleNextCompaction(long delayMillis) { .exceptionally(ex -> { logger.error("Compaction failed, cost {} ms, ", timerUtil.elapsedAs(TimeUnit.MILLISECONDS), ex); return null; - }) - .join(); + }).join(); } catch (Exception ex) { logger.error("Error while compacting objects ", ex); } @@ -154,7 +154,7 @@ private CompletableFuture compact() { }, compactThreadPool); } - private void compact(List streamMetadataList, List objectMetadataList) { + private void compact(List streamMetadataList, List objectMetadataList) throws CompletionException { logger.info("Get {} stream set objects from metadata", objectMetadataList.size()); if (objectMetadataList.isEmpty()) { return; @@ -187,7 +187,7 @@ private void compact(List streamMetadataList, List streamMetadataList, List objectsToForceSplit) { + void forceSplitObjects(List streamMetadataList, List objectsToForceSplit) { logger.info("Force split {} stream set objects", objectsToForceSplit.size()); TimerUtil timerUtil = new TimerUtil(); for (int i = 0; i < objectsToForceSplit.size(); i++) { @@ -195,7 +195,13 @@ private void forceSplitObjects(List streamMetadataList, List streamMetadataList, List streamMetadataList, List objectsToCompact) { + private void compactObjects(List streamMetadataList, List objectsToCompact) + throws CompletionException { if (objectsToCompact.isEmpty()) { return; } @@ -311,7 +318,7 @@ private Collection> splitStreamSetObject(List> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, - Collections.singletonList(objectMetadata), s3Operator); + Collections.singletonList(objectMetadata), s3Operator, logger); if (streamDataBlocksMap.isEmpty()) { // object not exist, metadata is out of date logger.warn("Object {} not exist, metadata is out of date", objectMetadata.objectId()); @@ -324,6 +331,10 @@ private Collection> splitStreamSetObject(List(); } + return groupAndSplitStreamDataBlocks(objectMetadata, streamDataBlocks); + } + + Collection> groupAndSplitStreamDataBlocks(S3ObjectMetadata objectMetadata, List streamDataBlocks) { List, CompletableFuture>> groupedDataBlocks = new ArrayList<>(); List> groupedStreamDataBlocks = CompactionUtils.groupStreamDataBlocks(streamDataBlocks); for (List group : groupedStreamDataBlocks) { @@ -363,11 +374,15 @@ private Collection> splitStreamSetObject(List, CompletableFuture> pair : batchGroup) { List blocks = pair.getLeft(); DataBlockWriter writer = new DataBlockWriter(objectId, s3Operator, config.objectPartSize()); - for (StreamDataBlock block : blocks) { - writer.write(block); - } + CompletableFuture cf = CompactionUtils.chainWriteDataBlock(writer, blocks, forceSplitThreadPool); long finalObjectId = objectId; - cfs.add(writer.close().thenAccept(v -> { + cfs.add(cf.thenAccept(nil -> writer.close()).whenComplete((ret, ex) -> { + if (ex != null) { + logger.error("write to stream object {} failed", finalObjectId, ex); + writer.release(); + blocks.forEach(StreamDataBlock::release); + return; + } StreamObject streamObject = new StreamObject(); streamObject.setObjectId(finalObjectId); streamObject.setStreamId(blocks.get(0).getStreamId()); @@ -381,19 +396,19 @@ private Collection> splitStreamSetObject(List { - //TODO: clean up buffer - logger.error("Force split object failed", ex); + logger.error("Force split object {} failed", objectMetadata.objectId(), ex); for (Pair, CompletableFuture> pair : groupedDataBlocks) { pair.getValue().completeExceptionally(ex); } - return null; + throw new IllegalStateException(String.format("Force split object %d failed", objectMetadata.objectId()), ex); }).join(); } return groupedDataBlocks.stream().map(Pair::getValue).collect(Collectors.toList()); } - CommitStreamSetObjectRequest buildSplitRequest(List streamMetadataList, S3ObjectMetadata objectToSplit) { + CommitStreamSetObjectRequest buildSplitRequest(List streamMetadataList, S3ObjectMetadata objectToSplit) + throws CompletionException { Collection> cfs = splitStreamSetObject(streamMetadataList, objectToSplit); if (cfs.isEmpty()) { logger.error("Force split object {} failed, no stream object generated", objectToSplit.objectId()); @@ -421,13 +436,15 @@ CommitStreamSetObjectRequest buildSplitRequest(List streamMetada return request; } - CommitStreamSetObjectRequest buildCompactRequest(List streamMetadataList, List objectsToCompact) { + CommitStreamSetObjectRequest buildCompactRequest(List streamMetadataList, List objectsToCompact) + throws CompletionException { CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest(); Set compactedObjectIds = new HashSet<>(); logger.info("{} stream set objects as compact candidates, total compaction size: {}", objectsToCompact.size(), objectsToCompact.stream().mapToLong(S3ObjectMetadata::objectSize).sum()); - Map> streamDataBlockMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, objectsToCompact, s3Operator); + Map> streamDataBlockMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, + objectsToCompact, s3Operator, logger); long now = System.currentTimeMillis(); Set excludedObjectIds = new HashSet<>(); List compactionPlans = this.compactionAnalyzer.analyze(streamDataBlockMap, excludedObjectIds); @@ -525,7 +542,7 @@ Map> convertS3Objects(List str } void executeCompactionPlans(CommitStreamSetObjectRequest request, List compactionPlans, List s3ObjectMetadata) - throws IllegalArgumentException { + throws CompletionException { if (compactionPlans.isEmpty()) { return; } @@ -545,30 +562,33 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List> streamObjectCFList = new ArrayList<>(); - CompletableFuture streamSetObjectCF = null; + List> streamObjectCfList = new ArrayList<>(); + CompletableFuture streamSetObjectChainWriteCf = CompletableFuture.completedFuture(null); for (CompactedObject compactedObject : compactionPlan.compactedObjects()) { if (compactedObject.type() == CompactionType.COMPACT) { sortedStreamDataBlocks.addAll(compactedObject.streamDataBlocks()); - streamSetObjectCF = uploader.chainWriteStreamSetObject(streamSetObjectCF, compactedObject); + streamSetObjectChainWriteCf = uploader.chainWriteStreamSetObject(streamSetObjectChainWriteCf, compactedObject); } else { - streamObjectCFList.add(uploader.writeStreamObject(compactedObject)); + streamObjectCfList.add(uploader.writeStreamObject(compactedObject)); } } + List> cfList = new ArrayList<>(); + cfList.add(streamSetObjectChainWriteCf); + cfList.addAll(streamObjectCfList); // wait for all stream objects and stream set object part to be uploaded - try { - if (streamSetObjectCF != null) { - // wait for all writes done - streamSetObjectCF.thenAccept(v -> uploader.forceUploadStreamSetObject()).join(); - } - streamObjectCFList.stream().map(CompletableFuture::join).forEach(request::addStreamObject); - } catch (Exception ex) { - //TODO: clean up buffer - logger.error("Error while uploading compaction objects", ex); - uploader.reset(); - throw new IllegalArgumentException("Error while uploading compaction objects", ex); - } + CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0])) + .thenAccept(v -> uploader.forceUploadStreamSetObject()) + .exceptionally(ex -> { + logger.error("Error while uploading compaction objects", ex); + uploader.release().thenAccept(v -> { + for (CompactedObject compactedObject : compactionPlan.compactedObjects()) { + compactedObject.streamDataBlocks().forEach(StreamDataBlock::release); + } + }).join(); + throw new IllegalStateException("Error while uploading compaction objects", ex); + }).join(); + streamObjectCfList.stream().map(CompletableFuture::join).forEach(request::addStreamObject); } List objectStreamRanges = CompactionUtils.buildObjectStreamRange(sortedStreamDataBlocks); objectStreamRanges.forEach(request::addStreamRange); @@ -576,6 +596,5 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List streamSetObjectIdCf = null; private DataBlockWriter streamSetObjectWriter = null; + private volatile boolean isAborted = false; public CompactionUploader(ObjectManager objectManager, S3Operator s3Operator, Config config) { this.objectManager = objectManager; @@ -63,70 +64,59 @@ public CompletableFuture chainWriteStreamSetObject(CompletableFuture if (compactedObject.type() != CompactionType.COMPACT) { return CompletableFuture.failedFuture(new IllegalArgumentException("wrong compacted object type, expected COMPACT")); } + if (compactedObject.streamDataBlocks().isEmpty()) { + return CompletableFuture.completedFuture(null); + } if (prev == null) { - return CompletableFuture.allOf(compactedObject.streamDataBlocks() - .stream() - .map(StreamDataBlock::getDataCf) - .toArray(CompletableFuture[]::new)) - .thenComposeAsync(v -> prepareObjectAndWrite(compactedObject), streamSetObjectUploadPool); + return prepareObjectAndWrite(compactedObject); } - return prev.thenComposeAsync(v -> - CompletableFuture.allOf(compactedObject.streamDataBlocks() - .stream() - .map(StreamDataBlock::getDataCf) - .toArray(CompletableFuture[]::new)) - .thenComposeAsync(vv -> prepareObjectAndWrite(compactedObject), streamSetObjectUploadPool)); + return prev.thenCompose(v -> prepareObjectAndWrite(compactedObject)); } private CompletableFuture prepareObjectAndWrite(CompactedObject compactedObject) { if (streamSetObjectIdCf == null) { streamSetObjectIdCf = this.objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(CompactionConstants.S3_OBJECT_TTL_MINUTES)); } - return streamSetObjectIdCf.thenAcceptAsync(objectId -> { + return streamSetObjectIdCf.thenComposeAsync(objectId -> { if (streamSetObjectWriter == null) { streamSetObjectWriter = new DataBlockWriter(objectId, s3Operator, config.objectPartSize()); } - for (StreamDataBlock streamDataBlock : compactedObject.streamDataBlocks()) { - streamSetObjectWriter.write(streamDataBlock); - } - }, streamObjectUploadPool).exceptionally(ex -> { - LOGGER.error("prepare and write stream set object failed", ex); - return null; - }); + return CompactionUtils.chainWriteDataBlock(streamSetObjectWriter, compactedObject.streamDataBlocks(), streamSetObjectUploadPool); + }, streamSetObjectUploadPool); } public CompletableFuture writeStreamObject(CompactedObject compactedObject) { if (compactedObject.type() != CompactionType.SPLIT) { return CompletableFuture.failedFuture(new IllegalArgumentException("wrong compacted object type, expected SPLIT")); } - return CompletableFuture.allOf(compactedObject.streamDataBlocks() - .stream() - .map(StreamDataBlock::getDataCf) - .toArray(CompletableFuture[]::new)) - .thenComposeAsync(v -> objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(CompactionConstants.S3_OBJECT_TTL_MINUTES)) - .thenComposeAsync(objectId -> { - DataBlockWriter dataBlockWriter = new DataBlockWriter(objectId, s3Operator, config.objectPartSize()); - for (StreamDataBlock streamDataBlock : compactedObject.streamDataBlocks()) { - dataBlockWriter.write(streamDataBlock); - } - long streamId = compactedObject.streamDataBlocks().get(0).getStreamId(); - long startOffset = compactedObject.streamDataBlocks().get(0).getStartOffset(); - long endOffset = compactedObject.streamDataBlocks().get(compactedObject.streamDataBlocks().size() - 1).getEndOffset(); - StreamObject streamObject = new StreamObject(); - streamObject.setObjectId(objectId); - streamObject.setStreamId(streamId); - streamObject.setStartOffset(startOffset); - streamObject.setEndOffset(endOffset); - return dataBlockWriter.close().thenApply(nil -> { - streamObject.setObjectSize(dataBlockWriter.size()); - return streamObject; - }); - }, streamObjectUploadPool), - streamObjectUploadPool) - .exceptionally(ex -> { - LOGGER.error("stream object write failed", ex); - return null; - }); + if (compactedObject.streamDataBlocks().isEmpty()) { + return CompletableFuture.completedFuture(null); + } + return objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(CompactionConstants.S3_OBJECT_TTL_MINUTES)) + .thenComposeAsync(objectId -> { + if (isAborted) { + // release data that has not been uploaded + compactedObject.streamDataBlocks().forEach(StreamDataBlock::release); + return CompletableFuture.completedFuture(null); + } + DataBlockWriter dataBlockWriter = new DataBlockWriter(objectId, s3Operator, config.objectPartSize()); + CompletableFuture cf = CompactionUtils.chainWriteDataBlock(dataBlockWriter, compactedObject.streamDataBlocks(), streamObjectUploadPool); + return cf.thenCompose(nil -> dataBlockWriter.close()).thenApply(nil -> { + StreamObject streamObject = new StreamObject(); + streamObject.setObjectId(objectId); + streamObject.setStreamId(compactedObject.streamDataBlocks().get(0).getStreamId()); + streamObject.setStartOffset(compactedObject.streamDataBlocks().get(0).getStartOffset()); + streamObject.setEndOffset(compactedObject.streamDataBlocks().get(compactedObject.streamDataBlocks().size() - 1).getEndOffset()); + streamObject.setObjectSize(dataBlockWriter.size()); + return streamObject; + }).whenComplete((ret, ex) -> { + if (ex != null) { + LOGGER.error("write to stream object {} failed", objectId, ex); + dataBlockWriter.release(); + compactedObject.streamDataBlocks().forEach(StreamDataBlock::release); + } + }); + }, streamObjectUploadPool); } public CompletableFuture forceUploadStreamSetObject() { @@ -141,12 +131,24 @@ public long complete() { return 0L; } streamSetObjectWriter.close().join(); - return streamSetObjectWriter.size(); + long writeSize = streamSetObjectWriter.size(); + reset(); + return writeSize; + } + + public CompletableFuture release() { + isAborted = true; + CompletableFuture cf = CompletableFuture.completedFuture(null); + if (streamSetObjectWriter != null) { + cf = streamSetObjectWriter.release(); + } + return cf.thenAccept(nil -> reset()); } - public void reset() { + private void reset() { streamSetObjectIdCf = null; streamSetObjectWriter = null; + isAborted = false; } public long getStreamSetObjectId() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUtils.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUtils.java index 19228d2fdb..d76e7a581a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUtils.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUtils.java @@ -20,10 +20,12 @@ import com.automq.stream.s3.compact.objects.CompactedObjectBuilder; import com.automq.stream.s3.StreamDataBlock; import com.automq.stream.s3.compact.operator.DataBlockReader; +import com.automq.stream.s3.compact.operator.DataBlockWriter; import com.automq.stream.s3.metadata.StreamMetadata; import com.automq.stream.s3.objects.ObjectStreamRange; import com.automq.stream.s3.operator.S3Operator; import com.automq.stream.s3.metadata.S3ObjectMetadata; +import org.slf4j.Logger; import java.util.AbstractMap; import java.util.ArrayList; @@ -32,6 +34,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; public class CompactionUtils { @@ -60,7 +63,15 @@ public static List buildObjectStreamRange(List> blockWaitObjectIndices(List streamMetadataList, - List objectMetadataList, S3Operator s3Operator) { + List objectMetadataList, + S3Operator s3Operator) { + return blockWaitObjectIndices(streamMetadataList, objectMetadataList, s3Operator, null); + } + + public static Map> blockWaitObjectIndices(List streamMetadataList, + List objectMetadataList, + S3Operator s3Operator, + Logger logger) { Map streamMetadataMap = streamMetadataList.stream() .collect(Collectors.toMap(StreamMetadata::getStreamId, s -> s)); Map>> objectStreamRangePositionFutures = new HashMap<>(); @@ -89,7 +100,9 @@ public static Map> blockWaitObjectIndices(List(f.getKey(), validStreamDataBlocks); } catch (Exception ex) { // continue compaction without invalid object - // TODO: log warn + if (logger != null) { + logger.warn("failed to get data block index for object {}", f.getKey(), ex); + } return null; } }) @@ -97,6 +110,9 @@ public static Map> blockWaitObjectIndices(List> groupStreamDataBlocks(List streamDataBlocks) { List> groupedStreamDataBlocks = new ArrayList<>(); List currGroup = new ArrayList<>(); @@ -129,4 +145,16 @@ public static int getTotalObjectStats(CompactedObjectBuilder o, Map chainWriteDataBlock(DataBlockWriter dataBlockWriter, List streamDataBlocks, ExecutorService executorService) { + CompletableFuture cf = null; + for (StreamDataBlock streamDataBlock : streamDataBlocks) { + if (cf == null) { + cf = streamDataBlock.getDataCf().thenAcceptAsync(data -> dataBlockWriter.write(streamDataBlock), executorService); + } else { + cf = cf.thenCompose(nil -> streamDataBlock.getDataCf().thenAcceptAsync(data -> dataBlockWriter.write(streamDataBlock), executorService)); + } + } + return cf; + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java index fcdcfb8fe4..8f8f805669 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java @@ -32,6 +32,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import static com.automq.stream.s3.operator.Writer.MIN_PART_SIZE; + //TODO: refactor to reduce duplicate code with ObjectWriter public class DataBlockWriter { private final int partSizeThreshold; @@ -47,7 +49,7 @@ public class DataBlockWriter { public DataBlockWriter(long objectId, S3Operator s3Operator, int partSizeThreshold) { this.objectId = objectId; String objectKey = ObjectUtils.genKey(0, objectId); - this.partSizeThreshold = partSizeThreshold; + this.partSizeThreshold = Math.max(MIN_PART_SIZE, partSizeThreshold); waitingUploadBlocks = new LinkedList<>(); waitingUploadBlockCfs = new ConcurrentHashMap<>(); completedBlocks = new LinkedList<>(); @@ -67,25 +69,6 @@ public void write(StreamDataBlock dataBlock) { } } - /** - * Copy write a list of adjacent data blocks from same object. - * - * @param dataBlock list of adjacent data blocks from same object - */ - public void copyWrite(List dataBlock) { - if (dataBlock.isEmpty()) { - return; - } - StreamDataBlock first = dataBlock.get(0); - StreamDataBlock end = dataBlock.get(dataBlock.size() - 1); - // size of data block is always smaller than MAX_PART_SIZE, no need to split into multiple parts - String originObjectKey = ObjectUtils.genKey(0, first.getObjectId()); - writer.copyWrite(originObjectKey, - first.getBlockStartPosition(), end.getBlockStartPosition() + end.getBlockSize()); - completedBlocks.addAll(dataBlock); - nextDataBlockPosition += dataBlock.stream().mapToLong(StreamDataBlock::getBlockSize).sum(); - } - public CompletableFuture forceUpload() { uploadWaitingList(); writer.copyOnWrite(); @@ -93,35 +76,31 @@ public CompletableFuture forceUpload() { } private void uploadWaitingList() { - CompositeByteBuf partBuf = DirectByteBufAlloc.compositeByteBuffer(); - for (StreamDataBlock block : waitingUploadBlocks) { - partBuf.addComponent(true, block.getDataCf().join()); - completedBlocks.add(block); - nextDataBlockPosition += block.getBlockSize(); - } + CompositeByteBuf buf = groupWaitingBlocks(); List blocks = new LinkedList<>(waitingUploadBlocks); - writer.write(partBuf).thenAccept(v -> { + writer.write(buf).thenAccept(v -> { for (StreamDataBlock block : blocks) { - waitingUploadBlockCfs.get(block).complete(null); - waitingUploadBlockCfs.remove(block); + waitingUploadBlockCfs.computeIfPresent(block, (k, cf) -> { + cf.complete(null); + return null; + }); } }); if (writer.hasBatchingPart()) { // prevent blocking on part that's waiting for batch when force upload waiting list for (StreamDataBlock block : blocks) { - waitingUploadBlockCfs.remove(block); + waitingUploadBlockCfs.computeIfPresent(block, (k, cf) -> { + cf.complete(null); + return null; + }); } } waitingUploadBlocks.clear(); } public CompletableFuture close() { - CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer(); - for (StreamDataBlock block : waitingUploadBlocks) { - buf.addComponent(true, block.getDataCf().join()); - completedBlocks.add(block); - nextDataBlockPosition += block.getBlockSize(); - } + CompositeByteBuf buf = groupWaitingBlocks(); + List blocks = new LinkedList<>(waitingUploadBlocks); waitingUploadBlocks.clear(); indexBlock = new IndexBlock(); buf.addComponent(true, indexBlock.buffer()); @@ -129,7 +108,30 @@ public CompletableFuture close() { buf.addComponent(true, footer.buffer()); writer.write(buf.duplicate()); size = indexBlock.position() + indexBlock.size() + footer.size(); - return writer.close(); + return writer.close().thenAccept(nil -> { + for (StreamDataBlock block : blocks) { + waitingUploadBlockCfs.computeIfPresent(block, (k, cf) -> { + cf.complete(null); + return null; + }); + } + }); + } + + private CompositeByteBuf groupWaitingBlocks() { + CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer(); + for (StreamDataBlock block : waitingUploadBlocks) { + buf.addComponent(true, block.getDataCf().join()); + block.releaseRef(); + completedBlocks.add(block); + nextDataBlockPosition += block.getBlockSize(); + } + return buf; + } + + public CompletableFuture release() { + // release buffer that is batching for upload + return writer.release(); } public long objectId() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index 1f4e80e1f0..c30d1d4ff2 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -132,7 +132,7 @@ public static Builder builder() { } // used for test only. - DefaultS3Operator(S3AsyncClient s3Client, String bucket) { + public DefaultS3Operator(S3AsyncClient s3Client, String bucket) { this(s3Client, bucket, false); } @@ -588,7 +588,7 @@ private void checkAvailable() { } } - private static S3AsyncClient newS3Client(String endpoint, String region, boolean forcePathStyle, String accessKey, String secretKey) { + public S3AsyncClient newS3Client(String endpoint, String region, boolean forcePathStyle, String accessKey, String secretKey) { S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region)); if (StringUtils.isNotBlank(endpoint)) { builder.endpointOverride(URI.create(endpoint)); diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MemoryS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MemoryS3Operator.java index 0a42d1ef85..b65b802de4 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/MemoryS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MemoryS3Operator.java @@ -89,6 +89,11 @@ public void copyWrite(String sourcePath, long start, long end) { public CompletableFuture close() { return CompletableFuture.completedFuture(null); } + + @Override + public CompletableFuture release() { + return CompletableFuture.completedFuture(null); + } }; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java index 2183bdb5e6..45e0163870 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java @@ -152,6 +152,16 @@ public CompletableFuture close() { return closeCf; } + @Override + public CompletableFuture release() { + // wait for all ongoing uploading parts to finish and release pending part + return CompletableFuture.allOf(parts.toArray(new CompletableFuture[0])).whenComplete((nil, ex) -> { + if (objectPart != null) { + objectPart.release(); + } + }); + } + private List genCompleteParts() { return this.parts.stream().map(cf -> { try { @@ -224,6 +234,10 @@ public long size() { public CompletableFuture getFuture() { return partCf.thenApply(nil -> null); } + + public void release() { + partBuf.release(); + } } class CopyObjectPart { diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java index 89ca78d3d3..5c12fcaac1 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java @@ -97,6 +97,15 @@ public CompletableFuture close() { } } + @Override + public CompletableFuture release() { + if (multiPartWriter != null) { + return multiPartWriter.release(); + } else { + return objectWriter.release(); + } + } + private void newMultiPartWriter() { this.multiPartWriter = new MultiPartWriter(operator, path, minPartSize, throttleStrategy); if (objectWriter.data.readableBytes() > 0) { @@ -147,6 +156,12 @@ public CompletableFuture close() { return cf; } + @Override + public CompletableFuture release() { + data.release(); + return CompletableFuture.completedFuture(null); + } + public boolean isFull() { return data.readableBytes() > MAX_UPLOAD_SIZE; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/Writer.java b/s3stream/src/main/java/com/automq/stream/s3/operator/Writer.java index 70b09de2dc..e0615b75c8 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/Writer.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/Writer.java @@ -75,4 +75,8 @@ public interface Writer { */ CompletableFuture close(); + /** + * Release all resources held by this writer. + */ + CompletableFuture release(); } diff --git a/s3stream/src/test/java/com/automq/stream/s3/TestUtils.java b/s3stream/src/test/java/com/automq/stream/s3/TestUtils.java index a39dfceef8..dd795f92cb 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/TestUtils.java +++ b/s3stream/src/test/java/com/automq/stream/s3/TestUtils.java @@ -18,6 +18,7 @@ package com.automq.stream.s3; import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import java.util.Random; @@ -34,6 +35,14 @@ public static ByteBuf random(int size) { return Unpooled.wrappedBuffer(bytes).retain(); } + public static ByteBuf randomPooled(int size) { + byte[] bytes = new byte[size]; + new Random().nextBytes(bytes); + ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(size); + buf.writeBytes(bytes); + return buf; + } + public static String tempFilePath() { return System.getProperty("java.io.tmpdir") + "/kos-" + UUID.randomUUID(); } diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionAnalyzerTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionAnalyzerTest.java index dfa5785f03..def58a44a9 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionAnalyzerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionAnalyzerTest.java @@ -61,7 +61,7 @@ public void tearDown() { @Test public void testReadObjectIndices() { List streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join(); - Map> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, s3Operator); + Map> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, s3Operator, null); Map> expectedBlocksMap = Map.of( OBJECT_0, List.of( new StreamDataBlock(STREAM_0, 0, 15, 0, OBJECT_0, -1, -1, 1), diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java index 7343dea264..6cf9187061 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java @@ -18,7 +18,9 @@ package com.automq.stream.s3.compact; import com.automq.stream.s3.Config; +import com.automq.stream.s3.ObjectReader; import com.automq.stream.s3.StreamDataBlock; +import com.automq.stream.s3.TestUtils; import com.automq.stream.s3.compact.operator.DataBlockReader; import com.automq.stream.s3.metadata.StreamMetadata; import com.automq.stream.s3.metadata.StreamState; @@ -29,6 +31,7 @@ import com.automq.stream.s3.metadata.S3ObjectType; import com.automq.stream.s3.metadata.S3StreamConstant; import com.automq.stream.s3.metadata.StreamOffsetRange; +import com.automq.stream.s3.operator.DefaultS3Operator; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -36,6 +39,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.mockito.Mockito; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; import java.util.ArrayList; import java.util.Collection; @@ -46,10 +52,16 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.when; @Timeout(60) @@ -112,6 +124,20 @@ public void testForceSplit() { Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(2)), request)); } + @Test + public void testForceSplitWithException() { + S3AsyncClient s3AsyncClient = Mockito.mock(S3AsyncClient.class); + doAnswer(invocation -> CompletableFuture.completedFuture(null)).when(s3AsyncClient).putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class)); + + Map> streamDataBlockMap = getStreamDataBlockMap(); + S3ObjectMetadata objectMetadata = new S3ObjectMetadata(OBJECT_0, 0, S3ObjectType.STREAM_SET); + DefaultS3Operator s3Operator = Mockito.spy(new DefaultS3Operator(s3AsyncClient, "")); + doReturn(CompletableFuture.failedFuture(new IllegalArgumentException("exception"))).when(s3Operator).rangeRead(eq(objectMetadata.key()), anyLong(), anyLong(), any()); + + CompactionManager compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator); + Assertions.assertThrowsExactly(CompletionException.class, () -> compactionManager.groupAndSplitStreamDataBlocks(objectMetadata, streamDataBlockMap.get(OBJECT_0))); + } + @Test public void testForceSplitWithLimit() { when(config.streamSetObjectCompactionCacheSize()).thenReturn(5L); @@ -220,15 +246,99 @@ public void testCompactWithNonExistStream() { @Test public void testCompactNoneExistObjects() { - List s3ObjectMetadata = this.objectManager.getServerObjects().join(); - compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator); - List streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join(); - Map> streamDataBlockMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, s3ObjectMetadata, s3Operator); + when(config.streamSetObjectCompactionStreamSplitSize()).thenReturn(100L); + when(config.streamSetObjectCompactionCacheSize()).thenReturn(9999L); + Map> streamDataBlockMap = getStreamDataBlockMap(); + S3ObjectMetadata objectMetadata0 = new S3ObjectMetadata(OBJECT_0, 0, S3ObjectType.STREAM_SET); + S3ObjectMetadata objectMetadata1 = new S3ObjectMetadata(OBJECT_1, 0, S3ObjectType.STREAM_SET); + S3ObjectMetadata objectMetadata2 = new S3ObjectMetadata(OBJECT_2, 0, S3ObjectType.STREAM_SET); + List s3ObjectMetadata = List.of(objectMetadata0, objectMetadata1, objectMetadata2); + this.compactionAnalyzer = new CompactionAnalyzer(config.streamSetObjectCompactionCacheSize(), config.streamSetObjectCompactionStreamSplitSize(), + config.maxStreamNumPerStreamSetObject(), config.maxStreamObjectNumPerCommit()); + List compactionPlans = this.compactionAnalyzer.analyze(streamDataBlockMap, new HashSet<>()); + CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest(); + + S3AsyncClient s3AsyncClient = Mockito.mock(S3AsyncClient.class); + doAnswer(invocation -> CompletableFuture.completedFuture(null)).when(s3AsyncClient).putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class)); + + DefaultS3Operator s3Operator = Mockito.spy(new DefaultS3Operator(s3AsyncClient, "")); + doAnswer(invocation -> CompletableFuture.completedFuture(TestUtils.randomPooled(65))).when(s3Operator).rangeRead(eq(objectMetadata0.key()), anyLong(), anyLong(), any()); + doAnswer(invocation -> CompletableFuture.completedFuture(TestUtils.randomPooled(80))).when(s3Operator).rangeRead(eq(objectMetadata1.key()), anyLong(), anyLong(), any()); + doAnswer(invocation -> CompletableFuture.failedFuture(new IllegalArgumentException("exception"))).when(s3Operator).rangeRead(eq(objectMetadata2.key()), anyLong(), anyLong(), any()); + + CompactionManager compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator); + Assertions.assertThrowsExactly(CompletionException.class, + () -> compactionManager.executeCompactionPlans(request, compactionPlans, s3ObjectMetadata)); + for (CompactionPlan plan : compactionPlans) { + plan.streamDataBlocksMap().forEach((streamId, blocks) -> blocks.forEach(block -> { + if (block.getObjectId() != OBJECT_2) { + block.getDataCf().thenAccept(data -> { + Assertions.assertEquals(0, data.refCnt()); + }).join(); + } + })); + } + } + + @Test + public void testCompactNoneExistObjects2() { + when(config.streamSetObjectCompactionStreamSplitSize()).thenReturn(100L); + when(config.streamSetObjectCompactionCacheSize()).thenReturn(9999L); + Map> streamDataBlockMap = getStreamDataBlockMap(); + S3ObjectMetadata objectMetadata0 = new S3ObjectMetadata(OBJECT_0, 0, S3ObjectType.STREAM_SET); + S3ObjectMetadata objectMetadata1 = new S3ObjectMetadata(OBJECT_1, 0, S3ObjectType.STREAM_SET); + S3ObjectMetadata objectMetadata2 = new S3ObjectMetadata(OBJECT_2, 0, S3ObjectType.STREAM_SET); + List s3ObjectMetadata = List.of(objectMetadata0, objectMetadata1, objectMetadata2); + this.compactionAnalyzer = new CompactionAnalyzer(config.streamSetObjectCompactionCacheSize(), config.streamSetObjectCompactionStreamSplitSize(), + config.maxStreamNumPerStreamSetObject(), config.maxStreamObjectNumPerCommit()); List compactionPlans = this.compactionAnalyzer.analyze(streamDataBlockMap, new HashSet<>()); - s3Operator.delete(s3ObjectMetadata.get(0).key()).join(); CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest(); - Assertions.assertThrowsExactly(IllegalArgumentException.class, + + S3AsyncClient s3AsyncClient = Mockito.mock(S3AsyncClient.class); + doAnswer(invocation -> CompletableFuture.completedFuture(null)).when(s3AsyncClient).putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class)); + + DefaultS3Operator s3Operator = Mockito.spy(new DefaultS3Operator(s3AsyncClient, "")); + doAnswer(invocation -> CompletableFuture.completedFuture(TestUtils.randomPooled(65))).when(s3Operator).rangeRead(eq(objectMetadata0.key()), anyLong(), anyLong(), any()); + doAnswer(invocation -> CompletableFuture.failedFuture(new IllegalArgumentException("exception"))).when(s3Operator).rangeRead(eq(objectMetadata1.key()), anyLong(), anyLong(), any()); + doAnswer(invocation -> CompletableFuture.completedFuture(TestUtils.randomPooled(50))).when(s3Operator).rangeRead(eq(objectMetadata2.key()), anyLong(), anyLong(), any()); + + CompactionManager compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator); + Assertions.assertThrowsExactly(CompletionException.class, () -> compactionManager.executeCompactionPlans(request, compactionPlans, s3ObjectMetadata)); + for (CompactionPlan plan : compactionPlans) { + plan.streamDataBlocksMap().forEach((streamId, blocks) -> blocks.forEach(block -> { + if (block.getObjectId() != OBJECT_1) { + block.getDataCf().thenAccept(data -> { + Assertions.assertEquals(0, data.refCnt()); + }).join(); + } + })); + } + } + + private static Map> getStreamDataBlockMap() { + StreamDataBlock block1 = new StreamDataBlock(STREAM_0, 0, 15, OBJECT_0, new ObjectReader.DataBlockIndex(0, 0, 15, 15)); + StreamDataBlock block2 = new StreamDataBlock(STREAM_1, 0, 20, OBJECT_0, new ObjectReader.DataBlockIndex(1, 15, 50, 20)); + + StreamDataBlock block3 = new StreamDataBlock(STREAM_0, 15, 27, OBJECT_1, new ObjectReader.DataBlockIndex(0, 0, 20, 12)); + StreamDataBlock block4 = new StreamDataBlock(STREAM_1, 20, 45, OBJECT_1, new ObjectReader.DataBlockIndex(1, 20, 60, 25)); + + StreamDataBlock block5 = new StreamDataBlock(STREAM_0, 27, 40, OBJECT_2, new ObjectReader.DataBlockIndex(0, 0, 20, 20)); + StreamDataBlock block6 = new StreamDataBlock(STREAM_3, 0, 30, OBJECT_2, new ObjectReader.DataBlockIndex(1, 20, 30, 30)); + return Map.of( + OBJECT_0, List.of( + block1, + block2 + ), + OBJECT_1, List.of( + block3, + block4 + ), + OBJECT_2, List.of( + block5, + block6 + ) + ); } @Test