-
Notifications
You must be signed in to change notification settings - Fork 200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MetricsServer v2 #469
MetricsServer v2 #469
Changes from all commits
ce6aa9b
a24b046
75f783c
bd17208
8468b75
c5709a2
d012827
a631e0f
234ff55
0fe7565
b0fef28
49b2ab2
aa86f5d
32b27e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,14 +19,18 @@ | |
import io.mantisrx.common.metrics.measurement.CounterMeasurement; | ||
import io.mantisrx.common.metrics.measurement.GaugeMeasurement; | ||
import io.mantisrx.common.metrics.measurement.Measurements; | ||
import io.mantisrx.common.metrics.measurement.MicrometerMeasurement; | ||
import io.mantisrx.common.metrics.spectator.MetricId; | ||
import io.mantisrx.shaded.com.fasterxml.jackson.core.JsonProcessingException; | ||
import io.mantisrx.shaded.com.fasterxml.jackson.databind.DeserializationFeature; | ||
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper; | ||
import io.mantisrx.shaded.com.fasterxml.jackson.datatype.jdk8.Jdk8Module; | ||
import io.micrometer.core.instrument.Meter; | ||
import io.micrometer.core.instrument.MeterRegistry; | ||
import io.netty.buffer.ByteBuf; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
@@ -53,41 +57,50 @@ public class MetricsServer { | |
private int port; | ||
private Map<String, String> tags; | ||
private long publishRateInSeconds; | ||
private MeterRegistry micrometerRegistry; | ||
|
||
public MetricsServer(int port, long publishRateInSeconds, Map<String, String> tags) { | ||
public MetricsServer(int port, long publishRateInSeconds, Map<String, String> tags, MeterRegistry micrometerRegistry) { | ||
this.port = port; | ||
this.publishRateInSeconds = publishRateInSeconds; | ||
this.tags = tags; | ||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); | ||
mapper.registerModule(new Jdk8Module()); | ||
this.micrometerRegistry = micrometerRegistry; | ||
} | ||
|
||
private Observable<Measurements> measurements(long timeFrequency) { | ||
final MeterRegistry microRegistry = this.micrometerRegistry; | ||
final MetricsRegistry registry = MetricsRegistry.getInstance(); | ||
return | ||
Observable.interval(0, timeFrequency, TimeUnit.SECONDS) | ||
.flatMap(new Func1<Long, Observable<Measurements>>() { | ||
@Override | ||
public Observable<Measurements> call(Long t1) { | ||
long timestamp = System.currentTimeMillis(); | ||
List<Measurements> measurements = new ArrayList<>(); | ||
for (Metrics metrics : registry.metrics()) { | ||
Collection<CounterMeasurement> counters = new LinkedList<>(); | ||
Collection<GaugeMeasurement> gauges = new LinkedList<>(); | ||
|
||
for (Entry<MetricId, Counter> counterEntry : metrics.counters().entrySet()) { | ||
Counter counter = counterEntry.getValue(); | ||
counters.add(new CounterMeasurement(counterEntry.getKey().metricName(), counter.value())); | ||
} | ||
for (Entry<MetricId, Gauge> gaugeEntry : metrics.gauges().entrySet()) { | ||
gauges.add(new GaugeMeasurement(gaugeEntry.getKey().metricName(), gaugeEntry.getValue().doubleValue())); | ||
} | ||
measurements.add(new Measurements(metrics.getMetricGroupId().id(), | ||
timestamp, counters, gauges, tags)); | ||
} | ||
return Observable.from(measurements); | ||
Observable.interval(0, timeFrequency, TimeUnit.SECONDS) | ||
.flatMap(new Func1<Long, Observable<Measurements>>() { | ||
@Override | ||
public Observable<Measurements> call(Long t1) { | ||
long timestamp = System.currentTimeMillis(); | ||
List<Measurements> measurements = new ArrayList<>(); | ||
|
||
for (Meter meter: microRegistry.getMeters()) { | ||
Collection<MicrometerMeasurement> micrometers = new LinkedList<>(); | ||
micrometers.add(new MicrometerMeasurement(meter.getId().getType(), meter.measure().iterator().next().getValue())); | ||
measurements.add(new Measurements(meter.getId().getName(), timestamp, Collections.emptyList(), Collections.emptyList(), micrometers, tags)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are generating a measurement instance for every micrometer metric, even though measurements are supposed to consist of a collection of counters, timers, and other components for a specific group ID. Considering this, is the semantics of having a 1:1 relationship between the measurements instance and micrometer metric correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it is correct. |
||
} | ||
|
||
for (io.mantisrx.common.metrics.Metrics metrics : registry.metrics()) { | ||
Collection<CounterMeasurement> counters = new LinkedList<>(); | ||
Collection<GaugeMeasurement> gauges = new LinkedList<>(); | ||
for (Entry<MetricId, Counter> counterEntry : metrics.counters().entrySet()) { | ||
Counter counter = counterEntry.getValue(); | ||
counters.add(new CounterMeasurement(counterEntry.getKey().metricName(), counter.value())); | ||
} | ||
}); | ||
for (Entry<MetricId, Gauge> gaugeEntry : metrics.gauges().entrySet()) { | ||
gauges.add(new GaugeMeasurement(gaugeEntry.getKey().metricName(), gaugeEntry.getValue().doubleValue())); | ||
} | ||
measurements.add(new Measurements(metrics.getMetricGroupId().name(), | ||
timestamp, counters, gauges, Collections.emptyList(),tags)); | ||
} | ||
return Observable.from(measurements); | ||
} | ||
}); | ||
} | ||
|
||
public void start() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonCreator; | ||
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonProperty; | ||
import io.micrometer.core.instrument.Meter; | ||
import java.util.Collection; | ||
import java.util.Map; | ||
|
||
|
@@ -30,6 +31,7 @@ public class Measurements { | |
private long timestamp; | ||
private Collection<CounterMeasurement> counters; | ||
private Collection<GaugeMeasurement> gauges; | ||
private Collection<MicrometerMeasurement> micrometers; | ||
|
||
@JsonCreator | ||
@JsonIgnoreProperties(ignoreUnknown = true) | ||
|
@@ -38,11 +40,13 @@ public Measurements( | |
@JsonProperty("timestamp") long timestamp, | ||
@JsonProperty("counters") Collection<CounterMeasurement> counters, | ||
@JsonProperty("gauges") Collection<GaugeMeasurement> gauges, | ||
@JsonProperty("micrometers") Collection<MicrometerMeasurement> micrometers, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this when we are already emitting the existing data-structures? Unless the downstream understands this DS, emitting it is unnecessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
@JsonProperty("tags") Map<String, String> tags) { | ||
this.name = name; | ||
this.timestamp = timestamp; | ||
this.counters = counters; | ||
this.gauges = gauges; | ||
this.micrometers = micrometers; | ||
this.tags = tags; | ||
} | ||
|
||
|
@@ -62,6 +66,10 @@ public Collection<GaugeMeasurement> getGauges() { | |
return gauges; | ||
} | ||
|
||
public Collection<MicrometerMeasurement> getMicrometers() { | ||
return micrometers; | ||
} | ||
|
||
public Map<String, String> getTags() { | ||
return tags; | ||
} | ||
|
@@ -74,6 +82,7 @@ public String toString() { | |
", tags=" + tags + | ||
", counters=" + counters + | ||
", gauges=" + gauges + | ||
", micrometers=" + micrometers + | ||
'}'; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright 2023 Netflix, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.mantisrx.common.metrics.measurement; | ||
|
||
import io.micrometer.core.instrument.Meter; | ||
import lombok.Value; | ||
|
||
@Value | ||
public class MicrometerMeasurement { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use lombok @value. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we even need this type, or can we convert the micrometer measurements to the existing measures DS? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should keep them separate to allow identify mantis metrics vs micrometer metrics. |
||
Meter.Type type; | ||
double value; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
|
||
import static org.apache.flink.configuration.GlobalConfiguration.loadConfiguration; | ||
|
||
import io.micrometer.core.instrument.MeterRegistry; | ||
import akka.actor.ActorRef; | ||
import akka.actor.ActorSystem; | ||
import akka.actor.DeadLetter; | ||
|
@@ -79,6 +80,7 @@ | |
import io.mantisrx.server.master.scheduler.WorkerRegistry; | ||
import io.mantisrx.shaded.com.fasterxml.jackson.core.JsonProcessingException; | ||
import io.mantisrx.shaded.org.apache.curator.utils.ZKPaths; | ||
import io.micrometer.core.instrument.composite.CompositeMeterRegistry; | ||
import java.io.File; | ||
import java.io.FileInputStream; | ||
import java.io.FileNotFoundException; | ||
|
@@ -119,16 +121,16 @@ public class MasterMain implements Service { | |
private MasterConfiguration config; | ||
private SchedulingService schedulingService; | ||
private ILeadershipManager leadershipManager; | ||
private final MeterRegistry micrometerRegistry; | ||
|
||
public MasterMain(ConfigurationFactory configFactory, AuditEventSubscriber auditEventSubscriber) { | ||
|
||
public MasterMain(ConfigurationFactory configFactory, AuditEventSubscriber auditEventSubscriber, MeterRegistry micrometerRegistry) { | ||
this.micrometerRegistry = micrometerRegistry; | ||
Comment on lines
+126
to
+127
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can use the CompositeRegistry singleton here since this is the leaf node. |
||
String test = "{\"jobId\":\"sine-function-1\",\"status\":{\"jobId\":\"sine-function-1\",\"stageNum\":1,\"workerIndex\":0,\"workerNumber\":2,\"type\":\"HEARTBEAT\",\"message\":\"heartbeat\",\"state\":\"Noop\",\"hostname\":null,\"timestamp\":1525813363585,\"reason\":\"Normal\",\"payloads\":[{\"type\":\"SubscriptionState\",\"data\":\"false\"},{\"type\":\"IncomingDataDrop\",\"data\":\"{\\\"onNextCount\\\":0,\\\"droppedCount\\\":0}\"}]}}"; | ||
|
||
Metrics metrics = new Metrics.Builder() | ||
.id("MasterMain") | ||
.addCounter("masterInitSuccess") | ||
.addCounter("masterInitError") | ||
.build(); | ||
.id("MasterMain") | ||
.addCounter("masterInitSuccess") | ||
.addCounter("masterInitError") | ||
.build(); | ||
Metrics m = MetricsRegistry.getInstance().registerAndGet(metrics); | ||
try { | ||
ConfigurationProvider.initialize(configFactory); | ||
|
@@ -238,7 +240,7 @@ public MasterMain(ConfigurationFactory configFactory, AuditEventSubscriber audit | |
|
||
// start serving metrics | ||
if (config.getMasterMetricsPort() > 0) { | ||
new MetricsServerService(config.getMasterMetricsPort(), 1, Collections.emptyMap()).start(); | ||
new MetricsServerService(config.getMasterMetricsPort(), 1, Collections.emptyMap(), micrometerRegistry).start(); | ||
} | ||
new MetricsPublisherService(config.getMetricsPublisher(), config.getMetricsPublisherFrequencyInSeconds(), | ||
new HashMap<>()).start(); | ||
|
@@ -375,7 +377,8 @@ public static void main(String[] args) { | |
StaticPropertiesConfigurationFactory factory = new StaticPropertiesConfigurationFactory(props); | ||
setupDummyAgentClusterAutoScaler(); | ||
final AuditEventSubscriber auditEventSubscriber = new AuditEventSubscriberLoggingImpl(); | ||
MasterMain master = new MasterMain(factory, auditEventSubscriber); | ||
MeterRegistry registry = new CompositeMeterRegistry(); | ||
MasterMain master = new MasterMain(factory, auditEventSubscriber,registry); | ||
master.start(); // blocks until shutdown hook (ctrl-c) | ||
} catch (Exception e) { | ||
// unexpected to get a RuntimeException, will exit | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,6 +40,8 @@ | |
import io.mantisrx.runtime.parameter.Parameter; | ||
import io.mantisrx.runtime.parameter.ParameterDefinition; | ||
import io.mantisrx.runtime.parameter.ParameterUtils; | ||
import io.micrometer.core.instrument.MeterRegistry; | ||
import io.micrometer.core.instrument.composite.CompositeMeterRegistry; | ||
import io.reactivex.mantis.remote.observable.EndpointChange; | ||
import io.reactivex.mantis.remote.observable.EndpointChange.Type; | ||
import io.reactivex.mantis.remote.observable.EndpointInjector; | ||
|
@@ -70,8 +72,13 @@ public class LocalJobExecutorNetworked { | |
private static final Logger logger = LoggerFactory.getLogger(LocalJobExecutorNetworked.class); | ||
private static final int numPartitions = 1; | ||
private static final Action0 nullAction = () -> System.exit(0); | ||
private final MeterRegistry registry; | ||
|
||
|
||
private LocalJobExecutorNetworked(MeterRegistry registry) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need the MeterRegistry in the constructor here since this is also a leaf node. This would avoid some of the other extraneous changes. |
||
this.registry = registry; | ||
} | ||
|
||
private LocalJobExecutorNetworked() {} | ||
|
||
@SuppressWarnings("rawtypes") | ||
private static void startSource(int index, int port, int workersAtNextStage, SourceHolder source, StageConfig stage, | ||
|
@@ -158,8 +165,11 @@ public static void execute(Job job, Parameter... parameters) throws IllegalManti | |
execute(job, builder.build(), parameters); | ||
} | ||
|
||
@SuppressWarnings( {"rawtypes", "unchecked"}) | ||
public static void execute(Job job, SchedulingInfo schedulingInfo, Parameter... parameters) throws IllegalMantisJobException { | ||
execute(job, schedulingInfo, new CompositeMeterRegistry(), parameters); | ||
} | ||
@SuppressWarnings( {"rawtypes", "unchecked"}) | ||
public static void execute(Job job, SchedulingInfo schedulingInfo, MeterRegistry registry, Parameter... parameters) throws IllegalMantisJobException { | ||
// validate job | ||
try { | ||
new ValidateJob(job).execute(); | ||
|
@@ -176,7 +186,7 @@ public static void execute(Job job, SchedulingInfo schedulingInfo, Parameter... | |
// register netty metrics | ||
RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory()); | ||
// start our metrics server | ||
MetricsServer metricsServer = new MetricsServer(portSelector.acquirePort(), 1, Collections.EMPTY_MAP); | ||
MetricsServer metricsServer = new MetricsServer(portSelector.acquirePort(), 1, Collections.EMPTY_MAP, registry); | ||
metricsServer.start(); | ||
|
||
Lifecycle lifecycle = job.getLifecycle(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed because we are dual publishing the metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the spectator library can be removed when all the metrics have been changed.