Skip to content

Commit

Permalink
perf(produce): fix validate compressed records alloc too many memory
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx committed Jan 7, 2025
1 parent 552fb77 commit f2e858e
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package kafka.automq.zonerouter;

import kafka.server.MetadataCache;
import kafka.server.RequestLocal;
import kafka.server.streamaspect.ElasticKafkaApis;

import org.apache.kafka.common.Node;
Expand Down Expand Up @@ -44,7 +45,9 @@ public NoopProduceRouter(ElasticKafkaApis kafkaApis, MetadataCache metadataCache
public void handleProduceRequest(short apiVersion, ClientIdMetadata clientId, int timeout, short requiredAcks,
boolean internalTopicsAllowed, String transactionId, Map<TopicPartition, MemoryRecords> entriesPerPartition,
Consumer<Map<TopicPartition, ProduceResponse.PartitionResponse>> responseCallback,
Consumer<Map<TopicPartition, RecordValidationStats>> recordValidationStatsCallback) {
Consumer<Map<TopicPartition, RecordValidationStats>> recordValidationStatsCallback,
RequestLocal requestLocal
) {
kafkaApis.handleProduceAppendJavaCompatible(
timeout,
requiredAcks,
Expand All @@ -59,7 +62,8 @@ public void handleProduceRequest(short apiVersion, ClientIdMetadata clientId, in
recordValidationStatsCallback.accept(rst);
return null;
},
apiVersion
apiVersion,
requestLocal
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

package kafka.automq.zonerouter;

import kafka.server.RequestLocal;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.MetadataResponseData;
Expand All @@ -36,7 +38,8 @@ void handleProduceRequest(
String transactionId,
Map<TopicPartition, MemoryRecords> entriesPerPartition,
Consumer<Map<TopicPartition, ProduceResponse.PartitionResponse>> responseCallback,
Consumer<Map<TopicPartition, RecordValidationStats>> recordValidationStatsCallback
Consumer<Map<TopicPartition, RecordValidationStats>> recordValidationStatsCallback,
RequestLocal requestLocal
);

CompletableFuture<AutomqZoneRouterResponse> handleZoneRouterRequest(byte[] metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ class ElasticKafkaApis(
authorizedRequestInfo.asJava,
sendResponseCallbackJava,
processingStatsCallbackJava,
requestLocal,
)

// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
Expand Down Expand Up @@ -425,7 +426,9 @@ class ElasticKafkaApis(
entriesPerPartition: util.Map[TopicPartition, MemoryRecords],
responseCallback: util.Map[TopicPartition, PartitionResponse] => Unit,
recordValidationStatsCallback: util.Map[TopicPartition, RecordValidationStats] => Unit = _ => (),
apiVersion: Short): Unit = {
apiVersion: Short,
requestLocal: RequestLocal
): Unit = {
val transactionSupportedOperation = if (apiVersion > 10) genericError else defaultError
replicaManager.handleProduceAppend(
timeout = timeout,
Expand All @@ -435,7 +438,8 @@ class ElasticKafkaApis(
entriesPerPartition = entriesPerPartition.asScala,
responseCallback = rst => responseCallback.apply(rst.asJava),
recordValidationStatsCallback = rst => recordValidationStatsCallback.apply(rst.asJava),
transactionSupportedOperation = transactionSupportedOperation
transactionSupportedOperation = transactionSupportedOperation,
requestLocal = requestLocal,
)
}

Expand Down

0 comments on commit f2e858e

Please sign in to comment.