Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wal): reduce concurrent conflicts between block write operations and poll operations #1554

Closed
wants to merge 7 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public interface Block {
*/
long startOffset();

/**
* The start time of this block*
* @return
*/
long startTime();
/**
* Append a record to this block.
* Cannot be called after {@link #data()} is called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public long startOffset() {
return startOffset;
}

@Override
public long startTime() {
return this.startTime;
}

/**
* Note: this method is NOT thread safe.
*/
Expand Down Expand Up @@ -120,4 +125,9 @@ public long size() {
public void polled() {
StorageOperationStats.getInstance().appendWALBlockPolledStats.record(TimerUtil.durationElapsedAs(startTime, TimeUnit.NANOSECONDS));
}

@Override
public boolean isEmpty() {
return records.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.LinkedList;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -62,6 +64,8 @@ public class SlidingWindowService {
* The lock of {@link #pendingBlocks}, {@link #writingBlocks}, {@link #currentBlock}.
*/
private final Lock blockLock = new ReentrantLock();

private final Lock pollBlocKLock = new ReentrantLock();
Chillax-0v0 marked this conversation as resolved.
Show resolved Hide resolved
Chillax-0v0 marked this conversation as resolved.
Show resolved Hide resolved
/**
* Blocks that are being written.
*/
Expand All @@ -80,11 +84,11 @@ public class SlidingWindowService {
* Blocks that are waiting to be written.
* All blocks in this queue are ordered by the start offset.
*/
private Queue<Block> pendingBlocks = new LinkedList<>();
private BlockingQueue<Block> pendingBlocks = new LinkedBlockingQueue<>();
/**
* The current block, records are added to this block.
*/
private Block currentBlock;
private volatile Block currentBlock;

/**
* The thread pool for write operations.
Expand All @@ -98,7 +102,7 @@ public class SlidingWindowService {
/**
* The last time when a batch of blocks is written to the disk.
*/
private long lastWriteTimeNanos = 0;
private volatile long lastWriteTimeNanos = 0;

public SlidingWindowService(WALChannel walChannel, int ioThreadNums, long upperLimit, long scaleUnit,
long blockSoftLimit, int writeRateLimit, WALHeaderFlusher flusher) {
Expand Down Expand Up @@ -276,38 +280,43 @@ private Block nextBlock(Block previousBlock) {
* Get all blocks to be written. If there is no non-empty block, return null.
*/
private BlockBatch pollBlocks() {
blockLock.lock();
try {
return pollBlocksLocked();
} finally {
blockLock.unlock();
if (this.pollBlocKLock.tryLock()) {
try {
return pollBlocksLocked();
} finally {
this.pollBlocKLock.unlock();
}
}
return null;
}

/**
* Get all blocks to be written. If there is no non-empty block, return null.
* Note: this method is NOT thread safe, and it should be called with {@link #blockLock} locked.
Chillax-0v0 marked this conversation as resolved.
Show resolved Hide resolved
*/
private BlockBatch pollBlocksLocked() {
Block currentBlock = getCurrentBlockLocked();

boolean isPendingBlockEmpty = pendingBlocks.isEmpty();
boolean isCurrentBlockEmpty = currentBlock == null || currentBlock.isEmpty();
if (isPendingBlockEmpty && isCurrentBlockEmpty) {
// No record to be written
return null;
Block currentBlock = this.currentBlock;
boolean isCurrentBlockNotEmpty = currentBlock != null && !currentBlock.isEmpty();
if (isCurrentBlockNotEmpty) {
long time = System.nanoTime();
if (time - currentBlock.startTime() > minWriteIntervalNanos) {
if (this.blockLock.tryLock()) {
try {
currentBlock = this.getCurrentBlockLocked();
if (!currentBlock.isEmpty() && time - currentBlock.startTime() > minWriteIntervalNanos) {
pendingBlocks.add(currentBlock);
setCurrentBlockLocked(nextBlock(currentBlock));
}
} finally {
this.blockLock.unlock();
}
}
}
}

Collection<Block> blocks;
if (!isPendingBlockEmpty) {
blocks = pendingBlocks;
pendingBlocks = new LinkedList<>();
} else {
blocks = new LinkedList<>();
}
if (!isCurrentBlockEmpty) {
blocks.add(currentBlock);
setCurrentBlockLocked(nextBlock(currentBlock));
Collection<Block> blocks = new LinkedList<>();
while (!pendingBlocks.isEmpty()) {
blocks.add(pendingBlocks.poll());
}

BlockBatch blockBatch = new BlockBatch(blocks);
Expand Down
Loading