Skip to content

Commit

Permalink
revert(s3stream): "perf(s3stream): only read the first record from lo…
Browse files Browse the repository at this point in the history
…g cache before reading block cache" (#1038)

Revert "perf(s3stream): only read the first record from log cache before read…"

This reverts commit 46ccc8a.
  • Loading branch information
Chillax-0v0 authored Mar 28, 2024
1 parent 3c8fe2f commit 6b39075
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 59 deletions.
70 changes: 31 additions & 39 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -446,56 +446,48 @@ private CompletableFuture<ReadDataBlock> read0(FetchContext context,
@SpanAttribute long startOffset,
@SpanAttribute long endOffset,
@SpanAttribute int maxBytes) {
if (maxBytes <= 0 || startOffset >= endOffset) {
return CompletableFuture.completedFuture(new ReadDataBlock(Collections.emptyList(), CacheAccessType.DELTA_WAL_CACHE_HIT));
List<StreamRecordBatch> logCacheRecords = deltaWALCache.get(context, streamId, startOffset, endOffset, maxBytes);
if (!logCacheRecords.isEmpty() && logCacheRecords.get(0).getBaseOffset() <= startOffset) {
return CompletableFuture.completedFuture(new ReadDataBlock(logCacheRecords, CacheAccessType.DELTA_WAL_CACHE_HIT));
}

Long firstRecordBaseOffset = deltaWALCache.getFirstRecordBaseOffset(streamId, startOffset, endOffset);
if (null != firstRecordBaseOffset && firstRecordBaseOffset <= startOffset) {
// The first record is in the cache, so we can read all from the cache.
List<StreamRecordBatch> records = deltaWALCache.get(context, streamId, startOffset, endOffset, maxBytes);
// Re-check the first record, to avoid that it was removed from the cache.
if (records.isEmpty() || records.get(0).getBaseOffset() != firstRecordBaseOffset) {
// The first record changed, which means it was removed from the cache. This rarely happens, so we just retry.
records.forEach(StreamRecordBatch::release);
records.clear();
return read0(context, streamId, startOffset, endOffset, maxBytes);
}
return CompletableFuture.completedFuture(new ReadDataBlock(records, CacheAccessType.DELTA_WAL_CACHE_HIT));
}

if (context.readOptions().fastRead()) {
// Fast read fail fast when need read from block cache.
// fast read fail fast when need read from block cache.
logCacheRecords.forEach(StreamRecordBatch::release);
logCacheRecords.clear();
return CompletableFuture.failedFuture(FAST_READ_FAIL_FAST_EXCEPTION);
}

final long blockCacheEndOffset;
if (null != firstRecordBaseOffset) {
// Update the end offset (which will be used to read from block cache) to the first record.
blockCacheEndOffset = firstRecordBaseOffset;
} else {
blockCacheEndOffset = endOffset;
if (!logCacheRecords.isEmpty()) {
endOffset = logCacheRecords.get(0).getBaseOffset();
}
Timeout timeout = timeoutDetect.newTimeout(t -> LOGGER.warn("read from block cache timeout, stream={}, {}-{}, maxBytes: {}", streamId, startOffset, blockCacheEndOffset, maxBytes), 1, TimeUnit.MINUTES);
return blockCache.read(context, streamId, startOffset, blockCacheEndOffset, maxBytes).thenCompose(blockCacheRst -> {
Timeout timeout = timeoutDetect.newTimeout(t -> LOGGER.warn("read from block cache timeout, stream={}, {}, maxBytes: {}", streamId, startOffset, maxBytes), 1, TimeUnit.MINUTES);
long finalEndOffset = endOffset;
return blockCache.read(context, streamId, startOffset, endOffset, maxBytes).thenApply(blockCacheRst -> {
List<StreamRecordBatch> rst = new ArrayList<>(blockCacheRst.getRecords());
assert !rst.isEmpty();
int remainingBytesSize = maxBytes - rst.stream().mapToInt(StreamRecordBatch::size).sum();
return read0(context, streamId, rst.get(rst.size() - 1).getLastOffset(), endOffset, remainingBytesSize).thenApply(tailRst -> {
rst.addAll(tailRst.getRecords());
try {
continuousCheck(rst);
} catch (IllegalArgumentException e) {
rst.forEach(StreamRecordBatch::release);
throw e;
}
return new ReadDataBlock(rst, blockCacheRst.getCacheAccessType());
});
int readIndex = -1;
for (int i = 0; i < logCacheRecords.size() && remainingBytesSize > 0; i++) {
readIndex = i;
StreamRecordBatch record = logCacheRecords.get(i);
rst.add(record);
remainingBytesSize -= record.size();
}
try {
continuousCheck(rst);
} catch (IllegalArgumentException e) {
blockCacheRst.getRecords().forEach(StreamRecordBatch::release);
throw e;
}
if (readIndex < logCacheRecords.size()) {
// release unnecessary record
logCacheRecords.subList(readIndex + 1, logCacheRecords.size()).forEach(StreamRecordBatch::release);
}
return new ReadDataBlock(rst, blockCacheRst.getCacheAccessType());
}).whenComplete((rst, ex) -> {
timeout.cancel();
if (ex != null) {
LOGGER.error("read from block cache failed, stream={}, {}-{}, maxBytes: {}",
streamId, startOffset, blockCacheEndOffset, maxBytes, ex);
streamId, startOffset, finalEndOffset, maxBytes, ex);
logCacheRecords.forEach(StreamRecordBatch::release);
}
});
}
Expand Down
20 changes: 0 additions & 20 deletions s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,26 +149,6 @@ public List<StreamRecordBatch> get(TraceContext context,
return records;
}

/**
* Get base offset of the first record in the range [startOffset, endOffset) of the stream.
* It is similar to {@link #get(TraceContext, long, long, long, int)}, but some differences:
* - It only returns the first record batch in the range if it exists.
* - It does not record metrics or trace.
* - It does not retain any records.
*/
public Long getFirstRecordBaseOffset(long streamId, long startOffset, long endOffset) {
readLock.lock();
try {
// set maxBytes to 1 to get the first record
return get0(streamId, startOffset, endOffset, 1).stream()
.findFirst()
.map(StreamRecordBatch::getBaseOffset)
.orElse(null);
} finally {
readLock.unlock();
}
}

public List<StreamRecordBatch> get0(long streamId, long startOffset, long endOffset, int maxBytes) {
List<StreamRecordBatch> rst = new LinkedList<>();
long nextStartOffset = startOffset;
Expand Down

0 comments on commit 6b39075

Please sign in to comment.