Skip to content

Commit

Permalink
feat(tools/perf): support schema message perf (#2226)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Dec 13, 2024
1 parent 0265f44 commit 6a013d0
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 8 deletions.
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ allprojects {

repositories {
mavenCentral()
maven {
url = uri("https://packages.confluent.io/maven/")
}
}

dependencyUpdates {
Expand Down Expand Up @@ -2226,7 +2229,10 @@ project(':tools') {
implementation (project(':log4j-appender')){
exclude group: 'org.slf4j', module: '*'
}
// AutoMQ inject start
implementation project(':automq-shell')
implementation libs.kafkaAvroSerializer
// AutoMQ inject end

implementation project(':storage')
implementation project(':connect:runtime')
Expand Down
2 changes: 2 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ versions += [
guava:"32.0.1-jre",
hdrHistogram:"2.1.12",
nettyTcnativeBoringSsl: "2.0.65.Final",
confluentSchema: "7.4.0",
// AutoMQ inject end

junitPlatform: "1.10.2"
Expand Down Expand Up @@ -299,5 +300,6 @@ libs += [
jna: "net.java.dev.jna:jna:$versions.jna",
guava: "com.google.guava:guava:$versions.guava",
hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram",
kafkaAvroSerializer: "io.confluent:kafka-avro-serializer:$versions.confluentSchema",
spotbugsAnnotations: "com.github.spotbugs:spotbugs-annotations:$versions.spotbugs",
]
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,34 @@
import com.automq.stream.s3.metrics.TimerUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Strings;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.serializers.KafkaAvroSerializer;

import static org.apache.kafka.tools.automq.perf.StatsCollector.printAndCollectStats;

Expand Down Expand Up @@ -105,7 +117,7 @@ private void run() {
waitTopicsReady(consumerService.consumerCount() > 0);
LOGGER.info("Topics are ready, took {} ms", timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));

List<byte[]> payloads = randomPayloads(config.recordSize, config.randomRatio, config.randomPoolSize);
Function<String, List<byte[]>> payloads = payloads(config, topics);
producerService.start(payloads, config.sendRate);

preparing = false;
Expand Down Expand Up @@ -209,6 +221,21 @@ private void waitTopicsReadyWithoutConsumer() {
}
}

private Function<String, List<byte[]>> payloads(PerfConfig config, List<Topic> topics) {
if (Strings.isNullOrEmpty(config.valueSchema)) {
List<byte[]> payloads = randomPayloads(config.recordSize, config.randomRatio, config.randomPoolSize);
return topic -> payloads;
} else {
// The producer configs should contain:
// - schema.registry.url: http://localhost:8081
Map<String, List<byte[]>> topic2payloads = new HashMap<>();
topics.forEach(topic -> {
topic2payloads.put(topic.name(), schemaPayloads(topic.name(), config.valueSchema, config.valuesFile, config.producerConfigs));
});
return topic2payloads::get;
}
}

/**
* Generates a list of byte arrays with specified size and mix of random and static content.
*
Expand Down Expand Up @@ -271,4 +298,20 @@ public void close() {
producerService.close();
consumerService.close();
}

private static List<byte[]> schemaPayloads(String topic, String schemaJson, String payloadsFile, Map<String, ?> configs) {
try (KafkaAvroSerializer serializer = new KafkaAvroSerializer()) {
List<byte[]> payloads = new ArrayList<>();
AvroSchema schema = new AvroSchema(schemaJson);
serializer.configure(configs, false);
for (String payloadStr : Files.readAllLines(Path.of(payloadsFile), StandardCharsets.UTF_8)) {
Object object = AvroSchemaUtils.toObject(payloadStr, schema);
byte[] payload = serializer.serialize(topic, object);
payloads.add(payload);
}
return payloads;
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class PerfConfig {
public final int warmupDurationMinutes;
public final int testDurationMinutes;
public final int reportingIntervalSeconds;
public final String valueSchema;
public final String valuesFile;

public PerfConfig(String[] args) {
ArgumentParser parser = parser();
Expand All @@ -77,7 +79,7 @@ public PerfConfig(String[] args) {
producerConfigs = parseConfigs(ns.getList("producerConfigs"));
consumerConfigs = parseConfigs(ns.getList("consumerConfigs"));
reset = ns.getBoolean("reset");
topicPrefix = ns.getString("topicPrefix") == null ? "test-topic-" + System.currentTimeMillis() : ns.getString("topicPrefix");
topicPrefix = ns.getString("topicPrefix") == null ? "test_topic_" + System.currentTimeMillis() : ns.getString("topicPrefix");
topics = ns.getInt("topics");
partitionsPerTopic = ns.getInt("partitionsPerTopic");
producersPerTopic = ns.getInt("producersPerTopic");
Expand All @@ -93,6 +95,8 @@ public PerfConfig(String[] args) {
warmupDurationMinutes = ns.getInt("warmupDurationMinutes");
testDurationMinutes = ns.getInt("testDurationMinutes");
reportingIntervalSeconds = ns.getInt("reportingIntervalSeconds");
valueSchema = ns.getString("valueSchema");
valuesFile = ns.get("valuesFile");

if (backlogDurationSeconds < groupsPerTopic * groupStartDelaySeconds) {
throw new IllegalArgumentException(String.format("BACKLOG_DURATION_SECONDS(%d) should not be less than GROUPS_PER_TOPIC(%d) * GROUP_START_DELAY_SECONDS(%d)",
Expand Down Expand Up @@ -233,6 +237,16 @@ public static ArgumentParser parser() {
.dest("reportingIntervalSeconds")
.metavar("REPORTING_INTERVAL_SECONDS")
.help("The reporting interval in seconds.");
parser.addArgument("--value-schema")
.type(String.class)
.dest("valueSchema")
.metavar("VALUE_SCHEMA")
.help("The schema of the values ex. {\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}");
parser.addArgument("--values-file")
.type(String.class)
.dest("valuesFile")
.metavar("VALUES_FILE")
.help("The avro value file. Example file content {\"f1\": \"value1\"}");
return parser;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -120,7 +121,7 @@ public int probe() {
/**
* Start sending messages using the given payloads at the given rate.
*/
public void start(List<byte[]> payloads, double rate) {
public void start(Function<String, List<byte[]>> payloads, double rate) {
adjustRate(rate);
adjustRateExecutor.scheduleWithFixedDelay(this::adjustRate, 0, ADJUST_RATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
int processors = Runtime.getRuntime().availableProcessors();
Expand Down Expand Up @@ -176,7 +177,7 @@ private double calculateY(long x1, double y1, long x2, double y2, long x) {
return y1 + (x - x1) * (y2 - y1) / (x2 - x1);
}

private void start(List<Producer> producers, List<byte[]> payloads) {
private void start(List<Producer> producers, Function<String, List<byte[]>> payloads) {
executor.submit(() -> {
try {
sendMessages(producers, payloads);
Expand All @@ -186,11 +187,14 @@ private void start(List<Producer> producers, List<byte[]> payloads) {
});
}

private void sendMessages(List<Producer> producers, List<byte[]> payloads) {
private void sendMessages(List<Producer> producers, Function<String, List<byte[]>> payloadsGet) {
Random random = ThreadLocalRandom.current();
while (!closed) {
producers.forEach(
p -> sendMessage(p, payloads.get(random.nextInt(payloads.size())))
p -> {
List<byte[]> payloads = payloadsGet.apply(p.topic.name);
sendMessage(p, payloads.get(random.nextInt(payloads.size())));
}
);
}
}
Expand Down Expand Up @@ -290,7 +294,7 @@ private String nextKey() {
*/
public List<CompletableFuture<Void>> probe() {
return IntStream.range(0, topic.partitions)
.mapToObj(i -> sendAsync("probe", new byte[42], i))
.mapToObj(i -> sendAsync("probe", new byte[] {1}, i))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private void waitTopicCreated(String name, KafkaFuture<Void> future) {
}

private String generateTopicName(String topicPrefix, int partitions, int index) {
return String.format("%s-%04d-%07d", topicPrefix, partitions, index);
return String.format("%s_%04d_%07d", topicPrefix, partitions, index);
}

public static class TopicsConfig {
Expand All @@ -140,6 +140,10 @@ public Topic(String name, int partitions) {
this.partitions = partitions;
}

public String name() {
return name;
}

public List<TopicPartition> partitions() {
return IntStream.range(0, partitions)
.mapToObj(i -> new TopicPartition(name, i))
Expand Down

0 comments on commit 6a013d0

Please sign in to comment.