Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MetricsServer v2 #469

Closed
wants to merge 14 commits into from
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ ext.versions = [
jsr305 : "3.0.1",
junit4 : "4.11",
junit5 : "5.4.+",
micrometer: "1.11.0",
mockito : "2.0.+",
mockito3 : "3.+",
spectator: "1.3.+",
Expand Down Expand Up @@ -74,6 +75,7 @@ ext.libraries = [
"org.junit.jupiter:junit-jupiter-params:${versions.junit5}",
],
mantisShaded : "io.mantisrx:mantis-shaded:2.0.97",
micrometerCore : "io.micrometer:micrometer-core:${versions.micrometer}",
mockitoAll : "org.mockito:mockito-all:${versions.mockito}",
mockitoCore : "org.mockito:mockito-core:${versions.mockito}",
mockitoCore3 : "org.mockito:mockito-core:${versions.mockito3}",
Expand Down
6 changes: 3 additions & 3 deletions mantis-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ dependencies {
api "org.xerial.snappy:snappy-java:$snappyVersion"
api "org.jctools:jctools-core:$jctoolsVersion"

// spectatorApi should be packaged at entry point level to avoid version conflicts.
compileOnly libraries.spectatorApi

api libraries.jsr305
api libraries.mantisShaded
api libraries.rxNettyShaded
api libraries.rxJava
api libraries.slf4jApi
api libraries.slf4jLog4j12
api libraries.micrometerCore

implementation libraries.commonsIo
implementation 'net.jcip:jcip-annotations:1.0'
implementation libraries.spectatorApi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed because we are dual publishing the metrics?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the spectator library can be removed when all the metrics have been changed.


testImplementation libraries.spectatorApi
testImplementation libraries.commonsLang3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@
import io.mantisrx.common.metrics.measurement.CounterMeasurement;
import io.mantisrx.common.metrics.measurement.GaugeMeasurement;
import io.mantisrx.common.metrics.measurement.Measurements;
import io.mantisrx.common.metrics.measurement.MicrometerMeasurement;
import io.mantisrx.common.metrics.spectator.MetricId;
import io.mantisrx.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.DeserializationFeature;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.shaded.com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -53,41 +57,50 @@ public class MetricsServer {
private int port;
private Map<String, String> tags;
private long publishRateInSeconds;
private MeterRegistry micrometerRegistry;

public MetricsServer(int port, long publishRateInSeconds, Map<String, String> tags) {
public MetricsServer(int port, long publishRateInSeconds, Map<String, String> tags, MeterRegistry micrometerRegistry) {
this.port = port;
this.publishRateInSeconds = publishRateInSeconds;
this.tags = tags;
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.registerModule(new Jdk8Module());
this.micrometerRegistry = micrometerRegistry;
}

private Observable<Measurements> measurements(long timeFrequency) {
final MeterRegistry microRegistry = this.micrometerRegistry;
final MetricsRegistry registry = MetricsRegistry.getInstance();
return
Observable.interval(0, timeFrequency, TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<Measurements>>() {
@Override
public Observable<Measurements> call(Long t1) {
long timestamp = System.currentTimeMillis();
List<Measurements> measurements = new ArrayList<>();
for (Metrics metrics : registry.metrics()) {
Collection<CounterMeasurement> counters = new LinkedList<>();
Collection<GaugeMeasurement> gauges = new LinkedList<>();

for (Entry<MetricId, Counter> counterEntry : metrics.counters().entrySet()) {
Counter counter = counterEntry.getValue();
counters.add(new CounterMeasurement(counterEntry.getKey().metricName(), counter.value()));
}
for (Entry<MetricId, Gauge> gaugeEntry : metrics.gauges().entrySet()) {
gauges.add(new GaugeMeasurement(gaugeEntry.getKey().metricName(), gaugeEntry.getValue().doubleValue()));
}
measurements.add(new Measurements(metrics.getMetricGroupId().id(),
timestamp, counters, gauges, tags));
}
return Observable.from(measurements);
Observable.interval(0, timeFrequency, TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<Measurements>>() {
@Override
public Observable<Measurements> call(Long t1) {
long timestamp = System.currentTimeMillis();
List<Measurements> measurements = new ArrayList<>();

for (Meter meter: microRegistry.getMeters()) {
Collection<MicrometerMeasurement> micrometers = new LinkedList<>();
micrometers.add(new MicrometerMeasurement(meter.getId().getType(), meter.measure().iterator().next().getValue()));
measurements.add(new Measurements(meter.getId().getName(), timestamp, Collections.emptyList(), Collections.emptyList(), micrometers, tags));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are generating a measurement instance for every micrometer metric, even though measurements are supposed to consist of a collection of counters, timers, and other components for a specific group ID. Considering this, is the semantics of having a 1:1 relationship between the measurements instance and micrometer metric correct?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is correct.

}

for (io.mantisrx.common.metrics.Metrics metrics : registry.metrics()) {
Collection<CounterMeasurement> counters = new LinkedList<>();
Collection<GaugeMeasurement> gauges = new LinkedList<>();
for (Entry<MetricId, Counter> counterEntry : metrics.counters().entrySet()) {
Counter counter = counterEntry.getValue();
counters.add(new CounterMeasurement(counterEntry.getKey().metricName(), counter.value()));
}
});
for (Entry<MetricId, Gauge> gaugeEntry : metrics.gauges().entrySet()) {
gauges.add(new GaugeMeasurement(gaugeEntry.getKey().metricName(), gaugeEntry.getValue().doubleValue()));
}
measurements.add(new Measurements(metrics.getMetricGroupId().name(),
timestamp, counters, gauges, Collections.emptyList(),tags));
}
return Observable.from(measurements);
}
});
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonCreator;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonProperty;
import io.micrometer.core.instrument.Meter;
import java.util.Collection;
import java.util.Map;

Expand All @@ -30,6 +31,7 @@ public class Measurements {
private long timestamp;
private Collection<CounterMeasurement> counters;
private Collection<GaugeMeasurement> gauges;
private Collection<MicrometerMeasurement> micrometers;

@JsonCreator
@JsonIgnoreProperties(ignoreUnknown = true)
Expand All @@ -38,11 +40,13 @@ public Measurements(
@JsonProperty("timestamp") long timestamp,
@JsonProperty("counters") Collection<CounterMeasurement> counters,
@JsonProperty("gauges") Collection<GaugeMeasurement> gauges,
@JsonProperty("micrometers") Collection<MicrometerMeasurement> micrometers,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this when we are already emitting the existing data-structures? Unless the downstream understands this DS, emitting it is unnecessary.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CounterMeasurement and GaugeMeasurement are for existing mantis internal metrics. All micrometer metrics will be part of the MicrometerMeasurement for now.

@JsonProperty("tags") Map<String, String> tags) {
this.name = name;
this.timestamp = timestamp;
this.counters = counters;
this.gauges = gauges;
this.micrometers = micrometers;
this.tags = tags;
}

Expand All @@ -62,6 +66,10 @@ public Collection<GaugeMeasurement> getGauges() {
return gauges;
}

public Collection<MicrometerMeasurement> getMicrometers() {
return micrometers;
}

public Map<String, String> getTags() {
return tags;
}
Expand All @@ -74,6 +82,7 @@ public String toString() {
", tags=" + tags +
", counters=" + counters +
", gauges=" + gauges +
", micrometers=" + micrometers +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.mantisrx.common.metrics.measurement;

import io.micrometer.core.instrument.Meter;
import lombok.Value;

@Value
public class MicrometerMeasurement {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use lombok @value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we even need this type, or can we convert the micrometer measurements to the existing measures DS?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep them separate to allow identify mantis metrics vs micrometer metrics.
Eventually, we can migrate Measurements to MicrometerMeasurement only.

Meter.Type type;
double value;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.mantisrx.common.metrics.netty;

import io.micrometer.core.instrument.MeterRegistry;
import mantis.io.reactivex.netty.client.ClientMetricsEvent;
import mantis.io.reactivex.netty.client.RxClient;
import mantis.io.reactivex.netty.metrics.MetricEventsListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
api "org.skife.config:config-magic:$configMagicVersion"
api libraries.flinkRpcApi
api libraries.flinkCore
implementation libraries.micrometerCore
implementation libraries.commonsIo
compileOnly libraries.jsr305
compileOnly libraries.flinkRpcImpl // Provided by copyLibs task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.mantisrx.server.core.CoreConfiguration;
import io.mantisrx.server.core.ExecuteStageRequest;
import io.mantisrx.server.core.stats.MetricStringConstants;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -26,14 +27,15 @@ public class MetricsFactory {
/**
* Returns a metrics server, publishing metrics every 1 second
*
* @param request request for which the metrics need to be published
* @param request request for which the metrics need to be published
* @param meterRegistry
* @return MetricsServerService server
*/
public static MetricsServerService newMetricsServer(CoreConfiguration configuration, ExecuteStageRequest request) {
public static MetricsServerService newMetricsServer(CoreConfiguration configuration, ExecuteStageRequest request, MeterRegistry meterRegistry) {

// todo(sundaram): get rid of the dependency on the metrics port defined at the ExecuteStageRequest level
// because that's a configuration of the task manager rather than the request.
return new MetricsServerService(request.getMetricsPort(), 1, getCommonTags(request));
return new MetricsServerService(request.getMetricsPort(), 1, getCommonTags(request), meterRegistry);
}

public static MetricsPublisherService newMetricsPublisher(CoreConfiguration config, ExecuteStageRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.mantisrx.server.core.metrics;

import io.micrometer.core.instrument.MeterRegistry;
import io.mantisrx.common.metrics.MetricsServer;
import io.mantisrx.server.core.Service;
import java.util.Map;
Expand All @@ -25,9 +26,10 @@ public class MetricsServerService implements Service {

private MetricsServer server;


public MetricsServerService(final int port, final int publishRateInSeconds,
final Map<String, String> tags) {
server = new MetricsServer(port, publishRateInSeconds, tags);
final Map<String, String> tags, MeterRegistry registry) {
server = new MetricsServer(port, publishRateInSeconds, tags, registry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static org.apache.flink.configuration.GlobalConfiguration.loadConfiguration;

import io.micrometer.core.instrument.MeterRegistry;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.DeadLetter;
Expand Down Expand Up @@ -79,6 +80,7 @@
import io.mantisrx.server.master.scheduler.WorkerRegistry;
import io.mantisrx.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import io.mantisrx.shaded.org.apache.curator.utils.ZKPaths;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -119,16 +121,16 @@ public class MasterMain implements Service {
private MasterConfiguration config;
private SchedulingService schedulingService;
private ILeadershipManager leadershipManager;
private final MeterRegistry micrometerRegistry;

public MasterMain(ConfigurationFactory configFactory, AuditEventSubscriber auditEventSubscriber) {

public MasterMain(ConfigurationFactory configFactory, AuditEventSubscriber auditEventSubscriber, MeterRegistry micrometerRegistry) {
this.micrometerRegistry = micrometerRegistry;
Comment on lines +126 to +127
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use the CompositeRegistry singleton here since this is the leaf node.

String test = "{\"jobId\":\"sine-function-1\",\"status\":{\"jobId\":\"sine-function-1\",\"stageNum\":1,\"workerIndex\":0,\"workerNumber\":2,\"type\":\"HEARTBEAT\",\"message\":\"heartbeat\",\"state\":\"Noop\",\"hostname\":null,\"timestamp\":1525813363585,\"reason\":\"Normal\",\"payloads\":[{\"type\":\"SubscriptionState\",\"data\":\"false\"},{\"type\":\"IncomingDataDrop\",\"data\":\"{\\\"onNextCount\\\":0,\\\"droppedCount\\\":0}\"}]}}";

Metrics metrics = new Metrics.Builder()
.id("MasterMain")
.addCounter("masterInitSuccess")
.addCounter("masterInitError")
.build();
.id("MasterMain")
.addCounter("masterInitSuccess")
.addCounter("masterInitError")
.build();
Metrics m = MetricsRegistry.getInstance().registerAndGet(metrics);
try {
ConfigurationProvider.initialize(configFactory);
Expand Down Expand Up @@ -238,7 +240,7 @@ public MasterMain(ConfigurationFactory configFactory, AuditEventSubscriber audit

// start serving metrics
if (config.getMasterMetricsPort() > 0) {
new MetricsServerService(config.getMasterMetricsPort(), 1, Collections.emptyMap()).start();
new MetricsServerService(config.getMasterMetricsPort(), 1, Collections.emptyMap(), micrometerRegistry).start();
}
new MetricsPublisherService(config.getMetricsPublisher(), config.getMetricsPublisherFrequencyInSeconds(),
new HashMap<>()).start();
Expand Down Expand Up @@ -375,7 +377,8 @@ public static void main(String[] args) {
StaticPropertiesConfigurationFactory factory = new StaticPropertiesConfigurationFactory(props);
setupDummyAgentClusterAutoScaler();
final AuditEventSubscriber auditEventSubscriber = new AuditEventSubscriberLoggingImpl();
MasterMain master = new MasterMain(factory, auditEventSubscriber);
MeterRegistry registry = new CompositeMeterRegistry();
MasterMain master = new MasterMain(factory, auditEventSubscriber,registry);
master.start(); // blocks until shutdown hook (ctrl-c)
} catch (Exception e) {
// unexpected to get a RuntimeException, will exit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.mantisrx.sourcejob.synthetic;

import io.micrometer.core.instrument.MeterRegistry;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJob;
import io.mantisrx.runtime.MantisJobProvider;
Expand All @@ -26,6 +27,7 @@
import io.mantisrx.sourcejob.synthetic.sink.TaggedDataSourceSink;
import io.mantisrx.sourcejob.synthetic.source.SyntheticSource;
import io.mantisrx.sourcejob.synthetic.stage.TaggingStage;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;

/**
* A sample queryable source job that generates synthetic request events.
Expand Down Expand Up @@ -57,6 +59,6 @@ public Job<TaggedData> getJobInstance() {
}

public static void main(String[] args) {
LocalJobExecutorNetworked.execute(new SyntheticSourceJob().getJobInstance());
LocalJobExecutorNetworked.execute(new SyntheticSourceJob().getJobInstance(), new CompositeMeterRegistry());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.ParameterUtils;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.reactivex.mantis.remote.observable.EndpointChange;
import io.reactivex.mantis.remote.observable.EndpointChange.Type;
import io.reactivex.mantis.remote.observable.EndpointInjector;
Expand Down Expand Up @@ -70,8 +72,13 @@ public class LocalJobExecutorNetworked {
private static final Logger logger = LoggerFactory.getLogger(LocalJobExecutorNetworked.class);
private static final int numPartitions = 1;
private static final Action0 nullAction = () -> System.exit(0);
private final MeterRegistry registry;


private LocalJobExecutorNetworked(MeterRegistry registry) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need the MeterRegistry in the constructor here since this is also a leaf node. This would avoid some of the other extraneous changes.

this.registry = registry;
}

private LocalJobExecutorNetworked() {}

@SuppressWarnings("rawtypes")
private static void startSource(int index, int port, int workersAtNextStage, SourceHolder source, StageConfig stage,
Expand Down Expand Up @@ -158,8 +165,11 @@ public static void execute(Job job, Parameter... parameters) throws IllegalManti
execute(job, builder.build(), parameters);
}

@SuppressWarnings( {"rawtypes", "unchecked"})
public static void execute(Job job, SchedulingInfo schedulingInfo, Parameter... parameters) throws IllegalMantisJobException {
execute(job, schedulingInfo, new CompositeMeterRegistry(), parameters);
}
@SuppressWarnings( {"rawtypes", "unchecked"})
public static void execute(Job job, SchedulingInfo schedulingInfo, MeterRegistry registry, Parameter... parameters) throws IllegalMantisJobException {
// validate job
try {
new ValidateJob(job).execute();
Expand All @@ -176,7 +186,7 @@ public static void execute(Job job, SchedulingInfo schedulingInfo, Parameter...
// register netty metrics
RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory());
// start our metrics server
MetricsServer metricsServer = new MetricsServer(portSelector.acquirePort(), 1, Collections.EMPTY_MAP);
MetricsServer metricsServer = new MetricsServer(portSelector.acquirePort(), 1, Collections.EMPTY_MAP, registry);
metricsServer.start();

Lifecycle lifecycle = job.getLifecycle();
Expand Down
Loading
Loading