Skip to content

Commit

Permalink
change metrics with micrometer in mantis-server
Browse files Browse the repository at this point in the history
  • Loading branch information
J. Wu authored and J. Wu committed Sep 5, 2023
1 parent 5057591 commit c800100
Show file tree
Hide file tree
Showing 16 changed files with 230 additions and 214 deletions.
1 change: 1 addition & 0 deletions mantis-server/mantis-server-agent/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies {
implementation libraries.slf4jApi
implementation libraries.slf4jLog4j12
implementation libraries.spectatorApi
implementation libraries.micrometerCore

testImplementation libraries.junit4
testImplementation libraries.mockitoAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import io.mantisrx.shaded.com.google.common.util.concurrent.Service;
import io.mantisrx.shaded.com.google.common.util.concurrent.Service.State;
import io.mantisrx.shaded.org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -122,15 +124,17 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {

private RuntimeTask currentTask;
private ExecuteStageRequest currentRequest;
private MeterRegistry meterRegistry;

public TaskExecutor(
RpcService rpcService,
WorkerConfiguration workerConfiguration,
HighAvailabilityServices highAvailabilityServices,
ClassLoaderHandle classLoaderHandle,
SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory) {
SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory,
MeterRegistry meterRegistry) {
this(rpcService, workerConfiguration, highAvailabilityServices, classLoaderHandle,
subscriptionStateHandlerFactory, null);
subscriptionStateHandlerFactory, null, meterRegistry);
}

public TaskExecutor(
Expand All @@ -139,7 +143,8 @@ public TaskExecutor(
HighAvailabilityServices highAvailabilityServices,
ClassLoaderHandle classLoaderHandle,
SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory,
@Nullable TaskFactory taskFactory) {
@Nullable TaskFactory taskFactory,
MeterRegistry meterRegistry) {
super(rpcService, RpcServiceUtils.createRandomName("worker"));

// this is the task executor ID that will be used for the rest of the JVM process
Expand Down Expand Up @@ -207,8 +212,8 @@ private void startTaskExecutorServices() {
validateRunsInMainThread();

masterMonitor = highAvailabilityServices.getMasterClientApi();
taskStatusUpdateHandler = TaskStatusUpdateHandler.forReportingToGateway(masterMonitor);
RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory());
taskStatusUpdateHandler = TaskStatusUpdateHandler.forReportingToGateway(masterMonitor, meterRegistry);
RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory(meterRegistry));
resourceClusterGatewaySupplier =
highAvailabilityServices.connectWithResourceManager(clusterID);
resourceClusterGatewaySupplier.register(new ResourceManagerChangeListener());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.mantisrx.shaded.com.google.common.base.Preconditions;
import io.mantisrx.shaded.com.google.common.util.concurrent.AbstractIdleService;
import io.mantisrx.shaded.com.google.common.util.concurrent.MoreExecutors;
import io.micrometer.core.instrument.MeterRegistry;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import java.time.Clock;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class TaskExecutorStarter extends AbstractIdleService {
private final TaskExecutor taskExecutor;
private final HighAvailabilityServices highAvailabilityServices;
private final RpcSystem rpcSystem;
private final MeterRegistry meterRegistry;

@Override
protected void startUp() {
Expand Down Expand Up @@ -111,6 +113,7 @@ public static class TaskExecutorStarterBuilder {
private TaskFactory taskFactory;

private final List<Tuple2<TaskExecutor.Listener, Executor>> listeners = new ArrayList<>();
private MeterRegistry meterRegistry;

private TaskExecutorStarterBuilder(WorkerConfiguration workerConfiguration) {
this.workerConfiguration = workerConfiguration;
Expand All @@ -129,6 +132,11 @@ public TaskExecutorStarterBuilder rpcSystem(RpcSystem rpcSystem) {
return this;
}

public TaskExecutorStarterBuilder registry(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
return this;
}

private RpcSystem getRpcSystem() {
if (this.rpcSystem == null) {
return MantisAkkaRpcSystemLoader.getInstance();
Expand Down Expand Up @@ -206,13 +214,14 @@ public TaskExecutorStarter build() throws Exception {
highAvailabilityServices,
getClassLoaderHandle(),
getSinkSubscriptionHandlerFactory(),
this.taskFactory);
this.taskFactory,
meterRegistry);

for (Tuple2<TaskExecutor.Listener, Executor> listener : listeners) {
taskExecutor.addListener(listener._1(), listener._2());
}

return new TaskExecutorStarter(taskExecutor, highAvailabilityServices, getRpcSystem());
return new TaskExecutorStarter(taskExecutor, highAvailabilityServices, getRpcSystem(), meterRegistry);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import io.mantisrx.shaded.com.google.common.collect.ImmutableMap;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import io.mantisrx.shaded.com.google.common.util.concurrent.MoreExecutors;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
Expand Down Expand Up @@ -116,6 +118,7 @@ public class RuntimeTaskImplExecutorTest {
private SimpleResourceLeaderConnection<ResourceClusterGateway> resourceManagerGatewayCxn;
private final ObjectMapper objectMapper = new ObjectMapper();
private CollectingTaskLifecycleListener listener;
private MeterRegistry meterRegistry;

@Before
public void setUp() throws IOException {
Expand All @@ -142,6 +145,7 @@ public void setUp() throws IOException {
classLoaderHandle = ClassLoaderHandle.fixed(getClass().getClassLoader());
resourceManagerGateway = getHealthyGateway("gateway 1");
resourceManagerGatewayCxn = new SimpleResourceLeaderConnection<>(resourceManagerGateway);
meterRegistry = new SimpleMeterRegistry();

// worker and task executor do not share the same HA instance.
highAvailabilityServices = mock(HighAvailabilityServices.class);
Expand Down Expand Up @@ -173,7 +177,8 @@ private void start() throws Exception {
highAvailabilityServices,
classLoaderHandle,
executeStageRequest -> SinkSubscriptionStateHandler.noop(),
updateTaskExecutionStatusFunction);
updateTaskExecutionStatusFunction,
meterRegistry);
taskExecutor.addListener(listener, MoreExecutors.directExecutor());
taskExecutor.start();
taskExecutor.awaitRunning().get(2, TimeUnit.SECONDS);
Expand Down Expand Up @@ -454,9 +459,10 @@ public TestingTaskExecutor(RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
ClassLoaderHandle classLoaderHandle,
SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory,
Consumer<Status> consumer) {
Consumer<Status> consumer,
MeterRegistry meterRegistry) {
super(rpcService, workerConfiguration, highAvailabilityServices, classLoaderHandle,
subscriptionStateHandlerFactory);
subscriptionStateHandlerFactory, meterRegistry);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

package io.mantisrx.server.worker.client;

import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.mantisrx.common.network.WorkerEndpoint;
import io.mantisrx.server.master.client.MasterClientWrapper;
import io.reactivex.mantis.remote.observable.EndpointChange;
Expand All @@ -27,6 +26,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
Expand All @@ -51,27 +51,30 @@ class MetricsClientImpl<T> implements MetricsClient<T> {
private final String expectedWorkersGaugeName = "ExpectedMetricsConnections";
private final String workerConnReceivingDataGaugeName = "metricsRecvngData";
private final Gauge workersGauge;
private final AtomicLong workersGaugeValue = new AtomicLong(0);
private final Gauge expectedWorkersGauge;
private final AtomicLong expectedWorkersGaugeValue = new AtomicLong(0);
private final Gauge workerConnReceivingDataGauge;
private final AtomicLong workerConnReceivingDataGaugeValue = new AtomicLong(0);
private final AtomicInteger numWorkers = new AtomicInteger();
private final Observer<WorkerConnectionsStatus> workerConnectionsStatusObserver;
private final long dataRecvTimeoutSecs;
private final MeterRegistry meterRegistry;
MetricsClientImpl(String jobId, WorkerConnectionFunc<T> workerConnectionFunc, JobWorkerMetricsLocator jobWorkerMetricsLocator,
Observable<Integer> numWorkersObservable,
Observer<WorkerConnectionsStatus> workerConnectionsStatusObserver, long dataRecvTimeoutSecs) {
Observer<WorkerConnectionsStatus> workerConnectionsStatusObserver, long dataRecvTimeoutSecs,
MeterRegistry meterRegistry) {
this.jobId = jobId;
this.workerConnectionFunc = workerConnectionFunc;
this.jobWorkerMetricsLocator = jobWorkerMetricsLocator;
Metrics metrics = new Metrics.Builder()
.name(MetricsClientImpl.class.getCanonicalName() + "-" + jobId)
.addGauge(workersGuageName)
.addGauge(expectedWorkersGaugeName)
.addGauge(workerConnReceivingDataGaugeName)
.build();
metrics = MetricsRegistry.getInstance().registerAndGet(metrics);
workersGauge = metrics.getGauge(workersGuageName);
expectedWorkersGauge = metrics.getGauge(expectedWorkersGaugeName);
workerConnReceivingDataGauge = metrics.getGauge(workerConnReceivingDataGaugeName);
this.meterRegistry = meterRegistry;
String groupName = MetricsClientImpl.class.getCanonicalName() + "-" + jobId;
workersGauge = Gauge.builder(groupName + "_" + workersGuageName, workersGaugeValue::get)
.register(meterRegistry);
expectedWorkersGauge = Gauge.builder(groupName + "_" + expectedWorkersGaugeName, expectedWorkersGaugeValue::get)
.register(meterRegistry);
workerConnReceivingDataGauge = Gauge.builder(groupName + "_" + workerConnReceivingDataGaugeName, workerConnReceivingDataGaugeValue::get)
.register(meterRegistry);
numWorkersObservable
.doOnNext(new Action1<Integer>() {
@Override
Expand Down Expand Up @@ -149,7 +152,7 @@ public void call() {
}
})
.share()
.lift(new DropOperator<Observable<T>>("client_metrics_share"))
.lift(new DropOperator<Observable<T>>(meterRegistry, "client_metrics_share"))
;
}

Expand Down Expand Up @@ -209,26 +212,26 @@ public void call(Boolean flag) {

private void updateWorkerDataReceivingStatus(Boolean flag) {
if (flag)
workerConnReceivingDataGauge.increment();
workerConnReceivingDataGaugeValue.incrementAndGet();
else
workerConnReceivingDataGauge.decrement();
expectedWorkersGauge.set(numWorkers.get());
workerConnReceivingDataGaugeValue.decrementAndGet();
expectedWorkersGaugeValue.set(numWorkers.get());
if (workerConnectionsStatusObserver != null) {
synchronized (workerConnectionsStatusObserver) {
workerConnectionsStatusObserver.onNext(new WorkerConnectionsStatus(workerConnReceivingDataGauge.value(), workersGauge.value(), numWorkers.get()));
workerConnectionsStatusObserver.onNext(new WorkerConnectionsStatus(workerConnReceivingDataGaugeValue.get(), workersGaugeValue.get(), numWorkers.get()));
}
}
}

private void updateWorkerConx(Boolean flag) {
if (flag)
workersGauge.increment();
workersGaugeValue.incrementAndGet();
else
workersGauge.decrement();
expectedWorkersGauge.set(numWorkers.get());
workersGaugeValue.decrementAndGet();
expectedWorkersGaugeValue.set(numWorkers.get());
if (workerConnectionsStatusObserver != null) {
synchronized (workerConnectionsStatusObserver) {
workerConnectionsStatusObserver.onNext(new WorkerConnectionsStatus(workerConnReceivingDataGauge.value(), workersGauge.value(), numWorkers.get()));
workerConnectionsStatusObserver.onNext(new WorkerConnectionsStatus(workerConnReceivingDataGaugeValue.get(), workersGaugeValue.get(), numWorkers.get()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
import com.mantisrx.common.utils.MantisSSEConstants;
import io.mantisrx.common.MantisServerSentEvent;
import io.mantisrx.common.compression.CompressionUtils;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.runtime.parameter.SinkParameter;
import io.mantisrx.runtime.parameter.SinkParameters;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.netty.buffer.ByteBuf;
import io.reactivx.mantis.operators.DropOperator;
import java.util.Collection;
Expand Down Expand Up @@ -59,9 +60,8 @@ public class SseWorkerConnection {
private final String connectionType;
private final String hostname;
private final int port;
private final MetricGroupId metricGroupId;
private final String metricGroup;
private final Counter pingCounter;

private final boolean reconnectUponConnectionReset;
private final Action1<Boolean> updateConxStatus;
private final Action1<Boolean> updateDataRecvngStatus;
Expand Down Expand Up @@ -112,13 +112,13 @@ public SseWorkerConnection(final String connectionType,
final Action1<Throwable> connectionResetHandler,
final long dataRecvTimeoutSecs,
final boolean reconnectUponConnectionReset,
final CopyOnWriteArraySet<MetricGroupId> metricsSet,
final CopyOnWriteArraySet<String> metricsSet,
final int bufferSize,
final SinkParameters sinkParameters,
final MetricGroupId metricGroupId) {
final String metricGroup,
final MeterRegistry meterRegistry) {
this(connectionType, hostname, port, updateConxStatus, updateDataRecvngStatus, connectionResetHandler,
dataRecvTimeoutSecs, reconnectUponConnectionReset, metricsSet, bufferSize, sinkParameters, false,
metricGroupId);
dataRecvTimeoutSecs, reconnectUponConnectionReset, metricsSet, bufferSize, sinkParameters, false, metricGroup, meterRegistry);
}
public SseWorkerConnection(final String connectionType,
final String hostname,
Expand All @@ -132,19 +132,14 @@ public SseWorkerConnection(final String connectionType,
final int bufferSize,
final SinkParameters sinkParameters,
final boolean disablePingFiltering,
final MetricGroupId metricGroupId) {
final String metricGroup,
final MeterRegistry meterRegistry) {
this.connectionType = connectionType;
this.hostname = hostname;
this.port = port;

this.metricGroupId = metricGroupId;
final MetricGroupId connHealthMetricGroup = new MetricGroupId("ConnectionHealth");
Metrics m = new Metrics.Builder()
.id(connHealthMetricGroup)
.addCounter("pingCount")
.build();
this.pingCounter = m.getCounter("pingCount");

this.meterRegistry = meterRegistry;
this.metricGroup = metricGroup;
this.pingCounter = meterRegistry.counter(metricGroup + "_ConnectionHealth_pingCount");
this.updateConxStatus = updateConxStatus;
this.updateDataRecvngStatus = updateDataRecvngStatus;
this.connectionResetHandler = connectionResetHandler;
Expand Down Expand Up @@ -283,7 +278,7 @@ protected Observable<MantisServerSentEvent> streamContent(HttpClientResponse<Ser
.subscribe();
}
return response.getContent()
.lift(new DropOperator<ServerSentEvent>(metricGroupId))
.lift(new DropOperator<ServerSentEvent>(meterRegistry))
.flatMap((ServerSentEvent t1) -> {
lastDataReceived.set(System.currentTimeMillis());
if (isConnected.get() && isReceivingData.compareAndSet(false, true))
Expand Down Expand Up @@ -313,17 +308,11 @@ protected Observable<MantisServerSentEvent> streamContent(HttpClientResponse<Ser
}

private boolean hasDataDrop() {
final Collection<Metrics> metrics = MetricsRegistry.getInstance().getMetrics(metricNamePrefix);
long totalDataDrop = 0L;
if (metrics != null && !metrics.isEmpty()) {
//logger.info("Got " + metrics.size() + " metrics for DropOperator");
for (Metrics m : metrics) {
final Counter dropped = m.getCounter("" + DropOperator.Counters.dropped);
final Counter onNext = m.getCounter("" + DropOperator.Counters.onNext);
if (dropped != null)
totalDataDrop += dropped.value();
if (meterRegistry.getMeters().contains(pingCounter)) {
final Counter dropped = meterRegistry.find(DROP_OPERATOR_INCOMING_METRIC_GROUP + "_" + ("" + DropOperator.Counters.dropped)).counter();
final Counter onNext = meterRegistry.find(DROP_OPERATOR_INCOMING_METRIC_GROUP + "_" + ("" + DropOperator.Counters.onNext)).counter();
}
}
if (totalDataDrop > lastDataDropValue) {
lastDataDropValue = totalDataDrop;
return true;
Expand Down
Loading

0 comments on commit c800100

Please sign in to comment.