Skip to content

Commit

Permalink
feat(issue1120): support slow broker detect in controller (#1122)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Apr 16, 2024
1 parent 7b9b846 commit 4cb2ebf
Show file tree
Hide file tree
Showing 26 changed files with 662 additions and 35 deletions.
8 changes: 8 additions & 0 deletions core/src/main/java/kafka/autobalancer/LoadRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import kafka.autobalancer.config.AutoBalancerControllerConfig;
import kafka.autobalancer.listeners.BrokerStatusListener;
import kafka.autobalancer.metricsreporter.metric.AutoBalancerMetrics;
import kafka.autobalancer.metricsreporter.metric.BrokerMetrics;
import kafka.autobalancer.metricsreporter.metric.MetricSerde;
import kafka.autobalancer.metricsreporter.metric.TopicPartitionMetrics;
import kafka.autobalancer.model.ClusterModel;
Expand Down Expand Up @@ -476,11 +477,18 @@ protected void updateClusterModel(AutoBalancerMetrics metrics) {
switch (metrics.metricType()) {
case MetricTypes.TOPIC_PARTITION_METRIC:
TopicPartitionMetrics partitionMetrics = (TopicPartitionMetrics) metrics;

// TODO: remove this when reporting broker metrics is supported
clusterModel.updateBrokerMetrics(partitionMetrics.brokerId(), new HashMap<>(), partitionMetrics.time());

clusterModel.updateTopicPartitionMetrics(partitionMetrics.brokerId(),
new TopicPartition(partitionMetrics.topic(), partitionMetrics.partition()),
partitionMetrics.getMetricValueMap(), partitionMetrics.time());
break;
case MetricTypes.BROKER_METRIC:
BrokerMetrics brokerMetrics = (BrokerMetrics) metrics;
clusterModel.updateBrokerMetrics(brokerMetrics.brokerId(), brokerMetrics.getMetricValueMap(), brokerMetrics.time());
break;
default:
logger.error("Not supported metrics version {}", metrics.metricType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@

public class MetricTypes {
public static final byte TOPIC_PARTITION_METRIC = (byte) 0;
public static final byte BROKER_METRIC = (byte) 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,32 @@

package kafka.autobalancer.common.types;

import kafka.autobalancer.common.types.metrics.AbnormalLatency;
import kafka.autobalancer.common.types.metrics.AbnormalMetric;
import kafka.autobalancer.common.types.metrics.AbnormalQueueSize;

import java.util.Map;
import java.util.Set;

public class RawMetricTypes {
public static final byte TOPIC_PARTITION_BYTES_IN = (byte) 0;
public static final byte TOPIC_PARTITION_BYTES_OUT = (byte) 1;
public static final byte PARTITION_BYTES_IN = (byte) 0;
public static final byte PARTITION_BYTES_OUT = (byte) 1;
public static final byte PARTITION_SIZE = (byte) 2;
public static final byte BROKER_APPEND_LATENCY_AVG_MS = (byte) 3;
public static final byte BROKER_APPEND_QUEUE_SIZE = (byte) 4;
public static final byte BROKER_FAST_READ_LATENCY_AVG_MS = (byte) 5;
public static final byte BROKER_SLOW_READ_QUEUE_SIZE = (byte) 6;
public static final Set<Byte> PARTITION_METRICS = Set.of(PARTITION_BYTES_IN, PARTITION_BYTES_OUT, PARTITION_SIZE);
public static final Set<Byte> BROKER_METRICS = Set.of(BROKER_APPEND_LATENCY_AVG_MS, BROKER_APPEND_QUEUE_SIZE,
BROKER_FAST_READ_LATENCY_AVG_MS, BROKER_SLOW_READ_QUEUE_SIZE);
public static final Map<Byte, AbnormalMetric> ABNORMAL_METRICS = Map.of(
BROKER_APPEND_LATENCY_AVG_MS, new AbnormalLatency(50),
BROKER_APPEND_QUEUE_SIZE, new AbnormalQueueSize(100),
BROKER_FAST_READ_LATENCY_AVG_MS, new AbnormalLatency(50),
BROKER_SLOW_READ_QUEUE_SIZE, new AbnormalQueueSize(100)
);

public static Set<Byte> partitionMetrics() {
return Set.of(TOPIC_PARTITION_BYTES_IN, TOPIC_PARTITION_BYTES_OUT, PARTITION_SIZE);
public static AbnormalMetric ofAbnormalType(byte metricType) {
return ABNORMAL_METRICS.get(metricType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.autobalancer.common.types.metrics;

import kafka.autobalancer.model.BrokerUpdater;
import kafka.autobalancer.model.Snapshot;

import java.util.Map;

public class AbnormalLatency extends AbstractSimpleAbnormalMetric {

public AbnormalLatency(float abnormalLatency) {
super(abnormalLatency);
}

@Override
public boolean isAbnormalToPeer(Snapshot self, Map<BrokerUpdater.Broker, Snapshot> peers) {
// TODO: compare with peer
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.autobalancer.common.types.metrics;

import kafka.autobalancer.model.BrokerUpdater;
import kafka.autobalancer.model.Snapshot;

import java.util.Map;

public interface AbnormalMetric {
default boolean isAbnormal(Snapshot self, Map<BrokerUpdater.Broker, Snapshot> peers) {
if (self == null) {
return false;
}
if (isSelfAbnormal(self)) {
return true;
}
return isAbnormalToPeer(self, peers);
}

boolean isSelfAbnormal(Snapshot self);

boolean isAbnormalToPeer(Snapshot self, Map<BrokerUpdater.Broker, Snapshot> peers);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.autobalancer.common.types.metrics;

import kafka.autobalancer.model.BrokerUpdater;
import kafka.autobalancer.model.Snapshot;

import java.util.Map;

public class AbnormalQueueSize extends AbstractSimpleAbnormalMetric {

public AbnormalQueueSize(int abnormalQueueSizeThreshold) {
super(abnormalQueueSizeThreshold);
}

@Override
public boolean isAbnormalToPeer(Snapshot self, Map<BrokerUpdater.Broker, Snapshot> peers) {
return false;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.autobalancer.common.types.metrics;

import kafka.autobalancer.model.Snapshot;

public abstract class AbstractSimpleAbnormalMetric implements AbnormalMetric {
protected final double abnormalThreshold;

public AbstractSimpleAbnormalMetric(double abnormalThreshold) {
this.abnormalThreshold = abnormalThreshold;
}

@Override
public boolean isSelfAbnormal(Snapshot self) {
// TODO: compare with historical data
return self.getLatest() > abnormalThreshold;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ long detect0() {
logger.info("Start detect");
// The delay in processing kraft log could result in outdated cluster snapshot
ClusterModelSnapshot snapshot = this.clusterModel.snapshot(excludedBrokers, excludedTopics, maxTolerateMetricsDelayMs);
snapshot.markSlowBrokers();

for (BrokerUpdater.Broker broker : snapshot.brokers()) {
logger.info("Broker status: {}", broker.shortString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public List<Action> doOptimize(Set<BrokerUpdater.Broker> eligibleBrokers, Cluste
}
actions.addAll(brokerActions);
} else if (requireMoreLoad(broker)) {
if (broker.isSlowBroker()) {
// prevent scheduling more partitions to slow broker
continue;
}
List<Action> brokerActions = tryIncreaseLoadByAction(ActionType.MOVE, cluster, broker, candidateBrokers, goalsByPriority);
if (!isBrokerAcceptable(broker)) {
brokerActions.addAll(tryIncreaseLoadByAction(ActionType.SWAP, cluster, broker, candidateBrokers, goalsByPriority));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ protected List<Action> tryReduceLoadByAction(ActionType actionType,
List<BrokerUpdater.Broker> candidateBrokers,
Collection<Goal> goalsByPriority) {
List<Action> actionList = new ArrayList<>();
candidateBrokers = candidateBrokers.stream().filter(b -> !b.isSlowBroker()).collect(Collectors.toList());
if (candidateBrokers.isEmpty()) {
return actionList;
}
List<TopicPartitionReplicaUpdater.TopicPartitionReplica> srcReplicas = cluster
.replicasFor(srcBroker.getBrokerId())
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ private void addMandatoryMetrics(YammerMetricProcessor.Context context) {
for (AutoBalancerMetrics metrics : context.getMetricMap().values()) {
if (metrics.metricType() == MetricTypes.TOPIC_PARTITION_METRIC
&& !MetricsUtils.sanityCheckTopicPartitionMetricsCompleteness(metrics)) {
metrics.getMetricValueMap().putIfAbsent(RawMetricTypes.TOPIC_PARTITION_BYTES_IN, 0.0);
metrics.getMetricValueMap().putIfAbsent(RawMetricTypes.TOPIC_PARTITION_BYTES_OUT, 0.0);
metrics.getMetricValueMap().putIfAbsent(RawMetricTypes.PARTITION_BYTES_IN, 0.0);
metrics.getMetricValueMap().putIfAbsent(RawMetricTypes.PARTITION_BYTES_OUT, 0.0);
metrics.getMetricValueMap().putIfAbsent(RawMetricTypes.PARTITION_SIZE, 0.0);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.autobalancer.metricsreporter.metric;

import kafka.autobalancer.common.types.MetricTypes;
import kafka.autobalancer.common.types.RawMetricTypes;
import kafka.autobalancer.metricsreporter.exception.UnknownVersionException;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;

public class BrokerMetrics extends AutoBalancerMetrics {

public BrokerMetrics(long time, int brokerId, String brokerRack) {
super(time, brokerId, brokerRack);
}

public BrokerMetrics(long time, int brokerId, String brokerRack, Map<Byte, Double> metricTypeValueMap) {
super(time, brokerId, brokerRack, metricTypeValueMap);
}

public static BrokerMetrics fromBuffer(ByteBuffer buffer) throws UnknownVersionException {
byte version = buffer.get();
if (version > METRIC_VERSION) {
throw new UnknownVersionException("Cannot deserialize the topic metrics for version " + version + ". "
+ "Current version is " + METRIC_VERSION);
}
long time = buffer.getLong();
int brokerId = buffer.getInt();
int brokerRackLength = buffer.getInt();
String brokerRack = "";
if (brokerRackLength > 0) {
brokerRack = new String(buffer.array(), buffer.arrayOffset() + buffer.position(), brokerRackLength, StandardCharsets.UTF_8);
buffer.position(buffer.position() + brokerRackLength);
}
Map<Byte, Double> metricsMap = parseMetricsMap(buffer);
return new BrokerMetrics(time, brokerId, brokerRack, metricsMap);
}

@Override
public AutoBalancerMetrics put(byte type, double value) {
if (!RawMetricTypes.BROKER_METRICS.contains(type)) {
throw new IllegalArgumentException("Cannot put non broker metric type " + type + " into a partition metric.");
}
return super.put(type, value);
}

@Override
public String key() {
return Integer.toString(brokerId());
}

@Override
public byte metricType() {
return MetricTypes.BROKER_METRIC;
}

/**
* The buffer capacity is calculated as follows:
* <ul>
* <li>(headerPos + {@link Byte#BYTES}) - version</li>
* <li>{@link Long#BYTES} - time</li>
* <li>{@link Integer#BYTES} - broker id</li>
* <li>{@link Integer#BYTES} - broker rack length</li>
* <li>brokerRack.length - broker rack</li>
* <li>body length - metric-value body</li>
*
* </ul>
*
* @param headerPos Header position
* @return Byte buffer of the partition metric.
*/
@Override
public ByteBuffer toBuffer(int headerPos) {
byte[] brokerRackBytes = brokerRack().getBytes(StandardCharsets.UTF_8);
ByteBuffer buffer = ByteBuffer.allocate(headerPos + Byte.BYTES
+ Long.BYTES
+ Integer.BYTES
+ Integer.BYTES
+ brokerRackBytes.length
+ bodySize());
buffer.position(headerPos);
buffer.put(METRIC_VERSION);
buffer.putLong(time());
buffer.putInt(brokerId());
buffer.putInt(brokerRackBytes.length);
if (brokerRackBytes.length > 0) {
buffer.put(brokerRackBytes);
}
buffer = writeBody(buffer);
return buffer;
}

@Override
public String toString() {
return String.format("[BrokerMetrics,BrokerId=%d,Time=%d,Key:Value=%s]", brokerId(), time(), buildKVString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private static AutoBalancerMetrics bytesInToMetric(String topic, int partition,
int brokerId, String brokerRack, double value) {

if (topic != null && partition != -1) {
return new TopicPartitionMetrics(nowMs, brokerId, brokerRack, topic, partition).put(RawMetricTypes.TOPIC_PARTITION_BYTES_IN, value);
return new TopicPartitionMetrics(nowMs, brokerId, brokerRack, topic, partition).put(RawMetricTypes.PARTITION_BYTES_IN, value);
}
return null;
}
Expand All @@ -252,12 +252,12 @@ private static AutoBalancerMetrics bytesOutToMetric(String topic, int partition,
int brokerId, String brokerRack, double value) {

if (topic != null && partition != -1) {
return new TopicPartitionMetrics(nowMs, brokerId, brokerRack, topic, partition).put(RawMetricTypes.TOPIC_PARTITION_BYTES_OUT, value);
return new TopicPartitionMetrics(nowMs, brokerId, brokerRack, topic, partition).put(RawMetricTypes.PARTITION_BYTES_OUT, value);
}
return null;
}

public static boolean sanityCheckTopicPartitionMetricsCompleteness(AutoBalancerMetrics metrics) {
return metrics.getMetricValueMap().keySet().containsAll(RawMetricTypes.partitionMetrics());
return metrics.getMetricValueMap().keySet().containsAll(RawMetricTypes.PARTITION_METRICS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static TopicPartitionMetrics fromBuffer(ByteBuffer buffer) throws Unknown

@Override
public AutoBalancerMetrics put(byte type, double value) {
if (!RawMetricTypes.partitionMetrics().contains(type)) {
if (!RawMetricTypes.PARTITION_METRICS.contains(type)) {
throw new IllegalArgumentException("Cannot put non partition metric type " + type + " into a partition metric.");
}
return super.put(type, value);
Expand Down
Loading

0 comments on commit 4cb2ebf

Please sign in to comment.