Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change metrics with micrometer in mantis-server #533

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ ext.versions = [
jsr305 : "3.0.1",
junit4 : "4.11",
junit5 : "5.4.+",
micrometer : "1.11.0",
mockito : "2.0.+",
mockito3 : "3.+",
spectator: "1.3.+",
Expand Down Expand Up @@ -73,6 +74,7 @@ ext.libraries = [
"org.junit.jupiter:junit-jupiter-params:${versions.junit5}",
],
mantisShaded : "io.mantisrx:mantis-shaded:2.0.8",
micrometerCore : "io.micrometer:micrometer-core:${versions.micrometer}",
mockitoAll : "org.mockito:mockito-all:${versions.mockito}",
mockitoCore : "org.mockito:mockito-core:${versions.mockito}",
mockitoCore3 : "org.mockito:mockito-core:${versions.mockito3}",
Expand Down
2 changes: 2 additions & 0 deletions mantis-server/mantis-server-agent/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ dependencies {
implementation libraries.slf4jLog4j12
implementation libraries.spectatorApi
implementation "io.github.resilience4j:resilience4j-retry:1.5.0"
implementation libraries.micrometerCore


testImplementation libraries.junit4
testImplementation libraries.mockitoAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.mantisrx.shaded.com.google.common.util.concurrent.Service;
import io.mantisrx.shaded.com.google.common.util.concurrent.Service.State;
import io.mantisrx.shaded.org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.mantisrx.shaded.com.google.common.base.Preconditions;
import io.mantisrx.shaded.com.google.common.util.concurrent.AbstractIdleService;
import io.mantisrx.shaded.com.google.common.util.concurrent.MoreExecutors;
import io.micrometer.core.instrument.MeterRegistry;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import java.time.Clock;
Expand Down Expand Up @@ -129,6 +130,7 @@ public TaskExecutorStarterBuilder rpcSystem(RpcSystem rpcSystem) {
return this;
}


private RpcSystem getRpcSystem() {
if (this.rpcSystem == null) {
return MantisAkkaRpcSystemLoader.getInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import io.mantisrx.shaded.com.google.common.collect.ImmutableMap;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import io.mantisrx.shaded.com.google.common.util.concurrent.MoreExecutors;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
Expand Down Expand Up @@ -116,6 +118,7 @@ public class RuntimeTaskImplExecutorTest {
private SimpleResourceLeaderConnection<ResourceClusterGateway> resourceManagerGatewayCxn;
private final ObjectMapper objectMapper = new ObjectMapper();
private CollectingTaskLifecycleListener listener;
private MeterRegistry meterRegistry;

@Before
public void setUp() throws IOException {
Expand Down Expand Up @@ -145,6 +148,7 @@ public void setUp() throws IOException {
classLoaderHandle = ClassLoaderHandle.fixed(getClass().getClassLoader());
resourceManagerGateway = getHealthyGateway("gateway 1");
resourceManagerGatewayCxn = new SimpleResourceLeaderConnection<>(resourceManagerGateway);
meterRegistry = new SimpleMeterRegistry();

// worker and task executor do not share the same HA instance.
highAvailabilityServices = mock(HighAvailabilityServices.class);
Expand Down Expand Up @@ -176,7 +180,8 @@ private void start() throws Exception {
highAvailabilityServices,
classLoaderHandle,
executeStageRequest -> SinkSubscriptionStateHandler.noop(),
updateTaskExecutionStatusFunction);
updateTaskExecutionStatusFunction,
meterRegistry);
taskExecutor.addListener(listener, MoreExecutors.directExecutor());
taskExecutor.start();
taskExecutor.awaitRunning().get(2, TimeUnit.SECONDS);
Expand Down Expand Up @@ -457,9 +462,10 @@ public TestingTaskExecutor(RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
ClassLoaderHandle classLoaderHandle,
SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory,
Consumer<Status> consumer) {
Consumer<Status> consumer,
MeterRegistry meterRegistry) {
super(rpcService, workerConfiguration, highAvailabilityServices, classLoaderHandle,
subscriptionStateHandlerFactory);
subscriptionStateHandlerFactory, meterRegistry);
}

}
Expand Down
2 changes: 1 addition & 1 deletion mantis-server/mantis-server-worker-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
dependencies {
api project(":mantis-control-plane:mantis-control-plane-core")
api project(":mantis-control-plane:mantis-control-plane-client")

implementation libraries.micrometerCore
testImplementation libraries.junit4
testImplementation libraries.mockitoAll
testImplementation libraries.spectatorApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

package io.mantisrx.server.worker.client;

import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.mantisrx.common.network.WorkerEndpoint;
import io.mantisrx.server.master.client.MasterClientWrapper;
import io.reactivex.mantis.remote.observable.EndpointChange;
Expand All @@ -27,6 +26,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
Expand All @@ -51,27 +51,30 @@ class MetricsClientImpl<T> implements MetricsClient<T> {
private final String expectedWorkersGaugeName = "ExpectedMetricsConnections";
private final String workerConnReceivingDataGaugeName = "metricsRecvngData";
private final Gauge workersGauge;
private final AtomicLong workersGaugeValue = new AtomicLong(0);
private final Gauge expectedWorkersGauge;
private final AtomicLong expectedWorkersGaugeValue = new AtomicLong(0);
private final Gauge workerConnReceivingDataGauge;
private final AtomicLong workerConnReceivingDataGaugeValue = new AtomicLong(0);
private final AtomicInteger numWorkers = new AtomicInteger();
private final Observer<WorkerConnectionsStatus> workerConnectionsStatusObserver;
private final long dataRecvTimeoutSecs;
private final MeterRegistry meterRegistry;
MetricsClientImpl(String jobId, WorkerConnectionFunc<T> workerConnectionFunc, JobWorkerMetricsLocator jobWorkerMetricsLocator,
Observable<Integer> numWorkersObservable,
Observer<WorkerConnectionsStatus> workerConnectionsStatusObserver, long dataRecvTimeoutSecs) {
Observer<WorkerConnectionsStatus> workerConnectionsStatusObserver, long dataRecvTimeoutSecs,
MeterRegistry meterRegistry) {
this.jobId = jobId;
this.workerConnectionFunc = workerConnectionFunc;
this.jobWorkerMetricsLocator = jobWorkerMetricsLocator;
Metrics metrics = new Metrics.Builder()
.name(MetricsClientImpl.class.getCanonicalName() + "-" + jobId)
.addGauge(workersGuageName)
.addGauge(expectedWorkersGaugeName)
.addGauge(workerConnReceivingDataGaugeName)
.build();
metrics = MetricsRegistry.getInstance().registerAndGet(metrics);
workersGauge = metrics.getGauge(workersGuageName);
expectedWorkersGauge = metrics.getGauge(expectedWorkersGaugeName);
workerConnReceivingDataGauge = metrics.getGauge(workerConnReceivingDataGaugeName);
this.meterRegistry = meterRegistry;
String groupName = MetricsClientImpl.class.getCanonicalName() + "-" + jobId;
workersGauge = Gauge.builder(groupName + "_" + workersGuageName, workersGaugeValue::get)
.register(meterRegistry);
expectedWorkersGauge = Gauge.builder(groupName + "_" + expectedWorkersGaugeName, expectedWorkersGaugeValue::get)
.register(meterRegistry);
workerConnReceivingDataGauge = Gauge.builder(groupName + "_" + workerConnReceivingDataGaugeName, workerConnReceivingDataGaugeValue::get)
.register(meterRegistry);
numWorkersObservable
.doOnNext(new Action1<Integer>() {
@Override
Expand Down Expand Up @@ -209,26 +212,26 @@ public void call(Boolean flag) {

private void updateWorkerDataReceivingStatus(Boolean flag) {
if (flag)
workerConnReceivingDataGauge.increment();
workerConnReceivingDataGaugeValue.incrementAndGet();
else
workerConnReceivingDataGauge.decrement();
expectedWorkersGauge.set(numWorkers.get());
workerConnReceivingDataGaugeValue.decrementAndGet();
expectedWorkersGaugeValue.set(numWorkers.get());
if (workerConnectionsStatusObserver != null) {
synchronized (workerConnectionsStatusObserver) {
workerConnectionsStatusObserver.onNext(new WorkerConnectionsStatus(workerConnReceivingDataGauge.value(), workersGauge.value(), numWorkers.get()));
workerConnectionsStatusObserver.onNext(new WorkerConnectionsStatus(workerConnReceivingDataGaugeValue.get(), workersGaugeValue.get(), numWorkers.get()));
}
}
}

private void updateWorkerConx(Boolean flag) {
if (flag)
workersGauge.increment();
workersGaugeValue.incrementAndGet();
else
workersGauge.decrement();
expectedWorkersGauge.set(numWorkers.get());
workersGaugeValue.decrementAndGet();
expectedWorkersGaugeValue.set(numWorkers.get());
if (workerConnectionsStatusObserver != null) {
synchronized (workerConnectionsStatusObserver) {
workerConnectionsStatusObserver.onNext(new WorkerConnectionsStatus(workerConnReceivingDataGauge.value(), workersGauge.value(), numWorkers.get()));
workerConnectionsStatusObserver.onNext(new WorkerConnectionsStatus(workerConnReceivingDataGaugeValue.get(), workersGaugeValue.get(), numWorkers.get()));
}
}
}
Expand Down
Loading
Loading