From 2bcc5b91c91d848ca22340857e0f40349bc67db3 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Tue, 16 Apr 2024 20:01:49 +0800 Subject: [PATCH] feat(issue1120): support reporting broker metrics in AutoBalancerMetricsReporter (#1124) * feat(issue1120): support reporting broker metrics in AutoBalancerMetricsReporter Signed-off-by: Shichao Nie * fix(issue1120): use pending request latency instead of pending size Signed-off-by: Shichao Nie * refactor(issue1120): refactor AutoBalancerMetrics for better inheritance Signed-off-by: Shichao Nie --------- Signed-off-by: Shichao Nie --- .../kafka/autobalancer/LoadRetriever.java | 4 -- .../common/types/RawMetricTypes.java | 17 +++--- .../detector/AnomalyDetector.java | 9 ++- .../AutoBalancerMetricsReporter.java | 17 ++++++ .../metric/AutoBalancerMetrics.java | 14 +++-- .../metricsreporter/metric/BrokerMetrics.java | 7 +-- .../metric/DeltaHistogram.java | 30 ++++++++++ .../metricsreporter/metric/MetricSerde.java | 2 + .../metric/TopicPartitionMetrics.java | 8 +-- .../autobalancer/model/BrokerUpdater.java | 3 +- .../model/MetricValueSequence.java | 4 -- .../kafka/autobalancer/model/Snapshot.java | 53 +++++++++-------- .../detector/AnomalyDetectorTest.java | 5 +- .../AutoBalancerMetricsReporterTest.java | 34 ++++++----- .../autobalancer/model/ClusterModelTest.java | 57 ++++++++++++------- .../java/com/automq/stream/s3/S3Stream.java | 25 ++++++++ .../s3/metrics/S3StreamMetricsConstant.java | 2 + .../s3/metrics/S3StreamMetricsManager.java | 48 ++++++++++++++++ .../automq/stream/s3/metrics/TimerUtil.java | 4 ++ .../S3StreamKafkaMetricsConstants.java | 1 + .../s3stream/S3StreamKafkaMetricsManager.java | 35 ++++++++---- 21 files changed, 271 insertions(+), 108 deletions(-) create mode 100644 core/src/main/java/kafka/autobalancer/metricsreporter/metric/DeltaHistogram.java diff --git a/core/src/main/java/kafka/autobalancer/LoadRetriever.java b/core/src/main/java/kafka/autobalancer/LoadRetriever.java index 1ddc21f98d..d8ae80db0c 100644 --- a/core/src/main/java/kafka/autobalancer/LoadRetriever.java +++ b/core/src/main/java/kafka/autobalancer/LoadRetriever.java @@ -477,10 +477,6 @@ protected void updateClusterModel(AutoBalancerMetrics metrics) { switch (metrics.metricType()) { case MetricTypes.TOPIC_PARTITION_METRIC: TopicPartitionMetrics partitionMetrics = (TopicPartitionMetrics) metrics; - - // TODO: remove this when reporting broker metrics is supported - clusterModel.updateBrokerMetrics(partitionMetrics.brokerId(), new HashMap<>(), partitionMetrics.time()); - clusterModel.updateTopicPartitionMetrics(partitionMetrics.brokerId(), new TopicPartition(partitionMetrics.topic(), partitionMetrics.partition()), partitionMetrics.getMetricValueMap(), partitionMetrics.time()); diff --git a/core/src/main/java/kafka/autobalancer/common/types/RawMetricTypes.java b/core/src/main/java/kafka/autobalancer/common/types/RawMetricTypes.java index 235c92ca63..54faee6415 100644 --- a/core/src/main/java/kafka/autobalancer/common/types/RawMetricTypes.java +++ b/core/src/main/java/kafka/autobalancer/common/types/RawMetricTypes.java @@ -13,7 +13,6 @@ import kafka.autobalancer.common.types.metrics.AbnormalLatency; import kafka.autobalancer.common.types.metrics.AbnormalMetric; -import kafka.autobalancer.common.types.metrics.AbnormalQueueSize; import java.util.Map; import java.util.Set; @@ -23,17 +22,15 @@ public class RawMetricTypes { public static final byte PARTITION_BYTES_OUT = (byte) 1; public static final byte PARTITION_SIZE = (byte) 2; public static final byte BROKER_APPEND_LATENCY_AVG_MS = (byte) 3; - public static final byte BROKER_APPEND_QUEUE_SIZE = (byte) 4; - public static final byte BROKER_FAST_READ_LATENCY_AVG_MS = (byte) 5; - public static final byte BROKER_SLOW_READ_QUEUE_SIZE = (byte) 6; + public static final byte BROKER_MAX_PENDING_APPEND_LATENCY_MS = (byte) 4; + public static final byte BROKER_MAX_PENDING_FETCH_LATENCY_MS = (byte) 5; public static final Set PARTITION_METRICS = Set.of(PARTITION_BYTES_IN, PARTITION_BYTES_OUT, PARTITION_SIZE); - public static final Set BROKER_METRICS = Set.of(BROKER_APPEND_LATENCY_AVG_MS, BROKER_APPEND_QUEUE_SIZE, - BROKER_FAST_READ_LATENCY_AVG_MS, BROKER_SLOW_READ_QUEUE_SIZE); + public static final Set BROKER_METRICS = Set.of(BROKER_APPEND_LATENCY_AVG_MS, + BROKER_MAX_PENDING_APPEND_LATENCY_MS, BROKER_MAX_PENDING_FETCH_LATENCY_MS); public static final Map ABNORMAL_METRICS = Map.of( - BROKER_APPEND_LATENCY_AVG_MS, new AbnormalLatency(50), - BROKER_APPEND_QUEUE_SIZE, new AbnormalQueueSize(100), - BROKER_FAST_READ_LATENCY_AVG_MS, new AbnormalLatency(50), - BROKER_SLOW_READ_QUEUE_SIZE, new AbnormalQueueSize(100) + BROKER_APPEND_LATENCY_AVG_MS, new AbnormalLatency(100), // 100ms + BROKER_MAX_PENDING_APPEND_LATENCY_MS, new AbnormalLatency(10000), // 10s + BROKER_MAX_PENDING_FETCH_LATENCY_MS, new AbnormalLatency(10000) // 10s ); public static AbnormalMetric ofAbnormalType(byte metricType) { diff --git a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java index 136ad81fce..2bb40a25be 100644 --- a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java +++ b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.ConfigUtils; +import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager; import java.util.ArrayList; import java.util.Collections; @@ -54,6 +55,7 @@ public class AnomalyDetector extends AbstractResumableService { private volatile long maxTolerateMetricsDelayMs; private volatile long coolDownIntervalPerActionMs; private volatile boolean isLeader = false; + private volatile Map slowBrokers = new HashMap<>(); AnomalyDetector(LogContext logContext, int maxActionsNumPerDetect, long detectIntervalMs, long maxTolerateMetricsDelayMs, long coolDownIntervalPerActionMs, ClusterModel clusterModel, ActionExecutorService actionExecutor, @@ -72,6 +74,7 @@ public class AnomalyDetector extends AbstractResumableService { this.excludedBrokers = excludedBrokers; this.excludedTopics = excludedTopics; this.executorService.schedule(this::detect, detectInterval, TimeUnit.MILLISECONDS); + S3StreamKafkaMetricsManager.setSlowBrokerSupplier(() -> this.slowBrokers); logger.info("maxActionsNumPerDetect: {}, detectInterval: {}ms, coolDownIntervalPerAction: {}ms, goals: {}, excluded brokers: {}, excluded topics: {}", this.maxActionsNumPerExecution, this.detectInterval, this.coolDownIntervalPerActionMs, this.goalsByPriority, this.excludedBrokers, this.excludedTopics); } @@ -245,14 +248,18 @@ long detect0() { ClusterModelSnapshot snapshot = this.clusterModel.snapshot(excludedBrokers, excludedTopics, maxTolerateMetricsDelayMs); snapshot.markSlowBrokers(); + Map slowBrokers = new HashMap<>(); for (BrokerUpdater.Broker broker : snapshot.brokers()) { - logger.info("Broker status: {}", broker.shortString()); + String brokerStr = logger.isDebugEnabled() ? broker.toString() : broker.shortString(); + slowBrokers.put(broker.getBrokerId(), broker.isSlowBroker()); + logger.info("Broker status: {}", brokerStr); if (logger.isDebugEnabled()) { for (TopicPartitionReplicaUpdater.TopicPartitionReplica replica : snapshot.replicasFor(broker.getBrokerId())) { logger.debug("Replica status {}", replica.shortString()); } } } + this.slowBrokers = slowBrokers; List totalActions = new ArrayList<>(); for (Goal goal : goals) { diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java b/core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java index 9da9a457d5..c2395f2a1b 100644 --- a/core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java +++ b/core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java @@ -11,6 +11,8 @@ package kafka.autobalancer.metricsreporter; +import com.automq.stream.s3.metrics.S3StreamMetricsManager; +import com.automq.stream.s3.metrics.stats.StreamOperationStats; import com.yammer.metrics.core.Metric; import com.yammer.metrics.core.MetricName; import com.yammer.metrics.core.MetricsRegistry; @@ -19,6 +21,8 @@ import kafka.autobalancer.common.types.RawMetricTypes; import kafka.autobalancer.config.AutoBalancerMetricsReporterConfig; import kafka.autobalancer.metricsreporter.metric.AutoBalancerMetrics; +import kafka.autobalancer.metricsreporter.metric.BrokerMetrics; +import kafka.autobalancer.metricsreporter.metric.DeltaHistogram; import kafka.autobalancer.metricsreporter.metric.MetricSerde; import kafka.autobalancer.metricsreporter.metric.MetricsUtils; import kafka.autobalancer.metricsreporter.metric.YammerMetricProcessor; @@ -49,6 +53,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; /** * This class was modified based on Cruise Control: com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter. @@ -60,6 +65,7 @@ public class AutoBalancerMetricsReporter implements MetricsRegistryListener, Met private static final Logger LOGGER = LoggerFactory.getLogger(AutoBalancerMetricsReporter.class); private final Map interestedMetrics = new ConcurrentHashMap<>(); private final MetricsRegistry metricsRegistry = KafkaYammerMetrics.defaultRegistry(); + protected final DeltaHistogram appendLatencyMetric = new DeltaHistogram(); protected YammerMetricProcessor yammerMetricProcessor; private KafkaThread metricsReporterRunner; private KafkaProducer producer; @@ -344,9 +350,20 @@ private void reportMetrics(long now) throws Exception { protected void processMetrics(YammerMetricProcessor.Context context) throws Exception { processYammerMetrics(context); + addBrokerMetrics(context); addMandatoryMetrics(context); } + protected void addBrokerMetrics(YammerMetricProcessor.Context context) { + context.merge(new BrokerMetrics(context.time(), brokerId, brokerRack) + .put(RawMetricTypes.BROKER_APPEND_LATENCY_AVG_MS, + TimeUnit.NANOSECONDS.toMillis((long) appendLatencyMetric.deltaRate(StreamOperationStats.getInstance().appendStreamStats))) + .put(RawMetricTypes.BROKER_MAX_PENDING_APPEND_LATENCY_MS, + TimeUnit.NANOSECONDS.toMillis(S3StreamMetricsManager.maxPendingStreamAppendLatency())) + .put(RawMetricTypes.BROKER_MAX_PENDING_FETCH_LATENCY_MS, + TimeUnit.NANOSECONDS.toMillis(S3StreamMetricsManager.maxPendingStreamFetchLatency()))); + } + protected void processYammerMetrics(YammerMetricProcessor.Context context) throws Exception { for (Map.Entry entry : interestedMetrics.entrySet()) { LOGGER.trace("Processing yammer metric {}, scope = {}", entry.getKey(), entry.getKey().getScope()); diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/AutoBalancerMetrics.java b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/AutoBalancerMetrics.java index 09de9fe7d6..92e3766d6c 100644 --- a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/AutoBalancerMetrics.java +++ b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/AutoBalancerMetrics.java @@ -51,7 +51,12 @@ protected static Map parseMetricsMap(ByteBuffer buffer) { return metricsMap; } + public abstract boolean isValidMetric(byte type); + public AutoBalancerMetrics put(byte type, double value) { + if (!isValidMetric(type)) { + throw new IllegalArgumentException(String.format("Cannot put metric type %d into a %s metric.", type, this.getClass().getSimpleName())); + } this.metricValueMap.put(type, value); return this; } @@ -115,12 +120,11 @@ public ByteBuffer writeBody(ByteBuffer buffer) { public String buildKVString() { StringBuilder builder = new StringBuilder(); - for (Map.Entry entry : metricValueMap.entrySet()) { - builder.append(entry.getKey()); - builder.append(":"); - builder.append(String.format("%.4f", entry.getValue())); + metricValueMap.forEach((k, v) -> builder.append(k).append(":").append(v).append(",")); + if (builder.length() == 0) { + return ""; } - return builder.toString(); + return builder.substring(0, builder.length() - 1); } @Override diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/BrokerMetrics.java b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/BrokerMetrics.java index 6953181213..039b68b5bc 100644 --- a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/BrokerMetrics.java +++ b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/BrokerMetrics.java @@ -48,11 +48,8 @@ public static BrokerMetrics fromBuffer(ByteBuffer buffer) throws UnknownVersionE } @Override - public AutoBalancerMetrics put(byte type, double value) { - if (!RawMetricTypes.BROKER_METRICS.contains(type)) { - throw new IllegalArgumentException("Cannot put non broker metric type " + type + " into a partition metric."); - } - return super.put(type, value); + public boolean isValidMetric(byte type) { + return RawMetricTypes.BROKER_METRICS.contains(type); } @Override diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/DeltaHistogram.java b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/DeltaHistogram.java new file mode 100644 index 0000000000..a2a50282c1 --- /dev/null +++ b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/DeltaHistogram.java @@ -0,0 +1,30 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.autobalancer.metricsreporter.metric; + +import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric; + +public class DeltaHistogram { + private long count = 0; + private long sum = 0; + + public double deltaRate(YammerHistogramMetric metric) { + long deltaCount = metric.count() - count; + if (deltaCount == 0) { + return 0; + } + double deltaRate = (double) (metric.sum() - sum) / deltaCount; + count = metric.count(); + sum = metric.sum(); + return deltaRate; + } +} diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/MetricSerde.java b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/MetricSerde.java index 2688b57bbe..eea62cbfbb 100644 --- a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/MetricSerde.java +++ b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/MetricSerde.java @@ -52,6 +52,8 @@ public static AutoBalancerMetrics fromBytes(byte[] bytes) throws UnknownVersionE switch (buffer.get()) { case MetricTypes.TOPIC_PARTITION_METRIC: return TopicPartitionMetrics.fromBuffer(buffer); + case MetricTypes.BROKER_METRIC: + return BrokerMetrics.fromBuffer(buffer); default: // This could happen when a new type of metric is added, but we are still running the old code. // simply ignore the metric by returning a null. diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/TopicPartitionMetrics.java b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/TopicPartitionMetrics.java index ba3abf81ac..a5ea74aec3 100644 --- a/core/src/main/java/kafka/autobalancer/metricsreporter/metric/TopicPartitionMetrics.java +++ b/core/src/main/java/kafka/autobalancer/metricsreporter/metric/TopicPartitionMetrics.java @@ -62,14 +62,10 @@ public static TopicPartitionMetrics fromBuffer(ByteBuffer buffer) throws Unknown } @Override - public AutoBalancerMetrics put(byte type, double value) { - if (!RawMetricTypes.PARTITION_METRICS.contains(type)) { - throw new IllegalArgumentException("Cannot put non partition metric type " + type + " into a partition metric."); - } - return super.put(type, value); + public boolean isValidMetric(byte type) { + return RawMetricTypes.PARTITION_METRICS.contains(type); } - @Override public String key() { return topic + "-" + partition; diff --git a/core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java b/core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java index 9ab4a9fd3c..7962850d29 100644 --- a/core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java +++ b/core/src/main/java/kafka/autobalancer/model/BrokerUpdater.java @@ -40,8 +40,7 @@ protected Broker createBroker(int brokerId, String rack, boolean active) { @Override protected boolean validateMetrics(Map metricsMap) { - // TODO: add broker metrics validation when reporting broker metrics is supported - return true; + return metricsMap.keySet().containsAll(RawMetricTypes.BROKER_METRICS); } @Override diff --git a/core/src/main/java/kafka/autobalancer/model/MetricValueSequence.java b/core/src/main/java/kafka/autobalancer/model/MetricValueSequence.java index 5cd7108632..61668d6ea1 100644 --- a/core/src/main/java/kafka/autobalancer/model/MetricValueSequence.java +++ b/core/src/main/java/kafka/autobalancer/model/MetricValueSequence.java @@ -16,7 +16,6 @@ public class MetricValueSequence { private static final int DEFAULT_MAX_SIZE = 1024; - private static final int MIN_VALID_LENGTH = 30; private final Deque values; private final int maxSize; private Snapshot prev; @@ -44,9 +43,6 @@ public void append(double value) { } public Snapshot snapshot() { - if (values.size() < MIN_VALID_LENGTH) { - return null; - } Snapshot snapshot = new Snapshot(prev, values); this.prev = snapshot; return snapshot; diff --git a/core/src/main/java/kafka/autobalancer/model/Snapshot.java b/core/src/main/java/kafka/autobalancer/model/Snapshot.java index 3a341ab6c8..56c0e3dce2 100644 --- a/core/src/main/java/kafka/autobalancer/model/Snapshot.java +++ b/core/src/main/java/kafka/autobalancer/model/Snapshot.java @@ -15,49 +15,52 @@ import java.util.Collection; public class Snapshot { + public static final double INVALID = -1; private final Snapshot prev; private final double[] values; - private Double latest; - private Double percentile90th; + private final double latest; public Snapshot(Snapshot prev, Collection values) { this.prev = prev; this.values = values.stream().mapToDouble(Double::doubleValue).toArray(); + this.latest = this.values.length > 0 ? this.values[this.values.length - 1] : INVALID; Arrays.sort(this.values); } public double getValue(double quantile) { - if (!(quantile < 0.0) && !(quantile > 1.0)) { - if (this.values.length == 0) { - return 0.0; - } else { - double pos = quantile * (double) (this.values.length + 1); - if (pos < 1.0) { - return this.values[0]; - } else if (pos >= (double) this.values.length) { - return this.values[this.values.length - 1]; - } else { - double lower = this.values[(int) pos - 1]; - double upper = this.values[(int) pos]; - return lower + (pos - Math.floor(pos)) * (upper - lower); - } - } + if (quantile < 0.0 || quantile > 1.0) { + throw new IllegalArgumentException(quantile + " out of range. Should be in [0, 1]"); + } + if (values.length < getMinValidLength(quantile)) { + return INVALID; + } + double pos = quantile * (double) (this.values.length + 1); + if (pos < 1.0) { + return this.values[0]; + } else if (pos >= (double) this.values.length) { + return this.values[this.values.length - 1]; } else { - throw new IllegalArgumentException(quantile + " is not in [0..1]"); + double lower = this.values[(int) pos - 1]; + double upper = this.values[(int) pos]; + return lower + (pos - Math.floor(pos)) * (upper - lower); } } - public double get90thPercentile() { - if (this.percentile90th == null) { - this.percentile90th = this.getValue(0.9); + private int getMinValidLength(double quantile) { + if (quantile == 0.0 || quantile == 1.0) { + return 1; + } + if (quantile < 0.5) { + return (int) Math.ceil(1.0 / quantile); } - return this.percentile90th; + return (int) Math.ceil(1.0 / (1 - quantile)); + } + + public double get90thPercentile() { + return getValue(0.9); } public double getLatest() { - if (this.latest == null) { - this.latest = this.values[this.values.length - 1]; - } return latest; } diff --git a/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java b/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java index 25609571d5..7f756cd99c 100644 --- a/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java +++ b/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java @@ -147,7 +147,10 @@ public void testSchedulingTimeCost() { int brokerNum = 20; for (int i = 0; i < brokerNum; i++) { clusterModel.registerBroker(i, ""); - clusterModel.updateBrokerMetrics(i, new HashMap<>(), System.currentTimeMillis()); + clusterModel.updateBrokerMetrics(i, Map.of( + RawMetricTypes.BROKER_APPEND_LATENCY_AVG_MS, 0.0, + RawMetricTypes.BROKER_MAX_PENDING_APPEND_LATENCY_MS, 0.0, + RawMetricTypes.BROKER_MAX_PENDING_FETCH_LATENCY_MS, 0.0), System.currentTimeMillis()); } int topicNum = 5000; int totalPartitionNum = 100000; diff --git a/core/src/test/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporterTest.java b/core/src/test/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporterTest.java index 279bc1a4e7..316a041c66 100644 --- a/core/src/test/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporterTest.java +++ b/core/src/test/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporterTest.java @@ -17,6 +17,8 @@ package kafka.autobalancer.metricsreporter; +import kafka.autobalancer.common.types.MetricTypes; +import kafka.autobalancer.common.types.RawMetricTypes; import kafka.autobalancer.config.AutoBalancerMetricsReporterConfig; import kafka.autobalancer.metricsreporter.metric.AutoBalancerMetrics; import kafka.autobalancer.metricsreporter.metric.MetricSerde; @@ -37,7 +39,6 @@ import org.junit.jupiter.api.Test; import java.time.Duration; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -45,10 +46,6 @@ import java.util.Properties; import java.util.Set; -import static kafka.autobalancer.common.types.RawMetricTypes.PARTITION_SIZE; -import static kafka.autobalancer.common.types.RawMetricTypes.PARTITION_BYTES_IN; -import static kafka.autobalancer.common.types.RawMetricTypes.PARTITION_BYTES_OUT; - @Tag("S3Unit") public class AutoBalancerMetricsReporterTest extends AutoBalancerClientsIntegrationTestHarness { @@ -93,24 +90,33 @@ public void testReportingMetrics() { try (Consumer consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singleton(Topic.AUTO_BALANCER_METRICS_TOPIC_NAME)); long startMs = System.currentTimeMillis(); - Set expectedTopicPartitionMetricTypes = new HashSet<>(Arrays.asList( - PARTITION_BYTES_IN, - PARTITION_BYTES_OUT, - PARTITION_SIZE)); - Set expectedMetricTypes = new HashSet<>(expectedTopicPartitionMetricTypes); + Set expectedBrokerMetricTypes = new HashSet<>(RawMetricTypes.BROKER_METRICS); + Set expectedPartitionMetricTypes = new HashSet<>(RawMetricTypes.PARTITION_METRICS); + Set expectedMetricTypes = new HashSet<>(); + expectedMetricTypes.addAll(expectedBrokerMetricTypes); + expectedMetricTypes.addAll(expectedPartitionMetricTypes); Set metricTypes = new HashSet<>(); ConsumerRecords records; - while (metricTypes.size() < expectedTopicPartitionMetricTypes.size() && System.currentTimeMillis() < startMs + 15000) { + while (metricTypes.size() < expectedMetricTypes.size() && System.currentTimeMillis() < startMs + 15000) { records = consumer.poll(Duration.ofMillis(10L)); for (ConsumerRecord record : records) { Set localMetricTypes = new HashSet<>(); - for (Byte type : record.value().getMetricValueMap().keySet()) { + AutoBalancerMetrics metrics = record.value(); + Assertions.assertNotNull(metrics); + for (Byte type : metrics.getMetricValueMap().keySet()) { metricTypes.add(type); localMetricTypes.add(type); } - Assertions.assertEquals(expectedTopicPartitionMetricTypes, localMetricTypes, - "Expected " + expectedTopicPartitionMetricTypes + ", but saw " + localMetricTypes); + if (metrics.metricType() == MetricTypes.BROKER_METRIC) { + Assertions.assertEquals(expectedBrokerMetricTypes, localMetricTypes, + "Expected " + expectedBrokerMetricTypes + ", but saw " + localMetricTypes); + } else if (metrics.metricType() == MetricTypes.TOPIC_PARTITION_METRIC) { + Assertions.assertEquals(expectedPartitionMetricTypes, localMetricTypes, + "Expected " + expectedPartitionMetricTypes + ", but saw " + localMetricTypes); + } else { + Assertions.fail("Unexpected metric type: " + metrics.metricType()); + } } } Assertions.assertEquals(expectedMetricTypes, metricTypes, "Expected " + expectedMetricTypes + ", but saw " + metricTypes); diff --git a/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java b/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java index 630ab707d4..9c8fe8ebd3 100644 --- a/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java +++ b/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java @@ -35,9 +35,7 @@ import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Map; -import java.util.Random; import java.util.Set; @Tag("S3Unit") @@ -264,7 +262,10 @@ public void testExcludeTopics() { .setPartitionId(partition); clusterModel.onPartitionCreate(partitionRecord1); long now = System.currentTimeMillis(); - clusterModel.updateBrokerMetrics(brokerId, new HashMap<>(), now); + clusterModel.updateBrokerMetrics(brokerId, Map.of( + RawMetricTypes.BROKER_APPEND_LATENCY_AVG_MS, 0.0, + RawMetricTypes.BROKER_MAX_PENDING_APPEND_LATENCY_MS, 0.0, + RawMetricTypes.BROKER_MAX_PENDING_FETCH_LATENCY_MS, 0.0), now); TopicPartitionMetrics topicPartitionMetrics = new TopicPartitionMetrics(now, brokerId, "", topicName, partition); topicPartitionMetrics.put(RawMetricTypes.PARTITION_BYTES_IN, 10); @@ -318,7 +319,10 @@ public void testMetricsTime() { long now = System.currentTimeMillis(); - Assertions.assertTrue(clusterModel.updateBrokerMetrics(brokerId, new HashMap<>(), now)); + Assertions.assertTrue(clusterModel.updateBrokerMetrics(brokerId, Map.of( + RawMetricTypes.BROKER_APPEND_LATENCY_AVG_MS, 0.0, + RawMetricTypes.BROKER_MAX_PENDING_APPEND_LATENCY_MS, 0.0, + RawMetricTypes.BROKER_MAX_PENDING_FETCH_LATENCY_MS, 0.0), now)); TopicPartitionMetrics topicPartitionMetrics = new TopicPartitionMetrics(now - 1000, brokerId, "", topicName, partition); topicPartitionMetrics.put(RawMetricTypes.PARTITION_BYTES_IN, 10); topicPartitionMetrics.put(RawMetricTypes.PARTITION_BYTES_OUT, 10); @@ -340,65 +344,78 @@ public void testMetricsTime() { @Test public void testSlowBroker() { - Random r = new Random(); RecordClusterModel clusterModel = new RecordClusterModel(); RegisterBrokerRecord registerBrokerRecord0 = new RegisterBrokerRecord().setBrokerId(0); clusterModel.onBrokerRegister(registerBrokerRecord0); - // APPEND_LATENCY_AVG_MS, APPEND_QUEUE_SIZE, FAST_READ_LATENCY_AVG_MS, SLOW_READ_QUEUE_SIZE - // test not enough samples - for (int i = 0; i < 10; i++) { +// for (int i = 0; i < 10; i++) { +// Assertions.assertTrue(clusterModel.updateBrokerMetrics(0, createBrokerMetrics(0, Double.MAX_VALUE, +// Double.MAX_VALUE, Double.MAX_VALUE).getMetricValueMap(), +// System.currentTimeMillis())); +// } +// ClusterModelSnapshot snapshot = clusterModel.snapshot(); +// snapshot.markSlowBrokers(); +// Assertions.assertFalse(snapshot.broker(0).isSlowBroker()); +// clusterModel.unregisterBroker(0); + + // test high append latency + clusterModel.onBrokerRegister(registerBrokerRecord0); + for (int i = 0; i < 100; i++) { Assertions.assertTrue(clusterModel.updateBrokerMetrics(0, createBrokerMetrics(0, - 999, 999, 999, 999).getMetricValueMap(), - System.currentTimeMillis())); + 0, 0, 0).getMetricValueMap(), System.currentTimeMillis())); } ClusterModelSnapshot snapshot = clusterModel.snapshot(); snapshot.markSlowBrokers(); Assertions.assertFalse(snapshot.broker(0).isSlowBroker()); + Assertions.assertTrue(clusterModel.updateBrokerMetrics(0, createBrokerMetrics(0, + 2000, 0, 0).getMetricValueMap(), System.currentTimeMillis())); + snapshot = clusterModel.snapshot(); + snapshot.markSlowBrokers(); + Assertions.assertTrue(snapshot.broker(0).isSlowBroker()); clusterModel.unregisterBroker(0); - // test abs high latency + // test high pending append latency clusterModel.onBrokerRegister(registerBrokerRecord0); for (int i = 0; i < 100; i++) { Assertions.assertTrue(clusterModel.updateBrokerMetrics(0, createBrokerMetrics(0, - 1, 0, 2, 0).getMetricValueMap(), System.currentTimeMillis())); + 0, 0, 0).getMetricValueMap(), System.currentTimeMillis())); } snapshot = clusterModel.snapshot(); snapshot.markSlowBrokers(); Assertions.assertFalse(snapshot.broker(0).isSlowBroker()); Assertions.assertTrue(clusterModel.updateBrokerMetrics(0, createBrokerMetrics(0, - 100, 0, 2, 0).getMetricValueMap(), System.currentTimeMillis())); + 0, 20000, 0).getMetricValueMap(), System.currentTimeMillis())); snapshot = clusterModel.snapshot(); snapshot.markSlowBrokers(); Assertions.assertTrue(snapshot.broker(0).isSlowBroker()); clusterModel.unregisterBroker(0); - // test large queue size + // test high pending fetch latency clusterModel.onBrokerRegister(registerBrokerRecord0); for (int i = 0; i < 100; i++) { Assertions.assertTrue(clusterModel.updateBrokerMetrics(0, createBrokerMetrics(0, - 1, 0, 2, 0).getMetricValueMap(), System.currentTimeMillis())); + 0, 0, 0).getMetricValueMap(), System.currentTimeMillis())); } snapshot = clusterModel.snapshot(); snapshot.markSlowBrokers(); Assertions.assertFalse(snapshot.broker(0).isSlowBroker()); Assertions.assertTrue(clusterModel.updateBrokerMetrics(0, createBrokerMetrics(0, - 1, 500, 2, 0).getMetricValueMap(), System.currentTimeMillis())); + 0, 0, 20000).getMetricValueMap(), System.currentTimeMillis())); snapshot = clusterModel.snapshot(); snapshot.markSlowBrokers(); Assertions.assertTrue(snapshot.broker(0).isSlowBroker()); clusterModel.unregisterBroker(0); } - private BrokerMetrics createBrokerMetrics(int brokerId, double appendLatency, double appendQueueSize, double fastReadLatency, double slowReadQueueSize) { + private BrokerMetrics createBrokerMetrics(int brokerId, double appendLatency, double pendingAppendLatency, + double pendingFetchLatency) { long now = System.currentTimeMillis(); BrokerMetrics brokerMetrics = new BrokerMetrics(now, brokerId, ""); brokerMetrics.put(RawMetricTypes.BROKER_APPEND_LATENCY_AVG_MS, appendLatency); - brokerMetrics.put(RawMetricTypes.BROKER_APPEND_QUEUE_SIZE, appendQueueSize); - brokerMetrics.put(RawMetricTypes.BROKER_FAST_READ_LATENCY_AVG_MS, fastReadLatency); - brokerMetrics.put(RawMetricTypes.BROKER_SLOW_READ_QUEUE_SIZE, slowReadQueueSize); + brokerMetrics.put(RawMetricTypes.BROKER_MAX_PENDING_APPEND_LATENCY_MS, pendingAppendLatency); + brokerMetrics.put(RawMetricTypes.BROKER_MAX_PENDING_FETCH_LATENCY_MS, pendingFetchLatency); return brokerMetrics; } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index 30b871eba0..42cdefcdf3 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -24,6 +24,7 @@ import com.automq.stream.s3.cache.CacheAccessType; import com.automq.stream.s3.context.AppendContext; import com.automq.stream.s3.context.FetchContext; +import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.StreamOperationStats; import com.automq.stream.s3.model.StreamRecordBatch; @@ -38,11 +39,13 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.Deque; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -70,7 +73,9 @@ public class S3Stream implements Stream { private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); private final ReentrantLock appendLock = new ReentrantLock(); private final Set> pendingAppends = ConcurrentHashMap.newKeySet(); + private final Deque pendingAppendTimestamps = new ConcurrentLinkedDeque<>(); private final Set> pendingFetches = ConcurrentHashMap.newKeySet(); + private final Deque pendingFetchTimestamps = new ConcurrentLinkedDeque<>(); private final AsyncNetworkBandwidthLimiter networkInboundLimiter; private final AsyncNetworkBandwidthLimiter networkOutboundLimiter; private long startOffset; @@ -94,6 +99,20 @@ public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, St this.streamManager = streamManager; this.networkInboundLimiter = networkInboundLimiter; this.networkOutboundLimiter = networkOutboundLimiter; + S3StreamMetricsManager.registerPendingStreamAppendLatencySupplier(streamId, () -> getHeadLatency(this.pendingAppendTimestamps)); + S3StreamMetricsManager.registerPendingStreamFetchLatencySupplier(streamId, () -> getHeadLatency(this.pendingFetchTimestamps)); + } + + private long getHeadLatency(Deque timestamps) { + if (timestamps.isEmpty()) { + return 0L; + } + Long lastTimestamp = timestamps.peek(); + if (lastTimestamp == null) { + LOGGER.error("head timestamp of pending request is null"); + return 0L; + } + return System.nanoTime() - lastTimestamp; } public boolean isClosed() { @@ -143,9 +162,11 @@ public CompletableFuture append(AppendContext context, RecordBatch } }, LOGGER, "append"); pendingAppends.add(cf); + pendingAppendTimestamps.push(timerUtil.lastAs(TimeUnit.NANOSECONDS)); cf.whenComplete((nil, ex) -> { StreamOperationStats.getInstance().appendStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); pendingAppends.remove(cf); + pendingAppendTimestamps.pop(); }); return cf; } finally { @@ -203,6 +224,7 @@ public CompletableFuture fetch(FetchContext context, return CompletableFuture.completedFuture(rs); }); pendingFetches.add(retCf); + pendingFetchTimestamps.push(timerUtil.lastAs(TimeUnit.NANOSECONDS)); retCf.whenComplete((rs, ex) -> { if (ex != null) { Throwable cause = FutureUtil.cause(ex); @@ -220,6 +242,7 @@ public CompletableFuture fetch(FetchContext context, startOffset, endOffset, totalSize, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); } pendingFetches.remove(retCf); + pendingFetchTimestamps.pop(); }); return retCf; } finally { @@ -324,6 +347,8 @@ public CompletableFuture close() { LOGGER.info("{} closed", logIdent); StreamOperationStats.getInstance().closeStreamStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); } + S3StreamMetricsManager.removePendingStreamAppendNumSupplier(streamId); + S3StreamMetricsManager.removePendingStreamFetchNumSupplier(streamId); }); return closeCf; diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java index 6590020356..490fc949f7 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java @@ -79,6 +79,8 @@ public class S3StreamMetricsConstant { public static final String NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRIC_NAME = "network_outbound_limiter_queue_time"; public static final String READ_AHEAD_SIZE_METRIC_NAME = "read_ahead_size"; public static final String READ_AHEAD_STAGE_TIME_METRIC_NAME = "read_ahead_stage_time"; + public static final String PENDING_STREAM_APPEND_LATENCY_METRIC_NAME = "pending_stream_append_latency"; + public static final String PENDING_STREAM_FETCH_LATENCY_METRIC_NAME = "pending_stream_fetch_latency"; public static final String SUM_METRIC_NAME_SUFFIX = "_sum"; public static final String COUNT_METRIC_NAME_SUFFIX = "_count"; public static final String P50_METRIC_NAME_SUFFIX = "_50p"; diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 68f906b223..e9cd2c4567 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -76,6 +76,8 @@ public class S3StreamMetricsManager { private static ObservableLongGauge inflightWALUploadTasksCount = new NoopObservableLongGauge(); private static ObservableLongGauge allocatedMemorySize = new NoopObservableLongGauge(); private static ObservableLongGauge usedMemorySize = new NoopObservableLongGauge(); + private static ObservableLongGauge pendingStreamAppendLatencyMetrics = new NoopObservableLongGauge(); + private static ObservableLongGauge pendingStreamFetchLatencyMetrics = new NoopObservableLongGauge(); private static LongCounter compactionReadSizeInTotal = new NoopLongCounter(); private static LongCounter compactionWriteSizeInTotal = new NoopLongCounter(); private static Supplier networkInboundAvailableBandwidthSupplier = () -> 0L; @@ -90,6 +92,8 @@ public class S3StreamMetricsManager { private static Map> availableInflightS3ReadQuotaSupplier = new ConcurrentHashMap<>(); private static Map> availableInflightS3WriteQuotaSupplier = new ConcurrentHashMap<>(); private static Supplier inflightWALUploadTasksCountSupplier = () -> 0; + private static Map> pendingStreamAppendLatencySupplier = new ConcurrentHashMap<>(); + private static Map> pendingStreamFetchLatencySupplier = new ConcurrentHashMap<>(); private static MetricsConfig metricsConfig = new MetricsConfig(MetricsLevel.INFO, Attributes.empty()); private static final MultiAttributes ALLOC_TYPE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), S3StreamMetricsConstant.LABEL_TYPE); @@ -289,6 +293,26 @@ public static void initMetrics(Meter meter, String prefix) { result.record(ByteBufAlloc.byteBufAllocMetric.getUsedMemory(), metricsConfig.getBaseAttributes()); } }); + pendingStreamAppendLatencyMetrics = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.PENDING_STREAM_APPEND_LATENCY_METRIC_NAME) + .setDescription("The maximum latency of pending stream append requests. NOTE: the minimum measurable " + + "latency depends on the reporting interval of this metrics.") + .ofLongs() + .setUnit("nanoseconds") + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { + result.record(maxPendingStreamAppendLatency(), metricsConfig.getBaseAttributes()); + } + }); + pendingStreamFetchLatencyMetrics = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.PENDING_STREAM_FETCH_LATENCY_METRIC_NAME) + .setDescription("The maximum latency of pending stream append requests. NOTE: the minimum measurable " + + "latency depends on the reporting interval of this metrics.") + .ofLongs() + .setUnit("nanoseconds") + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { + result.record(maxPendingStreamFetchLatency(), metricsConfig.getBaseAttributes()); + } + }); } public static void registerNetworkLimiterSupplier(AsyncNetworkBandwidthLimiter.Type type, @@ -536,4 +560,28 @@ public static CounterMetric buildCompactionWriteSizeMetric() { return metric; } } + + public static void registerPendingStreamAppendLatencySupplier(long streamId, Supplier pendingStreamAppendLatencySupplier) { + S3StreamMetricsManager.pendingStreamAppendLatencySupplier.put(streamId, pendingStreamAppendLatencySupplier); + } + + public static void registerPendingStreamFetchLatencySupplier(long streamId, Supplier pendingStreamFetchLatencySupplier) { + S3StreamMetricsManager.pendingStreamFetchLatencySupplier.put(streamId, pendingStreamFetchLatencySupplier); + } + + public static void removePendingStreamAppendNumSupplier(long streamId) { + S3StreamMetricsManager.pendingStreamAppendLatencySupplier.remove(streamId); + } + + public static void removePendingStreamFetchNumSupplier(long streamId) { + S3StreamMetricsManager.pendingStreamFetchLatencySupplier.remove(streamId); + } + + public static long maxPendingStreamAppendLatency() { + return pendingStreamAppendLatencySupplier.values().stream().map(Supplier::get).max(Long::compareTo).orElse(0L); + } + + public static long maxPendingStreamFetchLatency() { + return pendingStreamFetchLatencySupplier.values().stream().map(Supplier::get).max(Long::compareTo).orElse(0L); + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/TimerUtil.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/TimerUtil.java index 9059b2d761..5fe21ea34f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/TimerUtil.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/TimerUtil.java @@ -25,6 +25,10 @@ public void reset() { last.set(System.nanoTime()); } + public long lastAs(TimeUnit timeUnit) { + return timeUnit.convert(last.get(), TimeUnit.NANOSECONDS); + } + public long elapsedAs(TimeUnit timeUnit) { return timeUnit.convert(System.nanoTime() - last.get(), TimeUnit.NANOSECONDS); } diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java index 56c8388b11..4e989ad0b5 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java @@ -21,6 +21,7 @@ public class S3StreamKafkaMetricsConstants { public static final String STREAM_OBJECT_NUM = "stream_object_num"; public static final String FETCH_LIMITER_PERMIT_NUM = "fetch_limiter_permit_num"; public static final String FETCH_PENDING_TASK_NUM = "fetch_pending_task_num"; + public static final String SLOW_BROKER_METRIC_NAME = "slow_broker_count"; public static final AttributeKey LABEL_NODE_ID = AttributeKey.stringKey("node_id"); diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java index ce405d592b..0bd6b60b68 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java @@ -60,6 +60,8 @@ public class S3StreamKafkaMetricsManager { private static ObservableLongGauge fetchPendingTaskNumMetrics = new NoopObservableLongGauge(); private static Supplier> fetchPendingTaskNumSupplier = Collections::emptyMap; private static MetricsConfig metricsConfig = new MetricsConfig(MetricsLevel.INFO, Attributes.empty()); + private static ObservableLongGauge slowBrokerMetrics = new NoopObservableLongGauge(); + private static Supplier> slowBrokerSupplier = Collections::emptyMap; public static void configure(MetricsConfig metricsConfig) { synchronized (BASE_ATTRIBUTES_LISTENERS) { @@ -82,7 +84,7 @@ private static void initAutoBalancerMetrics(Meter meter, String prefix) { .setUnit("ms") .ofLongs() .buildWithCallback(result -> { - if (shouldRecordMetrics()) { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && isActiveSupplier.get()) { Map metricsTimeDelayMap = autoBalancerMetricsTimeMapSupplier.get(); for (Map.Entry entry : metricsTimeDelayMap.entrySet()) { long timestamp = entry.getValue(); @@ -91,6 +93,17 @@ private static void initAutoBalancerMetrics(Meter meter, String prefix) { } } }); + slowBrokerMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.SLOW_BROKER_METRIC_NAME) + .setDescription("The metrics to indicate whether the broker is slow or not") + .ofLongs() + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && isActiveSupplier.get()) { + Map slowBrokerMap = slowBrokerSupplier.get(); + for (Map.Entry entry : slowBrokerMap.entrySet()) { + result.record(entry.getValue() ? 1 : 0, BROKER_ATTRIBUTES.get(String.valueOf(entry.getKey()))); + } + } + }); } private static void initObjectMetrics(Meter meter, String prefix) { @@ -98,7 +111,7 @@ private static void initObjectMetrics(Meter meter, String prefix) { .setDescription("The total count of s3 objects in different states") .ofLongs() .buildWithCallback(result -> { - if (shouldRecordMetrics()) { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && isActiveSupplier.get()) { Map s3ObjectCountMap = s3ObjectCountMapSupplier.get(); for (Map.Entry entry : s3ObjectCountMap.entrySet()) { result.record(entry.getValue(), S3_OBJECT_ATTRIBUTES.get(entry.getKey())); @@ -110,7 +123,7 @@ private static void initObjectMetrics(Meter meter, String prefix) { .setUnit("bytes") .ofLongs() .buildWithCallback(result -> { - if (shouldRecordMetrics()) { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && isActiveSupplier.get()) { result.record(s3ObjectSizeSupplier.get(), metricsConfig.getBaseAttributes()); } }); @@ -118,7 +131,7 @@ private static void initObjectMetrics(Meter meter, String prefix) { .setDescription("The total number of stream set objects") .ofLongs() .buildWithCallback(result -> { - if (shouldRecordMetrics()) { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && isActiveSupplier.get()) { Map streamSetObjectNumMap = streamSetObjectNumSupplier.get(); for (Map.Entry entry : streamSetObjectNumMap.entrySet()) { result.record(entry.getValue(), BROKER_ATTRIBUTES.get(entry.getKey())); @@ -129,7 +142,7 @@ private static void initObjectMetrics(Meter meter, String prefix) { .setDescription("The total number of stream objects") .ofLongs() .buildWithCallback(result -> { - if (shouldRecordMetrics()) { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && isActiveSupplier.get()) { result.record(streamObjectNumSupplier.get(), metricsConfig.getBaseAttributes()); } }); @@ -140,7 +153,7 @@ private static void initFetchMetrics(Meter meter, String prefix) { .setDescription("The number of permits in fetch limiters") .ofLongs() .buildWithCallback(result -> { - if (shouldRecordMetrics()) { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { Map fetchLimiterPermitNumMap = fetchLimiterPermitNumSupplier.get(); for (Map.Entry entry : fetchLimiterPermitNumMap.entrySet()) { result.record(entry.getValue(), FETCH_LIMITER_ATTRIBUTES.get(entry.getKey())); @@ -151,7 +164,7 @@ private static void initFetchMetrics(Meter meter, String prefix) { .setDescription("The number of pending tasks in fetch executors") .ofLongs() .buildWithCallback(result -> { - if (shouldRecordMetrics()) { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { Map fetchPendingTaskNumMap = fetchPendingTaskNumSupplier.get(); for (Map.Entry entry : fetchPendingTaskNumMap.entrySet()) { result.record(entry.getValue(), FETCH_EXECUTOR_ATTRIBUTES.get(entry.getKey())); @@ -160,10 +173,6 @@ private static void initFetchMetrics(Meter meter, String prefix) { }); } - private static boolean shouldRecordMetrics() { - return MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && isActiveSupplier.get(); - } - public static void setIsActiveSupplier(Supplier isActiveSupplier) { S3StreamKafkaMetricsManager.isActiveSupplier = isActiveSupplier; } @@ -195,4 +204,8 @@ public static void setFetchLimiterPermitNumSupplier(Supplier> fetchPendingTaskNumSupplier) { S3StreamKafkaMetricsManager.fetchPendingTaskNumSupplier = fetchPendingTaskNumSupplier; } + + public static void setSlowBrokerSupplier(Supplier> slowBrokerSupplier) { + S3StreamKafkaMetricsManager.slowBrokerSupplier = slowBrokerSupplier; + } }