Skip to content

Commit

Permalink
fix(s3stream): fix potential dangling cf when read not aligned with d…
Browse files Browse the repository at this point in the history
…ata block (#795)

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored and ShadowySpirits committed Mar 14, 2024
1 parent e369dd9 commit 6da659f
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 21 deletions.
47 changes: 28 additions & 19 deletions s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ CompletableFuture<List<StreamRecordBatch>> handleSyncReadAhead(long streamId, lo
sortedDataBlockKeyList.add(dataBlockKey);
DataBlockReadAccumulator.ReserveResult reserveResult = reserveResults.get(i);
DefaultS3BlockCache.ReadAheadTaskKey taskKey = new DefaultS3BlockCache.ReadAheadTaskKey(streamId, streamDataBlock.getStartOffset());
int readIndex = i;
cfList.add(reserveResult.cf().thenApplyAsync(dataBlock -> {
if (dataBlock.records().isEmpty()) {
return new ArrayList<StreamRecordBatch>();
Expand All @@ -174,15 +175,14 @@ CompletableFuture<List<StreamRecordBatch>> handleSyncReadAhead(long streamId, lo

return dataBlock.records();
}, backgroundExecutor).whenComplete((ret, ex) -> {
CompletableFuture<Void> inflightReadAheadTask = inflightReadAheadTaskMap.remove(taskKey);
if (inflightReadAheadTask != null) {
if (ex != null) {
LOGGER.error("[S3BlockCache] sync ra fail to read data block, stream={}, {}-{}, data block: {}",
streamId, startOffset, endOffset, streamDataBlock, ex);
inflightReadAheadTask.completeExceptionally(ex);
} else {
inflightReadAheadTask.complete(null);
}
if (ex != null) {
LOGGER.error("[S3BlockCache] sync ra fail to read data block, stream={}, {}-{}, data block: {}",
streamId, startOffset, endOffset, streamDataBlock, ex);
}
completeInflightTask(taskKey, ex);
if (readIndex == 0) {
// in case of first data block and startOffset is not aligned with start of data block
completeInflightTask(new DefaultS3BlockCache.ReadAheadTaskKey(streamId, startOffset), ex);
}
}));
if (reserveResult.reserveSize() > 0) {
Expand Down Expand Up @@ -296,18 +296,16 @@ CompletableFuture<Void> handleAsyncReadAhead(long streamId, long startOffset, lo
}
dataBlock.release();
}, backgroundExecutor).whenComplete((ret, ex) -> {
if (ex != null) {
LOGGER.error("[S3BlockCache] async ra fail to read data block, stream={}, {}-{}, data block: {}",
streamId, startOffset, endOffset, streamDataBlock, ex);
}
inflightReadThrottle.release(uuid);
CompletableFuture<Void> inflightReadAheadTask = inflightReadAheadTaskMap.remove(taskKey);
if (inflightReadAheadTask != null) {
if (ex != null) {
LOGGER.error("[S3BlockCache] async ra fail to read data block, stream={}, {}-{}, data block: {}",
streamId, startOffset, endOffset, streamDataBlock, ex);
inflightReadAheadTask.completeExceptionally(ex);
} else {
inflightReadAheadTask.complete(null);
}
completeInflightTask(taskKey, ex);
if (readIndex == 0) {
// in case of first data block and startOffset is not aligned with start of data block
completeInflightTask(new DefaultS3BlockCache.ReadAheadTaskKey(streamId, startOffset), ex);
}

}));

if (LOGGER.isDebugEnabled()) {
Expand Down Expand Up @@ -393,6 +391,17 @@ private CompletableFuture<Void> getDataBlockIndices(long streamId, long endOffse
}, streamReaderExecutor);
}

private void completeInflightTask(DefaultS3BlockCache.ReadAheadTaskKey key, Throwable ex) {
CompletableFuture<Void> inflightReadAheadTask = inflightReadAheadTaskMap.remove(key);
if (inflightReadAheadTask != null) {
if (ex != null) {
inflightReadAheadTask.completeExceptionally(ex);
} else {
inflightReadAheadTask.complete(null);
}
}
}

private List<Pair<ObjectReader, StreamDataBlock>> collectStreamDataBlocksToRead(long streamId, ReadContext context) {
List<Pair<ObjectReader, StreamDataBlock>> result = new ArrayList<>();
for (Pair<Long, List<StreamDataBlock>> entry : context.streamDataBlocksPair) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,23 @@ private void readContinuousBlocks0(List<StreamDataBlock> streamDataBlocks) {

private CompletableFuture<ByteBuf> rangeRead(long start, long end) {
if (throttleBucket == null) {
return s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2);
return s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> {
// convert heap buffer to direct buffer
ByteBuf directBuf = DirectByteBufAlloc.byteBuffer(buf.readableBytes());
directBuf.writeBytes(buf);
buf.release();
return directBuf;
});
} else {
return throttleBucket.asScheduler().consume(end - start + 1, bucketCallbackExecutor)
.thenCompose(v -> s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2));
.thenCompose(v ->
s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> {
// convert heap buffer to direct buffer
ByteBuf directBuf = DirectByteBufAlloc.byteBuffer(buf.readableBytes());
directBuf.writeBytes(buf);
buf.release();
return directBuf;
}));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.TestUtils;
import com.automq.stream.s3.cache.DefaultS3BlockCache.ReadAheadTaskKey;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectManager;
Expand All @@ -41,6 +42,35 @@

public class StreamReaderTest {

@Test
public void testSyncReadAheadInflight() {
DataBlockReadAccumulator accumulator = new DataBlockReadAccumulator();
ObjectReaderLRUCache cache = Mockito.mock(ObjectReaderLRUCache.class);
S3Operator s3Operator = Mockito.mock(S3Operator.class);
ObjectManager objectManager = Mockito.mock(ObjectManager.class);
BlockCache blockCache = Mockito.mock(BlockCache.class);
Map<ReadAheadTaskKey, CompletableFuture<Void>> inflightReadAheadTasks = new HashMap<>();
StreamReader streamReader = new StreamReader(s3Operator, objectManager, blockCache, cache, accumulator, inflightReadAheadTasks, new InflightReadThrottle());

long startOffset = 70;
StreamReader.ReadContext context = new StreamReader.ReadContext(startOffset, 256);
ObjectReader.DataBlockIndex index1 = new ObjectReader.DataBlockIndex(0, 0, 256, 128);
context.streamDataBlocksPair = List.of(
new ImmutablePair<>(1L, List.of(
new StreamDataBlock(233L, 64, 128, 1, index1))));

ObjectReader reader = Mockito.mock(ObjectReader.class);
Mockito.when(reader.read(index1)).thenReturn(new CompletableFuture<>());
context.objectReaderMap = new HashMap<>(Map.of(1L, reader));
inflightReadAheadTasks.put(new ReadAheadTaskKey(233L, startOffset), new CompletableFuture<>());
streamReader.handleSyncReadAhead(233L, startOffset,
999, 64, Mockito.mock(ReadAheadAgent.class), UUID.randomUUID(), new TimerUtil(), context);
Threads.sleep(1000);
Assertions.assertEquals(2, inflightReadAheadTasks.size());
Assertions.assertTrue(inflightReadAheadTasks.containsKey(new ReadAheadTaskKey(233L, startOffset)));
Assertions.assertTrue(inflightReadAheadTasks.containsKey(new ReadAheadTaskKey(233L, 64)));
}

@Test
public void testSyncReadAhead() {
DataBlockReadAccumulator accumulator = new DataBlockReadAccumulator();
Expand Down Expand Up @@ -98,6 +128,67 @@ public StreamRecordBatch next() {
}).join();
}

@Test
public void testSyncReadAheadNotAlign() {
DataBlockReadAccumulator accumulator = new DataBlockReadAccumulator();
ObjectReaderLRUCache cache = Mockito.mock(ObjectReaderLRUCache.class);
S3Operator s3Operator = Mockito.mock(S3Operator.class);
ObjectManager objectManager = Mockito.mock(ObjectManager.class);
BlockCache blockCache = Mockito.mock(BlockCache.class);
Map<ReadAheadTaskKey, CompletableFuture<Void>> inflightReadAheadTasks = new HashMap<>();
StreamReader streamReader = new StreamReader(s3Operator, objectManager, blockCache, cache, accumulator, inflightReadAheadTasks, new InflightReadThrottle());

long startOffset = 32;
StreamReader.ReadContext context = new StreamReader.ReadContext(startOffset, 256);
ObjectReader.DataBlockIndex index1 = new ObjectReader.DataBlockIndex(0, 0, 256, 128);
context.streamDataBlocksPair = List.of(
new ImmutablePair<>(1L, List.of(
new StreamDataBlock(233L, 0, 128, 1, index1))));

ObjectReader reader = Mockito.mock(ObjectReader.class);
ObjectReader.DataBlock dataBlock1 = Mockito.mock(ObjectReader.DataBlock.class);
StreamRecordBatch record1 = new StreamRecordBatch(233L, 0, 0, 64, TestUtils.random(128));
record1.release();
StreamRecordBatch record2 = new StreamRecordBatch(233L, 0, 64, 64, TestUtils.random(128));
record2.release();
List<StreamRecordBatch> records = List.of(record1, record2);
AtomicInteger remaining = new AtomicInteger(0);
Assertions.assertEquals(1, record1.getPayload().refCnt());
Assertions.assertEquals(1, record2.getPayload().refCnt());
Mockito.when(dataBlock1.iterator()).thenReturn(new CloseableIterator<>() {
@Override
public void close() {
}

@Override
public boolean hasNext() {
return remaining.get() < records.size();
}

@Override
public StreamRecordBatch next() {
if (!hasNext()) {
throw new IllegalStateException("no more elements");
}
return records.get(remaining.getAndIncrement());
}
});
Mockito.when(reader.read(index1)).thenReturn(CompletableFuture.completedFuture(dataBlock1));
context.objectReaderMap = new HashMap<>(Map.of(1L, reader));
inflightReadAheadTasks.put(new ReadAheadTaskKey(233L, startOffset), new CompletableFuture<>());
CompletableFuture<List<StreamRecordBatch>> cf = streamReader.handleSyncReadAhead(233L, startOffset,
999, 64, Mockito.mock(ReadAheadAgent.class), UUID.randomUUID(), new TimerUtil(), context);

cf.whenComplete((rst, ex) -> {
Assertions.assertNull(ex);
Assertions.assertTrue(inflightReadAheadTasks.isEmpty());
Assertions.assertEquals(1, rst.size());
Assertions.assertTrue(record1.equals(rst.get(0)));
Assertions.assertEquals(2, record1.getPayload().refCnt());
Assertions.assertEquals(1, record2.getPayload().refCnt());
}).join();
}

@Test
public void testSyncReadAheadException() {
DataBlockReadAccumulator accumulator = new DataBlockReadAccumulator();
Expand Down

0 comments on commit 6da659f

Please sign in to comment.