From 65e9c585e6b3f8893fa552bcb8d917c2f0bd50cb Mon Sep 17 00:00:00 2001 From: SSpirits Date: Mon, 12 Aug 2024 16:40:27 +0800 Subject: [PATCH] feat(s3stream): add strictBathLimit option for s3 wal (#1769) * feat(s3stream): add strictBathLimit option for s3 wal Signed-off-by: SSpirits * feat(s3stream): fix test Signed-off-by: SSpirits --------- Signed-off-by: SSpirits --- .../s3/wal/impl/object/ObjectWALConfig.java | 21 ++++++- .../s3/wal/impl/object/RecordAccumulator.java | 6 +- .../wal/impl/object/ObjectWALServiceTest.java | 7 ++- .../impl/object/RecordAccumulatorTest.java | 58 ++++++++++++++++++- 4 files changed, 85 insertions(+), 7 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/ObjectWALConfig.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/ObjectWALConfig.java index 6c480539f9..6e20719785 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/ObjectWALConfig.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/ObjectWALConfig.java @@ -25,13 +25,15 @@ public class ObjectWALConfig { private final long epoch; private final boolean failover; private final short bucketId; + private final boolean strictBatchLimit; public static Builder builder() { return new Builder(); } public ObjectWALConfig(long batchInterval, long maxBytesInBatch, long maxUnflushedBytes, int maxInflightUploadCount, - int readAheadObjectCount, String clusterId, int nodeId, long epoch, boolean failover, short bucketId) { + int readAheadObjectCount, String clusterId, int nodeId, long epoch, boolean failover, short bucketId, + boolean strictBatchLimit) { this.batchInterval = batchInterval; this.maxBytesInBatch = maxBytesInBatch; this.maxUnflushedBytes = maxUnflushedBytes; @@ -42,6 +44,7 @@ public ObjectWALConfig(long batchInterval, long maxBytesInBatch, long maxUnflush this.epoch = epoch; this.failover = failover; this.bucketId = bucketId; + this.strictBatchLimit = strictBatchLimit; } public long batchInterval() { @@ -84,6 +87,10 @@ public short bucketId() { return bucketId; } + public boolean strictBatchLimit() { + return strictBatchLimit; + } + public static final class Builder { private long batchInterval = 100; // 100ms private long maxBytesInBatch = 4 * 1024 * 1024L; // 4MB @@ -95,6 +102,7 @@ public static final class Builder { private long epoch; private boolean failover; private short bucketId; + private boolean strictBatchLimit = false; private Builder() { } @@ -122,6 +130,10 @@ public Builder withURI(IdURI uri) { if (StringUtils.isNumeric(readAheadObjectCount)) { withReadAheadObjectCount(Integer.parseInt(readAheadObjectCount)); } + String strictBatchLimit = uri.extensionString("strictBatchLimit"); + if (StringUtils.isNumeric(strictBatchLimit)) { + withStrictBatchLimit(Boolean.parseBoolean(strictBatchLimit)); + } return this; } @@ -183,8 +195,13 @@ public Builder withBucketId(short bucketId) { return this; } + public Builder withStrictBatchLimit(boolean strictBatchLimit) { + this.strictBatchLimit = strictBatchLimit; + return this; + } + public ObjectWALConfig build() { - return new ObjectWALConfig(batchInterval, maxBytesInBatch, maxUnflushedBytes, maxInflightUploadCount, readAheadObjectCount, clusterId, nodeId, epoch, failover, bucketId); + return new ObjectWALConfig(batchInterval, maxBytesInBatch, maxUnflushedBytes, maxInflightUploadCount, readAheadObjectCount, clusterId, nodeId, epoch, failover, bucketId, strictBatchLimit); } } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/RecordAccumulator.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/RecordAccumulator.java index f1b824ca37..f522774aff 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/RecordAccumulator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/RecordAccumulator.java @@ -441,7 +441,7 @@ private void unsafeUpload(PriorityQueue recordQueue) { while (!recordQueue.isEmpty()) { // The retained bytes in the batch must larger than record header size. long retainedBytesInBatch = config.maxBytesInBatch() - dataBuffer.readableBytes() - WALObjectHeader.WAL_HEADER_SIZE; - if (retainedBytesInBatch <= RecordHeader.RECORD_HEADER_SIZE) { + if (config.strictBatchLimit() && retainedBytesInBatch <= RecordHeader.RECORD_HEADER_SIZE) { break; } @@ -449,13 +449,13 @@ private void unsafeUpload(PriorityQueue recordQueue) { // Records larger than the batch size will be uploaded immediately. assert record != null; - if (record.record.readableBytes() >= config.maxBytesInBatch() - WALObjectHeader.WAL_HEADER_SIZE) { + if (config.strictBatchLimit() && record.record.readableBytes() >= config.maxBytesInBatch() - WALObjectHeader.WAL_HEADER_SIZE) { dataBuffer.addComponent(true, record.record); recordList.add(record); break; } - if (record.record.readableBytes() > retainedBytesInBatch) { + if (config.strictBatchLimit() && record.record.readableBytes() > retainedBytesInBatch) { // The records will be split into multiple objects. ByteBuf slice = record.record.retainedSlice(0, (int) retainedBytesInBatch).asReadOnly(); dataBuffer.addComponent(true, slice); diff --git a/s3stream/src/test/java/com/automq/stream/s3/wal/impl/object/ObjectWALServiceTest.java b/s3stream/src/test/java/com/automq/stream/s3/wal/impl/object/ObjectWALServiceTest.java index b3e5e2529f..07f6fe5675 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/wal/impl/object/ObjectWALServiceTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/wal/impl/object/ObjectWALServiceTest.java @@ -32,7 +32,12 @@ public class ObjectWALServiceTest { @BeforeEach public void setUp() throws IOException { objectStorage = new MemoryObjectStorage(); - wal = new ObjectWALService(Time.SYSTEM, objectStorage, ObjectWALConfig.builder().withMaxBytesInBatch(110).withBatchInterval(Long.MAX_VALUE).build()); + ObjectWALConfig config = ObjectWALConfig.builder() + .withMaxBytesInBatch(110) + .withBatchInterval(Long.MAX_VALUE) + .withStrictBatchLimit(true) + .build(); + wal = new ObjectWALService(Time.SYSTEM, objectStorage, config); wal.start(); random = new Random(); } diff --git a/s3stream/src/test/java/com/automq/stream/s3/wal/impl/object/RecordAccumulatorTest.java b/s3stream/src/test/java/com/automq/stream/s3/wal/impl/object/RecordAccumulatorTest.java index 5e6e5bb243..56df1a7f62 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/wal/impl/object/RecordAccumulatorTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/wal/impl/object/RecordAccumulatorTest.java @@ -32,6 +32,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -53,6 +55,7 @@ public void setUp() { .withNodeId(100) .withEpoch(1000) .withBatchInterval(Long.MAX_VALUE) + .withStrictBatchLimit(true) .build(); recordAccumulatorExt = new RecordAccumulator(Time.SYSTEM, objectStorage, config); recordAccumulatorExt.start(); @@ -172,7 +175,60 @@ public void testOffset() throws OverCapacityException { } @Test - public void testInMultiThread() throws InterruptedException { + public void testStrictBatchLimit() throws OverCapacityException { + CompletableFuture future = new CompletableFuture<>(); + recordAccumulatorExt.append(50, offset -> generateByteBuf(50), new CompletableFuture<>()); + recordAccumulatorExt.append(50, offset -> generateByteBuf(50), new CompletableFuture<>()); + recordAccumulatorExt.append(50, offset -> generateByteBuf(50), future); + assertEquals(150, recordAccumulatorExt.nextOffset()); + + recordAccumulatorExt.unsafeUpload(true); + future.join(); + + assertEquals(2, recordAccumulatorExt.objectList().size()); + + // Reset the RecordAccumulator with strict batch limit disabled. + recordAccumulatorExt.close(); + ObjectWALConfig config = ObjectWALConfig.builder() + .withMaxBytesInBatch(115) + .withNodeId(100) + .withEpoch(1000) + .withBatchInterval(Long.MAX_VALUE) + .withStrictBatchLimit(false) + .build(); + recordAccumulatorExt = new RecordAccumulator(Time.SYSTEM, objectStorage, config); + recordAccumulatorExt.start(); + + assertEquals(2, recordAccumulatorExt.objectList().size()); + + future = new CompletableFuture<>(); + recordAccumulatorExt.append(50, offset -> generateByteBuf(50), new CompletableFuture<>()); + recordAccumulatorExt.append(50, offset -> generateByteBuf(50), new CompletableFuture<>()); + recordAccumulatorExt.append(50, offset -> generateByteBuf(50), future); + assertEquals(300, recordAccumulatorExt.nextOffset()); + + + recordAccumulatorExt.unsafeUpload(true); + future.join(); + + assertEquals(3, recordAccumulatorExt.objectList().size()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testInMultiThread(boolean strictBathLimit) throws InterruptedException { + recordAccumulatorExt.close(); + + ObjectWALConfig config = ObjectWALConfig.builder() + .withMaxBytesInBatch(115) + .withNodeId(100) + .withEpoch(1000) + .withBatchInterval(Long.MAX_VALUE) + .withStrictBatchLimit(strictBathLimit) + .build(); + recordAccumulatorExt = new RecordAccumulator(Time.SYSTEM, objectStorage, config); + recordAccumulatorExt.start(); + int threadCount = 10; CountDownLatch startBarrier = new CountDownLatch(threadCount); CountDownLatch stopCountDownLatch = new CountDownLatch(threadCount);