From af2c0f16938a484690fe9dc396cc59cbf28a61ef Mon Sep 17 00:00:00 2001 From: "J. Wu" Date: Mon, 21 Aug 2023 23:54:56 -0400 Subject: [PATCH 1/2] change metrics with micrometer in mantis-server --- .../mantis-server-agent/build.gradle | 2 + .../mantisrx/server/agent/TaskExecutor.java | 15 ++- .../server/agent/TaskExecutorStarter.java | 13 ++- .../agent/RuntimeTaskImplExecutorTest.java | 12 ++- .../worker/client/MetricsClientImpl.java | 49 +++++----- .../worker/client/SseWorkerConnection.java | 43 +++----- .../client/SseWorkerConnectionFunction.java | 43 ++++---- .../worker/client/MetricsClientImplTest.java | 7 +- .../SseWorkerConnectionFunctionTest.java | 5 +- .../client/SseWorkerConnectionTest.java | 29 +++--- .../mantis-server-worker/build.gradle | 1 + .../worker/DataDroppedPayloadSetter.java | 59 ++++++----- .../worker/ResourceUsagePayloadSetter.java | 98 ++++++++++--------- ...WorkerExecutionOperationsNetworkStage.java | 26 ++--- .../jobmaster/WorkerMetricSubscription.java | 7 +- .../worker/DataDroppedPayloadSetterTest.java | 36 +++---- 16 files changed, 231 insertions(+), 214 deletions(-) diff --git a/mantis-server/mantis-server-agent/build.gradle b/mantis-server/mantis-server-agent/build.gradle index ddf330ec4..40924fbc4 100644 --- a/mantis-server/mantis-server-agent/build.gradle +++ b/mantis-server/mantis-server-agent/build.gradle @@ -30,6 +30,8 @@ dependencies { implementation libraries.slf4jLog4j12 implementation libraries.spectatorApi implementation "io.github.resilience4j:resilience4j-retry:1.5.0" + implementation libraries.micrometerCore + testImplementation libraries.junit4 testImplementation libraries.mockitoAll diff --git a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java index 4e09580f5..a456b78fc 100644 --- a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java +++ b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java @@ -52,6 +52,8 @@ import io.mantisrx.shaded.com.google.common.util.concurrent.Service; import io.mantisrx.shaded.com.google.common.util.concurrent.Service.State; import io.mantisrx.shaded.org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; @@ -117,15 +119,17 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private RuntimeTask currentTask; private ExecuteStageRequest currentRequest; + private MeterRegistry meterRegistry; public TaskExecutor( RpcService rpcService, WorkerConfiguration workerConfiguration, HighAvailabilityServices highAvailabilityServices, ClassLoaderHandle classLoaderHandle, - SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory) { + SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory, + MeterRegistry meterRegistry) { this(rpcService, workerConfiguration, highAvailabilityServices, classLoaderHandle, - subscriptionStateHandlerFactory, null); + subscriptionStateHandlerFactory, null, meterRegistry); } public TaskExecutor( @@ -134,7 +138,8 @@ public TaskExecutor( HighAvailabilityServices highAvailabilityServices, ClassLoaderHandle classLoaderHandle, SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory, - @Nullable TaskFactory taskFactory) { + @Nullable TaskFactory taskFactory, + MeterRegistry meterRegistry) { super(rpcService, RpcServiceUtils.createRandomName("worker")); // this is the task executor ID that will be used for the rest of the JVM process @@ -202,8 +207,8 @@ private void startTaskExecutorServices() { validateRunsInMainThread(); masterMonitor = highAvailabilityServices.getMasterClientApi(); - taskStatusUpdateHandler = TaskStatusUpdateHandler.forReportingToGateway(masterMonitor); - RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory()); + taskStatusUpdateHandler = TaskStatusUpdateHandler.forReportingToGateway(masterMonitor, meterRegistry); + RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory(meterRegistry)); resourceClusterGatewaySupplier = highAvailabilityServices.connectWithResourceManager(clusterID); resourceClusterGatewaySupplier.register(new ResourceManagerChangeListener()); diff --git a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java index f03d4b57c..4f78aa177 100644 --- a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java +++ b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java @@ -27,6 +27,7 @@ import io.mantisrx.shaded.com.google.common.base.Preconditions; import io.mantisrx.shaded.com.google.common.util.concurrent.AbstractIdleService; import io.mantisrx.shaded.com.google.common.util.concurrent.MoreExecutors; +import io.micrometer.core.instrument.MeterRegistry; import io.vavr.Tuple; import io.vavr.Tuple2; import java.time.Clock; @@ -54,6 +55,7 @@ public class TaskExecutorStarter extends AbstractIdleService { private final TaskExecutor taskExecutor; private final HighAvailabilityServices highAvailabilityServices; private final RpcSystem rpcSystem; + private final MeterRegistry meterRegistry; @Override protected void startUp() { @@ -111,6 +113,7 @@ public static class TaskExecutorStarterBuilder { private TaskFactory taskFactory; private final List> listeners = new ArrayList<>(); + private MeterRegistry meterRegistry; private TaskExecutorStarterBuilder(WorkerConfiguration workerConfiguration) { this.workerConfiguration = workerConfiguration; @@ -129,6 +132,11 @@ public TaskExecutorStarterBuilder rpcSystem(RpcSystem rpcSystem) { return this; } + public TaskExecutorStarterBuilder registry(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + return this; + } + private RpcSystem getRpcSystem() { if (this.rpcSystem == null) { return MantisAkkaRpcSystemLoader.getInstance(); @@ -206,13 +214,14 @@ public TaskExecutorStarter build() throws Exception { highAvailabilityServices, getClassLoaderHandle(), getSinkSubscriptionHandlerFactory(), - this.taskFactory); + this.taskFactory, + meterRegistry); for (Tuple2 listener : listeners) { taskExecutor.addListener(listener._1(), listener._2()); } - return new TaskExecutorStarter(taskExecutor, highAvailabilityServices, getRpcSystem()); + return new TaskExecutorStarter(taskExecutor, highAvailabilityServices, getRpcSystem(), meterRegistry); } } } diff --git a/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/RuntimeTaskImplExecutorTest.java b/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/RuntimeTaskImplExecutorTest.java index 1ea119ad8..e86f27423 100644 --- a/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/RuntimeTaskImplExecutorTest.java +++ b/mantis-server/mantis-server-agent/src/test/java/io/mantisrx/server/agent/RuntimeTaskImplExecutorTest.java @@ -67,6 +67,8 @@ import io.mantisrx.shaded.com.google.common.collect.ImmutableMap; import io.mantisrx.shaded.com.google.common.collect.Lists; import io.mantisrx.shaded.com.google.common.util.concurrent.MoreExecutors; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -116,6 +118,7 @@ public class RuntimeTaskImplExecutorTest { private SimpleResourceLeaderConnection resourceManagerGatewayCxn; private final ObjectMapper objectMapper = new ObjectMapper(); private CollectingTaskLifecycleListener listener; + private MeterRegistry meterRegistry; @Before public void setUp() throws IOException { @@ -145,6 +148,7 @@ public void setUp() throws IOException { classLoaderHandle = ClassLoaderHandle.fixed(getClass().getClassLoader()); resourceManagerGateway = getHealthyGateway("gateway 1"); resourceManagerGatewayCxn = new SimpleResourceLeaderConnection<>(resourceManagerGateway); + meterRegistry = new SimpleMeterRegistry(); // worker and task executor do not share the same HA instance. highAvailabilityServices = mock(HighAvailabilityServices.class); @@ -176,7 +180,8 @@ private void start() throws Exception { highAvailabilityServices, classLoaderHandle, executeStageRequest -> SinkSubscriptionStateHandler.noop(), - updateTaskExecutionStatusFunction); + updateTaskExecutionStatusFunction, + meterRegistry); taskExecutor.addListener(listener, MoreExecutors.directExecutor()); taskExecutor.start(); taskExecutor.awaitRunning().get(2, TimeUnit.SECONDS); @@ -457,9 +462,10 @@ public TestingTaskExecutor(RpcService rpcService, HighAvailabilityServices highAvailabilityServices, ClassLoaderHandle classLoaderHandle, SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory, - Consumer consumer) { + Consumer consumer, + MeterRegistry meterRegistry) { super(rpcService, workerConfiguration, highAvailabilityServices, classLoaderHandle, - subscriptionStateHandlerFactory); + subscriptionStateHandlerFactory, meterRegistry); } } diff --git a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/MetricsClientImpl.java b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/MetricsClientImpl.java index 369ddac38..63f6971a3 100644 --- a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/MetricsClientImpl.java +++ b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/MetricsClientImpl.java @@ -16,9 +16,8 @@ package io.mantisrx.server.worker.client; -import io.mantisrx.common.metrics.Gauge; -import io.mantisrx.common.metrics.Metrics; -import io.mantisrx.common.metrics.MetricsRegistry; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; import io.mantisrx.common.network.WorkerEndpoint; import io.mantisrx.server.master.client.MasterClientWrapper; import io.reactivex.mantis.remote.observable.EndpointChange; @@ -27,6 +26,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; @@ -51,27 +51,30 @@ class MetricsClientImpl implements MetricsClient { private final String expectedWorkersGaugeName = "ExpectedMetricsConnections"; private final String workerConnReceivingDataGaugeName = "metricsRecvngData"; private final Gauge workersGauge; + private final AtomicLong workersGaugeValue = new AtomicLong(0); private final Gauge expectedWorkersGauge; + private final AtomicLong expectedWorkersGaugeValue = new AtomicLong(0); private final Gauge workerConnReceivingDataGauge; + private final AtomicLong workerConnReceivingDataGaugeValue = new AtomicLong(0); private final AtomicInteger numWorkers = new AtomicInteger(); private final Observer workerConnectionsStatusObserver; private final long dataRecvTimeoutSecs; + private final MeterRegistry meterRegistry; MetricsClientImpl(String jobId, WorkerConnectionFunc workerConnectionFunc, JobWorkerMetricsLocator jobWorkerMetricsLocator, Observable numWorkersObservable, - Observer workerConnectionsStatusObserver, long dataRecvTimeoutSecs) { + Observer workerConnectionsStatusObserver, long dataRecvTimeoutSecs, + MeterRegistry meterRegistry) { this.jobId = jobId; this.workerConnectionFunc = workerConnectionFunc; this.jobWorkerMetricsLocator = jobWorkerMetricsLocator; - Metrics metrics = new Metrics.Builder() - .name(MetricsClientImpl.class.getCanonicalName() + "-" + jobId) - .addGauge(workersGuageName) - .addGauge(expectedWorkersGaugeName) - .addGauge(workerConnReceivingDataGaugeName) - .build(); - metrics = MetricsRegistry.getInstance().registerAndGet(metrics); - workersGauge = metrics.getGauge(workersGuageName); - expectedWorkersGauge = metrics.getGauge(expectedWorkersGaugeName); - workerConnReceivingDataGauge = metrics.getGauge(workerConnReceivingDataGaugeName); + this.meterRegistry = meterRegistry; + String groupName = MetricsClientImpl.class.getCanonicalName() + "-" + jobId; + workersGauge = Gauge.builder(groupName + "_" + workersGuageName, workersGaugeValue::get) + .register(meterRegistry); + expectedWorkersGauge = Gauge.builder(groupName + "_" + expectedWorkersGaugeName, expectedWorkersGaugeValue::get) + .register(meterRegistry); + workerConnReceivingDataGauge = Gauge.builder(groupName + "_" + workerConnReceivingDataGaugeName, workerConnReceivingDataGaugeValue::get) + .register(meterRegistry); numWorkersObservable .doOnNext(new Action1() { @Override @@ -149,7 +152,7 @@ public void call() { } }) .share() - .lift(new DropOperator>("client_metrics_share")) + .lift(new DropOperator>(meterRegistry, "client_metrics_share")) ; } @@ -209,26 +212,26 @@ public void call(Boolean flag) { private void updateWorkerDataReceivingStatus(Boolean flag) { if (flag) - workerConnReceivingDataGauge.increment(); + workerConnReceivingDataGaugeValue.incrementAndGet(); else - workerConnReceivingDataGauge.decrement(); - expectedWorkersGauge.set(numWorkers.get()); + workerConnReceivingDataGaugeValue.decrementAndGet(); + expectedWorkersGaugeValue.set(numWorkers.get()); if (workerConnectionsStatusObserver != null) { synchronized (workerConnectionsStatusObserver) { - workerConnectionsStatusObserver.onNext(new WorkerConnectionsStatus(workerConnReceivingDataGauge.value(), workersGauge.value(), numWorkers.get())); + workerConnectionsStatusObserver.onNext(new WorkerConnectionsStatus(workerConnReceivingDataGaugeValue.get(), workersGaugeValue.get(), numWorkers.get())); } } } private void updateWorkerConx(Boolean flag) { if (flag) - workersGauge.increment(); + workersGaugeValue.incrementAndGet(); else - workersGauge.decrement(); - expectedWorkersGauge.set(numWorkers.get()); + workersGaugeValue.decrementAndGet(); + expectedWorkersGaugeValue.set(numWorkers.get()); if (workerConnectionsStatusObserver != null) { synchronized (workerConnectionsStatusObserver) { - workerConnectionsStatusObserver.onNext(new WorkerConnectionsStatus(workerConnReceivingDataGauge.value(), workersGauge.value(), numWorkers.get())); + workerConnectionsStatusObserver.onNext(new WorkerConnectionsStatus(workerConnReceivingDataGaugeValue.get(), workersGaugeValue.get(), numWorkers.get())); } } } diff --git a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java index 5e188c9d1..3e11312b2 100644 --- a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java +++ b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java @@ -21,12 +21,13 @@ import com.mantisrx.common.utils.MantisSSEConstants; import io.mantisrx.common.MantisServerSentEvent; import io.mantisrx.common.compression.CompressionUtils; -import io.mantisrx.common.metrics.Counter; import io.mantisrx.common.metrics.Metrics; import io.mantisrx.common.metrics.MetricsRegistry; import io.mantisrx.common.metrics.spectator.MetricGroupId; import io.mantisrx.runtime.parameter.SinkParameter; import io.mantisrx.runtime.parameter.SinkParameters; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; import io.netty.buffer.ByteBuf; import io.reactivx.mantis.operators.DropOperator; import java.util.Collection; @@ -59,9 +60,8 @@ public class SseWorkerConnection { private final String connectionType; private final String hostname; private final int port; - private final MetricGroupId metricGroupId; + private final String metricGroup; private final Counter pingCounter; - private final boolean reconnectUponConnectionReset; private final Action1 updateConxStatus; private final Action1 updateDataRecvngStatus; @@ -112,13 +112,13 @@ public SseWorkerConnection(final String connectionType, final Action1 connectionResetHandler, final long dataRecvTimeoutSecs, final boolean reconnectUponConnectionReset, - final CopyOnWriteArraySet metricsSet, + final CopyOnWriteArraySet metricsSet, final int bufferSize, final SinkParameters sinkParameters, - final MetricGroupId metricGroupId) { + final String metricGroup, + final MeterRegistry meterRegistry) { this(connectionType, hostname, port, updateConxStatus, updateDataRecvngStatus, connectionResetHandler, - dataRecvTimeoutSecs, reconnectUponConnectionReset, metricsSet, bufferSize, sinkParameters, false, - metricGroupId); + dataRecvTimeoutSecs, reconnectUponConnectionReset, metricsSet, bufferSize, sinkParameters, false, metricGroup, meterRegistry); } public SseWorkerConnection(final String connectionType, final String hostname, @@ -132,19 +132,14 @@ public SseWorkerConnection(final String connectionType, final int bufferSize, final SinkParameters sinkParameters, final boolean disablePingFiltering, - final MetricGroupId metricGroupId) { + final String metricGroup, + final MeterRegistry meterRegistry) { this.connectionType = connectionType; this.hostname = hostname; this.port = port; - - this.metricGroupId = metricGroupId; - final MetricGroupId connHealthMetricGroup = new MetricGroupId("ConnectionHealth"); - Metrics m = new Metrics.Builder() - .id(connHealthMetricGroup) - .addCounter("pingCount") - .build(); - this.pingCounter = m.getCounter("pingCount"); - + this.meterRegistry = meterRegistry; + this.metricGroup = metricGroup; + this.pingCounter = meterRegistry.counter(metricGroup + "_ConnectionHealth_pingCount"); this.updateConxStatus = updateConxStatus; this.updateDataRecvngStatus = updateDataRecvngStatus; this.connectionResetHandler = connectionResetHandler; @@ -283,7 +278,7 @@ protected Observable streamContent(HttpClientResponse(metricGroupId)) + .lift(new DropOperator(meterRegistry)) .flatMap((ServerSentEvent t1) -> { lastDataReceived.set(System.currentTimeMillis()); if (isConnected.get() && isReceivingData.compareAndSet(false, true)) @@ -313,17 +308,11 @@ protected Observable streamContent(HttpClientResponse metrics = MetricsRegistry.getInstance().getMetrics(metricNamePrefix); long totalDataDrop = 0L; - if (metrics != null && !metrics.isEmpty()) { - //logger.info("Got " + metrics.size() + " metrics for DropOperator"); - for (Metrics m : metrics) { - final Counter dropped = m.getCounter("" + DropOperator.Counters.dropped); - final Counter onNext = m.getCounter("" + DropOperator.Counters.onNext); - if (dropped != null) - totalDataDrop += dropped.value(); + if (meterRegistry.getMeters().contains(pingCounter)) { + final Counter dropped = meterRegistry.find(DROP_OPERATOR_INCOMING_METRIC_GROUP + "_" + ("" + DropOperator.Counters.dropped)).counter(); + final Counter onNext = meterRegistry.find(DROP_OPERATOR_INCOMING_METRIC_GROUP + "_" + ("" + DropOperator.Counters.onNext)).counter(); } - } if (totalDataDrop > lastDataDropValue) { lastDataDropValue = totalDataDrop; return true; diff --git a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnectionFunction.java b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnectionFunction.java index 377635d3a..9fe17c217 100644 --- a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnectionFunction.java +++ b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnectionFunction.java @@ -20,12 +20,11 @@ import com.mantisrx.common.utils.NettyUtils; import io.mantisrx.common.MantisServerSentEvent; -import io.mantisrx.common.metrics.Counter; -import io.mantisrx.common.metrics.Metrics; -import io.mantisrx.common.metrics.MetricsRegistry; import io.mantisrx.common.metrics.spectator.MetricGroupId; import io.mantisrx.runtime.parameter.SinkParameters; import io.mantisrx.server.core.ServiceRegistry; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Counter; import io.reactivx.mantis.operators.DropOperator; import java.util.HashSet; import java.util.Set; @@ -42,8 +41,8 @@ public class SseWorkerConnectionFunction implements WorkerConnectionFunc metricsSet = new CopyOnWriteArraySet<>(); - private static final MetricGroupId metricGroupId; + private static final CopyOnWriteArraySet metricsSet = new CopyOnWriteArraySet<>(); + private static final String metricGroup; private static final Action1 defaultConxResetHandler = new Action1() { @Override public void call(Throwable throwable) { @@ -58,25 +57,23 @@ public void call(Throwable throwable) { // Use single netty thread NettyUtils.setNettyThreads(); - metricGroupId = new MetricGroupId(DROP_OPERATOR_INCOMING_METRIC_GROUP + "_SseWorkerMetricsConnectionFunction_withBuffer"); - metricsSet.add(metricGroupId); + metricGroup = (DROP_OPERATOR_INCOMING_METRIC_GROUP + "_SseWorkerMetricsConnectionFunction_withBuffer"); + metricsSet.add(metricGroup); logger.info("SETTING UP METRICS PRINTER THREAD"); new ScheduledThreadPoolExecutor(1).scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { - Set metricGroups = new HashSet<>(metricsSet); + Set metricGroups = new HashSet<>(metricsSet); if (!metricGroups.isEmpty()) { - for (MetricGroupId metricGroupId : metricGroups) { - final Metrics metric = MetricsRegistry.getInstance().getMetric(metricGroupId); - if (metric != null) { - final Counter onNext = metric.getCounter("" + DropOperator.Counters.onNext); - final Counter onError = metric.getCounter("" + DropOperator.Counters.onError); - final Counter onComplete = metric.getCounter("" + DropOperator.Counters.onComplete); - final Counter dropped = metric.getCounter("" + DropOperator.Counters.dropped); - - logger.info(metricGroupId.id() + ": onNext=" + onNext.value() + ", onError=" + onError.value() + - ", onComplete=" + onComplete.value() + ", dropped=" + dropped.value() + for (String metricGroup : metricGroups) { + if (metricGroup != null) { + final Counter onNext = meterRegistry.counter(metricGroup + "_DropOperator_" + ("" + DropOperator.Counters.onNext)); + final Counter onError = meterRegistry.counter(metricGroup + "_DropOperator_" + ("" + DropOperator.Counters.onError)); + final Counter onComplete = meterRegistry.counter(metricGroup + "_DropOperator_" + ("" + DropOperator.Counters.onComplete)); + final Counter dropped = meterRegistry.counter(metricGroup + "_DropOperator_" + ("" + DropOperator.Counters.dropped)); + logger.info(metricGroup + ": onNext=" + onNext.count() + ", onError=" + onError.count() + + ", onComplete=" + onComplete.count() + ", dropped=" + dropped.count() // + ", buffered=" + buffered.value() ); } @@ -95,15 +92,17 @@ public void run() { private final Action1 connectionResetHandler; private final SinkParameters sinkParameters; private final int bufferSize; + private static MeterRegistry meterRegistry; - public SseWorkerConnectionFunction(boolean reconnectUponConnectionRest, Action1 connectionResetHandler) { - this(reconnectUponConnectionRest, connectionResetHandler, null); + public SseWorkerConnectionFunction(boolean reconnectUponConnectionRest, Action1 connectionResetHandler, MeterRegistry meterRegistry) { + this(reconnectUponConnectionRest, connectionResetHandler, null, meterRegistry); } - public SseWorkerConnectionFunction(boolean reconnectUponConnectionRest, Action1 connectionResetHandler, SinkParameters sinkParameters) { + public SseWorkerConnectionFunction(boolean reconnectUponConnectionRest, Action1 connectionResetHandler, SinkParameters sinkParameters, MeterRegistry meterRegistry) { this.reconnectUponConnectionRest = reconnectUponConnectionRest; this.connectionResetHandler = connectionResetHandler == null ? defaultConxResetHandler : connectionResetHandler; this.sinkParameters = sinkParameters; + this.meterRegistry = meterRegistry; String bufferSizeStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("workerClient.buffer.size", DEFAULT_BUFFER_SIZE_STR); bufferSize = Integer.parseInt(bufferSizeStr); } @@ -122,7 +121,7 @@ public WorkerConnection call(final String hostname, final private final SseWorkerConnection workerConn = new SseWorkerConnection("WorkerMetrics", hostname, port, updateConxStatus, updateDataRecvngStatus, connectionResetHandler, dataRecvTimeoutSecs, reconnectUponConnectionRest, metricsSet, - bufferSize, sinkParameters,metricGroupId); + bufferSize, sinkParameters, metricGroup, meterRegistry); @Override public String getName() { diff --git a/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/MetricsClientImplTest.java b/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/MetricsClientImplTest.java index 68428b267..a4c6068b1 100644 --- a/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/MetricsClientImplTest.java +++ b/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/MetricsClientImplTest.java @@ -30,6 +30,8 @@ import io.mantisrx.shaded.com.fasterxml.jackson.core.JsonProcessingException; import io.mantisrx.shaded.com.fasterxml.jackson.databind.DeserializationFeature; import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.reactivex.mantis.remote.observable.EndpointChange; import java.io.UnsupportedEncodingException; import java.util.Arrays; @@ -67,6 +69,7 @@ public void testMetricConnections() throws InterruptedException, UnsupportedEnco final String testResUsageMetricData = generateMetricJson(MetricStringConstants.RESOURCE_USAGE_METRIC_GROUP); final String testDropDataMetricData = generateMetricJson(MetricStringConstants.DATA_DROP_METRIC_GROUP); final int metricsPort = TestSseServerFactory.newServerWithInitialData(testResUsageMetricData); + final MeterRegistry meterRegistry = new SimpleMeterRegistry(); final AtomicInteger i = new AtomicInteger(0); final Observable workerMetricLocationStream = Observable.interval(1, TimeUnit.SECONDS, Schedulers.io()).map(new Func1() { @@ -93,7 +96,7 @@ public void call(Throwable throwable) { logger.error("Interrupted waiting for retrying connection"); } } - }, new SinkParameters.Builder().withParameter("name", MetricStringConstants.RESOURCE_USAGE_METRIC_GROUP).build()), + }, new SinkParameters.Builder().withParameter("name", MetricStringConstants.RESOURCE_USAGE_METRIC_GROUP).build(), meterRegistry), new JobWorkerMetricsLocator() { @Override public Observable locateWorkerMetricsForJob(String jobId) { @@ -117,7 +120,7 @@ public void onNext(WorkerConnectionsStatus workerConnectionsStatus) { logger.info("got WorkerConnStatus {}", workerConnectionsStatus); } }, - 60); + 60, meterRegistry); final CountDownLatch latch = new CountDownLatch(1); diff --git a/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/SseWorkerConnectionFunctionTest.java b/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/SseWorkerConnectionFunctionTest.java index efc112228..1f55a08fc 100644 --- a/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/SseWorkerConnectionFunctionTest.java +++ b/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/SseWorkerConnectionFunctionTest.java @@ -21,6 +21,8 @@ import io.mantisrx.common.MantisServerSentEvent; import io.mantisrx.server.worker.TestSseServerFactory; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.Test; @@ -43,6 +45,7 @@ public void testSseConnection() throws InterruptedException { final CountDownLatch errorLatch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1); final boolean reconnectOnConnReset = true; + final MeterRegistry meterRegistry = new SimpleMeterRegistry(); SseWorkerConnectionFunction connectionFunction = new SseWorkerConnectionFunction(reconnectOnConnReset, new Action1() { @Override @@ -50,7 +53,7 @@ public void call(Throwable throwable) { logger.warn("connection was reset, should be retried", throwable); errorLatch.countDown(); } - }); + }, meterRegistry); final WorkerConnection conn = connectionFunction.call("localhost", serverPort); final Observable events = conn.call(); diff --git a/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/SseWorkerConnectionTest.java b/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/SseWorkerConnectionTest.java index b5eddfcd1..03dd34c48 100644 --- a/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/SseWorkerConnectionTest.java +++ b/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/SseWorkerConnectionTest.java @@ -21,13 +21,10 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.netflix.spectator.api.DefaultRegistry; import io.mantisrx.common.MantisServerSentEvent; -import io.mantisrx.common.metrics.Counter; -import io.mantisrx.common.metrics.Metrics; -import io.mantisrx.common.metrics.MetricsRegistry; -import io.mantisrx.common.metrics.spectator.MetricGroupId; -import io.mantisrx.common.metrics.spectator.SpectatorRegistryFactory; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.netty.buffer.Unpooled; import io.reactivx.mantis.operators.DropOperator; import java.nio.charset.Charset; @@ -36,6 +33,7 @@ import java.util.concurrent.TimeUnit; import mantis.io.reactivex.netty.protocol.http.client.HttpClientResponse; import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent; +import org.apache.flink.metrics.Meter; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,9 +47,8 @@ public class SseWorkerConnectionTest { @Test public void testStreamContentDrops() throws Exception { - SpectatorRegistryFactory.setRegistry(new DefaultRegistry()); String metricGroupString = "testmetric"; - MetricGroupId metricGroupId = new MetricGroupId(metricGroupString); + MeterRegistry meterRegistry = new SimpleMeterRegistry(); SseWorkerConnection workerConnection = new SseWorkerConnection("connection_type", "hostname", 80, @@ -64,7 +61,8 @@ public void testStreamContentDrops() throws Exception { 1, null, true, - metricGroupId); + metricGroupString, + meterRegistry); HttpClientResponse response = mock(HttpClientResponse.class); TestScheduler testScheduler = Schedulers.test(); @@ -83,12 +81,11 @@ public void testStreamContentDrops() throws Exception { List events = subscriber.getOnNextEvents(); assertEquals("0", events.get(0).getEventAsString()); - Metrics metrics = MetricsRegistry.getInstance().getMetric(metricGroupId); - Counter onNextCounter = metrics.getCounter(DropOperator.Counters.onNext.toString()); - Counter droppedCounter = metrics.getCounter(DropOperator.Counters.dropped.toString()); - logger.info("next: {}", onNextCounter.value()); - logger.info("drop: {}", droppedCounter.value()); - assertTrue(onNextCounter.value() < 10); - assertTrue(droppedCounter.value() > 90); + Counter onNextCounter = meterRegistry.counter(DropOperator.Counters.onNext.toString()); + Counter droppedCounter = meterRegistry.counter(DropOperator.Counters.dropped.toString()); + logger.info("next: {}", onNextCounter.count()); + logger.info("drop: {}", droppedCounter.count()); + assertTrue(onNextCounter.count() < 10); + assertTrue(droppedCounter.count() > 90); } } diff --git a/mantis-server/mantis-server-worker/build.gradle b/mantis-server/mantis-server-worker/build.gradle index 16e53a859..0692cb6de 100644 --- a/mantis-server/mantis-server-worker/build.gradle +++ b/mantis-server/mantis-server-worker/build.gradle @@ -40,6 +40,7 @@ dependencies { implementation "com.yahoo.datasketches:sketches-core:0.9.1" implementation libraries.spectatorApi + implementation libraries.micrometerCore testImplementation libraries.junit4 testImplementation libraries.mockitoAll diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DataDroppedPayloadSetter.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DataDroppedPayloadSetter.java index d8efd9cd8..fcdfc8778 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DataDroppedPayloadSetter.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DataDroppedPayloadSetter.java @@ -21,21 +21,22 @@ import static io.mantisrx.server.core.stats.MetricStringConstants.DROP_COUNT; import static io.mantisrx.server.core.stats.MetricStringConstants.ON_NEXT_COUNT; -import io.mantisrx.common.metrics.Counter; -import io.mantisrx.common.metrics.Gauge; -import io.mantisrx.common.metrics.Metrics; -import io.mantisrx.common.metrics.MetricsRegistry; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Gauge; import io.mantisrx.server.core.StatusPayloads; import io.mantisrx.shaded.com.fasterxml.jackson.core.JsonProcessingException; import io.mantisrx.shaded.com.fasterxml.jackson.databind.DeserializationFeature; import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper; import io.reactivx.mantis.operators.DropOperator; +import io.reactivx.mantis.operators.DropOperator.Counters; import java.io.Closeable; import java.io.IOException; -import java.util.Collection; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,50 +50,48 @@ class DataDroppedPayloadSetter implements Closeable { private ScheduledFuture future; private final Gauge dropCountGauge; + private final AtomicLong dropCountValue = new AtomicLong(0); private final Gauge onNextCountGauge; + private final AtomicLong onNextCountValue = new AtomicLong(0); + private final MeterRegistry meterRegistry; - DataDroppedPayloadSetter(Heartbeat heartbeat) { + DataDroppedPayloadSetter(Heartbeat heartbeat, MeterRegistry meterRegistry) { this.heartbeat = heartbeat; objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); executor = new ScheduledThreadPoolExecutor(1); - Metrics m = new Metrics.Builder() - .name(DATA_DROP_METRIC_GROUP) - .addGauge(DROP_COUNT) - .addGauge(ON_NEXT_COUNT) - .build(); - m = MetricsRegistry.getInstance().registerAndGet(m); - dropCountGauge = m.getGauge(DROP_COUNT); - onNextCountGauge = m.getGauge(ON_NEXT_COUNT); + this.meterRegistry = meterRegistry; + dropCountGauge = Gauge.builder(DATA_DROP_METRIC_GROUP + "_" + DROP_COUNT, dropCountValue::get) + .register(meterRegistry); + onNextCountGauge = Gauge.builder(DATA_DROP_METRIC_GROUP + "_" + ON_NEXT_COUNT, dropCountValue::get) + .register(meterRegistry); } protected void setPayload(final long intervalSecs) { - final Collection metrics = MetricsRegistry.getInstance().getMetrics(metricNamePrefix); long totalDropped = 0L; long totalOnNext = 0L; try { - if (metrics != null && !metrics.isEmpty()) { + if (meterRegistry.getMeters().contains(dropCountGauge) && meterRegistry.getMeters().contains(onNextCountGauge)){ //logger.info("Got " + metrics.size() + " metrics for DropOperator"); - for (Metrics m : metrics) { - final Counter dropped = m.getCounter("" + DropOperator.Counters.dropped); - final Counter onNext = m.getCounter("" + DropOperator.Counters.onNext); - if (dropped != null) - totalDropped += dropped.value(); - else - logger.warn("Unexpected to get null dropped counter for metric " + m.getMetricGroupId().id()); - if (onNext != null) - totalOnNext += onNext.value(); - else - logger.warn("Unexpected to get null onNext counter for metric " + m.getMetricGroupId().id()); - } + final Counter dropped = meterRegistry.find("DropOperator_" + "" + DropOperator.Counters.dropped).counter(); + final Counter onNext = meterRegistry.find("DropOperator_" + "" + DropOperator.Counters.onNext).counter(); + if (dropped != null) + totalDropped += dropped.count(); + else + logger.warn("Unexpected to get null dropped counter for metric DropOperator DropOperator_dropped."); + if (onNext != null) + totalOnNext += onNext.count(); + else + logger.warn("Unexpected to get null onNext counter for metric DropOperator_onNext."); + final StatusPayloads.DataDropCounts dataDrop = new StatusPayloads.DataDropCounts(totalOnNext, totalDropped); try { heartbeat.addSingleUsePayload("" + StatusPayloads.Type.IncomingDataDrop, objectMapper.writeValueAsString(dataDrop)); } catch (JsonProcessingException e) { logger.warn("Error writing json for dataDrop payload: " + e.getMessage()); } - dropCountGauge.set(dataDrop.getDroppedCount()); - onNextCountGauge.set(dataDrop.getOnNextCount()); + dropCountValue.set(dataDrop.getDroppedCount()); + onNextCountValue.set(dataDrop.getOnNextCount()); } else logger.debug("Got no metrics from DropOperator"); } catch (Exception e) { diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/ResourceUsagePayloadSetter.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/ResourceUsagePayloadSetter.java index 8f4a735f0..01a4cf16c 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/ResourceUsagePayloadSetter.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/ResourceUsagePayloadSetter.java @@ -16,9 +16,8 @@ package io.mantisrx.server.worker; -import io.mantisrx.common.metrics.Gauge; -import io.mantisrx.common.metrics.Metrics; -import io.mantisrx.common.metrics.MetricsRegistry; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Gauge; import io.mantisrx.common.storage.StorageUnit; import io.mantisrx.runtime.loader.config.MetricsCollector; import io.mantisrx.runtime.loader.config.Usage; @@ -33,6 +32,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,18 +48,31 @@ public class ResourceUsagePayloadSetter implements Closeable { private final AtomicInteger counter = new AtomicInteger(); private final MetricsCollector resourceUsageUtils; private final Gauge cpuLimitGauge; + private final AtomicLong cpuLimitValue = new AtomicLong(0); private final Gauge cpuUsageCurrGauge; + private final AtomicLong cpuUsageCurrValue = new AtomicLong(0); private final Gauge cpuUsagePeakGauge; + private final AtomicLong cpuUsagePeakValue = new AtomicLong(0); private final Gauge memLimitGauge; + private final AtomicLong memLimitValue = new AtomicLong(0); private final Gauge cachedMemUsageCurrGauge; + private final AtomicLong cachedMemUsageCurrValue = new AtomicLong(0); private final Gauge cachedMemUsagePeakGauge; + private final AtomicLong cachedMemUsagePeakValue = new AtomicLong(0); private final Gauge totMemUsageCurrGauge; + private final AtomicLong totMemUsageCurrValue = new AtomicLong(0); private final Gauge totMemUsagePeakGauge; + private final AtomicLong totMemUsagePeakValue = new AtomicLong(0); private final Gauge nwBytesLimitGauge; + private final AtomicLong nwBytesLimitValue = new AtomicLong(0); private final Gauge nwBytesUsageCurrGauge; + private final AtomicLong nwBytesUsageCurrValue = new AtomicLong(0); private final Gauge nwBytesUsagePeakGauge; + private final AtomicLong nwBytesUsagePeakValue = new AtomicLong(0); private final Gauge jvmMemoryUsedGauge; + private final AtomicLong jvmMemoryUsedValue = new AtomicLong(0); private final Gauge jvmMemoryMaxGauge; + private final AtomicLong jvmMemoryMaxValue = new AtomicLong(0); private final double nwBytesLimit; private double prev_cpus_system_time_secs = -1.0; private double prev_cpus_user_time_secs = -1.0; @@ -72,10 +85,12 @@ public class ResourceUsagePayloadSetter implements Closeable { private double peakBytesRead = 0.0; private double peakBytesWritten = 0.0; private StatusPayloads.ResourceUsage oldUsage = null; + private MeterRegistry meterRegistry; - public ResourceUsagePayloadSetter(Heartbeat heartbeat, WorkerConfiguration config, double networkMbps) { + public ResourceUsagePayloadSetter(Heartbeat heartbeat, WorkerConfiguration config, double networkMbps, MeterRegistry meterRegistry) { this.heartbeat = heartbeat; this.nwBytesLimit = networkMbps * 1024.0 * 1024.0 / 8.0; // convert from bits to bytes + this.meterRegistry = meterRegistry; executor = new ScheduledThreadPoolExecutor(1); String defaultReportingSchedule = "5,5,10,10,20,30"; StringTokenizer tokenizer = new StringTokenizer(defaultReportingSchedule, ","); @@ -98,36 +113,25 @@ public ResourceUsagePayloadSetter(Heartbeat heartbeat, WorkerConfiguration confi String nwBytesUsagePeakGaugeName = MetricStringConstants.NW_BYTES_USAGE_PEAK; String jvmMemoryUsedGaugeName = "jvmMemoryUsedBytes"; String jvmMemoryMaxGaugeName = "jvmMemoryMaxBytes"; - Metrics m = new Metrics.Builder() - .name("ResourceUsage") - .addGauge(cpuLimitGaugeName) - .addGauge(cpuUsageCurrGaugeName) - .addGauge(cpuUsagePeakGaugeName) - .addGauge(memLimitGaugeName) - .addGauge(cachedMemUsageCurrGaugeName) - .addGauge(cachedMemUsagePeakGaugeName) - .addGauge(totMemUsageCurrGaugeName) - .addGauge(totMemUsagePeakGaugeName) - .addGauge(nwBytesLimitGaugeName) - .addGauge(nwBytesUsageCurrGaugeName) - .addGauge(nwBytesUsagePeakGaugeName) - .addGauge(jvmMemoryUsedGaugeName) - .addGauge(jvmMemoryMaxGaugeName) - .build(); - m = MetricsRegistry.getInstance().registerAndGet(m); - cpuLimitGauge = m.getGauge(cpuLimitGaugeName); - cpuUsageCurrGauge = m.getGauge(cpuUsageCurrGaugeName); - cpuUsagePeakGauge = m.getGauge(cpuUsagePeakGaugeName); - memLimitGauge = m.getGauge(memLimitGaugeName); - cachedMemUsageCurrGauge = m.getGauge(cachedMemUsageCurrGaugeName); - cachedMemUsagePeakGauge = m.getGauge(cachedMemUsagePeakGaugeName); - totMemUsageCurrGauge = m.getGauge(totMemUsageCurrGaugeName); - totMemUsagePeakGauge = m.getGauge(totMemUsagePeakGaugeName); - nwBytesLimitGauge = m.getGauge(nwBytesLimitGaugeName); - nwBytesUsageCurrGauge = m.getGauge(nwBytesUsageCurrGaugeName); - nwBytesUsagePeakGauge = m.getGauge(nwBytesUsagePeakGaugeName); - jvmMemoryUsedGauge = m.getGauge(jvmMemoryUsedGaugeName); - jvmMemoryMaxGauge = m.getGauge(jvmMemoryMaxGaugeName); + cpuLimitGauge = addGauge(cpuLimitGaugeName, cpuLimitValue); + cpuUsageCurrGauge = addGauge(cpuUsageCurrGaugeName, cpuUsageCurrValue); + cpuUsagePeakGauge = addGauge(cpuUsagePeakGaugeName, cpuUsagePeakValue); + memLimitGauge = addGauge(memLimitGaugeName, memLimitValue); + cachedMemUsageCurrGauge = addGauge(cachedMemUsageCurrGaugeName, cachedMemUsageCurrValue); + cachedMemUsagePeakGauge = addGauge(cachedMemUsagePeakGaugeName, cachedMemUsagePeakValue); + totMemUsageCurrGauge = addGauge(totMemUsageCurrGaugeName, totMemUsageCurrValue); + totMemUsagePeakGauge = addGauge(totMemUsagePeakGaugeName, totMemUsagePeakValue); + nwBytesLimitGauge = addGauge(nwBytesLimitGaugeName, nwBytesLimitValue); + nwBytesUsageCurrGauge = addGauge(nwBytesUsageCurrGaugeName, nwBytesUsageCurrValue); + nwBytesUsagePeakGauge = addGauge(nwBytesUsagePeakGaugeName, nwBytesUsagePeakValue); + jvmMemoryUsedGauge = addGauge(jvmMemoryUsedGaugeName, jvmMemoryUsedValue); + jvmMemoryMaxGauge = addGauge(jvmMemoryMaxGaugeName, jvmMemoryMaxValue); + } + + private Gauge addGauge(String name, AtomicLong value) { + Gauge gauge = Gauge.builder("ResourceUsage_" + name, value::get) + .register(meterRegistry); + return gauge; } private long getNextDelay() { @@ -147,19 +151,19 @@ private void setPayloadAndMetrics() { } catch (JsonProcessingException e) { logger.warn("Error writing json for resourceUsage payload: " + e.getMessage()); } - cpuLimitGauge.set(Math.round(usage.getCpuLimit() * 100.0)); - cpuUsageCurrGauge.set(Math.round(usage.getCpuUsageCurrent() * 100.0)); - cpuUsagePeakGauge.set(Math.round(usage.getCpuUsagePeak() * 100.0)); - memLimitGauge.set(Math.round(usage.getMemLimit())); - cachedMemUsageCurrGauge.set(Math.round(usage.getMemCacheCurrent())); - cachedMemUsagePeakGauge.set(Math.round(usage.getMemCachePeak())); - totMemUsageCurrGauge.set(Math.round(usage.getTotMemUsageCurrent())); - totMemUsagePeakGauge.set(Math.round(usage.getTotMemUsagePeak())); - nwBytesLimitGauge.set(Math.round(nwBytesLimit)); - nwBytesUsageCurrGauge.set(Math.round(usage.getNwBytesCurrent())); - nwBytesUsagePeakGauge.set(Math.round(usage.getNwBytesPeak())); - jvmMemoryUsedGauge.set(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()); - jvmMemoryMaxGauge.set(Runtime.getRuntime().maxMemory()); + cpuLimitValue.set(Math.round(usage.getCpuLimit() * 100.0)); + cpuUsageCurrValue.set(Math.round(usage.getCpuUsageCurrent() * 100.0)); + cpuUsagePeakValue.set(Math.round(usage.getCpuUsagePeak() * 100.0)); + memLimitValue.set(Math.round(usage.getMemLimit())); + cachedMemUsageCurrValue.set(Math.round(usage.getMemCacheCurrent())); + cachedMemUsagePeakValue.set(Math.round(usage.getMemCachePeak())); + totMemUsageCurrValue.set(Math.round(usage.getTotMemUsageCurrent())); + totMemUsagePeakValue.set(Math.round(usage.getTotMemUsagePeak())); + nwBytesLimitValue.set(Math.round(nwBytesLimit)); + nwBytesUsageCurrValue.set(Math.round(usage.getNwBytesCurrent())); + nwBytesUsagePeakValue.set(Math.round(usage.getNwBytesPeak())); + jvmMemoryUsedValue.set(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()); + jvmMemoryMaxValue.set(Runtime.getRuntime().maxMemory()); if (isBigIncrease(oldUsage, usage) || closeToLimit(usage)) { delay = Math.min(delay, bigUsageChgReportingIntervalSecs); } diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java index ff5847283..f5a7b72b1 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.java @@ -22,7 +22,6 @@ import com.mantisrx.common.utils.Closeables; import com.netflix.spectator.api.Registry; import io.mantisrx.common.WorkerPorts; -import io.mantisrx.common.metrics.MetricsRegistry; import io.mantisrx.common.metrics.spectator.SpectatorRegistryFactory; import io.mantisrx.common.network.Endpoint; import io.mantisrx.runtime.Context; @@ -59,6 +58,7 @@ import io.mantisrx.server.worker.mesos.VirtualMachineTaskStatus; import io.mantisrx.shaded.com.google.common.base.Splitter; import io.mantisrx.shaded.com.google.common.base.Strings; +import io.micrometer.core.instrument.MeterRegistry; import io.reactivex.mantis.remote.observable.RemoteRxServer; import io.reactivex.mantis.remote.observable.RxMetrics; import io.reactivex.mantis.remote.observable.ToDeltaEndpointInjector; @@ -107,6 +107,7 @@ public class WorkerExecutionOperationsNetworkStage implements WorkerExecutionOpe private final ScheduledExecutorService scheduledExecutorService; private final ClassLoader classLoader; private Observer jobStatusObserver; + private MeterRegistry meterRegistry; public WorkerExecutionOperationsNetworkStage( Observer vmTaskStatusObserver, @@ -114,13 +115,15 @@ public WorkerExecutionOperationsNetworkStage( WorkerConfiguration config, WorkerMetricsClient workerMetricsClient, SinkSubscriptionStateHandler.Factory sinkSubscriptionStateHandlerFactory, - ClassLoader classLoader) { + ClassLoader classLoader, + MeterRegistry meterRegistry) { this.vmTaskStatusObserver = vmTaskStatusObserver; this.mantisMasterApi = mantisMasterApi; this.config = config; this.workerMetricsClient = workerMetricsClient; this.sinkSubscriptionStateHandlerFactory = sinkSubscriptionStateHandlerFactory; this.classLoader = classLoader; + this.meterRegistry = meterRegistry; String connectionsPerEndpointStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.worker.connectionsPerEndpoint", "2"); @@ -203,9 +206,9 @@ private static WorkerInfo generateWorkerInfo(String jobName, String jobId, int s } private static Context generateContext(Parameters parameters, ServiceLocator serviceLocator, WorkerInfo workerInfo, - MetricsRegistry metricsRegistry, Action0 completeAndExitAction, Observable workerMapObservable, ClassLoader classLoader) { + MeterRegistry meterRegistry, Action0 completeAndExitAction, Observable workerMapObservable, ClassLoader classLoader) { - return new Context(parameters, serviceLocator, workerInfo, metricsRegistry, completeAndExitAction, workerMapObservable, classLoader); + return new Context(parameters, serviceLocator, workerInfo, meterRegistry, completeAndExitAction, workerMapObservable, classLoader); } @@ -217,10 +220,10 @@ private Closeable startSendingHeartbeats(final Observer jobStatusObserve heartbeatIntervalSecs, TimeUnit.SECONDS); // start heartbeat payload setter for incoming data drops - DataDroppedPayloadSetter droppedPayloadSetter = new DataDroppedPayloadSetter(heartbeatRef.get()); + DataDroppedPayloadSetter droppedPayloadSetter = new DataDroppedPayloadSetter(heartbeatRef.get(), meterRegistry); droppedPayloadSetter.start(heartbeatIntervalSecs); - ResourceUsagePayloadSetter usagePayloadSetter = new ResourceUsagePayloadSetter(heartbeatRef.get(), config, networkMbps); + ResourceUsagePayloadSetter usagePayloadSetter = new ResourceUsagePayloadSetter(heartbeatRef.get(), config, networkMbps, meterRegistry); usagePayloadSetter.start(heartbeatIntervalSecs); return Closeables.combine(() -> heartbeatFuture.cancel(false), droppedPayloadSetter, usagePayloadSetter); @@ -342,7 +345,7 @@ public void executeStage(final ExecutionDetails setup) throws IOException { Parameters parameters = ParameterUtils .createContextParameters(rw.getJob().getParameterDefinitions(), setup.getParameters()); - final Context context = generateContext(parameters, serviceLocator, workerInfo, MetricsRegistry.getInstance(), + final Context context = generateContext(parameters, serviceLocator, workerInfo, meterRegistry, () -> { rw.signalCompleted(); // wait for completion signal to go to the master and us getting killed. Upon timeout, exit. @@ -436,7 +439,7 @@ public int acquirePort() { WorkerPublisherRemoteObservable publisher = new WorkerPublisherRemoteObservable<>(rw.getPorts().next(), remoteObservableName, numWorkersAtStage(selfSchedulingInfo, rw.getJobId(), rw.getStageNum() + 1), - rw.getJobName()); + rw.getJobName(), meterRegistry); closeables.add(StageExecutors.executeSource(rw.getWorkerIndex(), rw.getJob().getSource(), rw.getStage(), publisher, rw.getContext(), rw.getSourceStageTotalWorkersObservable())); @@ -444,7 +447,6 @@ remoteObservableName, numWorkersAtStage(selfSchedulingInfo, rw.getJobId(), rw.ge logger.info("JobId: " + rw.getJobId() + " stage: " + rw.getStageNum() + ", serving remote observable for source with name: " + remoteObservableName); RemoteRxServer server = publisher.getServer(); RxMetrics rxMetrics = server.getMetrics(); - MetricsRegistry.getInstance().registerAndGet(rxMetrics.getCountersAndGauges()); signalStarted(rw); logger.info("JobId: " + rw.getJobId() + " stage: " + rw.getStageNum() + ", blocking until source observable completes"); @@ -511,7 +513,6 @@ public int acquirePort() { } }; RxMetrics rxMetrics = new RxMetrics(); - MetricsRegistry.getInstance().registerAndGet(rxMetrics.getCountersAndGauges()); final CountDownLatch blockUntilComplete = new CountDownLatch(1); Action0 countDownLatch = new Action0() { @Override @@ -541,14 +542,13 @@ public void call() { WorkerPublisherRemoteObservable publisher = new WorkerPublisherRemoteObservable<>(workerPort, remoteObservableName, - numWorkersAtStage(selfSchedulingInfo, rw.getJobId(), rw.getStageNum() + 1), rw.getJobName()); + numWorkersAtStage(selfSchedulingInfo, rw.getJobId(), rw.getStageNum() + 1), rw.getJobName(), meterRegistry); closeables.add(StageExecutors.executeIntermediate(consumer, rw.getStage(), publisher, rw.getContext())); RemoteRxServer server = publisher.getServer(); logger.info("JobId: " + jobId + " stage: " + stageNumToExecute + ", serving intermediate remote observable with name: " + remoteObservableName); RxMetrics rxMetrics = server.getMetrics(); - MetricsRegistry.getInstance().registerAndGet(rxMetrics.getCountersAndGauges()); // send running signal only after server is started signalStarted(rw); logger.info("JobId: " + jobId + " stage: " + stageNumToExecute + ", blocking until intermediate observable completes"); @@ -615,7 +615,7 @@ private WorkerConsumer connectToObservableAtPreviousStages(Observable metricsClient; // worker metrics to subscribe to private final Set metrics; + private MeterRegistry meterRegistry; - public WorkerMetricSubscription(final String jobId, WorkerMetricsClient workerMetricsClient, Set metricGroups) { + public WorkerMetricSubscription(final String jobId, WorkerMetricsClient workerMetricsClient, Set metricGroups, MeterRegistry meterRegistry) { this.metrics = metricGroups; + this.meterRegistry = meterRegistry; SinkParameters metricNamesFilter = null; try { @@ -64,7 +67,7 @@ public void call(Throwable throwable) { logger.error("Interrupted waiting for retrying connection"); } } - }, metricNamesFilter), + }, metricNamesFilter, meterRegistry), new Observer() { @Override public void onCompleted() { diff --git a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/DataDroppedPayloadSetterTest.java b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/DataDroppedPayloadSetterTest.java index 532629093..4b41d01d2 100644 --- a/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/DataDroppedPayloadSetterTest.java +++ b/mantis-server/mantis-server-worker/src/test/java/io/mantisrx/server/worker/DataDroppedPayloadSetterTest.java @@ -24,10 +24,12 @@ import static org.junit.Assert.assertEquals; import com.netflix.spectator.api.DefaultRegistry; -import io.mantisrx.common.metrics.Metrics; import io.mantisrx.common.metrics.MetricsRegistry; import io.mantisrx.common.metrics.spectator.MetricGroupId; import io.mantisrx.common.metrics.spectator.SpectatorRegistryFactory; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.reactivx.mantis.operators.DropOperator; import org.junit.Test; import org.slf4j.Logger; @@ -40,28 +42,20 @@ public class DataDroppedPayloadSetterTest { public void testAggregateDropOperatorMetrics() throws Exception { SpectatorRegistryFactory.setRegistry(new DefaultRegistry()); Heartbeat heartbeat = new Heartbeat("job-1", 1, 1, 1); - DataDroppedPayloadSetter payloadSetter = new DataDroppedPayloadSetter(heartbeat); + MeterRegistry meterRegistry = new SimpleMeterRegistry(); + DataDroppedPayloadSetter payloadSetter = new DataDroppedPayloadSetter(heartbeat, meterRegistry); - Metrics m = new Metrics.Builder() - .id(METRIC_GROUP + "_" + INCOMING + "_metric1") - .addCounter(DropOperator.Counters.dropped.toString()) - .addCounter(DropOperator.Counters.onNext.toString()) - .build(); - m = MetricsRegistry.getInstance().registerAndGet(m); - m.getCounter(DropOperator.Counters.dropped.toString()).increment(1); - m.getCounter(DropOperator.Counters.onNext.toString()).increment(10); - m = new Metrics.Builder() - .id(METRIC_GROUP + "_" + INCOMING + "_metric2") - .addCounter(DropOperator.Counters.dropped.toString()) - .addCounter(DropOperator.Counters.onNext.toString()) - .build(); - m = MetricsRegistry.getInstance().registerAndGet(m); - m.getCounter(DropOperator.Counters.dropped.toString()).increment(100); - m.getCounter(DropOperator.Counters.onNext.toString()).increment(1000); + Counter m1Dropped = meterRegistry.counter(METRIC_GROUP + "_" + INCOMING + "_metric1_" + ("" + DropOperator.Counters.dropped)); + Counter m1OnNext = meterRegistry.counter(METRIC_GROUP + "_" + INCOMING + "_metric1_" + ("" + DropOperator.Counters.onNext)); + m1Dropped.increment(1); + m1OnNext.increment(10); + Counter m2Dropped = meterRegistry.counter(METRIC_GROUP + "_" + INCOMING + "_metric2_" + ("" + DropOperator.Counters.dropped)); + Counter m2OnNext = meterRegistry.counter(METRIC_GROUP + "_" + INCOMING + "_metric2_" + ("" + DropOperator.Counters.onNext)); + m2Dropped.increment(100); + m2OnNext.increment(1000); payloadSetter.setPayload(30); - m = MetricsRegistry.getInstance().getMetric(new MetricGroupId(DATA_DROP_METRIC_GROUP)); - assertEquals(101L, m.getGauge(DROP_COUNT).value()); - assertEquals(1010, m.getGauge(ON_NEXT_COUNT).value()); + assertEquals(101L, meterRegistry.find(DATA_DROP_METRIC_GROUP + "_" + DROP_COUNT).gauge().value()); + assertEquals(1010, meterRegistry.find(DATA_DROP_METRIC_GROUP + "_" +ON_NEXT_COUNT).gauge().value()); } } From ba8be43f4ee29b126d34c8a0c67b1d7be9dfe3e0 Mon Sep 17 00:00:00 2001 From: "J. Wu" Date: Tue, 5 Sep 2023 16:42:44 -0400 Subject: [PATCH 2/2] update --- build.gradle | 2 + .../mantisrx/server/agent/TaskExecutor.java | 14 +- .../server/agent/TaskExecutorStarter.java | 11 +- .../mantis-server-worker-client/build.gradle | 2 +- .../worker/client/MetricsClientImpl.java | 2 +- .../worker/client/SseWorkerConnection.java | 251 +++++++++--------- .../client/SseWorkerConnectionFunction.java | 11 +- .../worker/client/WorkerMetricsClient.java | 7 +- .../client/SseWorkerConnectionTest.java | 7 +- .../worker/DataDroppedPayloadSetter.java | 46 ++-- 10 files changed, 175 insertions(+), 178 deletions(-) diff --git a/build.gradle b/build.gradle index ffc9540ae..22e0c7322 100644 --- a/build.gradle +++ b/build.gradle @@ -41,6 +41,7 @@ ext.versions = [ jsr305 : "3.0.1", junit4 : "4.11", junit5 : "5.4.+", + micrometer : "1.11.0", mockito : "2.0.+", mockito3 : "3.+", spectator: "1.3.+", @@ -73,6 +74,7 @@ ext.libraries = [ "org.junit.jupiter:junit-jupiter-params:${versions.junit5}", ], mantisShaded : "io.mantisrx:mantis-shaded:2.0.8", + micrometerCore : "io.micrometer:micrometer-core:${versions.micrometer}", mockitoAll : "org.mockito:mockito-all:${versions.mockito}", mockitoCore : "org.mockito:mockito-core:${versions.mockito}", mockitoCore3 : "org.mockito:mockito-core:${versions.mockito3}", diff --git a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java index a456b78fc..ea5813889 100644 --- a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java +++ b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java @@ -52,7 +52,6 @@ import io.mantisrx.shaded.com.google.common.util.concurrent.Service; import io.mantisrx.shaded.com.google.common.util.concurrent.Service.State; import io.mantisrx.shaded.org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting; -import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.MeterRegistry; import java.util.Map; import java.util.Optional; @@ -119,17 +118,15 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private RuntimeTask currentTask; private ExecuteStageRequest currentRequest; - private MeterRegistry meterRegistry; public TaskExecutor( RpcService rpcService, WorkerConfiguration workerConfiguration, HighAvailabilityServices highAvailabilityServices, ClassLoaderHandle classLoaderHandle, - SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory, - MeterRegistry meterRegistry) { + SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory) { this(rpcService, workerConfiguration, highAvailabilityServices, classLoaderHandle, - subscriptionStateHandlerFactory, null, meterRegistry); + subscriptionStateHandlerFactory, null); } public TaskExecutor( @@ -138,8 +135,7 @@ public TaskExecutor( HighAvailabilityServices highAvailabilityServices, ClassLoaderHandle classLoaderHandle, SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory, - @Nullable TaskFactory taskFactory, - MeterRegistry meterRegistry) { + @Nullable TaskFactory taskFactory) { super(rpcService, RpcServiceUtils.createRandomName("worker")); // this is the task executor ID that will be used for the rest of the JVM process @@ -207,8 +203,8 @@ private void startTaskExecutorServices() { validateRunsInMainThread(); masterMonitor = highAvailabilityServices.getMasterClientApi(); - taskStatusUpdateHandler = TaskStatusUpdateHandler.forReportingToGateway(masterMonitor, meterRegistry); - RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory(meterRegistry)); + taskStatusUpdateHandler = TaskStatusUpdateHandler.forReportingToGateway(masterMonitor); + RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory()); resourceClusterGatewaySupplier = highAvailabilityServices.connectWithResourceManager(clusterID); resourceClusterGatewaySupplier.register(new ResourceManagerChangeListener()); diff --git a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java index 4f78aa177..7796af4e8 100644 --- a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java +++ b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java @@ -55,7 +55,6 @@ public class TaskExecutorStarter extends AbstractIdleService { private final TaskExecutor taskExecutor; private final HighAvailabilityServices highAvailabilityServices; private final RpcSystem rpcSystem; - private final MeterRegistry meterRegistry; @Override protected void startUp() { @@ -113,7 +112,6 @@ public static class TaskExecutorStarterBuilder { private TaskFactory taskFactory; private final List> listeners = new ArrayList<>(); - private MeterRegistry meterRegistry; private TaskExecutorStarterBuilder(WorkerConfiguration workerConfiguration) { this.workerConfiguration = workerConfiguration; @@ -132,10 +130,6 @@ public TaskExecutorStarterBuilder rpcSystem(RpcSystem rpcSystem) { return this; } - public TaskExecutorStarterBuilder registry(MeterRegistry meterRegistry) { - this.meterRegistry = meterRegistry; - return this; - } private RpcSystem getRpcSystem() { if (this.rpcSystem == null) { @@ -214,14 +208,13 @@ public TaskExecutorStarter build() throws Exception { highAvailabilityServices, getClassLoaderHandle(), getSinkSubscriptionHandlerFactory(), - this.taskFactory, - meterRegistry); + this.taskFactory); for (Tuple2 listener : listeners) { taskExecutor.addListener(listener._1(), listener._2()); } - return new TaskExecutorStarter(taskExecutor, highAvailabilityServices, getRpcSystem(), meterRegistry); + return new TaskExecutorStarter(taskExecutor, highAvailabilityServices, getRpcSystem()); } } } diff --git a/mantis-server/mantis-server-worker-client/build.gradle b/mantis-server/mantis-server-worker-client/build.gradle index b87ceea4e..a7409e18c 100644 --- a/mantis-server/mantis-server-worker-client/build.gradle +++ b/mantis-server/mantis-server-worker-client/build.gradle @@ -17,7 +17,7 @@ dependencies { api project(":mantis-control-plane:mantis-control-plane-core") api project(":mantis-control-plane:mantis-control-plane-client") - + implementation libraries.micrometerCore testImplementation libraries.junit4 testImplementation libraries.mockitoAll testImplementation libraries.spectatorApi diff --git a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/MetricsClientImpl.java b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/MetricsClientImpl.java index 63f6971a3..b775a0a28 100644 --- a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/MetricsClientImpl.java +++ b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/MetricsClientImpl.java @@ -152,7 +152,7 @@ public void call() { } }) .share() - .lift(new DropOperator>(meterRegistry, "client_metrics_share")) + .lift(new DropOperator>("client_metrics_share")) ; } diff --git a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java index 3e11312b2..65cc4c7d2 100644 --- a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java +++ b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java @@ -21,16 +21,15 @@ import com.mantisrx.common.utils.MantisSSEConstants; import io.mantisrx.common.MantisServerSentEvent; import io.mantisrx.common.compression.CompressionUtils; -import io.mantisrx.common.metrics.Metrics; -import io.mantisrx.common.metrics.MetricsRegistry; import io.mantisrx.common.metrics.spectator.MetricGroupId; import io.mantisrx.runtime.parameter.SinkParameter; import io.mantisrx.runtime.parameter.SinkParameters; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.search.Search; import io.netty.buffer.ByteBuf; import io.reactivx.mantis.operators.DropOperator; -import java.util.Collection; import java.util.List; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; @@ -61,13 +60,14 @@ public class SseWorkerConnection { private final String hostname; private final int port; private final String metricGroup; + private final MeterRegistry meterRegistry; private final Counter pingCounter; private final boolean reconnectUponConnectionReset; private final Action1 updateConxStatus; private final Action1 updateDataRecvngStatus; private final Action1 connectionResetHandler; private final long dataRecvTimeoutSecs; - private final CopyOnWriteArraySet metricsSet; + private final CopyOnWriteArraySet metricsSet; private final int bufferSize; private final SinkParameters sinkParameters; private final boolean disablePingFiltering; @@ -77,33 +77,34 @@ public class SseWorkerConnection { private boolean compressedBinaryInputEnabled = false; private volatile boolean isShutdown = false; private final Func1, Observable> retryLogic = - new Func1, Observable>() { - @Override - public Observable call(Observable attempts) { - if (!reconnectUponConnectionReset) - return Observable.empty(); - return attempts - .zipWith(Observable.range(1, Integer.MAX_VALUE), new Func2() { - @Override - public Integer call(Throwable t1, Integer integer) { - return integer; - } - }) - .flatMap(new Func1>() { - @Override - public Observable call(Integer integer) { - if (isShutdown) { - logger.info(getName() + ": Is shutdown, stopping retries"); - return Observable.empty(); - } - long delay = 2 * (integer > 10 ? 10 : integer); - logger.info(getName() + ": retrying conx after sleeping for " + delay + " secs"); - return Observable.timer(delay, TimeUnit.SECONDS); - } - }); - } - }; + new Func1, Observable>() { + @Override + public Observable call(Observable attempts) { + if (!reconnectUponConnectionReset) + return Observable.empty(); + return attempts + .zipWith(Observable.range(1, Integer.MAX_VALUE), new Func2() { + @Override + public Integer call(Throwable t1, Integer integer) { + return integer; + } + }) + .flatMap(new Func1>() { + @Override + public Observable call(Integer integer) { + if (isShutdown) { + logger.info(getName() + ": Is shutdown, stopping retries"); + return Observable.empty(); + } + long delay = 2 * (integer > 10 ? 10 : integer); + logger.info(getName() + ": retrying conx after sleeping for " + delay + " secs"); + return Observable.timer(delay, TimeUnit.SECONDS); + } + }); + } + }; private long lastDataDropValue = 0L; + public SseWorkerConnection(final String connectionType, final String hostname, final Integer port, @@ -118,8 +119,9 @@ public SseWorkerConnection(final String connectionType, final String metricGroup, final MeterRegistry meterRegistry) { this(connectionType, hostname, port, updateConxStatus, updateDataRecvngStatus, connectionResetHandler, - dataRecvTimeoutSecs, reconnectUponConnectionReset, metricsSet, bufferSize, sinkParameters, false, metricGroup, meterRegistry); + dataRecvTimeoutSecs, reconnectUponConnectionReset, metricsSet, bufferSize, sinkParameters, false, metricGroup, meterRegistry); } + public SseWorkerConnection(final String connectionType, final String hostname, final Integer port, @@ -128,7 +130,7 @@ public SseWorkerConnection(final String connectionType, final Action1 connectionResetHandler, final long dataRecvTimeoutSecs, final boolean reconnectUponConnectionReset, - final CopyOnWriteArraySet metricsSet, + final CopyOnWriteArraySet metricsSet, final int bufferSize, final SinkParameters sinkParameters, final boolean disablePingFiltering, @@ -139,7 +141,7 @@ public SseWorkerConnection(final String connectionType, this.port = port; this.meterRegistry = meterRegistry; this.metricGroup = metricGroup; - this.pingCounter = meterRegistry.counter(metricGroup + "_ConnectionHealth_pingCount"); + this.pingCounter = meterRegistry.counter("ConnectionHealth_pingCount"); this.updateConxStatus = updateConxStatus; this.updateDataRecvngStatus = updateDataRecvngStatus; this.connectionResetHandler = connectionResetHandler; @@ -181,57 +183,56 @@ public synchronized Observable call() { if (isShutdown) return Observable.empty(); client = - RxNetty.newHttpClientBuilder(hostname, port) - .pipelineConfigurator(PipelineConfigurators.clientSseConfigurator()) - //.enableWireLogging(LogLevel.ERROR) - .withNoConnectionPooling() - .build(); + RxNetty.newHttpClientBuilder(hostname, port) + .pipelineConfigurator(PipelineConfigurators.clientSseConfigurator()) + //.enableWireLogging(LogLevel.ERROR) + .withNoConnectionPooling() + .build(); StringBuilder sp = new StringBuilder(); String delimiter = sinkParameters == null - ? null - : sinkParameters.getSinkParams().stream() - .filter(s -> s.getName() - .equalsIgnoreCase(MantisSSEConstants.MANTIS_COMPRESSION_DELIMITER)) - .findFirst() - .map(SinkParameter::getValue) - .orElse(null); + ? null + : sinkParameters.getSinkParams().stream() + .filter(s -> s.getName() + .equalsIgnoreCase(MantisSSEConstants.MANTIS_COMPRESSION_DELIMITER)) + .findFirst() + .map(SinkParameter::getValue) + .orElse(null); if (sinkParameters != null) { sp.append(sinkParameters.toString()); } - sp.append(sp.length() == 0 ? getDefaultSinkParams("?") : getDefaultSinkParams("&")); String uri = "/" + sp.toString(); logger.info(getName() + ": Using uri: " + uri); return - client.submit(HttpClientRequest.createGet(uri)) - .takeUntil(shutdownSubject) - .takeWhile((serverSentEventHttpClientResponse) -> !isShutdown) - .filter((HttpClientResponse response) -> { - if (!response.getStatus().reasonPhrase().equals("OK")) - logger.warn(getName() + ":Trying to continue after unexpected response from sink: " - + response.getStatus().reasonPhrase()); - return response.getStatus().reasonPhrase().equals("OK"); - }) - .flatMap((HttpClientResponse response) -> { - if (!isConnected.getAndSet(true)) { - if (updateConxStatus != null) - updateConxStatus.call(true); - } - return streamContent(response, updateDataRecvngStatus, dataRecvTimeoutSecs, delimiter); - }) - .doOnError((Throwable throwable) -> { - resetConnected(); - logger.warn(getName() + - "Error on getting response from SSE server: " + throwable.getMessage()); - connectionResetHandler.call(throwable); - }) - .retryWhen(retryLogic) - .doOnCompleted(this::resetConnected); + client.submit(HttpClientRequest.createGet(uri)) + .takeUntil(shutdownSubject) + .takeWhile((serverSentEventHttpClientResponse) -> !isShutdown) + .filter((HttpClientResponse response) -> { + if (!response.getStatus().reasonPhrase().equals("OK")) + logger.warn(getName() + ":Trying to continue after unexpected response from sink: " + + response.getStatus().reasonPhrase()); + return response.getStatus().reasonPhrase().equals("OK"); + }) + .flatMap((HttpClientResponse response) -> { + if (!isConnected.getAndSet(true)) { + if (updateConxStatus != null) + updateConxStatus.call(true); + } + return streamContent(response, updateDataRecvngStatus, dataRecvTimeoutSecs, delimiter); + }) + .doOnError((Throwable throwable) -> { + resetConnected(); + logger.warn(getName() + + "Error on getting response from SSE server: " + throwable.getMessage()); + connectionResetHandler.call(throwable); + }) + .retryWhen(retryLogic) + .doOnCompleted(this::resetConnected); } private void resetConnected() { @@ -247,72 +248,72 @@ private void resetConnected() { } protected Observable streamContent(HttpClientResponse response, - final Action1 updateDataRecvngStatus, - final long dataRecvTimeoutSecs, String delimiter) { + final Action1 updateDataRecvngStatus, + final long dataRecvTimeoutSecs, String delimiter) { long interval = Math.max(1, dataRecvTimeoutSecs / 2); if (updateDataRecvngStatus != null) { Observable.interval(interval, interval, TimeUnit.SECONDS) - .doOnNext((Long aLong) -> { - if (!isShutdown) { - if (hasDataDrop() || System.currentTimeMillis() > (lastDataReceived.get() + dataRecvTimeoutSecs * 1000)) { - if (isReceivingData.compareAndSet(true, false)) - synchronized (updateDataRecvngStatus) { - updateDataRecvngStatus.call(false); - } - } else { - if (isConnected.get() && isReceivingData.compareAndSet(false, true)) - synchronized (updateDataRecvngStatus) { - updateDataRecvngStatus.call(true); - } - } + .doOnNext((Long aLong) -> { + if (!isShutdown) { + if (hasDataDrop() || System.currentTimeMillis() > (lastDataReceived.get() + dataRecvTimeoutSecs * 1000)) { + if (isReceivingData.compareAndSet(true, false)) + synchronized (updateDataRecvngStatus) { + updateDataRecvngStatus.call(false); + } + } else { + if (isConnected.get() && isReceivingData.compareAndSet(false, true)) + synchronized (updateDataRecvngStatus) { + updateDataRecvngStatus.call(true); + } } - }) - .takeUntil(shutdownSubject) - .takeWhile((o) -> !isShutdown) - .doOnCompleted(() -> { - if (isReceivingData.compareAndSet(true, false)) - synchronized (updateDataRecvngStatus) { - updateDataRecvngStatus.call(false); - } - }) - .subscribe(); - } - return response.getContent() - .lift(new DropOperator(meterRegistry)) - .flatMap((ServerSentEvent t1) -> { - lastDataReceived.set(System.currentTimeMillis()); - if (isConnected.get() && isReceivingData.compareAndSet(false, true)) - if (updateDataRecvngStatus != null) - synchronized (updateDataRecvngStatus) { - updateDataRecvngStatus.call(true); - } - - if (t1.hasEventType() && t1.getEventTypeAsString().startsWith("error:")) { - return Observable.error(new SseException(ErrorType.Retryable, "Got error SSE event: " + t1.contentAsString())); } - return Observable.just(t1.contentAsString()); - }, 1) - .filter(data -> { - if (data.startsWith("ping")) { - pingCounter.increment(); - return this.disablePingFiltering; - } - return true; }) - .flatMapIterable((data) -> { - boolean useSnappy = true; - return CompressionUtils.decompressAndBase64Decode(data, compressedBinaryInputEnabled, useSnappy, delimiter); - }, 1) .takeUntil(shutdownSubject) - .takeWhile((event) -> !isShutdown); + .takeWhile((o) -> !isShutdown) + .doOnCompleted(() -> { + if (isReceivingData.compareAndSet(true, false)) + synchronized (updateDataRecvngStatus) { + updateDataRecvngStatus.call(false); + } + }) + .subscribe(); + } + return response.getContent() + .lift(new DropOperator(meterRegistry, metricGroup)) + .flatMap((ServerSentEvent t1) -> { + lastDataReceived.set(System.currentTimeMillis()); + if (isConnected.get() && isReceivingData.compareAndSet(false, true)) + if (updateDataRecvngStatus != null) + synchronized (updateDataRecvngStatus) { + updateDataRecvngStatus.call(true); + } + + if (t1.hasEventType() && t1.getEventTypeAsString().startsWith("error:")) { + return Observable.error(new SseException(ErrorType.Retryable, "Got error SSE event: " + t1.contentAsString())); + } + return Observable.just(t1.contentAsString()); + }, 1) + .filter(data -> { + if (data.startsWith("ping")) { + pingCounter.increment(); + return this.disablePingFiltering; + } + return true; + }) + .flatMapIterable((data) -> { + boolean useSnappy = true; + return CompressionUtils.decompressAndBase64Decode(data, compressedBinaryInputEnabled, useSnappy, delimiter); + }, 1) + .takeUntil(shutdownSubject) + .takeWhile((event) -> !isShutdown); } private boolean hasDataDrop() { long totalDataDrop = 0L; - if (meterRegistry.getMeters().contains(pingCounter)) { - final Counter dropped = meterRegistry.find(DROP_OPERATOR_INCOMING_METRIC_GROUP + "_" + ("" + DropOperator.Counters.dropped)).counter(); - final Counter onNext = meterRegistry.find(DROP_OPERATOR_INCOMING_METRIC_GROUP + "_" + ("" + DropOperator.Counters.onNext)).counter(); - } + final Counter dropped = Search.in(meterRegistry).name(name -> name.startsWith("DropOperator_") && name.endsWith(DropOperator.Counters.dropped.toString())).counter(); + final Counter onNext = Search.in(meterRegistry).name(name -> name.startsWith("DropOperator_") && name.endsWith(DropOperator.Counters.onNext.toString())).counter(); + if (dropped != null) + totalDataDrop += dropped.count(); if (totalDataDrop > lastDataDropValue) { lastDataDropValue = totalDataDrop; return true; diff --git a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnectionFunction.java b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnectionFunction.java index 9fe17c217..bb4718343 100644 --- a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnectionFunction.java +++ b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnectionFunction.java @@ -20,12 +20,13 @@ import com.mantisrx.common.utils.NettyUtils; import io.mantisrx.common.MantisServerSentEvent; -import io.mantisrx.common.metrics.spectator.MetricGroupId; import io.mantisrx.runtime.parameter.SinkParameters; import io.mantisrx.server.core.ServiceRegistry; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.search.Search; import io.reactivx.mantis.operators.DropOperator; +import io.reactivx.mantis.operators.DropOperator.Counters; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -68,10 +69,10 @@ public void run() { if (!metricGroups.isEmpty()) { for (String metricGroup : metricGroups) { if (metricGroup != null) { - final Counter onNext = meterRegistry.counter(metricGroup + "_DropOperator_" + ("" + DropOperator.Counters.onNext)); - final Counter onError = meterRegistry.counter(metricGroup + "_DropOperator_" + ("" + DropOperator.Counters.onError)); - final Counter onComplete = meterRegistry.counter(metricGroup + "_DropOperator_" + ("" + DropOperator.Counters.onComplete)); - final Counter dropped = meterRegistry.counter(metricGroup + "_DropOperator_" + ("" + DropOperator.Counters.dropped)); + final Counter onNext = Search.in(meterRegistry).name(name-> name.startsWith(metricGroup) && name.endsWith(DropOperator.Counters.onNext.toString())).counter(); + final Counter onError = Search.in(meterRegistry).name(name-> name.startsWith(metricGroup) && name.endsWith(DropOperator.Counters.onError.toString())).counter(); + final Counter onComplete = Search.in(meterRegistry).name(name-> name.startsWith(metricGroup) && name.endsWith(Counters.onComplete.toString())).counter(); + final Counter dropped = Search.in(meterRegistry).name(name-> name.startsWith(metricGroup) && name.endsWith(Counters.onNext.toString())).counter(); logger.info(metricGroup + ": onNext=" + onNext.count() + ", onError=" + onError.count() + ", onComplete=" + onComplete.count() + ", dropped=" + dropped.count() // + ", buffered=" + buffered.value() diff --git a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/WorkerMetricsClient.java b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/WorkerMetricsClient.java index 1783a8544..ad0046d1e 100644 --- a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/WorkerMetricsClient.java +++ b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/WorkerMetricsClient.java @@ -23,6 +23,7 @@ import io.mantisrx.server.master.client.HighAvailabilityServicesUtil; import io.mantisrx.server.master.client.MantisMasterGateway; import io.mantisrx.server.master.client.MasterClientWrapper; +import io.micrometer.core.instrument.MeterRegistry; import io.reactivex.mantis.remote.observable.EndpointChange; import java.util.Properties; import org.slf4j.Logger; @@ -39,7 +40,7 @@ public class WorkerMetricsClient { private final MasterClientWrapper clientWrapper; - private final JobWorkerMetricsLocator jobWrokerMetricsLocator = new JobWorkerMetricsLocator() { + private final JobWorkerMetricsLocator jobWorkerMetricsLocator = new JobWorkerMetricsLocator() { @Override public Observable locateWorkerMetricsForJob(final String jobId) { return clientWrapper.getMasterClientApi() @@ -101,7 +102,7 @@ public WorkerMetricsClient(MantisMasterGateway gateway) { public JobWorkerMetricsLocator getWorkerMetricsLocator() { - return jobWrokerMetricsLocator; + return jobWorkerMetricsLocator; } /* package */ MasterClientWrapper getClientWrapper() { @@ -131,6 +132,6 @@ public Integer call(MasterClientWrapper.JobNumWorkers jobNumWorkers) { return jobNumWorkers.getNumWorkers(); } }), - workerConnectionsStatusObserver, dataRecvTimeoutSecs); + workerConnectionsStatusObserver, dataRecvTimeoutSecs, meterRegistry); } } diff --git a/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/SseWorkerConnectionTest.java b/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/SseWorkerConnectionTest.java index 03dd34c48..faa141a3f 100644 --- a/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/SseWorkerConnectionTest.java +++ b/mantis-server/mantis-server-worker-client/src/test/java/io/mantisrx/server/worker/client/SseWorkerConnectionTest.java @@ -24,16 +24,17 @@ import io.mantisrx.common.MantisServerSentEvent; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.search.Search; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.netty.buffer.Unpooled; import io.reactivx.mantis.operators.DropOperator; +import io.reactivx.mantis.operators.DropOperator.Counters; import java.nio.charset.Charset; import java.util.List; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import mantis.io.reactivex.netty.protocol.http.client.HttpClientResponse; import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent; -import org.apache.flink.metrics.Meter; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,8 +82,8 @@ public void testStreamContentDrops() throws Exception { List events = subscriber.getOnNextEvents(); assertEquals("0", events.get(0).getEventAsString()); - Counter onNextCounter = meterRegistry.counter(DropOperator.Counters.onNext.toString()); - Counter droppedCounter = meterRegistry.counter(DropOperator.Counters.dropped.toString()); + Counter onNextCounter = Search.in(meterRegistry).name(name-> name.startsWith("DropOperator_") && name.endsWith(DropOperator.Counters.onNext.toString())).counter(); + Counter droppedCounter = Search.in(meterRegistry).name(name-> name.startsWith("DropOperator_") && name.endsWith(Counters.dropped.toString())).counter(); logger.info("next: {}", onNextCounter.count()); logger.info("drop: {}", droppedCounter.count()); assertTrue(onNextCounter.count() < 10); diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DataDroppedPayloadSetter.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DataDroppedPayloadSetter.java index fcdfc8778..cc4c18079 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DataDroppedPayloadSetter.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DataDroppedPayloadSetter.java @@ -71,29 +71,31 @@ protected void setPayload(final long intervalSecs) { long totalDropped = 0L; long totalOnNext = 0L; try { - if (meterRegistry.getMeters().contains(dropCountGauge) && meterRegistry.getMeters().contains(onNextCountGauge)){ - //logger.info("Got " + metrics.size() + " metrics for DropOperator"); - final Counter dropped = meterRegistry.find("DropOperator_" + "" + DropOperator.Counters.dropped).counter(); - final Counter onNext = meterRegistry.find("DropOperator_" + "" + DropOperator.Counters.onNext).counter(); - if (dropped != null) - totalDropped += dropped.count(); - else - logger.warn("Unexpected to get null dropped counter for metric DropOperator DropOperator_dropped."); - if (onNext != null) - totalOnNext += onNext.count(); - else - logger.warn("Unexpected to get null onNext counter for metric DropOperator_onNext."); - - final StatusPayloads.DataDropCounts dataDrop = new StatusPayloads.DataDropCounts(totalOnNext, totalDropped); - try { - heartbeat.addSingleUsePayload("" + StatusPayloads.Type.IncomingDataDrop, objectMapper.writeValueAsString(dataDrop)); - } catch (JsonProcessingException e) { - logger.warn("Error writing json for dataDrop payload: " + e.getMessage()); + for (Meter meter : meterRegistry.getMeters()) { + if (meter.getId().getName().startsWith("DropOperator_") && meter.getId().getName().endsWith(DropOperator.Counters.dropped.toString())){ + final Counter dropped = meterRegistry.find(meter.getId().getName()).counter(); + if (dropped != null) + totalDropped += dropped.count(); + else + logger.warn("Unexpected to get null dropped counter for metric DropOperator DropOperator_dropped."); + } + if (meter.getId().getName().startsWith("DropOperator_") && meter.getId().getName().endsWith(DropOperator.Counters.onNext.toString())){ + final Counter onNext = meterRegistry.find(meter.getId().getName()).counter(); + if (onNext != null) + totalOnNext += onNext.count(); + else + logger.warn("Unexpected to get null onNext counter for metric DropOperator_onNext."); } - dropCountValue.set(dataDrop.getDroppedCount()); - onNextCountValue.set(dataDrop.getOnNextCount()); - } else - logger.debug("Got no metrics from DropOperator"); + } + //logger.info("Got " + metrics.size() + " metrics for DropOperator"); + final StatusPayloads.DataDropCounts dataDrop = new StatusPayloads.DataDropCounts(totalOnNext, totalDropped); + try { + heartbeat.addSingleUsePayload("" + StatusPayloads.Type.IncomingDataDrop, objectMapper.writeValueAsString(dataDrop)); + } catch (JsonProcessingException e) { + logger.warn("Error writing json for dataDrop payload: " + e.getMessage()); + } + dropCountValue.set(dataDrop.getDroppedCount()); + onNextCountValue.set(dataDrop.getOnNextCount()); } catch (Exception e) { logger.error(e.getMessage(), e); }