Skip to content

Commit

Permalink
feat(issue1120): support reporting broker metrics in AutoBalancerMetr…
Browse files Browse the repository at this point in the history
…icsReporter (#1124)

* feat(issue1120): support reporting broker metrics in AutoBalancerMetricsReporter

Signed-off-by: Shichao Nie <[email protected]>

* fix(issue1120): use pending request latency instead of pending size

Signed-off-by: Shichao Nie <[email protected]>

* refactor(issue1120): refactor AutoBalancerMetrics for better inheritance

Signed-off-by: Shichao Nie <[email protected]>

---------

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Apr 16, 2024
1 parent ed1b828 commit 2bcc5b9
Show file tree
Hide file tree
Showing 21 changed files with 271 additions and 108 deletions.
4 changes: 0 additions & 4 deletions core/src/main/java/kafka/autobalancer/LoadRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -477,10 +477,6 @@ protected void updateClusterModel(AutoBalancerMetrics metrics) {
switch (metrics.metricType()) {
case MetricTypes.TOPIC_PARTITION_METRIC:
TopicPartitionMetrics partitionMetrics = (TopicPartitionMetrics) metrics;

// TODO: remove this when reporting broker metrics is supported
clusterModel.updateBrokerMetrics(partitionMetrics.brokerId(), new HashMap<>(), partitionMetrics.time());

clusterModel.updateTopicPartitionMetrics(partitionMetrics.brokerId(),
new TopicPartition(partitionMetrics.topic(), partitionMetrics.partition()),
partitionMetrics.getMetricValueMap(), partitionMetrics.time());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import kafka.autobalancer.common.types.metrics.AbnormalLatency;
import kafka.autobalancer.common.types.metrics.AbnormalMetric;
import kafka.autobalancer.common.types.metrics.AbnormalQueueSize;

import java.util.Map;
import java.util.Set;
Expand All @@ -23,17 +22,15 @@ public class RawMetricTypes {
public static final byte PARTITION_BYTES_OUT = (byte) 1;
public static final byte PARTITION_SIZE = (byte) 2;
public static final byte BROKER_APPEND_LATENCY_AVG_MS = (byte) 3;
public static final byte BROKER_APPEND_QUEUE_SIZE = (byte) 4;
public static final byte BROKER_FAST_READ_LATENCY_AVG_MS = (byte) 5;
public static final byte BROKER_SLOW_READ_QUEUE_SIZE = (byte) 6;
public static final byte BROKER_MAX_PENDING_APPEND_LATENCY_MS = (byte) 4;
public static final byte BROKER_MAX_PENDING_FETCH_LATENCY_MS = (byte) 5;
public static final Set<Byte> PARTITION_METRICS = Set.of(PARTITION_BYTES_IN, PARTITION_BYTES_OUT, PARTITION_SIZE);
public static final Set<Byte> BROKER_METRICS = Set.of(BROKER_APPEND_LATENCY_AVG_MS, BROKER_APPEND_QUEUE_SIZE,
BROKER_FAST_READ_LATENCY_AVG_MS, BROKER_SLOW_READ_QUEUE_SIZE);
public static final Set<Byte> BROKER_METRICS = Set.of(BROKER_APPEND_LATENCY_AVG_MS,
BROKER_MAX_PENDING_APPEND_LATENCY_MS, BROKER_MAX_PENDING_FETCH_LATENCY_MS);
public static final Map<Byte, AbnormalMetric> ABNORMAL_METRICS = Map.of(
BROKER_APPEND_LATENCY_AVG_MS, new AbnormalLatency(50),
BROKER_APPEND_QUEUE_SIZE, new AbnormalQueueSize(100),
BROKER_FAST_READ_LATENCY_AVG_MS, new AbnormalLatency(50),
BROKER_SLOW_READ_QUEUE_SIZE, new AbnormalQueueSize(100)
BROKER_APPEND_LATENCY_AVG_MS, new AbnormalLatency(100), // 100ms
BROKER_MAX_PENDING_APPEND_LATENCY_MS, new AbnormalLatency(10000), // 10s
BROKER_MAX_PENDING_FETCH_LATENCY_MS, new AbnormalLatency(10000) // 10s
);

public static AbnormalMetric ofAbnormalType(byte metricType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.ConfigUtils;
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class AnomalyDetector extends AbstractResumableService {
private volatile long maxTolerateMetricsDelayMs;
private volatile long coolDownIntervalPerActionMs;
private volatile boolean isLeader = false;
private volatile Map<Integer, Boolean> slowBrokers = new HashMap<>();

AnomalyDetector(LogContext logContext, int maxActionsNumPerDetect, long detectIntervalMs, long maxTolerateMetricsDelayMs,
long coolDownIntervalPerActionMs, ClusterModel clusterModel, ActionExecutorService actionExecutor,
Expand All @@ -72,6 +74,7 @@ public class AnomalyDetector extends AbstractResumableService {
this.excludedBrokers = excludedBrokers;
this.excludedTopics = excludedTopics;
this.executorService.schedule(this::detect, detectInterval, TimeUnit.MILLISECONDS);
S3StreamKafkaMetricsManager.setSlowBrokerSupplier(() -> this.slowBrokers);
logger.info("maxActionsNumPerDetect: {}, detectInterval: {}ms, coolDownIntervalPerAction: {}ms, goals: {}, excluded brokers: {}, excluded topics: {}",
this.maxActionsNumPerExecution, this.detectInterval, this.coolDownIntervalPerActionMs, this.goalsByPriority, this.excludedBrokers, this.excludedTopics);
}
Expand Down Expand Up @@ -245,14 +248,18 @@ long detect0() {
ClusterModelSnapshot snapshot = this.clusterModel.snapshot(excludedBrokers, excludedTopics, maxTolerateMetricsDelayMs);
snapshot.markSlowBrokers();

Map<Integer, Boolean> slowBrokers = new HashMap<>();
for (BrokerUpdater.Broker broker : snapshot.brokers()) {
logger.info("Broker status: {}", broker.shortString());
String brokerStr = logger.isDebugEnabled() ? broker.toString() : broker.shortString();
slowBrokers.put(broker.getBrokerId(), broker.isSlowBroker());
logger.info("Broker status: {}", brokerStr);
if (logger.isDebugEnabled()) {
for (TopicPartitionReplicaUpdater.TopicPartitionReplica replica : snapshot.replicasFor(broker.getBrokerId())) {
logger.debug("Replica status {}", replica.shortString());
}
}
}
this.slowBrokers = slowBrokers;

List<Action> totalActions = new ArrayList<>();
for (Goal goal : goals) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

package kafka.autobalancer.metricsreporter;

import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.stats.StreamOperationStats;
import com.yammer.metrics.core.Metric;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
Expand All @@ -19,6 +21,8 @@
import kafka.autobalancer.common.types.RawMetricTypes;
import kafka.autobalancer.config.AutoBalancerMetricsReporterConfig;
import kafka.autobalancer.metricsreporter.metric.AutoBalancerMetrics;
import kafka.autobalancer.metricsreporter.metric.BrokerMetrics;
import kafka.autobalancer.metricsreporter.metric.DeltaHistogram;
import kafka.autobalancer.metricsreporter.metric.MetricSerde;
import kafka.autobalancer.metricsreporter.metric.MetricsUtils;
import kafka.autobalancer.metricsreporter.metric.YammerMetricProcessor;
Expand Down Expand Up @@ -49,6 +53,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
* This class was modified based on Cruise Control: com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.
Expand All @@ -60,6 +65,7 @@ public class AutoBalancerMetricsReporter implements MetricsRegistryListener, Met
private static final Logger LOGGER = LoggerFactory.getLogger(AutoBalancerMetricsReporter.class);
private final Map<MetricName, Metric> interestedMetrics = new ConcurrentHashMap<>();
private final MetricsRegistry metricsRegistry = KafkaYammerMetrics.defaultRegistry();
protected final DeltaHistogram appendLatencyMetric = new DeltaHistogram();
protected YammerMetricProcessor yammerMetricProcessor;
private KafkaThread metricsReporterRunner;
private KafkaProducer<String, AutoBalancerMetrics> producer;
Expand Down Expand Up @@ -344,9 +350,20 @@ private void reportMetrics(long now) throws Exception {

protected void processMetrics(YammerMetricProcessor.Context context) throws Exception {
processYammerMetrics(context);
addBrokerMetrics(context);
addMandatoryMetrics(context);
}

protected void addBrokerMetrics(YammerMetricProcessor.Context context) {
context.merge(new BrokerMetrics(context.time(), brokerId, brokerRack)
.put(RawMetricTypes.BROKER_APPEND_LATENCY_AVG_MS,
TimeUnit.NANOSECONDS.toMillis((long) appendLatencyMetric.deltaRate(StreamOperationStats.getInstance().appendStreamStats)))
.put(RawMetricTypes.BROKER_MAX_PENDING_APPEND_LATENCY_MS,
TimeUnit.NANOSECONDS.toMillis(S3StreamMetricsManager.maxPendingStreamAppendLatency()))
.put(RawMetricTypes.BROKER_MAX_PENDING_FETCH_LATENCY_MS,
TimeUnit.NANOSECONDS.toMillis(S3StreamMetricsManager.maxPendingStreamFetchLatency())));
}

protected void processYammerMetrics(YammerMetricProcessor.Context context) throws Exception {
for (Map.Entry<MetricName, Metric> entry : interestedMetrics.entrySet()) {
LOGGER.trace("Processing yammer metric {}, scope = {}", entry.getKey(), entry.getKey().getScope());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ protected static Map<Byte, Double> parseMetricsMap(ByteBuffer buffer) {
return metricsMap;
}

public abstract boolean isValidMetric(byte type);

public AutoBalancerMetrics put(byte type, double value) {
if (!isValidMetric(type)) {
throw new IllegalArgumentException(String.format("Cannot put metric type %d into a %s metric.", type, this.getClass().getSimpleName()));
}
this.metricValueMap.put(type, value);
return this;
}
Expand Down Expand Up @@ -115,12 +120,11 @@ public ByteBuffer writeBody(ByteBuffer buffer) {

public String buildKVString() {
StringBuilder builder = new StringBuilder();
for (Map.Entry<Byte, Double> entry : metricValueMap.entrySet()) {
builder.append(entry.getKey());
builder.append(":");
builder.append(String.format("%.4f", entry.getValue()));
metricValueMap.forEach((k, v) -> builder.append(k).append(":").append(v).append(","));
if (builder.length() == 0) {
return "";
}
return builder.toString();
return builder.substring(0, builder.length() - 1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,8 @@ public static BrokerMetrics fromBuffer(ByteBuffer buffer) throws UnknownVersionE
}

@Override
public AutoBalancerMetrics put(byte type, double value) {
if (!RawMetricTypes.BROKER_METRICS.contains(type)) {
throw new IllegalArgumentException("Cannot put non broker metric type " + type + " into a partition metric.");
}
return super.put(type, value);
public boolean isValidMetric(byte type) {
return RawMetricTypes.BROKER_METRICS.contains(type);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.autobalancer.metricsreporter.metric;

import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric;

public class DeltaHistogram {
private long count = 0;
private long sum = 0;

public double deltaRate(YammerHistogramMetric metric) {
long deltaCount = metric.count() - count;
if (deltaCount == 0) {
return 0;
}
double deltaRate = (double) (metric.sum() - sum) / deltaCount;
count = metric.count();
sum = metric.sum();
return deltaRate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public static AutoBalancerMetrics fromBytes(byte[] bytes) throws UnknownVersionE
switch (buffer.get()) {
case MetricTypes.TOPIC_PARTITION_METRIC:
return TopicPartitionMetrics.fromBuffer(buffer);
case MetricTypes.BROKER_METRIC:
return BrokerMetrics.fromBuffer(buffer);
default:
// This could happen when a new type of metric is added, but we are still running the old code.
// simply ignore the metric by returning a null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,10 @@ public static TopicPartitionMetrics fromBuffer(ByteBuffer buffer) throws Unknown
}

@Override
public AutoBalancerMetrics put(byte type, double value) {
if (!RawMetricTypes.PARTITION_METRICS.contains(type)) {
throw new IllegalArgumentException("Cannot put non partition metric type " + type + " into a partition metric.");
}
return super.put(type, value);
public boolean isValidMetric(byte type) {
return RawMetricTypes.PARTITION_METRICS.contains(type);
}


@Override
public String key() {
return topic + "-" + partition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ protected Broker createBroker(int brokerId, String rack, boolean active) {

@Override
protected boolean validateMetrics(Map<Byte, Double> metricsMap) {
// TODO: add broker metrics validation when reporting broker metrics is supported
return true;
return metricsMap.keySet().containsAll(RawMetricTypes.BROKER_METRICS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

public class MetricValueSequence {
private static final int DEFAULT_MAX_SIZE = 1024;
private static final int MIN_VALID_LENGTH = 30;
private final Deque<Double> values;
private final int maxSize;
private Snapshot prev;
Expand Down Expand Up @@ -44,9 +43,6 @@ public void append(double value) {
}

public Snapshot snapshot() {
if (values.size() < MIN_VALID_LENGTH) {
return null;
}
Snapshot snapshot = new Snapshot(prev, values);
this.prev = snapshot;
return snapshot;
Expand Down
53 changes: 28 additions & 25 deletions core/src/main/java/kafka/autobalancer/model/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,49 +15,52 @@
import java.util.Collection;

public class Snapshot {
public static final double INVALID = -1;
private final Snapshot prev;
private final double[] values;
private Double latest;
private Double percentile90th;
private final double latest;

public Snapshot(Snapshot prev, Collection<Double> values) {
this.prev = prev;
this.values = values.stream().mapToDouble(Double::doubleValue).toArray();
this.latest = this.values.length > 0 ? this.values[this.values.length - 1] : INVALID;
Arrays.sort(this.values);
}

public double getValue(double quantile) {
if (!(quantile < 0.0) && !(quantile > 1.0)) {
if (this.values.length == 0) {
return 0.0;
} else {
double pos = quantile * (double) (this.values.length + 1);
if (pos < 1.0) {
return this.values[0];
} else if (pos >= (double) this.values.length) {
return this.values[this.values.length - 1];
} else {
double lower = this.values[(int) pos - 1];
double upper = this.values[(int) pos];
return lower + (pos - Math.floor(pos)) * (upper - lower);
}
}
if (quantile < 0.0 || quantile > 1.0) {
throw new IllegalArgumentException(quantile + " out of range. Should be in [0, 1]");
}
if (values.length < getMinValidLength(quantile)) {
return INVALID;
}
double pos = quantile * (double) (this.values.length + 1);
if (pos < 1.0) {
return this.values[0];
} else if (pos >= (double) this.values.length) {
return this.values[this.values.length - 1];
} else {
throw new IllegalArgumentException(quantile + " is not in [0..1]");
double lower = this.values[(int) pos - 1];
double upper = this.values[(int) pos];
return lower + (pos - Math.floor(pos)) * (upper - lower);
}
}

public double get90thPercentile() {
if (this.percentile90th == null) {
this.percentile90th = this.getValue(0.9);
private int getMinValidLength(double quantile) {
if (quantile == 0.0 || quantile == 1.0) {
return 1;
}
if (quantile < 0.5) {
return (int) Math.ceil(1.0 / quantile);
}
return this.percentile90th;
return (int) Math.ceil(1.0 / (1 - quantile));
}

public double get90thPercentile() {
return getValue(0.9);
}

public double getLatest() {
if (this.latest == null) {
this.latest = this.values[this.values.length - 1];
}
return latest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ public void testSchedulingTimeCost() {
int brokerNum = 20;
for (int i = 0; i < brokerNum; i++) {
clusterModel.registerBroker(i, "");
clusterModel.updateBrokerMetrics(i, new HashMap<>(), System.currentTimeMillis());
clusterModel.updateBrokerMetrics(i, Map.of(
RawMetricTypes.BROKER_APPEND_LATENCY_AVG_MS, 0.0,
RawMetricTypes.BROKER_MAX_PENDING_APPEND_LATENCY_MS, 0.0,
RawMetricTypes.BROKER_MAX_PENDING_FETCH_LATENCY_MS, 0.0), System.currentTimeMillis());
}
int topicNum = 5000;
int totalPartitionNum = 100000;
Expand Down
Loading

0 comments on commit 2bcc5b9

Please sign in to comment.