Skip to content

Commit

Permalink
feat(s3stream): add strictBathLimit option for s3 wal (#1769)
Browse files Browse the repository at this point in the history
* feat(s3stream): add strictBathLimit option for s3 wal

Signed-off-by: SSpirits <[email protected]>

* feat(s3stream): fix test

Signed-off-by: SSpirits <[email protected]>

---------

Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Aug 12, 2024
1 parent 2cf64bf commit 65e9c58
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ public class ObjectWALConfig {
private final long epoch;
private final boolean failover;
private final short bucketId;
private final boolean strictBatchLimit;

public static Builder builder() {
return new Builder();
}

public ObjectWALConfig(long batchInterval, long maxBytesInBatch, long maxUnflushedBytes, int maxInflightUploadCount,
int readAheadObjectCount, String clusterId, int nodeId, long epoch, boolean failover, short bucketId) {
int readAheadObjectCount, String clusterId, int nodeId, long epoch, boolean failover, short bucketId,
boolean strictBatchLimit) {
this.batchInterval = batchInterval;
this.maxBytesInBatch = maxBytesInBatch;
this.maxUnflushedBytes = maxUnflushedBytes;
Expand All @@ -42,6 +44,7 @@ public ObjectWALConfig(long batchInterval, long maxBytesInBatch, long maxUnflush
this.epoch = epoch;
this.failover = failover;
this.bucketId = bucketId;
this.strictBatchLimit = strictBatchLimit;
}

public long batchInterval() {
Expand Down Expand Up @@ -84,6 +87,10 @@ public short bucketId() {
return bucketId;
}

public boolean strictBatchLimit() {
return strictBatchLimit;
}

public static final class Builder {
private long batchInterval = 100; // 100ms
private long maxBytesInBatch = 4 * 1024 * 1024L; // 4MB
Expand All @@ -95,6 +102,7 @@ public static final class Builder {
private long epoch;
private boolean failover;
private short bucketId;
private boolean strictBatchLimit = false;

private Builder() {
}
Expand Down Expand Up @@ -122,6 +130,10 @@ public Builder withURI(IdURI uri) {
if (StringUtils.isNumeric(readAheadObjectCount)) {
withReadAheadObjectCount(Integer.parseInt(readAheadObjectCount));
}
String strictBatchLimit = uri.extensionString("strictBatchLimit");
if (StringUtils.isNumeric(strictBatchLimit)) {
withStrictBatchLimit(Boolean.parseBoolean(strictBatchLimit));
}
return this;
}

Expand Down Expand Up @@ -183,8 +195,13 @@ public Builder withBucketId(short bucketId) {
return this;
}

public Builder withStrictBatchLimit(boolean strictBatchLimit) {
this.strictBatchLimit = strictBatchLimit;
return this;
}

public ObjectWALConfig build() {
return new ObjectWALConfig(batchInterval, maxBytesInBatch, maxUnflushedBytes, maxInflightUploadCount, readAheadObjectCount, clusterId, nodeId, epoch, failover, bucketId);
return new ObjectWALConfig(batchInterval, maxBytesInBatch, maxUnflushedBytes, maxInflightUploadCount, readAheadObjectCount, clusterId, nodeId, epoch, failover, bucketId, strictBatchLimit);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -441,21 +441,21 @@ private void unsafeUpload(PriorityQueue<Record> recordQueue) {
while (!recordQueue.isEmpty()) {
// The retained bytes in the batch must larger than record header size.
long retainedBytesInBatch = config.maxBytesInBatch() - dataBuffer.readableBytes() - WALObjectHeader.WAL_HEADER_SIZE;
if (retainedBytesInBatch <= RecordHeader.RECORD_HEADER_SIZE) {
if (config.strictBatchLimit() && retainedBytesInBatch <= RecordHeader.RECORD_HEADER_SIZE) {
break;
}

Record record = recordQueue.poll();

// Records larger than the batch size will be uploaded immediately.
assert record != null;
if (record.record.readableBytes() >= config.maxBytesInBatch() - WALObjectHeader.WAL_HEADER_SIZE) {
if (config.strictBatchLimit() && record.record.readableBytes() >= config.maxBytesInBatch() - WALObjectHeader.WAL_HEADER_SIZE) {
dataBuffer.addComponent(true, record.record);
recordList.add(record);
break;
}

if (record.record.readableBytes() > retainedBytesInBatch) {
if (config.strictBatchLimit() && record.record.readableBytes() > retainedBytesInBatch) {
// The records will be split into multiple objects.
ByteBuf slice = record.record.retainedSlice(0, (int) retainedBytesInBatch).asReadOnly();
dataBuffer.addComponent(true, slice);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ public class ObjectWALServiceTest {
@BeforeEach
public void setUp() throws IOException {
objectStorage = new MemoryObjectStorage();
wal = new ObjectWALService(Time.SYSTEM, objectStorage, ObjectWALConfig.builder().withMaxBytesInBatch(110).withBatchInterval(Long.MAX_VALUE).build());
ObjectWALConfig config = ObjectWALConfig.builder()
.withMaxBytesInBatch(110)
.withBatchInterval(Long.MAX_VALUE)
.withStrictBatchLimit(true)
.build();
wal = new ObjectWALService(Time.SYSTEM, objectStorage, config);
wal.start();
random = new Random();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -53,6 +55,7 @@ public void setUp() {
.withNodeId(100)
.withEpoch(1000)
.withBatchInterval(Long.MAX_VALUE)
.withStrictBatchLimit(true)
.build();
recordAccumulatorExt = new RecordAccumulator(Time.SYSTEM, objectStorage, config);
recordAccumulatorExt.start();
Expand Down Expand Up @@ -172,7 +175,60 @@ public void testOffset() throws OverCapacityException {
}

@Test
public void testInMultiThread() throws InterruptedException {
public void testStrictBatchLimit() throws OverCapacityException {
CompletableFuture<AppendResult.CallbackResult> future = new CompletableFuture<>();
recordAccumulatorExt.append(50, offset -> generateByteBuf(50), new CompletableFuture<>());
recordAccumulatorExt.append(50, offset -> generateByteBuf(50), new CompletableFuture<>());
recordAccumulatorExt.append(50, offset -> generateByteBuf(50), future);
assertEquals(150, recordAccumulatorExt.nextOffset());

recordAccumulatorExt.unsafeUpload(true);
future.join();

assertEquals(2, recordAccumulatorExt.objectList().size());

// Reset the RecordAccumulator with strict batch limit disabled.
recordAccumulatorExt.close();
ObjectWALConfig config = ObjectWALConfig.builder()
.withMaxBytesInBatch(115)
.withNodeId(100)
.withEpoch(1000)
.withBatchInterval(Long.MAX_VALUE)
.withStrictBatchLimit(false)
.build();
recordAccumulatorExt = new RecordAccumulator(Time.SYSTEM, objectStorage, config);
recordAccumulatorExt.start();

assertEquals(2, recordAccumulatorExt.objectList().size());

future = new CompletableFuture<>();
recordAccumulatorExt.append(50, offset -> generateByteBuf(50), new CompletableFuture<>());
recordAccumulatorExt.append(50, offset -> generateByteBuf(50), new CompletableFuture<>());
recordAccumulatorExt.append(50, offset -> generateByteBuf(50), future);
assertEquals(300, recordAccumulatorExt.nextOffset());


recordAccumulatorExt.unsafeUpload(true);
future.join();

assertEquals(3, recordAccumulatorExt.objectList().size());
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testInMultiThread(boolean strictBathLimit) throws InterruptedException {
recordAccumulatorExt.close();

ObjectWALConfig config = ObjectWALConfig.builder()
.withMaxBytesInBatch(115)
.withNodeId(100)
.withEpoch(1000)
.withBatchInterval(Long.MAX_VALUE)
.withStrictBatchLimit(strictBathLimit)
.build();
recordAccumulatorExt = new RecordAccumulator(Time.SYSTEM, objectStorage, config);
recordAccumulatorExt.start();

int threadCount = 10;
CountDownLatch startBarrier = new CountDownLatch(threadCount);
CountDownLatch stopCountDownLatch = new CountDownLatch(threadCount);
Expand Down

0 comments on commit 65e9c58

Please sign in to comment.