diff --git a/build.gradle b/build.gradle index ffc9540ae..22e0c7322 100644 --- a/build.gradle +++ b/build.gradle @@ -41,6 +41,7 @@ ext.versions = [ jsr305 : "3.0.1", junit4 : "4.11", junit5 : "5.4.+", + micrometer : "1.11.0", mockito : "2.0.+", mockito3 : "3.+", spectator: "1.3.+", @@ -73,6 +74,7 @@ ext.libraries = [ "org.junit.jupiter:junit-jupiter-params:${versions.junit5}", ], mantisShaded : "io.mantisrx:mantis-shaded:2.0.8", + micrometerCore : "io.micrometer:micrometer-core:${versions.micrometer}", mockitoAll : "org.mockito:mockito-all:${versions.mockito}", mockitoCore : "org.mockito:mockito-core:${versions.mockito}", mockitoCore3 : "org.mockito:mockito-core:${versions.mockito3}", diff --git a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java index 4f78aa177..cb5d655cd 100644 --- a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java +++ b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java @@ -55,7 +55,6 @@ public class TaskExecutorStarter extends AbstractIdleService { private final TaskExecutor taskExecutor; private final HighAvailabilityServices highAvailabilityServices; private final RpcSystem rpcSystem; - private final MeterRegistry meterRegistry; @Override protected void startUp() { @@ -221,7 +220,7 @@ public TaskExecutorStarter build() throws Exception { taskExecutor.addListener(listener._1(), listener._2()); } - return new TaskExecutorStarter(taskExecutor, highAvailabilityServices, getRpcSystem(), meterRegistry); + return new TaskExecutorStarter(taskExecutor, highAvailabilityServices, getRpcSystem()); } } } diff --git a/mantis-server/mantis-server-worker-client/build.gradle b/mantis-server/mantis-server-worker-client/build.gradle index b87ceea4e..a7409e18c 100644 --- a/mantis-server/mantis-server-worker-client/build.gradle +++ b/mantis-server/mantis-server-worker-client/build.gradle @@ -17,7 +17,7 @@ dependencies { api project(":mantis-control-plane:mantis-control-plane-core") api project(":mantis-control-plane:mantis-control-plane-client") - + implementation libraries.micrometerCore testImplementation libraries.junit4 testImplementation libraries.mockitoAll testImplementation libraries.spectatorApi diff --git a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/MetricsClientImpl.java b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/MetricsClientImpl.java index 63f6971a3..b775a0a28 100644 --- a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/MetricsClientImpl.java +++ b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/MetricsClientImpl.java @@ -152,7 +152,7 @@ public void call() { } }) .share() - .lift(new DropOperator>(meterRegistry, "client_metrics_share")) + .lift(new DropOperator>("client_metrics_share")) ; } diff --git a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java index 3e11312b2..65cc4c7d2 100644 --- a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java +++ b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnection.java @@ -21,16 +21,15 @@ import com.mantisrx.common.utils.MantisSSEConstants; import io.mantisrx.common.MantisServerSentEvent; import io.mantisrx.common.compression.CompressionUtils; -import io.mantisrx.common.metrics.Metrics; -import io.mantisrx.common.metrics.MetricsRegistry; import io.mantisrx.common.metrics.spectator.MetricGroupId; import io.mantisrx.runtime.parameter.SinkParameter; import io.mantisrx.runtime.parameter.SinkParameters; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.search.Search; import io.netty.buffer.ByteBuf; import io.reactivx.mantis.operators.DropOperator; -import java.util.Collection; import java.util.List; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; @@ -61,13 +60,14 @@ public class SseWorkerConnection { private final String hostname; private final int port; private final String metricGroup; + private final MeterRegistry meterRegistry; private final Counter pingCounter; private final boolean reconnectUponConnectionReset; private final Action1 updateConxStatus; private final Action1 updateDataRecvngStatus; private final Action1 connectionResetHandler; private final long dataRecvTimeoutSecs; - private final CopyOnWriteArraySet metricsSet; + private final CopyOnWriteArraySet metricsSet; private final int bufferSize; private final SinkParameters sinkParameters; private final boolean disablePingFiltering; @@ -77,33 +77,34 @@ public class SseWorkerConnection { private boolean compressedBinaryInputEnabled = false; private volatile boolean isShutdown = false; private final Func1, Observable> retryLogic = - new Func1, Observable>() { - @Override - public Observable call(Observable attempts) { - if (!reconnectUponConnectionReset) - return Observable.empty(); - return attempts - .zipWith(Observable.range(1, Integer.MAX_VALUE), new Func2() { - @Override - public Integer call(Throwable t1, Integer integer) { - return integer; - } - }) - .flatMap(new Func1>() { - @Override - public Observable call(Integer integer) { - if (isShutdown) { - logger.info(getName() + ": Is shutdown, stopping retries"); - return Observable.empty(); - } - long delay = 2 * (integer > 10 ? 10 : integer); - logger.info(getName() + ": retrying conx after sleeping for " + delay + " secs"); - return Observable.timer(delay, TimeUnit.SECONDS); - } - }); - } - }; + new Func1, Observable>() { + @Override + public Observable call(Observable attempts) { + if (!reconnectUponConnectionReset) + return Observable.empty(); + return attempts + .zipWith(Observable.range(1, Integer.MAX_VALUE), new Func2() { + @Override + public Integer call(Throwable t1, Integer integer) { + return integer; + } + }) + .flatMap(new Func1>() { + @Override + public Observable call(Integer integer) { + if (isShutdown) { + logger.info(getName() + ": Is shutdown, stopping retries"); + return Observable.empty(); + } + long delay = 2 * (integer > 10 ? 10 : integer); + logger.info(getName() + ": retrying conx after sleeping for " + delay + " secs"); + return Observable.timer(delay, TimeUnit.SECONDS); + } + }); + } + }; private long lastDataDropValue = 0L; + public SseWorkerConnection(final String connectionType, final String hostname, final Integer port, @@ -118,8 +119,9 @@ public SseWorkerConnection(final String connectionType, final String metricGroup, final MeterRegistry meterRegistry) { this(connectionType, hostname, port, updateConxStatus, updateDataRecvngStatus, connectionResetHandler, - dataRecvTimeoutSecs, reconnectUponConnectionReset, metricsSet, bufferSize, sinkParameters, false, metricGroup, meterRegistry); + dataRecvTimeoutSecs, reconnectUponConnectionReset, metricsSet, bufferSize, sinkParameters, false, metricGroup, meterRegistry); } + public SseWorkerConnection(final String connectionType, final String hostname, final Integer port, @@ -128,7 +130,7 @@ public SseWorkerConnection(final String connectionType, final Action1 connectionResetHandler, final long dataRecvTimeoutSecs, final boolean reconnectUponConnectionReset, - final CopyOnWriteArraySet metricsSet, + final CopyOnWriteArraySet metricsSet, final int bufferSize, final SinkParameters sinkParameters, final boolean disablePingFiltering, @@ -139,7 +141,7 @@ public SseWorkerConnection(final String connectionType, this.port = port; this.meterRegistry = meterRegistry; this.metricGroup = metricGroup; - this.pingCounter = meterRegistry.counter(metricGroup + "_ConnectionHealth_pingCount"); + this.pingCounter = meterRegistry.counter("ConnectionHealth_pingCount"); this.updateConxStatus = updateConxStatus; this.updateDataRecvngStatus = updateDataRecvngStatus; this.connectionResetHandler = connectionResetHandler; @@ -181,57 +183,56 @@ public synchronized Observable call() { if (isShutdown) return Observable.empty(); client = - RxNetty.newHttpClientBuilder(hostname, port) - .pipelineConfigurator(PipelineConfigurators.clientSseConfigurator()) - //.enableWireLogging(LogLevel.ERROR) - .withNoConnectionPooling() - .build(); + RxNetty.newHttpClientBuilder(hostname, port) + .pipelineConfigurator(PipelineConfigurators.clientSseConfigurator()) + //.enableWireLogging(LogLevel.ERROR) + .withNoConnectionPooling() + .build(); StringBuilder sp = new StringBuilder(); String delimiter = sinkParameters == null - ? null - : sinkParameters.getSinkParams().stream() - .filter(s -> s.getName() - .equalsIgnoreCase(MantisSSEConstants.MANTIS_COMPRESSION_DELIMITER)) - .findFirst() - .map(SinkParameter::getValue) - .orElse(null); + ? null + : sinkParameters.getSinkParams().stream() + .filter(s -> s.getName() + .equalsIgnoreCase(MantisSSEConstants.MANTIS_COMPRESSION_DELIMITER)) + .findFirst() + .map(SinkParameter::getValue) + .orElse(null); if (sinkParameters != null) { sp.append(sinkParameters.toString()); } - sp.append(sp.length() == 0 ? getDefaultSinkParams("?") : getDefaultSinkParams("&")); String uri = "/" + sp.toString(); logger.info(getName() + ": Using uri: " + uri); return - client.submit(HttpClientRequest.createGet(uri)) - .takeUntil(shutdownSubject) - .takeWhile((serverSentEventHttpClientResponse) -> !isShutdown) - .filter((HttpClientResponse response) -> { - if (!response.getStatus().reasonPhrase().equals("OK")) - logger.warn(getName() + ":Trying to continue after unexpected response from sink: " - + response.getStatus().reasonPhrase()); - return response.getStatus().reasonPhrase().equals("OK"); - }) - .flatMap((HttpClientResponse response) -> { - if (!isConnected.getAndSet(true)) { - if (updateConxStatus != null) - updateConxStatus.call(true); - } - return streamContent(response, updateDataRecvngStatus, dataRecvTimeoutSecs, delimiter); - }) - .doOnError((Throwable throwable) -> { - resetConnected(); - logger.warn(getName() + - "Error on getting response from SSE server: " + throwable.getMessage()); - connectionResetHandler.call(throwable); - }) - .retryWhen(retryLogic) - .doOnCompleted(this::resetConnected); + client.submit(HttpClientRequest.createGet(uri)) + .takeUntil(shutdownSubject) + .takeWhile((serverSentEventHttpClientResponse) -> !isShutdown) + .filter((HttpClientResponse response) -> { + if (!response.getStatus().reasonPhrase().equals("OK")) + logger.warn(getName() + ":Trying to continue after unexpected response from sink: " + + response.getStatus().reasonPhrase()); + return response.getStatus().reasonPhrase().equals("OK"); + }) + .flatMap((HttpClientResponse response) -> { + if (!isConnected.getAndSet(true)) { + if (updateConxStatus != null) + updateConxStatus.call(true); + } + return streamContent(response, updateDataRecvngStatus, dataRecvTimeoutSecs, delimiter); + }) + .doOnError((Throwable throwable) -> { + resetConnected(); + logger.warn(getName() + + "Error on getting response from SSE server: " + throwable.getMessage()); + connectionResetHandler.call(throwable); + }) + .retryWhen(retryLogic) + .doOnCompleted(this::resetConnected); } private void resetConnected() { @@ -247,72 +248,72 @@ private void resetConnected() { } protected Observable streamContent(HttpClientResponse response, - final Action1 updateDataRecvngStatus, - final long dataRecvTimeoutSecs, String delimiter) { + final Action1 updateDataRecvngStatus, + final long dataRecvTimeoutSecs, String delimiter) { long interval = Math.max(1, dataRecvTimeoutSecs / 2); if (updateDataRecvngStatus != null) { Observable.interval(interval, interval, TimeUnit.SECONDS) - .doOnNext((Long aLong) -> { - if (!isShutdown) { - if (hasDataDrop() || System.currentTimeMillis() > (lastDataReceived.get() + dataRecvTimeoutSecs * 1000)) { - if (isReceivingData.compareAndSet(true, false)) - synchronized (updateDataRecvngStatus) { - updateDataRecvngStatus.call(false); - } - } else { - if (isConnected.get() && isReceivingData.compareAndSet(false, true)) - synchronized (updateDataRecvngStatus) { - updateDataRecvngStatus.call(true); - } - } + .doOnNext((Long aLong) -> { + if (!isShutdown) { + if (hasDataDrop() || System.currentTimeMillis() > (lastDataReceived.get() + dataRecvTimeoutSecs * 1000)) { + if (isReceivingData.compareAndSet(true, false)) + synchronized (updateDataRecvngStatus) { + updateDataRecvngStatus.call(false); + } + } else { + if (isConnected.get() && isReceivingData.compareAndSet(false, true)) + synchronized (updateDataRecvngStatus) { + updateDataRecvngStatus.call(true); + } } - }) - .takeUntil(shutdownSubject) - .takeWhile((o) -> !isShutdown) - .doOnCompleted(() -> { - if (isReceivingData.compareAndSet(true, false)) - synchronized (updateDataRecvngStatus) { - updateDataRecvngStatus.call(false); - } - }) - .subscribe(); - } - return response.getContent() - .lift(new DropOperator(meterRegistry)) - .flatMap((ServerSentEvent t1) -> { - lastDataReceived.set(System.currentTimeMillis()); - if (isConnected.get() && isReceivingData.compareAndSet(false, true)) - if (updateDataRecvngStatus != null) - synchronized (updateDataRecvngStatus) { - updateDataRecvngStatus.call(true); - } - - if (t1.hasEventType() && t1.getEventTypeAsString().startsWith("error:")) { - return Observable.error(new SseException(ErrorType.Retryable, "Got error SSE event: " + t1.contentAsString())); } - return Observable.just(t1.contentAsString()); - }, 1) - .filter(data -> { - if (data.startsWith("ping")) { - pingCounter.increment(); - return this.disablePingFiltering; - } - return true; }) - .flatMapIterable((data) -> { - boolean useSnappy = true; - return CompressionUtils.decompressAndBase64Decode(data, compressedBinaryInputEnabled, useSnappy, delimiter); - }, 1) .takeUntil(shutdownSubject) - .takeWhile((event) -> !isShutdown); + .takeWhile((o) -> !isShutdown) + .doOnCompleted(() -> { + if (isReceivingData.compareAndSet(true, false)) + synchronized (updateDataRecvngStatus) { + updateDataRecvngStatus.call(false); + } + }) + .subscribe(); + } + return response.getContent() + .lift(new DropOperator(meterRegistry, metricGroup)) + .flatMap((ServerSentEvent t1) -> { + lastDataReceived.set(System.currentTimeMillis()); + if (isConnected.get() && isReceivingData.compareAndSet(false, true)) + if (updateDataRecvngStatus != null) + synchronized (updateDataRecvngStatus) { + updateDataRecvngStatus.call(true); + } + + if (t1.hasEventType() && t1.getEventTypeAsString().startsWith("error:")) { + return Observable.error(new SseException(ErrorType.Retryable, "Got error SSE event: " + t1.contentAsString())); + } + return Observable.just(t1.contentAsString()); + }, 1) + .filter(data -> { + if (data.startsWith("ping")) { + pingCounter.increment(); + return this.disablePingFiltering; + } + return true; + }) + .flatMapIterable((data) -> { + boolean useSnappy = true; + return CompressionUtils.decompressAndBase64Decode(data, compressedBinaryInputEnabled, useSnappy, delimiter); + }, 1) + .takeUntil(shutdownSubject) + .takeWhile((event) -> !isShutdown); } private boolean hasDataDrop() { long totalDataDrop = 0L; - if (meterRegistry.getMeters().contains(pingCounter)) { - final Counter dropped = meterRegistry.find(DROP_OPERATOR_INCOMING_METRIC_GROUP + "_" + ("" + DropOperator.Counters.dropped)).counter(); - final Counter onNext = meterRegistry.find(DROP_OPERATOR_INCOMING_METRIC_GROUP + "_" + ("" + DropOperator.Counters.onNext)).counter(); - } + final Counter dropped = Search.in(meterRegistry).name(name -> name.startsWith("DropOperator_") && name.endsWith(DropOperator.Counters.dropped.toString())).counter(); + final Counter onNext = Search.in(meterRegistry).name(name -> name.startsWith("DropOperator_") && name.endsWith(DropOperator.Counters.onNext.toString())).counter(); + if (dropped != null) + totalDataDrop += dropped.count(); if (totalDataDrop > lastDataDropValue) { lastDataDropValue = totalDataDrop; return true; diff --git a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnectionFunction.java b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnectionFunction.java index 9fe17c217..bb4718343 100644 --- a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnectionFunction.java +++ b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/SseWorkerConnectionFunction.java @@ -20,12 +20,13 @@ import com.mantisrx.common.utils.NettyUtils; import io.mantisrx.common.MantisServerSentEvent; -import io.mantisrx.common.metrics.spectator.MetricGroupId; import io.mantisrx.runtime.parameter.SinkParameters; import io.mantisrx.server.core.ServiceRegistry; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.search.Search; import io.reactivx.mantis.operators.DropOperator; +import io.reactivx.mantis.operators.DropOperator.Counters; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -68,10 +69,10 @@ public void run() { if (!metricGroups.isEmpty()) { for (String metricGroup : metricGroups) { if (metricGroup != null) { - final Counter onNext = meterRegistry.counter(metricGroup + "_DropOperator_" + ("" + DropOperator.Counters.onNext)); - final Counter onError = meterRegistry.counter(metricGroup + "_DropOperator_" + ("" + DropOperator.Counters.onError)); - final Counter onComplete = meterRegistry.counter(metricGroup + "_DropOperator_" + ("" + DropOperator.Counters.onComplete)); - final Counter dropped = meterRegistry.counter(metricGroup + "_DropOperator_" + ("" + DropOperator.Counters.dropped)); + final Counter onNext = Search.in(meterRegistry).name(name-> name.startsWith(metricGroup) && name.endsWith(DropOperator.Counters.onNext.toString())).counter(); + final Counter onError = Search.in(meterRegistry).name(name-> name.startsWith(metricGroup) && name.endsWith(DropOperator.Counters.onError.toString())).counter(); + final Counter onComplete = Search.in(meterRegistry).name(name-> name.startsWith(metricGroup) && name.endsWith(Counters.onComplete.toString())).counter(); + final Counter dropped = Search.in(meterRegistry).name(name-> name.startsWith(metricGroup) && name.endsWith(Counters.onNext.toString())).counter(); logger.info(metricGroup + ": onNext=" + onNext.count() + ", onError=" + onError.count() + ", onComplete=" + onComplete.count() + ", dropped=" + dropped.count() // + ", buffered=" + buffered.value() diff --git a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/WorkerMetricsClient.java b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/WorkerMetricsClient.java index 1783a8544..ad0046d1e 100644 --- a/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/WorkerMetricsClient.java +++ b/mantis-server/mantis-server-worker-client/src/main/java/io/mantisrx/server/worker/client/WorkerMetricsClient.java @@ -23,6 +23,7 @@ import io.mantisrx.server.master.client.HighAvailabilityServicesUtil; import io.mantisrx.server.master.client.MantisMasterGateway; import io.mantisrx.server.master.client.MasterClientWrapper; +import io.micrometer.core.instrument.MeterRegistry; import io.reactivex.mantis.remote.observable.EndpointChange; import java.util.Properties; import org.slf4j.Logger; @@ -39,7 +40,7 @@ public class WorkerMetricsClient { private final MasterClientWrapper clientWrapper; - private final JobWorkerMetricsLocator jobWrokerMetricsLocator = new JobWorkerMetricsLocator() { + private final JobWorkerMetricsLocator jobWorkerMetricsLocator = new JobWorkerMetricsLocator() { @Override public Observable locateWorkerMetricsForJob(final String jobId) { return clientWrapper.getMasterClientApi() @@ -101,7 +102,7 @@ public WorkerMetricsClient(MantisMasterGateway gateway) { public JobWorkerMetricsLocator getWorkerMetricsLocator() { - return jobWrokerMetricsLocator; + return jobWorkerMetricsLocator; } /* package */ MasterClientWrapper getClientWrapper() { @@ -131,6 +132,6 @@ public Integer call(MasterClientWrapper.JobNumWorkers jobNumWorkers) { return jobNumWorkers.getNumWorkers(); } }), - workerConnectionsStatusObserver, dataRecvTimeoutSecs); + workerConnectionsStatusObserver, dataRecvTimeoutSecs, meterRegistry); } } diff --git a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DataDroppedPayloadSetter.java b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DataDroppedPayloadSetter.java index fcdfc8778..cc4c18079 100644 --- a/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DataDroppedPayloadSetter.java +++ b/mantis-server/mantis-server-worker/src/main/java/io/mantisrx/server/worker/DataDroppedPayloadSetter.java @@ -71,29 +71,31 @@ protected void setPayload(final long intervalSecs) { long totalDropped = 0L; long totalOnNext = 0L; try { - if (meterRegistry.getMeters().contains(dropCountGauge) && meterRegistry.getMeters().contains(onNextCountGauge)){ - //logger.info("Got " + metrics.size() + " metrics for DropOperator"); - final Counter dropped = meterRegistry.find("DropOperator_" + "" + DropOperator.Counters.dropped).counter(); - final Counter onNext = meterRegistry.find("DropOperator_" + "" + DropOperator.Counters.onNext).counter(); - if (dropped != null) - totalDropped += dropped.count(); - else - logger.warn("Unexpected to get null dropped counter for metric DropOperator DropOperator_dropped."); - if (onNext != null) - totalOnNext += onNext.count(); - else - logger.warn("Unexpected to get null onNext counter for metric DropOperator_onNext."); - - final StatusPayloads.DataDropCounts dataDrop = new StatusPayloads.DataDropCounts(totalOnNext, totalDropped); - try { - heartbeat.addSingleUsePayload("" + StatusPayloads.Type.IncomingDataDrop, objectMapper.writeValueAsString(dataDrop)); - } catch (JsonProcessingException e) { - logger.warn("Error writing json for dataDrop payload: " + e.getMessage()); + for (Meter meter : meterRegistry.getMeters()) { + if (meter.getId().getName().startsWith("DropOperator_") && meter.getId().getName().endsWith(DropOperator.Counters.dropped.toString())){ + final Counter dropped = meterRegistry.find(meter.getId().getName()).counter(); + if (dropped != null) + totalDropped += dropped.count(); + else + logger.warn("Unexpected to get null dropped counter for metric DropOperator DropOperator_dropped."); + } + if (meter.getId().getName().startsWith("DropOperator_") && meter.getId().getName().endsWith(DropOperator.Counters.onNext.toString())){ + final Counter onNext = meterRegistry.find(meter.getId().getName()).counter(); + if (onNext != null) + totalOnNext += onNext.count(); + else + logger.warn("Unexpected to get null onNext counter for metric DropOperator_onNext."); } - dropCountValue.set(dataDrop.getDroppedCount()); - onNextCountValue.set(dataDrop.getOnNextCount()); - } else - logger.debug("Got no metrics from DropOperator"); + } + //logger.info("Got " + metrics.size() + " metrics for DropOperator"); + final StatusPayloads.DataDropCounts dataDrop = new StatusPayloads.DataDropCounts(totalOnNext, totalDropped); + try { + heartbeat.addSingleUsePayload("" + StatusPayloads.Type.IncomingDataDrop, objectMapper.writeValueAsString(dataDrop)); + } catch (JsonProcessingException e) { + logger.warn("Error writing json for dataDrop payload: " + e.getMessage()); + } + dropCountValue.set(dataDrop.getDroppedCount()); + onNextCountValue.set(dataDrop.getOnNextCount()); } catch (Exception e) { logger.error(e.getMessage(), e); }