Skip to content

Commit

Permalink
KAFKA-14562 [2/3]: Implement epoch bump after every transaction (KIP-…
Browse files Browse the repository at this point in the history
…890) (apache#17402)

This patch includes changes to the clients end transaction response handling when transaction version 2 is enabled.
Version 5+ of the End Txn Response includes the producer Id and the producer epoch fields.

Upon receiving the request, the client updates its producer Id and epoch according to the response.

On receiving an EndTxnRequest the server would've either:

Bumped the epoch for the given producer ID.
On epoch overflow, sent a new producer Id with epoch 0.
This patch also includes changes to the endTxnRequest to send the right request version based on whether txnV2 is enabled.

There was a test failure in the integration tests that allowed us to catch a bug in the PrepareComplete method where we update the transit metadata incorrectly. Added the bug fix in this patch where the lastProducerEpoch is updated correctly.

Reviewers: Artem Livshits <[email protected]>, Justine Olshan <[email protected]>
  • Loading branch information
rreddy-22 authored Nov 1, 2024
1 parent 2696a6d commit e14a81b
Show file tree
Hide file tree
Showing 17 changed files with 484 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult
.setTransactionalId(transactionalId)
.setProducerId(producerIdAndEpoch.producerId)
.setProducerEpoch(producerIdAndEpoch.epoch)
.setCommitted(transactionResult.id));
.setCommitted(transactionResult.id),
isTransactionV2Enabled
);

EndTxnHandler handler = new EndTxnHandler(builder);
enqueueRequest(handler);
Expand Down Expand Up @@ -1568,6 +1570,19 @@ public void handleResponse(AbstractResponse response) {
Errors error = endTxnResponse.error();

if (error == Errors.NONE) {
// For End Txn version 5+, the broker includes the producerId and producerEpoch in the EndTxnResponse.
// For versions lower than 5, the producer Id and epoch are set to -1 by default.
// When Transaction Version 2 is enabled, the end txn request 5+ is used,
// it mandates bumping the epoch after every transaction.
// If the epoch overflows, a new producerId is returned with epoch set to 0.
if (endTxnResponse.data().producerId() != -1) {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
endTxnResponse.data().producerId(),
endTxnResponse.data().producerEpoch()
);
setProducerIdAndEpoch(producerIdAndEpoch);
resetSequenceNumbers();
}
completeTransaction();
result.done();
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,28 @@
import java.nio.ByteBuffer;

public class EndTxnRequest extends AbstractRequest {

public static final short LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 = 4;
private final EndTxnRequestData data;

public static class Builder extends AbstractRequest.Builder<EndTxnRequest> {
public final EndTxnRequestData data;
public final boolean isTransactionV2Enabled;

public Builder(EndTxnRequestData data) {
this(data, false);
public Builder(EndTxnRequestData data, boolean isTransactionV2Enabled) {
this(data, false, isTransactionV2Enabled);
}

public Builder(EndTxnRequestData data, boolean enableUnstableLastVersion) {
public Builder(EndTxnRequestData data, boolean enableUnstableLastVersion, boolean isTransactionV2Enabled) {
super(ApiKeys.END_TXN, enableUnstableLastVersion);
this.data = data;
this.isTransactionV2Enabled = isTransactionV2Enabled;
}

@Override
public EndTxnRequest build(short version) {
if (!isTransactionV2Enabled) {
version = (short) Math.min(version, LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2);
}
return new EndTxnRequest(data, version);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
// Version 4 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
//
// Version 5 enables bumping epoch on every transaction (KIP-890 Part 2)
"latestVersionUnstable": true,
"latestVersionUnstable": false,
"validVersions": "0-5",
"flexibleVersions": "3+",
"fields": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2980,6 +2980,64 @@ public void testEpochBumpAfterLastInflightBatchFails(boolean transactionV2Enable
assertEquals(0, transactionManager.sequenceNumber(tp0));
}

@Test
public void testEpochUpdateAfterBumpFromEndTxnResponseInV2() throws InterruptedException {
initializeTransactionManager(Optional.of(transactionalId), true);

// Initialize transaction with initial producer ID and epoch.
doInitTransactions(producerId, epoch);

transactionManager.beginTransaction();
transactionManager.maybeAddPartition(tp0);
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
runUntil(() -> transactionManager.isPartitionAdded(tp0));

// Append record with initial producer ID and epoch.
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
prepareProduceResponse(Errors.NONE, producerId, epoch);
runUntil(responseFuture::isDone);

final short bumpedEpoch = epoch + 1;

// Trigger an EndTxn request by completing the transaction.
TransactionalRequestResult abortResult = transactionManager.beginAbort();

prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch, producerId, bumpedEpoch, false);
runUntil(abortResult::isCompleted);

assertEquals(producerId, transactionManager.producerIdAndEpoch().producerId);
assertEquals(bumpedEpoch, transactionManager.producerIdAndEpoch().epoch);
}

@Test
public void testProducerIdAndEpochUpdateAfterOverflowFromEndTxnResponseInV2() throws InterruptedException {
initializeTransactionManager(Optional.of(transactionalId), true);

// Initialize transaction with initial producer ID and epoch.
doInitTransactions(producerId, epoch);

transactionManager.beginTransaction();
transactionManager.maybeAddPartition(tp0);
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
runUntil(() -> transactionManager.isPartitionAdded(tp0));

// Append record with initial producer ID and epoch
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
prepareProduceResponse(Errors.NONE, producerId, epoch);
runUntil(responseFuture::isDone);

final long newProducerId = producerId + 1;

// Trigger an EndTxn request by completing the transaction.
TransactionalRequestResult commitResult = transactionManager.beginCommit();

prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch, newProducerId, (short) 0, false);
runUntil(commitResult::isCompleted);

assertEquals(newProducerId, transactionManager.producerIdAndEpoch().producerId);
assertEquals((short) 0, transactionManager.producerIdAndEpoch().epoch);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testNoFailedBatchHandlingWhenTxnManagerIsInFatalError(boolean transactionV2Enabled) {
Expand Down Expand Up @@ -3012,10 +3070,15 @@ public void testAbortTransactionAndReuseSequenceNumberOnError() throws Interrupt
.setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
.setMinVersion((short) 0)
.setMaxVersion((short) 1),
new ApiVersion()
.setApiKey(ApiKeys.END_TXN.id)
.setMinVersion((short) 0)
.setMaxVersion((short) 4),
new ApiVersion()
.setApiKey(ApiKeys.PRODUCE.id)
.setMinVersion((short) 0)
.setMaxVersion((short) 7))));
.setMaxVersion((short) 7)
)));

doInitTransactions();

Expand Down Expand Up @@ -3059,6 +3122,7 @@ public void testAbortTransactionAndResetSequenceNumberOnUnknownProducerId() thro
// Set the InitProducerId version such that bumping the epoch number is not supported. This will test the case
// where the sequence number is reset on an UnknownProducerId error, allowing subsequent transactions to
// append to the log successfully
// Set the EndTxn version such that sequence is not reset on every end txn.
apiVersions.update("0", NodeApiVersions.create(Arrays.asList(
new ApiVersion()
.setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
Expand All @@ -3067,7 +3131,12 @@ public void testAbortTransactionAndResetSequenceNumberOnUnknownProducerId() thro
new ApiVersion()
.setApiKey(ApiKeys.PRODUCE.id)
.setMinVersion((short) 0)
.setMaxVersion((short) 7))));
.setMaxVersion((short) 7),
new ApiVersion()
.setApiKey(ApiKeys.END_TXN.id)
.setMinVersion((short) 0)
.setMaxVersion((short) 4)
)));

doInitTransactions();

Expand Down Expand Up @@ -3115,8 +3184,9 @@ public void testAbortTransactionAndResetSequenceNumberOnUnknownProducerId() thro
assertEquals(1, transactionManager.sequenceNumber(tp1));
}

@Test
public void testBumpTransactionalEpochOnAbortableError() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testBumpTransactionalEpochOnAbortableError(boolean transactionV2Enabled) throws InterruptedException {
final short initialEpoch = 1;
final short bumpedEpoch = initialEpoch + 1;

Expand Down Expand Up @@ -3733,7 +3803,7 @@ private void verifyCommitOrAbortTransactionRetriable(TransactionResult firstTran

TransactionalRequestResult result = firstTransactionResult == TransactionResult.COMMIT ?
transactionManager.beginCommit() : transactionManager.beginAbort();
prepareEndTxnResponse(Errors.NONE, firstTransactionResult, producerId, epoch, true);
prepareEndTxnResponse(Errors.NONE, firstTransactionResult, producerId, epoch, producerId, epoch, true);
runUntil(() -> !client.hasPendingResponses());
assertFalse(result.isCompleted());
assertThrows(TimeoutException.class, () -> result.await(MAX_BLOCK_TIMEOUT, TimeUnit.MILLISECONDS));
Expand All @@ -3744,7 +3814,7 @@ private void verifyCommitOrAbortTransactionRetriable(TransactionResult firstTran
transactionManager.beginCommit() : transactionManager.beginAbort();
assertEquals(retryResult, result); // check if cached result is reused.

prepareEndTxnResponse(Errors.NONE, retryTransactionResult, producerId, epoch, false);
prepareEndTxnResponse(Errors.NONE, retryTransactionResult, producerId, epoch);
runUntil(retryResult::isCompleted);
assertFalse(transactionManager.hasOngoingTransaction());
}
Expand Down Expand Up @@ -3869,19 +3939,87 @@ private List<TopicPartition> getPartitionsFromV3Request(AddPartitionsToTxnReques
return AddPartitionsToTxnRequest.getPartitions(request.data().v3AndBelowTopics());
}

private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long producerId, final short epoch) {
this.prepareEndTxnResponse(error, result, producerId, epoch, false);
}
/**
* Prepares an `EndTxnResponse` for a transactional producer.
* This method can be used when Transaction V2 is NOT enabled, which
* means the producer ID and producer epoch will NOT be a part of
* the End Txn Response.
* An error stating to use the right method and provide the expected producer ID and epoch
* will be thrown if the End Txn API version is >= 5.
*
* @param error The error to be set in the response.
* @param result The transaction result.
* @param requestProducerId The producer ID in the request.
* @param requestProducerEpoch The producer epoch in the request.
*/
private void prepareEndTxnResponse(
Errors error,
final TransactionResult result,
final long requestProducerId,
final short requestProducerEpoch
) {
EndTxnResponseData responseData = new EndTxnResponseData()
.setErrorCode(error.code())
.setThrottleTimeMs(0);

client.prepareResponse(body -> {
EndTxnRequest endTxnRequest = (EndTxnRequest) body;

assertEquals(transactionalId, endTxnRequest.data().transactionalId());
assertEquals(requestProducerId, endTxnRequest.data().producerId());
assertEquals(requestProducerEpoch, endTxnRequest.data().producerEpoch());
assertEquals(result, endTxnRequest.result());

int requestVersion = endTxnRequest.version();
if (requestVersion >= 5) {
fail("ExpectedProducerId and ExpectedEpochId must be provided when transaction V2 is enabled. Use the appropriate method.");
}
return true;
}, new EndTxnResponse(responseData));
}

/**
* Prepares an `EndTxnResponse` for a transactional producer.
* This method should be used when Transaction V2 is enabled, which
* means the producer ID and producer epoch will be a part of
* the End Txn Response.
*
* @param error The error to be set in the response.
* @param result The transaction result.
* @param requestProducerId The producer ID in the request.
* @param requestEpochId The producer epoch in the request.
* @param expectedProducerId The expected producer ID to set in the response if the API version is >= 5.
* @param expectedEpochId The expected producer epoch to set in the response if the API version is >= 5.
* @param shouldDisconnect Whether to simulate a disconnection after sending the response.
*/
private void prepareEndTxnResponse(
Errors error,
final TransactionResult result,
final long requestProducerId,
final short requestEpochId,
final long expectedProducerId,
final short expectedEpochId,
boolean shouldDisconnect
) {
EndTxnResponseData responseData = new EndTxnResponseData()
.setErrorCode(error.code())
.setThrottleTimeMs(0);

client.prepareResponse(body -> {
EndTxnRequest endTxnRequest = (EndTxnRequest) body;

private void prepareEndTxnResponse(Errors error,
final TransactionResult result,
final long producerId,
final short epoch,
final boolean shouldDisconnect) {
client.prepareResponse(endTxnMatcher(result, producerId, epoch),
new EndTxnResponse(new EndTxnResponseData()
.setErrorCode(error.code())
.setThrottleTimeMs(0)), shouldDisconnect);
assertEquals(transactionalId, endTxnRequest.data().transactionalId());
assertEquals(requestProducerId, endTxnRequest.data().producerId());
assertEquals(requestEpochId, endTxnRequest.data().producerEpoch());
assertEquals(result, endTxnRequest.result());

int requestVersion = endTxnRequest.version(); // Extract request version
if (requestVersion >= 5) {
responseData.setProducerId(expectedProducerId);
responseData.setProducerEpoch(expectedEpochId);
}
return true;
}, new EndTxnResponse(responseData), shouldDisconnect);
}

private void sendEndTxnResponse(Errors error, final TransactionResult result, final long producerId, final short epoch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import org.apache.kafka.common.protocol.Errors;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Collections;

import static org.apache.kafka.common.requests.EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class EndTxnRequestTest {
Expand All @@ -34,13 +37,16 @@ public void testConstructor() {
int producerId = 1;
String transactionId = "txn_id";
int throttleTimeMs = 10;
boolean isTransactionV2Enabled = true;

EndTxnRequest.Builder builder = new EndTxnRequest.Builder(
new EndTxnRequestData()
.setCommitted(true)
.setProducerEpoch(producerEpoch)
.setProducerId(producerId)
.setTransactionalId(transactionId));
.setTransactionalId(transactionId),
isTransactionV2Enabled
);

for (short version : ApiKeys.END_TXN.allVersions()) {
EndTxnRequest request = builder.build(version);
Expand All @@ -54,4 +60,36 @@ public void testConstructor() {
assertEquals(throttleTimeMs, response.throttleTimeMs());
}
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEndTxnRequestWithParameterizedTransactionsV2(boolean isTransactionV2Enabled) {
short latestVersion = ApiKeys.END_TXN.latestVersion();

EndTxnRequestData requestData = new EndTxnRequestData()
.setTransactionalId("txn_id")
.setCommitted(true)
.setProducerId(1L)
.setProducerEpoch((short) 0);

EndTxnRequest.Builder builder = new EndTxnRequest.Builder(
requestData,
false,
isTransactionV2Enabled
);

EndTxnRequest request = builder.build(latestVersion);

// Determine the expected version based on whether transactions V2 is enabled
short expectedVersion = isTransactionV2Enabled ?
latestVersion :
LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2;

// Verify that the request is built with the expected version
assertEquals(expectedVersion, request.version());

// Verify that producerId and producerEpoch are included
assertEquals(1L, request.data().producerId());
assertEquals((short) 0, request.data().producerEpoch());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@
import static org.apache.kafka.common.protocol.ApiKeys.SYNC_GROUP;
import static org.apache.kafka.common.protocol.ApiKeys.UPDATE_METADATA;
import static org.apache.kafka.common.protocol.ApiKeys.WRITE_TXN_MARKERS;
import static org.apache.kafka.common.requests.EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2;
import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -3062,12 +3063,14 @@ private AddOffsetsToTxnResponse createAddOffsetsToTxnResponse() {
}

private EndTxnRequest createEndTxnRequest(short version) {
boolean isTransactionV2Enabled = version > LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2;
return new EndTxnRequest.Builder(
new EndTxnRequestData()
.setTransactionalId("tid")
.setProducerId(21L)
.setProducerEpoch((short) 42)
.setCommitted(TransactionResult.COMMIT.id)
.setCommitted(TransactionResult.COMMIT.id),
isTransactionV2Enabled
).build(version);
}

Expand Down
Loading

0 comments on commit e14a81b

Please sign in to comment.