Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
J. Wu authored and J. Wu committed Sep 10, 2023
1 parent af2c0f1 commit 848943c
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 159 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ ext.versions = [
jsr305 : "3.0.1",
junit4 : "4.11",
junit5 : "5.4.+",
micrometer : "1.11.0",
mockito : "2.0.+",
mockito3 : "3.+",
spectator: "1.3.+",
Expand Down Expand Up @@ -73,6 +74,7 @@ ext.libraries = [
"org.junit.jupiter:junit-jupiter-params:${versions.junit5}",
],
mantisShaded : "io.mantisrx:mantis-shaded:2.0.8",
micrometerCore : "io.micrometer:micrometer-core:${versions.micrometer}",
mockitoAll : "org.mockito:mockito-all:${versions.mockito}",
mockitoCore : "org.mockito:mockito-core:${versions.mockito}",
mockitoCore3 : "org.mockito:mockito-core:${versions.mockito3}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public class TaskExecutorStarter extends AbstractIdleService {
private final TaskExecutor taskExecutor;
private final HighAvailabilityServices highAvailabilityServices;
private final RpcSystem rpcSystem;
private final MeterRegistry meterRegistry;

@Override
protected void startUp() {
Expand Down Expand Up @@ -221,7 +220,7 @@ public TaskExecutorStarter build() throws Exception {
taskExecutor.addListener(listener._1(), listener._2());
}

return new TaskExecutorStarter(taskExecutor, highAvailabilityServices, getRpcSystem(), meterRegistry);
return new TaskExecutorStarter(taskExecutor, highAvailabilityServices, getRpcSystem());
}
}
}
2 changes: 1 addition & 1 deletion mantis-server/mantis-server-worker-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
dependencies {
api project(":mantis-control-plane:mantis-control-plane-core")
api project(":mantis-control-plane:mantis-control-plane-client")

implementation libraries.micrometerCore
testImplementation libraries.junit4
testImplementation libraries.mockitoAll
testImplementation libraries.spectatorApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void call() {
}
})
.share()
.lift(new DropOperator<Observable<T>>(meterRegistry, "client_metrics_share"))
.lift(new DropOperator<Observable<T>>("client_metrics_share"))
;
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@

import com.mantisrx.common.utils.NettyUtils;
import io.mantisrx.common.MantisServerSentEvent;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.runtime.parameter.SinkParameters;
import io.mantisrx.server.core.ServiceRegistry;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.search.Search;
import io.reactivx.mantis.operators.DropOperator;
import io.reactivx.mantis.operators.DropOperator.Counters;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
Expand Down Expand Up @@ -68,10 +69,10 @@ public void run() {
if (!metricGroups.isEmpty()) {
for (String metricGroup : metricGroups) {
if (metricGroup != null) {
final Counter onNext = meterRegistry.counter(metricGroup + "_DropOperator_" + ("" + DropOperator.Counters.onNext));
final Counter onError = meterRegistry.counter(metricGroup + "_DropOperator_" + ("" + DropOperator.Counters.onError));
final Counter onComplete = meterRegistry.counter(metricGroup + "_DropOperator_" + ("" + DropOperator.Counters.onComplete));
final Counter dropped = meterRegistry.counter(metricGroup + "_DropOperator_" + ("" + DropOperator.Counters.dropped));
final Counter onNext = Search.in(meterRegistry).name(name-> name.startsWith(metricGroup) && name.endsWith(DropOperator.Counters.onNext.toString())).counter();
final Counter onError = Search.in(meterRegistry).name(name-> name.startsWith(metricGroup) && name.endsWith(DropOperator.Counters.onError.toString())).counter();
final Counter onComplete = Search.in(meterRegistry).name(name-> name.startsWith(metricGroup) && name.endsWith(Counters.onComplete.toString())).counter();
final Counter dropped = Search.in(meterRegistry).name(name-> name.startsWith(metricGroup) && name.endsWith(Counters.onNext.toString())).counter();
logger.info(metricGroup + ": onNext=" + onNext.count() + ", onError=" + onError.count() +
", onComplete=" + onComplete.count() + ", dropped=" + dropped.count()
// + ", buffered=" + buffered.value()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.mantisrx.server.master.client.HighAvailabilityServicesUtil;
import io.mantisrx.server.master.client.MantisMasterGateway;
import io.mantisrx.server.master.client.MasterClientWrapper;
import io.micrometer.core.instrument.MeterRegistry;
import io.reactivex.mantis.remote.observable.EndpointChange;
import java.util.Properties;
import org.slf4j.Logger;
Expand All @@ -39,7 +40,7 @@ public class WorkerMetricsClient {

private final MasterClientWrapper clientWrapper;

private final JobWorkerMetricsLocator jobWrokerMetricsLocator = new JobWorkerMetricsLocator() {
private final JobWorkerMetricsLocator jobWorkerMetricsLocator = new JobWorkerMetricsLocator() {
@Override
public Observable<EndpointChange> locateWorkerMetricsForJob(final String jobId) {
return clientWrapper.getMasterClientApi()
Expand Down Expand Up @@ -101,7 +102,7 @@ public WorkerMetricsClient(MantisMasterGateway gateway) {


public JobWorkerMetricsLocator getWorkerMetricsLocator() {
return jobWrokerMetricsLocator;
return jobWorkerMetricsLocator;
}

/* package */ MasterClientWrapper getClientWrapper() {
Expand Down Expand Up @@ -131,6 +132,6 @@ public Integer call(MasterClientWrapper.JobNumWorkers jobNumWorkers) {
return jobNumWorkers.getNumWorkers();
}
}),
workerConnectionsStatusObserver, dataRecvTimeoutSecs);
workerConnectionsStatusObserver, dataRecvTimeoutSecs, meterRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,31 @@ protected void setPayload(final long intervalSecs) {
long totalDropped = 0L;
long totalOnNext = 0L;
try {
if (meterRegistry.getMeters().contains(dropCountGauge) && meterRegistry.getMeters().contains(onNextCountGauge)){
//logger.info("Got " + metrics.size() + " metrics for DropOperator");
final Counter dropped = meterRegistry.find("DropOperator_" + "" + DropOperator.Counters.dropped).counter();
final Counter onNext = meterRegistry.find("DropOperator_" + "" + DropOperator.Counters.onNext).counter();
if (dropped != null)
totalDropped += dropped.count();
else
logger.warn("Unexpected to get null dropped counter for metric DropOperator DropOperator_dropped.");
if (onNext != null)
totalOnNext += onNext.count();
else
logger.warn("Unexpected to get null onNext counter for metric DropOperator_onNext.");

final StatusPayloads.DataDropCounts dataDrop = new StatusPayloads.DataDropCounts(totalOnNext, totalDropped);
try {
heartbeat.addSingleUsePayload("" + StatusPayloads.Type.IncomingDataDrop, objectMapper.writeValueAsString(dataDrop));
} catch (JsonProcessingException e) {
logger.warn("Error writing json for dataDrop payload: " + e.getMessage());
for (Meter meter : meterRegistry.getMeters()) {
if (meter.getId().getName().startsWith("DropOperator_") && meter.getId().getName().endsWith(DropOperator.Counters.dropped.toString())){
final Counter dropped = meterRegistry.find(meter.getId().getName()).counter();
if (dropped != null)
totalDropped += dropped.count();
else
logger.warn("Unexpected to get null dropped counter for metric DropOperator DropOperator_dropped.");
}
if (meter.getId().getName().startsWith("DropOperator_") && meter.getId().getName().endsWith(DropOperator.Counters.onNext.toString())){
final Counter onNext = meterRegistry.find(meter.getId().getName()).counter();
if (onNext != null)
totalOnNext += onNext.count();
else
logger.warn("Unexpected to get null onNext counter for metric DropOperator_onNext.");
}
dropCountValue.set(dataDrop.getDroppedCount());
onNextCountValue.set(dataDrop.getOnNextCount());
} else
logger.debug("Got no metrics from DropOperator");
}
//logger.info("Got " + metrics.size() + " metrics for DropOperator");
final StatusPayloads.DataDropCounts dataDrop = new StatusPayloads.DataDropCounts(totalOnNext, totalDropped);
try {
heartbeat.addSingleUsePayload("" + StatusPayloads.Type.IncomingDataDrop, objectMapper.writeValueAsString(dataDrop));
} catch (JsonProcessingException e) {
logger.warn("Error writing json for dataDrop payload: " + e.getMessage());
}
dropCountValue.set(dataDrop.getDroppedCount());
onNextCountValue.set(dataDrop.getOnNextCount());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
Expand Down

0 comments on commit 848943c

Please sign in to comment.