diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/Block.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/Block.java index 88bded4325..6103420a8b 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/Block.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/Block.java @@ -28,6 +28,11 @@ public interface Block { */ long startOffset(); + /** + * The start time of this block* + * @return + */ + long startTime(); /** * Append a record to this block. * Cannot be called after {@link #data()} is called. diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockImpl.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockImpl.java index e1aa6c06c5..a5c5208175 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockImpl.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockImpl.java @@ -65,6 +65,11 @@ public long startOffset() { return startOffset; } + @Override + public long startTime() { + return this.startTime; + } + /** * Note: this method is NOT thread safe. */ @@ -120,4 +125,9 @@ public long size() { public void polled() { StorageOperationStats.getInstance().appendWALBlockPolledStats.record(TimerUtil.durationElapsedAs(startTime, TimeUnit.NANOSECONDS)); } + + @Override + public boolean isEmpty() { + return records.isEmpty(); + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java index a444f6ad3f..f138ae55d2 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java @@ -24,7 +24,9 @@ import java.util.LinkedList; import java.util.PriorityQueue; import java.util.Queue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,6 +64,8 @@ public class SlidingWindowService { * The lock of {@link #pendingBlocks}, {@link #writingBlocks}, {@link #currentBlock}. */ private final Lock blockLock = new ReentrantLock(); + + private final Lock pollBlocKLock = new ReentrantLock(); /** * Blocks that are being written. */ @@ -80,11 +84,11 @@ public class SlidingWindowService { * Blocks that are waiting to be written. * All blocks in this queue are ordered by the start offset. */ - private Queue pendingBlocks = new LinkedList<>(); + private BlockingQueue pendingBlocks = new LinkedBlockingQueue<>(); /** * The current block, records are added to this block. */ - private Block currentBlock; + private volatile Block currentBlock; /** * The thread pool for write operations. @@ -98,7 +102,7 @@ public class SlidingWindowService { /** * The last time when a batch of blocks is written to the disk. */ - private long lastWriteTimeNanos = 0; + private volatile long lastWriteTimeNanos = 0; public SlidingWindowService(WALChannel walChannel, int ioThreadNums, long upperLimit, long scaleUnit, long blockSoftLimit, int writeRateLimit, WALHeaderFlusher flusher) { @@ -276,12 +280,14 @@ private Block nextBlock(Block previousBlock) { * Get all blocks to be written. If there is no non-empty block, return null. */ private BlockBatch pollBlocks() { - blockLock.lock(); - try { - return pollBlocksLocked(); - } finally { - blockLock.unlock(); + if (this.pollBlocKLock.tryLock()) { + try { + return pollBlocksLocked(); + } finally { + this.pollBlocKLock.unlock(); + } } + return null; } /** @@ -289,25 +295,28 @@ private BlockBatch pollBlocks() { * Note: this method is NOT thread safe, and it should be called with {@link #blockLock} locked. */ private BlockBatch pollBlocksLocked() { - Block currentBlock = getCurrentBlockLocked(); - - boolean isPendingBlockEmpty = pendingBlocks.isEmpty(); - boolean isCurrentBlockEmpty = currentBlock == null || currentBlock.isEmpty(); - if (isPendingBlockEmpty && isCurrentBlockEmpty) { - // No record to be written - return null; + Block currentBlock = this.currentBlock; + boolean isCurrentBlockNotEmpty = currentBlock != null && !currentBlock.isEmpty(); + if (isCurrentBlockNotEmpty) { + long time = System.nanoTime(); + if (time - currentBlock.startTime() > minWriteIntervalNanos) { + if (this.blockLock.tryLock()) { + try { + currentBlock = this.getCurrentBlockLocked(); + if (!currentBlock.isEmpty() && time - currentBlock.startTime() > minWriteIntervalNanos) { + pendingBlocks.add(currentBlock); + setCurrentBlockLocked(nextBlock(currentBlock)); + } + } finally { + this.blockLock.unlock(); + } + } + } } - Collection blocks; - if (!isPendingBlockEmpty) { - blocks = pendingBlocks; - pendingBlocks = new LinkedList<>(); - } else { - blocks = new LinkedList<>(); - } - if (!isCurrentBlockEmpty) { - blocks.add(currentBlock); - setCurrentBlockLocked(nextBlock(currentBlock)); + Collection blocks = new LinkedList<>(); + while (!pendingBlocks.isEmpty()) { + blocks.add(pendingBlocks.poll()); } BlockBatch blockBatch = new BlockBatch(blocks);