From 568b9e8a6ce9d7982ed4d3da35eaf26dd590f006 Mon Sep 17 00:00:00 2001 From: kevin-wu24 <66326898+kevin-wu24@users.noreply.github.com> Date: Fri, 1 Nov 2024 09:32:00 -0700 Subject: [PATCH] KAFKA-17803: LogSegment#read should return the base offset of the batch that contains startOffset rather than startOffset (#17528) Reviewers: Jose Sancio , Jun Rao --- .../kafka/common/record/FileRecords.java | 8 +- .../unit/kafka/cluster/PartitionTest.scala | 4 +- .../scala/unit/kafka/log/UnifiedLogTest.scala | 34 +++++--- .../storage/internals/log/LogSegment.java | 14 +-- .../storage/internals/log/LogSegmentTest.java | 86 ++++++++++++------- 5 files changed, 88 insertions(+), 58 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 84eb6f20ac5f7..64dd73de41212 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -306,18 +306,18 @@ public int writeTo(TransferableChannel destChannel, int offset, int length) thro } /** - * Search forward for the file position of the last offset that is greater than or equal to the target offset - * and return its physical position and the size of the message (including log overhead) at the returned offset. If - * no such offsets are found, return null. + * Search forward for the file position of the message batch whose last offset that is greater + * than or equal to the target offset. If no such batch is found, return null. * * @param targetOffset The offset to search for. * @param startingPosition The starting position in the file to begin searching from. + * @return the batch's base offset, its physical position, and its size (including log overhead) */ public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) { for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) { long offset = batch.lastOffset(); if (offset >= targetOffset) - return new LogOffsetPosition(offset, batch.position(), batch.sizeInBytes()); + return new LogOffsetPosition(batch.baseOffset(), batch.position(), batch.sizeInBytes()); } return null; } diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 7023fc270a50a..fbaf556df7d66 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -1149,8 +1149,8 @@ class PartitionTest extends AbstractPartitionTest { // let the follower in ISR move leader's HW to move further but below LEO fetchFollower(partition, replicaId = follower2, fetchOffset = 0) - fetchFollower(partition, replicaId = follower2, fetchOffset = lastOffsetOfFirstBatch) - assertEquals(lastOffsetOfFirstBatch, partition.log.get.highWatermark, "Expected leader's HW") + fetchFollower(partition, replicaId = follower2, fetchOffset = lastOffsetOfFirstBatch + 1) + assertEquals(lastOffsetOfFirstBatch + 1, partition.log.get.highWatermark, "Expected leader's HW") // current leader becomes follower and then leader again (without any new records appended) val followerState = new LeaderAndIsrPartitionState() diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index b3553c5b8ed92..61a93102a02f0 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -58,6 +58,7 @@ import java.nio.file.Files import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit} import java.util.{Optional, OptionalLong, Properties} import scala.annotation.nowarn +import scala.collection.immutable.SortedSet import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters.{RichOptional, RichOptionalInt} @@ -324,7 +325,7 @@ class UnifiedLogTest { assertHighWatermark(4L) } - private def assertNonEmptyFetch(log: UnifiedLog, offset: Long, isolation: FetchIsolation): Unit = { + private def assertNonEmptyFetch(log: UnifiedLog, offset: Long, isolation: FetchIsolation, batchBaseOffset: Long): Unit = { val readInfo = log.read(startOffset = offset, maxLength = Int.MaxValue, isolation = isolation, @@ -342,18 +343,18 @@ class UnifiedLogTest { for (record <- readInfo.records.records.asScala) assertTrue(record.offset < upperBoundOffset) - assertEquals(offset, readInfo.fetchOffsetMetadata.messageOffset) + assertEquals(batchBaseOffset, readInfo.fetchOffsetMetadata.messageOffset) assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata) } - private def assertEmptyFetch(log: UnifiedLog, offset: Long, isolation: FetchIsolation): Unit = { + private def assertEmptyFetch(log: UnifiedLog, offset: Long, isolation: FetchIsolation, batchBaseOffset: Long): Unit = { val readInfo = log.read(startOffset = offset, maxLength = Int.MaxValue, isolation = isolation, minOneMessage = true) assertFalse(readInfo.firstEntryIncomplete) assertEquals(0, readInfo.records.sizeInBytes) - assertEquals(offset, readInfo.fetchOffsetMetadata.messageOffset) + assertEquals(batchBaseOffset, readInfo.fetchOffsetMetadata.messageOffset) assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata) } @@ -371,9 +372,11 @@ class UnifiedLogTest { new SimpleRecord("3".getBytes), new SimpleRecord("4".getBytes) )), leaderEpoch = 0) + val batchBaseOffsets = SortedSet[Long](0, 3, 5) (log.logStartOffset until log.logEndOffset).foreach { offset => - assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END) + val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey + assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END, batchBaseOffset) } } @@ -391,14 +394,17 @@ class UnifiedLogTest { new SimpleRecord("3".getBytes), new SimpleRecord("4".getBytes) )), leaderEpoch = 0) + val batchBaseOffsets = SortedSet[Long](0, 3, 5) def assertHighWatermarkBoundedFetches(): Unit = { (log.logStartOffset until log.highWatermark).foreach { offset => - assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK) + val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey + assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK, batchBaseOffset) } (log.highWatermark to log.logEndOffset).foreach { offset => - assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK) + val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey + assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK, batchBaseOffset) } } @@ -488,13 +494,17 @@ class UnifiedLogTest { LogTestUtils.appendNonTransactionalAsLeader(log, 2) appendProducer1(10) + val batchBaseOffsets = SortedSet[Long](0, 5, 8, 10, 14, 16, 26, 27, 28) + def assertLsoBoundedFetches(): Unit = { (log.logStartOffset until log.lastStableOffset).foreach { offset => - assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED) + val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey + assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED, batchBaseOffset) } (log.lastStableOffset to log.logEndOffset).foreach { offset => - assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED) + val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey + assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED, batchBaseOffset) } } @@ -3464,13 +3474,13 @@ class UnifiedLogTest { new SimpleRecord("c".getBytes)), 5) - log.updateHighWatermark(2L) + log.updateHighWatermark(3L) var offsets: LogOffsetSnapshot = log.fetchOffsetSnapshot - assertEquals(offsets.highWatermark.messageOffset, 2L) + assertEquals(offsets.highWatermark.messageOffset, 3L) assertFalse(offsets.highWatermark.messageOffsetOnly) offsets = log.fetchOffsetSnapshot - assertEquals(offsets.highWatermark.messageOffset, 2L) + assertEquals(offsets.highWatermark.messageOffset, 3L) assertFalse(offsets.highWatermark.messageOffsetOnly) } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index 2b8e6b3aa6549..faf0ece60675f 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -372,7 +372,7 @@ public LogOffsetPosition translateOffset(long offset) throws IOException { } /** - * Find the physical file position for the first message with offset >= the requested offset. + * Find the physical file position for the message batch that contains the requested offset. * * The startingFilePosition argument is an optimization that can be used if we already know a valid starting position * in the file higher than the greatest-lower-bound from the index. @@ -382,8 +382,8 @@ public LogOffsetPosition translateOffset(long offset) throws IOException { * @param offset The offset we want to translate * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and * when omitted, the search will begin at the position in the offset index. - * @return The position in the log storing the message with the least offset >= the requested offset and the size of the - * message or null if no message meets this criteria. + * @return The base offset, position in the log, and size of the message batch that contains the requested offset, + * or null if no such batch is found. */ LogOffsetPosition translateOffset(long offset, int startingFilePosition) throws IOException { OffsetPosition mapping = offsetIndex().lookup(offset); @@ -409,17 +409,17 @@ public FetchDataInfo read(long startOffset, int maxSize, long maxPosition) throw } /** - * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include + * Read a message set from this segment that contains startOffset. The message set will include * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. * * This method is thread-safe. * - * @param startOffset A lower bound on the first offset to include in the message set we read + * @param startOffset The logical log offset we are trying to read * @param maxSize The maximum number of bytes to include in the message set we read * @param maxPositionOpt The maximum position in the log segment that should be exposed for read * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists) * - * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset, + * @return The fetched data and the base offset metadata of the message batch that contains startOffset, * or null if the startOffset is larger than the largest offset in this log */ public FetchDataInfo read(long startOffset, int maxSize, Optional maxPositionOpt, boolean minOneMessage) throws IOException { @@ -433,7 +433,7 @@ public FetchDataInfo read(long startOffset, int maxSize, Optional maxPosit return null; int startPosition = startOffsetAndSize.position; - LogOffsetMetadata offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition); + LogOffsetMetadata offsetMetadata = new LogOffsetMetadata(startOffsetAndSize.offset, this.baseOffset, startPosition); int adjustedMaxSize = maxSize; if (minOneMessage) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java index 695c19d420812..616671a65491b 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java @@ -93,7 +93,7 @@ private LogSegment createSegment(long offset, int indexIntervalBytes) throws IOE } /* Create a ByteBufferMessageSet for the given messages starting from the given offset */ - private MemoryRecords records(long offset, String... records) { + private MemoryRecords v1Records(long offset, String... records) { List simpleRecords = new ArrayList<>(); for (String s : records) { simpleRecords.add(new SimpleRecord(offset * 10, s.getBytes())); @@ -103,6 +103,16 @@ private MemoryRecords records(long offset, String... records) { Compression.NONE, TimestampType.CREATE_TIME, simpleRecords.toArray(new SimpleRecord[0])); } + private MemoryRecords v2Records(long offset, String... records) { + List simpleRecords = new ArrayList<>(); + for (String s : records) { + simpleRecords.add(new SimpleRecord(offset * 10, s.getBytes())); + } + return MemoryRecords.withRecords( + RecordBatch.MAGIC_VALUE_V2, offset, + Compression.NONE, TimestampType.CREATE_TIME, simpleRecords.toArray(new SimpleRecord[0])); + } + @BeforeEach public void setup() { logDir = TestUtils.tempDirectory(); @@ -134,7 +144,7 @@ public void teardown() throws IOException { public void testAppendForLogSegmentOffsetOverflowException(long baseOffset, long largestOffset) throws IOException { try (LogSegment seg = createSegment(baseOffset, 10, Time.SYSTEM)) { long currentTime = Time.SYSTEM.milliseconds(); - MemoryRecords memoryRecords = records(0, "hello"); + MemoryRecords memoryRecords = v1Records(0, "hello"); assertThrows(LogSegmentOffsetOverflowException.class, () -> seg.append(largestOffset, currentTime, largestOffset, memoryRecords)); } } @@ -157,20 +167,35 @@ public void testReadOnEmptySegment() throws IOException { @Test public void testReadBeforeFirstOffset() throws IOException { try (LogSegment seg = createSegment(40)) { - MemoryRecords ms = records(50, "hello", "there", "little", "bee"); + MemoryRecords ms = v1Records(50, "hello", "there", "little", "bee"); seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms); Records read = seg.read(41, 300).records; checkEquals(ms.records().iterator(), read.records().iterator()); } } + /** + * Reading from an offset in the middle of a batch should return a + * LogOffsetMetadata offset that points to the batch's base offset + */ + @Test + public void testReadFromMiddleOfBatch() throws IOException { + long batchBaseOffset = 50; + try (LogSegment seg = createSegment(40)) { + MemoryRecords ms = v2Records(batchBaseOffset, "hello", "there", "little", "bee"); + seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms); + FetchDataInfo readInfo = seg.read(52, 300); + assertEquals(batchBaseOffset, readInfo.fetchOffsetMetadata.messageOffset); + } + } + /** * If we read from an offset beyond the last offset in the segment we should get null */ @Test public void testReadAfterLast() throws IOException { try (LogSegment seg = createSegment(40)) { - MemoryRecords ms = records(50, "hello", "there"); + MemoryRecords ms = v1Records(50, "hello", "there"); seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms); FetchDataInfo read = seg.read(52, 200); assertNull(read, "Read beyond the last offset in the segment should give null"); @@ -184,9 +209,9 @@ public void testReadAfterLast() throws IOException { @Test public void testReadFromGap() throws IOException { try (LogSegment seg = createSegment(40)) { - MemoryRecords ms = records(50, "hello", "there"); + MemoryRecords ms = v1Records(50, "hello", "there"); seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms); - MemoryRecords ms2 = records(60, "alpha", "beta"); + MemoryRecords ms2 = v1Records(60, "alpha", "beta"); seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2); FetchDataInfo read = seg.read(55, 200); checkEquals(ms2.records().iterator(), read.records.records().iterator()); @@ -199,16 +224,11 @@ public void testReadWhenNoMaxPosition(boolean minOneMessage) throws IOException Optional maxPosition = Optional.empty(); int maxSize = 1; try (LogSegment seg = createSegment(40)) { - MemoryRecords ms = records(50, "hello", "there"); + MemoryRecords ms = v1Records(50, "hello", "there"); seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms); - // read before first offset - FetchDataInfo read = seg.read(48, maxSize, maxPosition, minOneMessage); - assertEquals(new LogOffsetMetadata(48, 40, 0), read.fetchOffsetMetadata); - assertFalse(read.records.records().iterator().hasNext()); - // read at first offset - read = seg.read(50, maxSize, maxPosition, minOneMessage); + FetchDataInfo read = seg.read(50, maxSize, maxPosition, minOneMessage); assertEquals(new LogOffsetMetadata(50, 40, 0), read.fetchOffsetMetadata); assertFalse(read.records.records().iterator().hasNext()); @@ -236,9 +256,9 @@ public void testTruncate() throws IOException { try (LogSegment seg = createSegment(40)) { long offset = 40; for (int i = 0; i < 30; i++) { - MemoryRecords ms1 = records(offset, "hello"); + MemoryRecords ms1 = v1Records(offset, "hello"); seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1); - MemoryRecords ms2 = records(offset + 1, "hello"); + MemoryRecords ms2 = v1Records(offset + 1, "hello"); seg.append(offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2); // check that we can read back both messages @@ -297,10 +317,10 @@ public void testTruncateEmptySegment() throws IOException { @Test public void testReloadLargestTimestampAndNextOffsetAfterTruncation() throws IOException { int numMessages = 30; - try (LogSegment seg = createSegment(40, 2 * records(0, "hello").sizeInBytes() - 1)) { + try (LogSegment seg = createSegment(40, 2 * v1Records(0, "hello").sizeInBytes() - 1)) { int offset = 40; for (int i = 0; i < numMessages; i++) { - seg.append(offset, offset, offset, records(offset, "hello")); + seg.append(offset, offset, offset, v1Records(offset, "hello")); offset++; } assertEquals(offset, seg.readNextOffset()); @@ -323,7 +343,7 @@ public void testTruncateFull() throws IOException { MockTime time = new MockTime(); try (LogSegment seg = createSegment(40, time)) { - seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there")); + seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, v1Records(40, "hello", "there")); // If the segment is empty after truncation, the create time should be reset time.sleep(500); @@ -335,7 +355,7 @@ public void testTruncateFull() throws IOException { assertFalse(seg.offsetIndex().isFull()); assertNull(seg.read(0, 1024), "Segment should be empty."); - seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there")); + seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, v1Records(40, "hello", "there")); } } @@ -344,11 +364,11 @@ public void testTruncateFull() throws IOException { */ @Test public void testFindOffsetByTimestamp() throws IOException { - int messageSize = records(0, "msg00").sizeInBytes(); + int messageSize = v1Records(0, "msg00").sizeInBytes(); try (LogSegment seg = createSegment(40, messageSize * 2 - 1)) { // Produce some messages for (int i = 40; i < 50; i++) { - seg.append(i, i * 10, i, records(i, "msg" + i)); + seg.append(i, i * 10, i, v1Records(i, "msg" + i)); } assertEquals(490, seg.largestTimestamp()); @@ -374,7 +394,7 @@ public void testFindOffsetByTimestamp() throws IOException { public void testNextOffsetCalculation() throws IOException { try (LogSegment seg = createSegment(40)) { assertEquals(40, seg.readNextOffset()); - seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you")); + seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, v1Records(50, "hello", "there", "you")); assertEquals(53, seg.readNextOffset()); } } @@ -417,7 +437,7 @@ public void testChangeFileSuffixes() throws IOException { public void testRecoveryFixesCorruptIndex() throws Exception { try (LogSegment seg = createSegment(0)) { for (int i = 0; i < 100; i++) { - seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, Integer.toString(i))); + seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, v1Records(i, Integer.toString(i))); } File indexFile = seg.offsetIndexFile(); writeNonsenseToFile(indexFile, 5, (int) indexFile.length()); @@ -547,7 +567,7 @@ private MemoryRecords endTxnRecords( public void testRecoveryFixesCorruptTimeIndex() throws IOException { try (LogSegment seg = createSegment(0)) { for (int i = 0; i < 100; i++) { - seg.append(i, i * 10, i, records(i, String.valueOf(i))); + seg.append(i, i * 10, i, v1Records(i, String.valueOf(i))); } File timeIndexFile = seg.timeIndexFile(); writeNonsenseToFile(timeIndexFile, 5, (int) timeIndexFile.length()); @@ -570,7 +590,7 @@ public void testRecoveryWithCorruptMessage() throws IOException { for (int ignore = 0; ignore < 10; ignore++) { try (LogSegment seg = createSegment(0)) { for (int i = 0; i < messagesAppended; i++) { - seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, String.valueOf(i))); + seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, v1Records(i, String.valueOf(i))); } int offsetToBeginCorruption = TestUtils.RANDOM.nextInt(messagesAppended); // start corrupting somewhere in the middle of the chosen record all the way to the end @@ -606,9 +626,9 @@ public void testCreateWithInitFileSizeAppendMessage() throws IOException { try (LogSegment seg = LogSegment.open(tempDir, 40, logConfig, Time.SYSTEM, false, 512 * 1024 * 1024, true, "")) { segments.add(seg); - MemoryRecords ms = records(50, "hello", "there"); + MemoryRecords ms = v1Records(50, "hello", "there"); seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms); - MemoryRecords ms2 = records(60, "alpha", "beta"); + MemoryRecords ms2 = v1Records(60, "alpha", "beta"); seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2); FetchDataInfo read = seg.read(55, 200); checkEquals(ms2.records().iterator(), read.records.records().iterator()); @@ -629,9 +649,9 @@ public void testCreateWithInitFileSizeClearShutdown() throws IOException { LogConfig logConfig = new LogConfig(configMap); try (LogSegment seg = LogSegment.open(tempDir, 40, logConfig, Time.SYSTEM, 512 * 1024 * 1024, true)) { - MemoryRecords ms = records(50, "hello", "there"); + MemoryRecords ms = v1Records(50, "hello", "there"); seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms); - MemoryRecords ms2 = records(60, "alpha", "beta"); + MemoryRecords ms2 = v1Records(60, "alpha", "beta"); seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2); FetchDataInfo read = seg.read(55, 200); checkEquals(ms2.records().iterator(), read.records.records().iterator()); @@ -685,7 +705,7 @@ public void shouldTruncateEvenIfOffsetPointsToAGapInTheLog() throws IOException } } - private MemoryRecords records(long offset, int size) { + private MemoryRecords v2RecordWithSize(long offset, int size) { return MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, offset, Compression.NONE, TimestampType.CREATE_TIME, new SimpleRecord(new byte[size])); } @@ -697,10 +717,10 @@ public void testAppendFromFile() throws IOException { FileRecords fileRecords = FileRecords.open(LogFileUtils.logFile(tempDir, 0)); // Simulate a scenario with log offset range exceeding Integer.MAX_VALUE - fileRecords.append(records(0, 1024)); - fileRecords.append(records(500, 1024 * 1024 + 1)); + fileRecords.append(v2RecordWithSize(0, 1024)); + fileRecords.append(v2RecordWithSize(500, 1024 * 1024 + 1)); long sizeBeforeOverflow = fileRecords.sizeInBytes(); - fileRecords.append(records(Integer.MAX_VALUE + 5L, 1024)); + fileRecords.append(v2RecordWithSize(Integer.MAX_VALUE + 5L, 1024)); long sizeAfterOverflow = fileRecords.sizeInBytes(); try (LogSegment segment = createSegment(0)) {