Skip to content

Commit

Permalink
fix(issue440): get lastOffst from records (#441)
Browse files Browse the repository at this point in the history
* fix(issue440): get lastOffst from records

Signed-off-by: Robin Han <[email protected]>

* fix: unit test

Signed-off-by: Robin Han <[email protected]>

---------

Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Nov 13, 2023
1 parent 0fb6df7 commit d021080
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 20 deletions.
11 changes: 7 additions & 4 deletions core/src/main/scala/kafka/log/LocalLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package kafka.log

import kafka.log.streamaspect.ElasticLogFileRecords.{BatchIteratorRecordsAdaptor, PooledMemoryRecords}

import java.io.{File, IOException}
import java.nio.file.Files
import java.text.NumberFormat
Expand All @@ -31,7 +33,6 @@ import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.utils.{Time, Utils}
import kafka.log.streamaspect.ElasticLogManager
import kafka.log.streamaspect.ElasticLogFileRecords.BatchIteratorRecordsAdaptor

import java.util.concurrent.{CompletableFuture, ExecutionException}
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -435,9 +436,11 @@ class LocalLog(@volatile protected var _dir: File,
if (fetchDataInfo != null) {
if (includeAbortedTxns) {
// AutoMQ for Kafka inject start
val upperBoundOpt = fetchDataInfo.records match {
case adaptor: BatchIteratorRecordsAdaptor =>
Some(adaptor.lastOffset())
val upperBoundOpt = fetchDataInfo.records match {
case records: PooledMemoryRecords =>
Some(records.lastOffset())
case adapter: BatchIteratorRecordsAdaptor =>
Some(adapter.lastOffset())
case _ =>
None
}
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kafka.log.streamaspect
import com.automq.stream.api.{Client, CreateStreamOptions, KeyValue, OpenStreamOptions}
import io.netty.buffer.Unpooled
import kafka.log._
import kafka.log.streamaspect.ElasticLogFileRecords.BatchIteratorRecordsAdaptor
import kafka.log.streamaspect.ElasticLogFileRecords.{BatchIteratorRecordsAdaptor, PooledMemoryRecords}
import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsUtil}
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.EpochEntry
Expand Down Expand Up @@ -345,8 +345,10 @@ class ElasticLog(val metaStream: MetaStream,
} else {
if (includeAbortedTxns) {
val upperBoundOpt = fetchDataInfo.records match {
case adaptor: BatchIteratorRecordsAdaptor =>
Some(adaptor.lastOffset())
case records: PooledMemoryRecords =>
Some(records.lastOffset())
case adapter: BatchIteratorRecordsAdaptor =>
Some(adapter.lastOffset())
case _ =>
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public long appendedOffset() {
}

public CompletableFuture<Records> read(long startOffset, long maxOffset, int maxSize) {
if (ReadManualReleaseHint.isMarked()) {
if (ReadAllHint.isMarked()) {
return readAll0(startOffset, maxOffset, maxSize);
} else {
return CompletableFuture.completedFuture(new BatchIteratorRecordsAdaptor(this, startOffset, maxOffset, maxSize));
Expand Down Expand Up @@ -276,16 +276,20 @@ protected RecordBatchIterator<RecordBatch> batchIterator(long startOffset, long
public static class PooledMemoryRecords extends AbstractRecords implements PooledResource {
private final List<FetchResult> fetchResults;
private final MemoryRecords memoryRecords;
private final long lastOffset;

private PooledMemoryRecords(List<FetchResult> fetchResults) {
this.fetchResults = fetchResults;
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
long lastOffset = 0;
for (FetchResult fetchResult : fetchResults) {
for (RecordBatchWithContext recordBatchWithContext : fetchResult.recordBatchList()) {
compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(recordBatchWithContext.rawPayload()));
lastOffset = recordBatchWithContext.lastOffset();
}
}
this.memoryRecords = MemoryRecords.readableRecords(compositeByteBuf.nioBuffer());
this.lastOffset = lastOffset;
}

public static PooledMemoryRecords of(List<FetchResult> fetchResults) {
Expand Down Expand Up @@ -322,6 +326,10 @@ public void release() {
fetchResults.forEach(FetchResult::free);
fetchResults.clear();
}

public long lastOffset() {
return lastOffset;
}
}

static class StreamSegmentInputStream implements LogInputStream<RecordBatch> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,25 @@

import java.util.concurrent.atomic.AtomicBoolean;

public class ReadManualReleaseHint {
public class ReadAllHint {

public static final FastThreadLocal<AtomicBoolean> MANUAL_RELEASE = new FastThreadLocal<AtomicBoolean>() {
public static final FastThreadLocal<AtomicBoolean> HINT = new FastThreadLocal<AtomicBoolean>() {
@Override
protected AtomicBoolean initialValue() {
return new AtomicBoolean(false);
}
};

public static boolean isMarked() {
return MANUAL_RELEASE.get().get();
return HINT.get().get();
}

public static void mark() {
MANUAL_RELEASE.get().set(true);
HINT.get().set(true);
}

public static void reset() {
MANUAL_RELEASE.get().set(false);
HINT.get().set(false);
}

}
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package kafka.server

import kafka.log.streamaspect.ReadManualReleaseHint
import kafka.log.streamaspect.ReadAllHint
import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.common.TopicIdPartition
Expand Down Expand Up @@ -163,14 +163,14 @@ class DelayedFetch(
}

// AutoMQ for Kafka inject start
ReadManualReleaseHint.mark()
val logReadResults = replicaManager.readFromLocalLog(
ReadAllHint.mark()
val logReadResults = replicaManager.readAsyncFromLocalLog(
params,
fetchInfos,
quota,
readFromPurgatory = true
)
ReadManualReleaseHint.reset()
ReadAllHint.reset()
// AutoMQ for Kafka inject end

val fetchPartitionData = logReadResults.map { case (tp, result) =>
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.controller.ReplicaAssignment
import kafka.coordinator.group._
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.log.AppendOrigin
import kafka.log.streamaspect.{ElasticLogManager, ReadManualReleaseHint}
import kafka.log.streamaspect.{ElasticLogManager, ReadAllHint}
import kafka.message.ZStdCompressionCodec
import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsUtil}
import kafka.network.RequestChannel
Expand Down Expand Up @@ -1093,9 +1093,9 @@ class KafkaApis(val requestChannel: RequestChannel,
// The fetching is done is a separate thread pool to avoid blocking io thread.
fetchingExecutors.submit(new Runnable {
override def run(): Unit = {
ReadManualReleaseHint.mark()
ReadAllHint.mark()
doFetchingRecords()
ReadManualReleaseHint.reset()
ReadAllHint.reset()
}
})
} else {
Expand Down

0 comments on commit d021080

Please sign in to comment.