Skip to content

Commit

Permalink
feat(idempotent): update check sequence timeout to 40s (#1770)
Browse files Browse the repository at this point in the history
* feat(idempotent): update check sequence timeout to 40s

Signed-off-by: Robin Han <[email protected]>

* fix: fix unit test

Signed-off-by: Robin Han <[email protected]>

---------

Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Aug 12, 2024
1 parent 27d607c commit 2cf64bf
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.storage.internals.log.VerificationStateEntry;

public class ElasticProducerStateManager extends ProducerStateManager {
public static final long AWAIT_SEQ_ZERO_TIMEOUT = 40000L;
private final PersistSnapshots persistSnapshots;
private final long createTimestamp;

Expand Down Expand Up @@ -233,7 +234,10 @@ public ProducerAppendInfoExt(TopicPartition topicPartition, long producerId, Pro

@Override
protected void checkSequence(short producerEpoch, int appendFirstSeq, long offset) {
if (currentEntry.isEmpty() && updatedEntry.isEmpty() && appendFirstSeq != 0 && time.milliseconds() - createTimestamp < 10000) {
if (currentEntry.isEmpty() && updatedEntry.isEmpty() && appendFirstSeq != 0
// await sequence 0 message append timeout and retry
&& time.milliseconds() - createTimestamp < AWAIT_SEQ_ZERO_TIMEOUT
) {
throw new OutOfOrderSequenceException("Invalid sequence number for new created log, producer " + producerId() + " " +
"at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), " + appendFirstSeq + " (seq. number), " +
updatedEntry.producerEpoch() + " (current producer epoch)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.mockito.Mockito.{mock, when}

import kafka.log.streamaspect.ElasticProducerStateManager.AWAIT_SEQ_ZERO_TIMEOUT

import java.io.File
import java.nio.ByteBuffer
import java.util
Expand Down Expand Up @@ -658,7 +660,7 @@ class ElasticProducerStateManagerTest {
// we should accept the append and add the pid back in
assertThrows(classOf[OutOfOrderSequenceException], () => append(recoveredMapping, producerId, epoch, 2, 2L, 70001))

time.sleep(10000)
time.sleep(AWAIT_SEQ_ZERO_TIMEOUT)
append(recoveredMapping, producerId, epoch, 2, 2L, 70001)

assertEquals(1, recoveredMapping.activeProducers.size)
Expand Down
7 changes: 4 additions & 3 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka.log

import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
import kafka.log.streamaspect.ElasticProducerStateManager.AWAIT_SEQ_ZERO_TIMEOUT
import kafka.server.{BrokerTopicStats, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.kafka.common.compress.Compression
Expand Down Expand Up @@ -3869,7 +3870,7 @@ class UnifiedLogTest {
val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)

mockTime.sleep(10000)
mockTime.sleep(AWAIT_SEQ_ZERO_TIMEOUT)
log.appendAsLeader(idempotentRecords, origin = appendOrigin, leaderEpoch = 0)
assertFalse(log.hasOngoingTransaction(producerId))

Expand Down Expand Up @@ -3978,7 +3979,7 @@ class UnifiedLogTest {
new SimpleRecord("1".getBytes),
new SimpleRecord("2".getBytes)
)
mockTime.sleep(10000)
mockTime.sleep(AWAIT_SEQ_ZERO_TIMEOUT)
assertThrows(classOf[InvalidTxnStateException], () => log.appendAsLeader(transactionalRecords, leaderEpoch = 0))
assertFalse(log.hasOngoingTransaction(producerId))
assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
Expand Down Expand Up @@ -4014,7 +4015,7 @@ class UnifiedLogTest {
)

val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
mockTime.sleep(10000)
mockTime.sleep(AWAIT_SEQ_ZERO_TIMEOUT)
// Append should not throw error.
log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
}
Expand Down
11 changes: 6 additions & 5 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import kafka.cluster.PartitionTest.MockPartitionListener
import kafka.cluster.{BrokerEndPoint, Partition}
import kafka.log._
import kafka.log.remote.RemoteLogManager
import kafka.log.streamaspect.ElasticProducerStateManager.AWAIT_SEQ_ZERO_TIMEOUT
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile}
import kafka.server.epoch.util.MockBlockingSender
Expand Down Expand Up @@ -682,7 +683,7 @@ class ReplicaManagerTest {
}.head._2.asInstanceOf[Gauge[Int]].value
}

time.sleep(10000)
time.sleep(AWAIT_SEQ_ZERO_TIMEOUT)
// Initially all metrics are 0.
assertEquals(0, replicaManagerMetricValue())

Expand Down Expand Up @@ -754,14 +755,14 @@ class ReplicaManagerTest {
val sequence = 9
val records = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, epoch, sequence,
new SimpleRecord(time.milliseconds(), s"message $sequence".getBytes))
time.sleep(10000)
time.sleep(AWAIT_SEQ_ZERO_TIMEOUT)
handleProduceAppend(replicaManager, new TopicPartition(topic, 0), records, transactionalId = transactionalId).onFire { response =>
assertEquals(Errors.NONE, response.error)
}
assertLateTransactionCount(Some(0))

// The transaction becomes late if not finished before the max transaction timeout passes
time.sleep(replicaManager.logManager.maxTransactionTimeoutMs + ProducerStateManager.LATE_TRANSACTION_BUFFER_MS - 10000)
time.sleep(replicaManager.logManager.maxTransactionTimeoutMs + ProducerStateManager.LATE_TRANSACTION_BUFFER_MS - AWAIT_SEQ_ZERO_TIMEOUT)
assertLateTransactionCount(Some(0))
time.sleep(1)
assertLateTransactionCount(Some(1))
Expand Down Expand Up @@ -2308,7 +2309,7 @@ class ReplicaManagerTest {
val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence,
new SimpleRecord("message".getBytes))

time.sleep(10000)
time.sleep(AWAIT_SEQ_ZERO_TIMEOUT)
// We should add these partitions to the manager to verify.
val result = handleProduceAppend(replicaManager, tp0, transactionalRecords, origin = appendOrigin, transactionalId = transactionalId)
val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
Expand Down Expand Up @@ -2577,7 +2578,7 @@ class ReplicaManagerTest {
val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence,
new SimpleRecord("message".getBytes))

time.sleep(10000)
time.sleep(AWAIT_SEQ_ZERO_TIMEOUT)
// We should add these partitions to the manager to verify.
val result = handleProduceAppend(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId)
val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
Expand Down

0 comments on commit 2cf64bf

Please sign in to comment.