diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index 9d113ea07d..c951c3cda1 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -17,6 +17,8 @@ package kafka.log +import kafka.log.streamaspect.ElasticLogFileRecords.{BatchIteratorRecordsAdaptor, PooledMemoryRecords} + import java.io.{File, IOException} import java.nio.file.Files import java.text.NumberFormat @@ -31,7 +33,6 @@ import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.utils.{Time, Utils} import kafka.log.streamaspect.ElasticLogManager -import kafka.log.streamaspect.ElasticLogFileRecords.BatchIteratorRecordsAdaptor import java.util.concurrent.{CompletableFuture, ExecutionException} import scala.jdk.CollectionConverters._ @@ -435,9 +436,11 @@ class LocalLog(@volatile protected var _dir: File, if (fetchDataInfo != null) { if (includeAbortedTxns) { // AutoMQ for Kafka inject start - val upperBoundOpt = fetchDataInfo.records match { - case adaptor: BatchIteratorRecordsAdaptor => - Some(adaptor.lastOffset()) + val upperBoundOpt = fetchDataInfo.records match { + case records: PooledMemoryRecords => + Some(records.lastOffset()) + case adapter: BatchIteratorRecordsAdaptor => + Some(adapter.lastOffset()) case _ => None } diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala index 495a650dbd..a3aeb9811e 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala @@ -20,7 +20,7 @@ package kafka.log.streamaspect import com.automq.stream.api.{Client, CreateStreamOptions, KeyValue, OpenStreamOptions} import io.netty.buffer.Unpooled import kafka.log._ -import kafka.log.streamaspect.ElasticLogFileRecords.BatchIteratorRecordsAdaptor +import kafka.log.streamaspect.ElasticLogFileRecords.{BatchIteratorRecordsAdaptor, PooledMemoryRecords} import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsUtil} import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.epoch.EpochEntry @@ -345,8 +345,10 @@ class ElasticLog(val metaStream: MetaStream, } else { if (includeAbortedTxns) { val upperBoundOpt = fetchDataInfo.records match { - case adaptor: BatchIteratorRecordsAdaptor => - Some(adaptor.lastOffset()) + case records: PooledMemoryRecords => + Some(records.lastOffset()) + case adapter: BatchIteratorRecordsAdaptor => + Some(adapter.lastOffset()) case _ => None } diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java index 375b164858..64e9f71b45 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java @@ -99,7 +99,7 @@ public long appendedOffset() { } public CompletableFuture read(long startOffset, long maxOffset, int maxSize) { - if (ReadManualReleaseHint.isMarked()) { + if (ReadAllHint.isMarked()) { return readAll0(startOffset, maxOffset, maxSize); } else { return CompletableFuture.completedFuture(new BatchIteratorRecordsAdaptor(this, startOffset, maxOffset, maxSize)); @@ -276,16 +276,20 @@ protected RecordBatchIterator batchIterator(long startOffset, long public static class PooledMemoryRecords extends AbstractRecords implements PooledResource { private final List fetchResults; private final MemoryRecords memoryRecords; + private final long lastOffset; private PooledMemoryRecords(List fetchResults) { this.fetchResults = fetchResults; CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(); + long lastOffset = 0; for (FetchResult fetchResult : fetchResults) { for (RecordBatchWithContext recordBatchWithContext : fetchResult.recordBatchList()) { compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(recordBatchWithContext.rawPayload())); + lastOffset = recordBatchWithContext.lastOffset(); } } this.memoryRecords = MemoryRecords.readableRecords(compositeByteBuf.nioBuffer()); + this.lastOffset = lastOffset; } public static PooledMemoryRecords of(List fetchResults) { @@ -322,6 +326,10 @@ public void release() { fetchResults.forEach(FetchResult::free); fetchResults.clear(); } + + public long lastOffset() { + return lastOffset; + } } static class StreamSegmentInputStream implements LogInputStream { diff --git a/core/src/main/scala/kafka/log/streamaspect/ReadManualReleaseHint.java b/core/src/main/scala/kafka/log/streamaspect/ReadAllHint.java similarity index 81% rename from core/src/main/scala/kafka/log/streamaspect/ReadManualReleaseHint.java rename to core/src/main/scala/kafka/log/streamaspect/ReadAllHint.java index 634c71dce4..f4ead47549 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ReadManualReleaseHint.java +++ b/core/src/main/scala/kafka/log/streamaspect/ReadAllHint.java @@ -20,9 +20,9 @@ import java.util.concurrent.atomic.AtomicBoolean; -public class ReadManualReleaseHint { +public class ReadAllHint { - public static final FastThreadLocal MANUAL_RELEASE = new FastThreadLocal() { + public static final FastThreadLocal HINT = new FastThreadLocal() { @Override protected AtomicBoolean initialValue() { return new AtomicBoolean(false); @@ -30,15 +30,15 @@ protected AtomicBoolean initialValue() { }; public static boolean isMarked() { - return MANUAL_RELEASE.get().get(); + return HINT.get().get(); } public static void mark() { - MANUAL_RELEASE.get().set(true); + HINT.get().set(true); } public static void reset() { - MANUAL_RELEASE.get().set(false); + HINT.get().set(false); } } diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 7ef79da3e7..8ba5862d18 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.log.streamaspect.ReadManualReleaseHint +import kafka.log.streamaspect.ReadAllHint import java.util.concurrent.TimeUnit import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.common.TopicIdPartition @@ -163,14 +163,14 @@ class DelayedFetch( } // AutoMQ for Kafka inject start - ReadManualReleaseHint.mark() - val logReadResults = replicaManager.readFromLocalLog( + ReadAllHint.mark() + val logReadResults = replicaManager.readAsyncFromLocalLog( params, fetchInfos, quota, readFromPurgatory = true ) - ReadManualReleaseHint.reset() + ReadAllHint.reset() // AutoMQ for Kafka inject end val fetchPartitionData = logReadResults.map { case (tp, result) => diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f9c45a494e..fe344829e8 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -25,7 +25,7 @@ import kafka.controller.ReplicaAssignment import kafka.coordinator.group._ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} import kafka.log.AppendOrigin -import kafka.log.streamaspect.{ElasticLogManager, ReadManualReleaseHint} +import kafka.log.streamaspect.{ElasticLogManager, ReadAllHint} import kafka.message.ZStdCompressionCodec import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsUtil} import kafka.network.RequestChannel @@ -1093,9 +1093,9 @@ class KafkaApis(val requestChannel: RequestChannel, // The fetching is done is a separate thread pool to avoid blocking io thread. fetchingExecutors.submit(new Runnable { override def run(): Unit = { - ReadManualReleaseHint.mark() + ReadAllHint.mark() doFetchingRecords() - ReadManualReleaseHint.reset() + ReadAllHint.reset() } }) } else {