Skip to content

Commit

Permalink
KAFKA-17803: LogSegment#read should return the base offset of the bat…
Browse files Browse the repository at this point in the history
…ch that contains startOffset rather than startOffset (apache#17528)

Reviewers: Jose Sancio <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
kevin-wu24 authored Nov 1, 2024
1 parent e14a81b commit 568b9e8
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,18 +306,18 @@ public int writeTo(TransferableChannel destChannel, int offset, int length) thro
}

/**
* Search forward for the file position of the last offset that is greater than or equal to the target offset
* and return its physical position and the size of the message (including log overhead) at the returned offset. If
* no such offsets are found, return null.
* Search forward for the file position of the message batch whose last offset that is greater
* than or equal to the target offset. If no such batch is found, return null.
*
* @param targetOffset The offset to search for.
* @param startingPosition The starting position in the file to begin searching from.
* @return the batch's base offset, its physical position, and its size (including log overhead)
*/
public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
long offset = batch.lastOffset();
if (offset >= targetOffset)
return new LogOffsetPosition(offset, batch.position(), batch.sizeInBytes());
return new LogOffsetPosition(batch.baseOffset(), batch.position(), batch.sizeInBytes());
}
return null;
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1149,8 +1149,8 @@ class PartitionTest extends AbstractPartitionTest {

// let the follower in ISR move leader's HW to move further but below LEO
fetchFollower(partition, replicaId = follower2, fetchOffset = 0)
fetchFollower(partition, replicaId = follower2, fetchOffset = lastOffsetOfFirstBatch)
assertEquals(lastOffsetOfFirstBatch, partition.log.get.highWatermark, "Expected leader's HW")
fetchFollower(partition, replicaId = follower2, fetchOffset = lastOffsetOfFirstBatch + 1)
assertEquals(lastOffsetOfFirstBatch + 1, partition.log.get.highWatermark, "Expected leader's HW")

// current leader becomes follower and then leader again (without any new records appended)
val followerState = new LeaderAndIsrPartitionState()
Expand Down
34 changes: 22 additions & 12 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import java.nio.file.Files
import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit}
import java.util.{Optional, OptionalLong, Properties}
import scala.annotation.nowarn
import scala.collection.immutable.SortedSet
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.{RichOptional, RichOptionalInt}
Expand Down Expand Up @@ -324,7 +325,7 @@ class UnifiedLogTest {
assertHighWatermark(4L)
}

private def assertNonEmptyFetch(log: UnifiedLog, offset: Long, isolation: FetchIsolation): Unit = {
private def assertNonEmptyFetch(log: UnifiedLog, offset: Long, isolation: FetchIsolation, batchBaseOffset: Long): Unit = {
val readInfo = log.read(startOffset = offset,
maxLength = Int.MaxValue,
isolation = isolation,
Expand All @@ -342,18 +343,18 @@ class UnifiedLogTest {
for (record <- readInfo.records.records.asScala)
assertTrue(record.offset < upperBoundOffset)

assertEquals(offset, readInfo.fetchOffsetMetadata.messageOffset)
assertEquals(batchBaseOffset, readInfo.fetchOffsetMetadata.messageOffset)
assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata)
}

private def assertEmptyFetch(log: UnifiedLog, offset: Long, isolation: FetchIsolation): Unit = {
private def assertEmptyFetch(log: UnifiedLog, offset: Long, isolation: FetchIsolation, batchBaseOffset: Long): Unit = {
val readInfo = log.read(startOffset = offset,
maxLength = Int.MaxValue,
isolation = isolation,
minOneMessage = true)
assertFalse(readInfo.firstEntryIncomplete)
assertEquals(0, readInfo.records.sizeInBytes)
assertEquals(offset, readInfo.fetchOffsetMetadata.messageOffset)
assertEquals(batchBaseOffset, readInfo.fetchOffsetMetadata.messageOffset)
assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata)
}

Expand All @@ -371,9 +372,11 @@ class UnifiedLogTest {
new SimpleRecord("3".getBytes),
new SimpleRecord("4".getBytes)
)), leaderEpoch = 0)
val batchBaseOffsets = SortedSet[Long](0, 3, 5)

(log.logStartOffset until log.logEndOffset).foreach { offset =>
assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END)
val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END, batchBaseOffset)
}
}

Expand All @@ -391,14 +394,17 @@ class UnifiedLogTest {
new SimpleRecord("3".getBytes),
new SimpleRecord("4".getBytes)
)), leaderEpoch = 0)
val batchBaseOffsets = SortedSet[Long](0, 3, 5)

def assertHighWatermarkBoundedFetches(): Unit = {
(log.logStartOffset until log.highWatermark).foreach { offset =>
assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK, batchBaseOffset)
}

(log.highWatermark to log.logEndOffset).foreach { offset =>
assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK, batchBaseOffset)
}
}

Expand Down Expand Up @@ -488,13 +494,17 @@ class UnifiedLogTest {
LogTestUtils.appendNonTransactionalAsLeader(log, 2)
appendProducer1(10)

val batchBaseOffsets = SortedSet[Long](0, 5, 8, 10, 14, 16, 26, 27, 28)

def assertLsoBoundedFetches(): Unit = {
(log.logStartOffset until log.lastStableOffset).foreach { offset =>
assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED)
val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED, batchBaseOffset)
}

(log.lastStableOffset to log.logEndOffset).foreach { offset =>
assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED)
val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED, batchBaseOffset)
}
}

Expand Down Expand Up @@ -3464,13 +3474,13 @@ class UnifiedLogTest {
new SimpleRecord("c".getBytes)), 5)


log.updateHighWatermark(2L)
log.updateHighWatermark(3L)
var offsets: LogOffsetSnapshot = log.fetchOffsetSnapshot
assertEquals(offsets.highWatermark.messageOffset, 2L)
assertEquals(offsets.highWatermark.messageOffset, 3L)
assertFalse(offsets.highWatermark.messageOffsetOnly)

offsets = log.fetchOffsetSnapshot
assertEquals(offsets.highWatermark.messageOffset, 2L)
assertEquals(offsets.highWatermark.messageOffset, 3L)
assertFalse(offsets.highWatermark.messageOffsetOnly)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ public LogOffsetPosition translateOffset(long offset) throws IOException {
}

/**
* Find the physical file position for the first message with offset >= the requested offset.
* Find the physical file position for the message batch that contains the requested offset.
*
* The startingFilePosition argument is an optimization that can be used if we already know a valid starting position
* in the file higher than the greatest-lower-bound from the index.
Expand All @@ -382,8 +382,8 @@ public LogOffsetPosition translateOffset(long offset) throws IOException {
* @param offset The offset we want to translate
* @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
* when omitted, the search will begin at the position in the offset index.
* @return The position in the log storing the message with the least offset >= the requested offset and the size of the
* message or null if no message meets this criteria.
* @return The base offset, position in the log, and size of the message batch that contains the requested offset,
* or null if no such batch is found.
*/
LogOffsetPosition translateOffset(long offset, int startingFilePosition) throws IOException {
OffsetPosition mapping = offsetIndex().lookup(offset);
Expand All @@ -409,17 +409,17 @@ public FetchDataInfo read(long startOffset, int maxSize, long maxPosition) throw
}

/**
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* Read a message set from this segment that contains startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
*
* This method is thread-safe.
*
* @param startOffset A lower bound on the first offset to include in the message set we read
* @param startOffset The logical log offset we are trying to read
* @param maxSize The maximum number of bytes to include in the message set we read
* @param maxPositionOpt The maximum position in the log segment that should be exposed for read
* @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists)
*
* @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
* @return The fetched data and the base offset metadata of the message batch that contains startOffset,
* or null if the startOffset is larger than the largest offset in this log
*/
public FetchDataInfo read(long startOffset, int maxSize, Optional<Long> maxPositionOpt, boolean minOneMessage) throws IOException {
Expand All @@ -433,7 +433,7 @@ public FetchDataInfo read(long startOffset, int maxSize, Optional<Long> maxPosit
return null;

int startPosition = startOffsetAndSize.position;
LogOffsetMetadata offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition);
LogOffsetMetadata offsetMetadata = new LogOffsetMetadata(startOffsetAndSize.offset, this.baseOffset, startPosition);

int adjustedMaxSize = maxSize;
if (minOneMessage)
Expand Down
Loading

0 comments on commit 568b9e8

Please sign in to comment.