Skip to content

Commit

Permalink
fix(issues1087): fix object not exist handle (#1119)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Apr 15, 2024
1 parent 1d5cd9e commit 1ea143e
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public DataBlockIndex dataBlockIndex() {
}

public void markUnread() {
if (dataBlockGroup == null) {
throw new IllegalStateException("DataBlock is not loaded yet.");
}
if (unreadCnt.get() == UNREAD_INIT) {
unreadCnt.set(1);
} else {
Expand All @@ -89,6 +92,9 @@ public void markUnread() {
}

public void markRead() {
if (dataBlockGroup == null) {
throw new IllegalStateException("DataBlock is not loaded yet.");
}
int unreadCnt = this.unreadCnt.decrementAndGet();
if (unreadCnt <= 0) {
listener.markRead(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ void evict() {
}

private void evict0() {
// TODO: avoid awake more tasks than necessary
while (sizeLimiter.requiredRelease()) {
Map.Entry<DataBlockGroupKey, DataBlock> entry = null;
if (!inactive.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;

import static com.automq.stream.utils.FutureUtil.exec;

@EventLoopSafe
public class StreamReader {
public static final int GET_OBJECT_STEP = 4;
Expand Down Expand Up @@ -86,24 +88,24 @@ CompletableFuture<ReadDataBlock> read(long startOffset, long endOffset, int maxB
ReadContext readContext = new ReadContext();
read0(readContext, startOffset, endOffset, maxBytes);
CompletableFuture<ReadDataBlock> retCf = new CompletableFuture<>();
readContext.cf.whenComplete((rst, ex) -> {
ex = FutureUtil.cause(ex);
if (ex != null) {
readContext.cf.whenComplete((rst, ex) -> exec(() -> {
Throwable cause = FutureUtil.cause(ex);
if (cause != null) {
readContext.records.forEach(StreamRecordBatch::release);
if (leftRetries > 0) {
if (ex instanceof ObjectNotExistException || ex instanceof NoSuchKeyException || ex instanceof BlockNotContinuousException) {
if (cause instanceof ObjectNotExistException || cause instanceof NoSuchKeyException || cause instanceof BlockNotContinuousException) {
// The cached blocks maybe invalid after object compaction, so we need to reset the blocks and retry read
resetBlocks();
FutureUtil.propagate(read(startOffset, endOffset, maxBytes, leftRetries - 1), retCf);
}
} else {
retCf.completeExceptionally(ex);
retCf.completeExceptionally(cause);
}
} else {
afterRead(rst);
afterRead(rst, readContext);
retCf.complete(rst);
}
});
}, retCf, LOGGER, "read"));
return retCf;
}

Expand Down Expand Up @@ -146,6 +148,7 @@ void read0(ReadContext ctx, long startOffset, long endOffset, int maxBytes) {
ctx.cf.completeExceptionally(failedBlock.get().exception);
return;
}
ctx.blocks.addAll(blocks);
int remainingSize = maxBytes;
long nextStartOffset = startOffset;
long nextEndOffset;
Expand Down Expand Up @@ -176,10 +179,15 @@ void read0(ReadContext ctx, long startOffset, long endOffset, int maxBytes) {
// So we may need to retry read to fulfill the endOffset or maxBytes
read0(ctx, nextStartOffset, endOffset, remainingSize);
}
}).whenComplete((nil, ex) -> blocks.forEach(Block::release));
}).whenComplete((nil, ex) -> {
blocks.forEach(Block::release);
if (ex != null) {
ctx.cf.completeExceptionally(ex);
}
});
}

void afterRead(ReadDataBlock readDataBlock) {
void afterRead(ReadDataBlock readDataBlock, ReadContext ctx) {
List<StreamRecordBatch> records = readDataBlock.getRecords();
if (!records.isEmpty()) {
nextReadOffset = records.get(records.size() - 1).getLastOffset();
Expand All @@ -189,20 +197,28 @@ void afterRead(ReadDataBlock readDataBlock) {
while (it.hasNext()) {
Block block = it.next().getValue();
if (block.index.endOffset() <= nextReadOffset) {
// #getDataBlock will invoke DataBlock#markUnread
block.data.markRead();
it.remove();
} else {
break;
}
}
// #getDataBlock will invoke DataBlock#markUnread
ctx.blocks.forEach(b -> b.data.markRead());
// try readahead to accelerate the next read
readahead.tryReadahead();
}

private CompletableFuture<List<Block>> getBlocks(long startOffset, long endOffset, int maxBytes) {
GetBlocksContext context = new GetBlocksContext();
getBlocks0(context, startOffset, endOffset, maxBytes);
try {
getBlocks0(context, startOffset, endOffset, maxBytes);
} catch (Throwable ex) {
context.cf.completeExceptionally(ex);
}
context.cf.exceptionally(ex -> {
context.blocks.forEach(b -> b.loadCf.thenAccept(nil -> b.release()));
return null;
});
return context.cf;
}

Expand Down Expand Up @@ -359,6 +375,7 @@ static class GetBlocksContext {

static class ReadContext {
List<StreamRecordBatch> records = new LinkedList<>();
List<Block> blocks = new ArrayList<>();
CacheAccessType accessType = CacheAccessType.BLOCK_CACHE_HIT;
CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.TestUtils;
import com.automq.stream.s3.cache.ReadDataBlock;
import com.automq.stream.s3.exceptions.ObjectNotExistException;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.threads.EventLoop;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -31,10 +36,12 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
Expand Down Expand Up @@ -102,7 +109,7 @@ void setup() {
}

@Test
public void testRead_withReadahead() throws ExecutionException, InterruptedException {
public void testRead_withReadahead() throws ExecutionException, InterruptedException, TimeoutException {
// user read get objects
when(objectManager.getObjects(eq(STREAM_ID), eq(0L), eq(-1L), eq(StreamReader.GET_OBJECT_STEP)))
.thenReturn(CompletableFuture.completedFuture(List.of(objects.get(0L).metadata, objects.get(1L).metadata, objects.get(2L).metadata, objects.get(3L).metadata)));
Expand Down Expand Up @@ -182,6 +189,30 @@ public void testRead_withReadahead() throws ExecutionException, InterruptedExcep
assertEquals(14L, streamReader.readahead.readaheadMarkOffset);
assertEquals(1024L * 1024 * 2, streamReader.readahead.nextReadaheadSize);
assertEquals(14, dataBlockCache.caches[0].inactive.size());


when(objectManager.isObjectExist(anyLong())).thenReturn(false);
eventLoops[0].submit(() -> readCf.set(streamReader.read(14L, 15L, Integer.MAX_VALUE))).get();
Throwable ex = null;
try {
readCf.get().get(1, TimeUnit.SECONDS);
} catch (Throwable e) {
ex = FutureUtil.cause(e);
}
assertInstanceOf(ObjectNotExistException.class, ex);

AtomicBoolean failed = new AtomicBoolean(false);
doAnswer(args -> {
if (failed.get()) {
return true;
} else {
failed.set(true);
return false;
}
}).when(objectManager).isObjectExist(anyLong());
eventLoops[0].submit(() -> readCf.set(streamReader.read(14L, 15L, Integer.MAX_VALUE))).get();
rst = readCf.get().get(1, TimeUnit.SECONDS);

}

}

0 comments on commit 1ea143e

Please sign in to comment.