Skip to content

Commit

Permalink
perf(s3stream/wal): reuse the ByteBuf for record headers (#1877)
Browse files Browse the repository at this point in the history
* refactor: manage the record headers' lifecycle in `Block`

Signed-off-by: Ning Yu <[email protected]>

* perf(s3stream/wal): reuse the `ByteBuf` for record headers

Signed-off-by: Ning Yu <[email protected]>

* perf: remove the max size limit

Signed-off-by: Ning Yu <[email protected]>

* test: test `FixedSizeByteBufPool`

Signed-off-by: Ning Yu <[email protected]>

* revert: "perf: remove the max size limit"

This reverts commit ed63112.

* feat: use a separate `poolSize` to limit the size of the pool

Signed-off-by: Ning Yu <[email protected]>

---------

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Aug 20, 2024
1 parent 70108bc commit 962fad3
Show file tree
Hide file tree
Showing 9 changed files with 300 additions and 42 deletions.
79 changes: 79 additions & 0 deletions s3stream/src/main/java/com/automq/stream/FixedSizeByteBufPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream;

import com.automq.stream.s3.ByteBufAlloc;
import io.netty.buffer.ByteBuf;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A pool of fixed-size {@link ByteBuf}.
*/
public class FixedSizeByteBufPool {

/**
* The size of the {@link ByteBuf} in this pool.
*/
private final int bufferSize;
/**
* The max size of the pool.
* It is possible that the pool size exceeds this limit in some rare cases.
*/
private final int maxPoolSize;
private final Queue<ByteBuf> pool = new ConcurrentLinkedQueue<>();
/**
* The current size of the pool.
* We use an {@link AtomicInteger} rather than {@link Queue#size()} to avoid the cost of traversing the queue.
*/
private final AtomicInteger poolSize = new AtomicInteger(0);

public FixedSizeByteBufPool(int bufferSize, int maxPoolSize) {
this.bufferSize = bufferSize;
this.maxPoolSize = maxPoolSize;
}

/**
* Get a {@link ByteBuf} from the pool.
* If the pool is empty, a new {@link ByteBuf} will be allocated.
*/
public ByteBuf get() {
ByteBuf buffer = pool.poll();
if (buffer == null) {
return allocate();
}
poolSize.decrementAndGet();
return buffer;
}

private ByteBuf allocate() {
return ByteBufAlloc.byteBuffer(bufferSize);
}

/**
* Release a {@link ByteBuf} to the pool.
* Note: the buffer MUST be gotten from this pool.
*/
public void release(ByteBuf buffer) {
assert buffer.capacity() == bufferSize;

if (poolSize.get() >= maxPoolSize) {
buffer.release();
return;
}

buffer.clear();
pool.offer(buffer);
poolSize.incrementAndGet();
}
}
33 changes: 33 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/wal/common/Record.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.wal.common;

import io.netty.buffer.ByteBuf;

public class Record {

private final ByteBuf header;
private final ByteBuf body;

public Record(ByteBuf header, ByteBuf body) {
this.header = header;
this.body = body;
}

public ByteBuf header() {
return header;
}

public ByteBuf body() {
return body;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package com.automq.stream.s3.wal.common;

import com.automq.stream.s3.ByteBufAlloc;
import com.automq.stream.s3.wal.util.WALUtil;
import io.netty.buffer.ByteBuf;

Expand Down Expand Up @@ -81,25 +80,25 @@ public int getRecordHeaderCRC() {
@Override
public String toString() {
return "RecordHeaderCoreData{" +
"magicCode=" + magicCode0 +
", recordBodyLength=" + recordBodyLength1 +
", recordBodyOffset=" + recordBodyOffset2 +
", recordBodyCRC=" + recordBodyCRC3 +
", recordHeaderCRC=" + recordHeaderCRC4 +
'}';
"magicCode=" + magicCode0 +
", recordBodyLength=" + recordBodyLength1 +
", recordBodyOffset=" + recordBodyOffset2 +
", recordBodyCRC=" + recordBodyCRC3 +
", recordHeaderCRC=" + recordHeaderCRC4 +
'}';
}

private ByteBuf marshalHeaderExceptCRC() {
ByteBuf buf = ByteBufAlloc.byteBuffer(RECORD_HEADER_SIZE);
private ByteBuf marshalHeaderExceptCRC(ByteBuf buf) {
buf.writeInt(magicCode0);
buf.writeInt(recordBodyLength1);
buf.writeLong(recordBodyOffset2);
buf.writeInt(recordBodyCRC3);
return buf;
}

public ByteBuf marshal() {
ByteBuf buf = marshalHeaderExceptCRC();
public ByteBuf marshal(ByteBuf emptyBuf) {
assert emptyBuf.writableBytes() == RECORD_HEADER_SIZE;
ByteBuf buf = marshalHeaderExceptCRC(emptyBuf);
buf.writeInt(WALUtil.crc32(buf, RECORD_HEADER_WITHOUT_CRC_SIZE));
return buf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
package com.automq.stream.s3.wal.impl.block;

import com.automq.stream.s3.wal.AppendResult;
import com.automq.stream.s3.wal.common.Record;
import com.automq.stream.s3.wal.common.RecordHeader;
import com.automq.stream.s3.wal.util.WALUtil;
import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

/**
* A Block contains multiple records, and will be written to the WAL in one batch.
Expand All @@ -33,11 +34,11 @@ public interface Block {
* Cannot be called after {@link #data()} is called.
*
* @param recordSize The size of this record.
* @param recordSupplier The supplier of this record which receives the start offset of this record as the parameter.
* @param recordSupplier The supplier of this record.
* @param future The future of this record, which will be completed when the record is written to the WAL.
* @return The start offset of this record. If the size of this block exceeds the limit, return -1.
*/
long addRecord(long recordSize, Function<Long, ByteBuf> recordSupplier,
long addRecord(long recordSize, RecordSupplier recordSupplier,
CompletableFuture<AppendResult.CallbackResult> future);

/**
Expand All @@ -52,7 +53,6 @@ default boolean isEmpty() {
/**
* The content of this block, which contains multiple records.
* The first call of this method will marshal all records in this block to a ByteBuf. It will be cached for later calls.
* It returns null if this block is empty.
*/
ByteBuf data();

Expand All @@ -61,16 +61,23 @@ default boolean isEmpty() {
*/
long size();

default void release() {
ByteBuf data = data();
if (null != data) {
data.release();
}
}
void release();

/**
* Called when this block is polled and sent to the writer.
* Used for metrics.
*/
void polled();

@FunctionalInterface
interface RecordSupplier {
/**
* Generate a record.
*
* @param recordStartOffset The start offset of this record.
* @param emptyHeader An empty {@link ByteBuf} with the size of {@link RecordHeader#RECORD_HEADER_SIZE}. It will be used to marshal the header.
* @return The record.
*/
Record get(long recordStartOffset, ByteBuf emptyHeader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,31 @@

package com.automq.stream.s3.wal.impl.block;

import com.automq.stream.FixedSizeByteBufPool;
import com.automq.stream.s3.ByteBufAlloc;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.s3.wal.AppendResult;
import com.automq.stream.s3.wal.common.Record;
import com.automq.stream.s3.wal.util.WALUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static com.automq.stream.s3.wal.common.RecordHeader.RECORD_HEADER_SIZE;

public class BlockImpl implements Block {

/**
* The pool for record headers.
*/
private static final FixedSizeByteBufPool HEADER_POOL = new FixedSizeByteBufPool(RECORD_HEADER_SIZE, 1024);

private final long startOffset;
/**
* The max size of this block.
Expand All @@ -40,14 +49,14 @@ public class BlockImpl implements Block {
*/
private final long softLimit;
private final List<CompletableFuture<AppendResult.CallbackResult>> futures = new LinkedList<>();
private final List<Supplier<ByteBuf>> records = new LinkedList<>();
private final List<Supplier<Record>> recordSuppliers = new LinkedList<>();
private final long startTime;
/**
* The next offset to write in this block.
* Align to {@link WALUtil#BLOCK_SIZE}
*/
private long nextOffset = 0;
private CompositeByteBuf data = null;
private List<Record> records = null;

/**
* Create a block.
Expand All @@ -69,9 +78,9 @@ public long startOffset() {
* Note: this method is NOT thread safe.
*/
@Override
public long addRecord(long recordSize, Function<Long, ByteBuf> recordSupplier,
public long addRecord(long recordSize, RecordSupplier recordSupplier,
CompletableFuture<AppendResult.CallbackResult> future) {
assert data == null;
assert records == null;
long requiredCapacity = nextOffset + recordSize;
if (requiredCapacity > maxSize) {
return -1;
Expand All @@ -82,7 +91,10 @@ public long addRecord(long recordSize, Function<Long, ByteBuf> recordSupplier,
}

long recordOffset = startOffset + nextOffset;
records.add(() -> recordSupplier.apply(recordOffset));
recordSuppliers.add(() -> {
ByteBuf header = HEADER_POOL.get();
return recordSupplier.get(recordOffset, header);
});
nextOffset += recordSize;
futures.add(future);

Expand All @@ -96,26 +108,40 @@ public List<CompletableFuture<AppendResult.CallbackResult>> futures() {

@Override
public ByteBuf data() {
if (null != data) {
return data;
}
if (records.isEmpty()) {
return null;
}
maybeGenerateRecords();

data = ByteBufAlloc.compositeByteBuffer();
for (Supplier<ByteBuf> supplier : records) {
ByteBuf record = supplier.get();
data.addComponent(true, record);
CompositeByteBuf data = ByteBufAlloc.compositeByteBuffer();
for (Record record : records) {
data.addComponents(true, record.header(), record.body());
}
return data;
}

private void maybeGenerateRecords() {
if (null != records) {
return;
}
records = recordSuppliers.stream()
.map(Supplier::get)
.collect(Collectors.toUnmodifiableList());
}

@Override
public long size() {
return nextOffset;
}

@Override
public void release() {
if (null == records) {
return;
}
records.forEach(record -> {
HEADER_POOL.release(record.header());
record.body().release();
});
}

@Override
public void polled() {
StorageOperationStats.getInstance().appendWALBlockPolledStats.record(TimerUtil.durationElapsedAs(startTime, TimeUnit.NANOSECONDS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,12 @@ private AppendResult append0(ByteBuf body, int crc) throws OverCapacityException
lock.lock();
try {
Block block = slidingWindowService.getCurrentBlockLocked();
expectedWriteOffset = block.addRecord(recordSize, offset -> WALUtil.generateRecord(body, crc, offset), appendResultFuture);
Block.RecordSupplier recordSupplier = (offset, header) -> WALUtil.generateRecord(body, header, crc, offset);
expectedWriteOffset = block.addRecord(recordSize, recordSupplier, appendResultFuture);
if (expectedWriteOffset < 0) {
// this block is full, create a new one
block = slidingWindowService.sealAndNewBlockLocked(block, recordSize, walHeader.getFlushedTrimOffset(), walHeader.getCapacity() - WAL_HEADER_TOTAL_CAPACITY);
expectedWriteOffset = block.addRecord(recordSize, offset -> WALUtil.generateRecord(body, crc, offset), appendResultFuture);
expectedWriteOffset = block.addRecord(recordSize, recordSupplier, appendResultFuture);
}
} finally {
lock.unlock();
Expand Down
Loading

0 comments on commit 962fad3

Please sign in to comment.