Skip to content

Commit

Permalink
KAFKA-18269: Remove deprecated protocol APIs support (KIP-896, KIP-72…
Browse files Browse the repository at this point in the history
…4) (apache#18218)

Included in this change:
1. Remove deprecated protocol api versions from json files.
3. Remove fields that are no longer used from json files (affects ListOffsets, OffsetCommit, DescribeConfigs).
4. Remove record down-conversion support from KafkaApis.
5. No longer return `Errors.UNSUPPORTED_COMPRESSION_TYPE` on the fetch path[1].
6. Deprecate `TopicConfig. MESSAGE_DOWNCONVERSION_ENABLE_CONFIG` and made the relevant
configs (`message.downconversion.enable` and `log.message.downcoversion.enable`) no-ops since
down-conversion is no longer supported. It was an oversight not to deprecate this via KIP-724.
7. Fix `shouldRetainsBufferReference` to handle null request schemas for a given version.
8. Simplify producer logic since it only supports the v2 record format now.
9. Fix tests so they don't exercise protocol api versions that have been removed.
10. Add upgrade note.

Testing:
1. System tests have a lot of failures, but those tests fail for trunk too and I didn't see any issues specific to this change - it's hard to be sure given the number of failing tests, but let's not block on that given the other testing that has been done (see below).
3. Java producers and consumers with version 0.9-0.10.1 don't have api versions support and hence they fail in an ungraceful manner: the broker disconnects and the clients reconnect until the relevant timeout is triggered.
4. Same thing seems to happen for the console producer 0.10.2 although it's unclear why since api versions should be supported. I will look into this separately, it's unlikely to be related to this PR.
5. Console consumer 0.10.2 fails with the expected error and a reasonable message[2].
6. Console producer and consumer 0.11.0 works fine, newer versions should naturally also work fine.
7. kcat 1.5.0 (based on librdkafka 1.1.0) produce and consume fail with a reasonable message[3][4].
8. kcat 1.6.0-1.7.0 (based on librdkafka 1.5.0 and 1.7.0 respectively) consume fails with a reasonable message[5].
9. kcat 1.6.0-1.7.0 produce works fine.
10. kcat 1.7.1  (based on librdkafka 1.8.2) works fine for consumer and produce.
11. confluent-go-client (librdkafka based) 1.8.2 works fine for consumer and produce.
12. I will test more clients, but I don't think we need to block the PR on that.

Note that this also completes part of KIP-724: produce v2 and lower as well as fetch v3 and lower are no longer supported.

Future PRs will remove conditional code that is no longer needed (some of that has been done in KafkaApis,
but only what was required due to the schema changes). We can probably do that in master only as it does
not change behavior.

Note that I did not touch `ignorable` fields even though some of them could have been
changed. The reasoning is that this could result in incompatible changes for clients
that use new protocol versions without setting such fields _if_ we don't manually
validate their presence. I will file a JIRA ticket to look into this carefully for each
case (i.e. if we do validate their presence for the appropriate versions, we can
set them to ignorable=false in the json file).

[1] We would return this error if a fetch < v10 was used and the compression topic config was set
to zstd, but we would not do the same for the case where zstd was compressed at the producer
level (the most common case). Since there is no efficient way to do the check for the common
case, I made it consistent for both by having no checks.
[2] ```org.apache.kafka.common.errors.UnsupportedVersionException: The broker is too new to support JOIN_GROUP version 1```
[3]```METADATA|rdkafka#producer-1| [thrd:main]: localhost:9092/bootstrap: Metadata request failed: connected: Local: Required feature not supported by broker (0ms): Permanent```
[4]```METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Metadata request failed: connected: Local: Required feature not supported by broker (0ms): Permanent```
[5] `ERROR: Topic test-topic [0] error: Failed to query logical offset END: Local: Required feature not supported by broker`

Reviewers: David Arthur <[email protected]>
  • Loading branch information
ijuma authored Dec 21, 2024
1 parent 288d4de commit fe56fc9
Show file tree
Hide file tree
Showing 115 changed files with 474 additions and 1,994 deletions.
23 changes: 0 additions & 23 deletions clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,8 @@
*/
package org.apache.kafka.clients;

import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.ProduceRequest;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* Maintains node api versions for access outside of NetworkClient (which is where the information is derived).
Expand All @@ -33,7 +28,6 @@
public class ApiVersions {

private final Map<String, NodeApiVersions> nodeApiVersions = new HashMap<>();
private byte maxUsableProduceMagic = RecordBatch.CURRENT_MAGIC_VALUE;

// The maximum finalized feature epoch of all the node api versions.
private long maxFinalizedFeaturesEpoch = -1;
Expand All @@ -50,7 +44,6 @@ public static class FinalizedFeaturesInfo {

public synchronized void update(String nodeId, NodeApiVersions nodeApiVersions) {
this.nodeApiVersions.put(nodeId, nodeApiVersions);
this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
if (maxFinalizedFeaturesEpoch < nodeApiVersions.finalizedFeaturesEpoch()) {
this.maxFinalizedFeaturesEpoch = nodeApiVersions.finalizedFeaturesEpoch();
this.finalizedFeatures = nodeApiVersions.finalizedFeatures();
Expand All @@ -59,7 +52,6 @@ public synchronized void update(String nodeId, NodeApiVersions nodeApiVersions)

public synchronized void remove(String nodeId) {
this.nodeApiVersions.remove(nodeId);
this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
}

public synchronized NodeApiVersions get(String nodeId) {
Expand All @@ -74,19 +66,4 @@ public synchronized FinalizedFeaturesInfo getFinalizedFeaturesInfo() {
return new FinalizedFeaturesInfo(maxFinalizedFeaturesEpoch, finalizedFeatures);
}

private byte computeMaxUsableProduceMagic() {
// use a magic version which is supported by all brokers to reduce the chance that
// we will need to convert the messages when they are ready to be sent.
Optional<Byte> knownBrokerNodesMinRequiredMagicForProduce = this.nodeApiVersions.values().stream()
.filter(versions -> versions.apiVersion(ApiKeys.PRODUCE) != null) // filter out Raft controller nodes
.map(versions -> ProduceRequest.requiredMagicForVersion(versions.latestUsableVersion(ApiKeys.PRODUCE)))
.min(Byte::compare);
return (byte) Math.min(RecordBatch.CURRENT_MAGIC_VALUE,
knownBrokerNodesMinRequiredMagicForProduce.orElse(RecordBatch.CURRENT_MAGIC_VALUE));
}

public synchronized byte maxUsableProduceMagic() {
return maxUsableProduceMagic;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,33 +111,15 @@ OffsetFetcherUtils.ListOffsetResult handleListOffsetResponse(ListOffsetsResponse
Errors error = Errors.forCode(partition.errorCode());
switch (error) {
case NONE:
if (!partition.oldStyleOffsets().isEmpty()) {
// Handle v0 response with offsets
long offset;
if (partition.oldStyleOffsets().size() > 1) {
throw new IllegalStateException("Unexpected partitionData response of length " +
partition.oldStyleOffsets().size());
} else {
offset = partition.oldStyleOffsets().get(0);
}
log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}",
topicPartition, offset);
if (offset != ListOffsetsResponse.UNKNOWN_OFFSET) {
OffsetFetcherUtils.ListOffsetData offsetData = new OffsetFetcherUtils.ListOffsetData(offset, null, Optional.empty());
fetchedOffsets.put(topicPartition, offsetData);
}
} else {
// Handle v1 and later response or v0 without offsets
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
topicPartition, partition.offset(), partition.timestamp());
if (partition.offset() != ListOffsetsResponse.UNKNOWN_OFFSET) {
Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH)
? Optional.empty()
: Optional.of(partition.leaderEpoch());
OffsetFetcherUtils.ListOffsetData offsetData = new OffsetFetcherUtils.ListOffsetData(partition.offset(), partition.timestamp(),
leaderEpoch);
fetchedOffsets.put(topicPartition, offsetData);
}
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
topicPartition, partition.offset(), partition.timestamp());
if (partition.offset() != ListOffsetsResponse.UNKNOWN_OFFSET) {
Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH)
? Optional.empty()
: Optional.of(partition.leaderEpoch());
OffsetFetcherUtils.ListOffsetData offsetData = new OffsetFetcherUtils.ListOffsetData(partition.offset(), partition.timestamp(),
leaderEpoch);
fetchedOffsets.put(topicPartition, offsetData);
}
break;
case UNSUPPORTED_FOR_MESSAGE_FORMAT:
Expand Down Expand Up @@ -458,4 +440,4 @@ static class ListOffsetData {
this.leaderEpoch = leaderEpoch;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();

int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(RecordBatch.CURRENT_MAGIC_VALUE,
compression.type(), serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.AbstractRecords;
Expand Down Expand Up @@ -344,8 +343,8 @@ public RecordAppendResult append(String topic,
}

if (buffer == null) {
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression.type(), key, value, headers));
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(
RecordBatch.CURRENT_MAGIC_VALUE, compression.type(), key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, effectivePartition, maxTimeToBlock);
// This call may block if we exhausted buffer space.
buffer = free.allocate(size, maxTimeToBlock);
Expand Down Expand Up @@ -408,7 +407,7 @@ private RecordAppendResult appendNewBatch(String topic,
return appendResult;
}

MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, apiVersions.maxUsableProduceMagic());
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer);
ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs);
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callbacks, nowMs));
Expand All @@ -419,12 +418,8 @@ private RecordAppendResult appendNewBatch(String topic,
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false, batch.estimatedSizeInBytes());
}

private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
if (transactionManager != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) {
throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
"support the required message format (v2). The broker must be version 0.11 or later.");
}
return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer) {
return MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compression, TimestampType.CREATE_TIME, 0L);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,27 +871,10 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
return;

final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

// find the minimum magic version used when creating the record sets
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();

// down convert if necessary to the minimum magic used. In general, there can be a delay between the time
// that the producer starts building the batch and the time that we send the request, and we may have
// chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
// the new message format, but found that the broker didn't support it, so we need to down-convert on the
// client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
// not all support the same message format version. For example, if a partition migrates from a broker
// which is supporting the new magic version to one which doesn't, then we will need to convert.
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
if (tpData == null) {
tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
Expand All @@ -904,18 +887,13 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
}

String transactionalId = null;

// When we use transaction V1 protocol in transaction we set the request version upper limit to
// LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 so that the broker knows that we're using transaction protocol V1.
boolean useTransactionV1Version = false;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
if (!transactionManager.isTransactionV2Enabled()) {
useTransactionV1Version = true;
}
useTransactionV1Version = !transactionManager.isTransactionV2Enabled();
}

ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
ProduceRequest.Builder requestBuilder = ProduceRequest.builder(
new ProduceRequestData()
.setAcks(acks)
.setTimeoutMs(timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,18 @@ public class TopicConfig {
"or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this " +
"configuration. If message.timestamp.type=CreateTime, the message will be rejected if the difference in " +
"timestamps exceeds this specified threshold. This configuration is ignored if message.timestamp.type=LogAppendTime.";

/**
* @deprecated down-conversion is not possible in Apache Kafka 4.0 and newer, hence this configuration is a no-op,
* and it is deprecated for removal in Apache Kafka 5.0.
*/
@Deprecated
public static final String MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = "message.downconversion.enable";
public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "This configuration controls whether " +
"down-conversion of message formats is enabled to satisfy consume requests. When set to <code>false</code>, " +
"broker will not perform down-conversion for consumers expecting an older message format. The broker responds " +
"with <code>UNSUPPORTED_VERSION</code> error for consume requests from such older clients. This configuration" +
"does not apply to any message format conversion that might be required for replication to followers.";

/**
* @deprecated see {@link #MESSAGE_DOWNCONVERSION_ENABLE_CONFIG}.
*/
@Deprecated
public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "Down-conversion is not possible in Apache Kafka 4.0 and newer, " +
"hence this configuration is no-op and it is deprecated for removal in Apache Kafka 5.0.";
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public enum ApiKeys {
private static boolean shouldRetainsBufferReference(Schema[] requestSchemas) {
boolean requestRetainsBufferReference = false;
for (Schema requestVersionSchema : requestSchemas) {
if (retainsBufferReference(requestVersionSchema)) {
if (requestVersionSchema != null && retainsBufferReference(requestVersionSchema)) {
requestRetainsBufferReference = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,29 +224,6 @@ public DescribeConfigsResponse(DescribeConfigsResponseData data) {
this.data = data;
}

// This constructor should only be used after deserialization, it has special handling for version 0
private DescribeConfigsResponse(DescribeConfigsResponseData data, short version) {
super(ApiKeys.DESCRIBE_CONFIGS);
this.data = data;
if (version == 0) {
for (DescribeConfigsResponseData.DescribeConfigsResult result : data.results()) {
for (DescribeConfigsResponseData.DescribeConfigsResourceResult config : result.configs()) {
if (config.isDefault()) {
config.setConfigSource(ConfigSource.DEFAULT_CONFIG.id);
} else {
if (result.resourceType() == ConfigResource.Type.BROKER.id()) {
config.setConfigSource(ConfigSource.STATIC_BROKER_CONFIG.id);
} else if (result.resourceType() == ConfigResource.Type.TOPIC.id()) {
config.setConfigSource(ConfigSource.TOPIC_CONFIG.id);
} else {
config.setConfigSource(ConfigSource.UNKNOWN.id);
}
}
}
}
}
}

@Override
public DescribeConfigsResponseData data() {
return data;
Expand All @@ -272,7 +249,7 @@ public Map<Errors, Integer> errorCounts() {
}

public static DescribeConfigsResponse parse(ByteBuffer buffer, short version) {
return new DescribeConfigsResponse(new DescribeConfigsResponseData(new ByteBufferAccessor(buffer), version), version);
return new DescribeConfigsResponse(new DescribeConfigsResponseData(new ByteBufferAccessor(buffer), version));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -137,7 +136,6 @@ private ListOffsetsRequest(ListOffsetsRequestData data, short version) {

@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
short errorCode = Errors.forException(e).code();

List<ListOffsetsTopicResponse> responses = new ArrayList<>();
Expand All @@ -148,12 +146,8 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ListOffsetsPartitionResponse partitionResponse = new ListOffsetsPartitionResponse()
.setErrorCode(errorCode)
.setPartitionIndex(partition.partitionIndex());
if (versionId == 0) {
partitionResponse.setOldStyleOffsets(Collections.emptyList());
} else {
partitionResponse.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP);
}
partitionResponse.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP);
partitions.add(partitionResponse);
}
topicResponse.setPartitions(partitions);
Expand Down
Loading

0 comments on commit fe56fc9

Please sign in to comment.