Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
J. Wu authored and J. Wu committed Sep 5, 2023
1 parent c800100 commit 5fde01a
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 156 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ ext.versions = [
jsr305 : "3.0.1",
junit4 : "4.11",
junit5 : "5.4.+",
micrometer : "1.11.0",
mockito : "2.0.+",
mockito3 : "3.+",
spectator: "1.3.+",
Expand Down Expand Up @@ -73,6 +74,7 @@ ext.libraries = [
"org.junit.jupiter:junit-jupiter-params:${versions.junit5}",
],
mantisShaded : "io.mantisrx:mantis-shaded:2.0.8",
micrometerCore : "io.micrometer:micrometer-core:${versions.micrometer}",
mockitoAll : "org.mockito:mockito-all:${versions.mockito}",
mockitoCore : "org.mockito:mockito-core:${versions.mockito}",
mockitoCore3 : "org.mockito:mockito-core:${versions.mockito3}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public class TaskExecutorStarter extends AbstractIdleService {
private final TaskExecutor taskExecutor;
private final HighAvailabilityServices highAvailabilityServices;
private final RpcSystem rpcSystem;
private final MeterRegistry meterRegistry;

@Override
protected void startUp() {
Expand Down Expand Up @@ -221,7 +220,7 @@ public TaskExecutorStarter build() throws Exception {
taskExecutor.addListener(listener._1(), listener._2());
}

return new TaskExecutorStarter(taskExecutor, highAvailabilityServices, getRpcSystem(), meterRegistry);
return new TaskExecutorStarter(taskExecutor, highAvailabilityServices, getRpcSystem());
}
}
}
2 changes: 1 addition & 1 deletion mantis-server/mantis-server-worker-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
dependencies {
api project(":mantis-control-plane:mantis-control-plane-core")
api project(":mantis-control-plane:mantis-control-plane-client")

implementation libraries.micrometerCore
testImplementation libraries.junit4
testImplementation libraries.mockitoAll
testImplementation libraries.spectatorApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void call() {
}
})
.share()
.lift(new DropOperator<Observable<T>>(meterRegistry, "client_metrics_share"))
.lift(new DropOperator<Observable<T>>("client_metrics_share"))
;
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.mantisrx.server.master.client.HighAvailabilityServicesUtil;
import io.mantisrx.server.master.client.MantisMasterGateway;
import io.mantisrx.server.master.client.MasterClientWrapper;
import io.micrometer.core.instrument.MeterRegistry;
import io.reactivex.mantis.remote.observable.EndpointChange;
import java.util.Properties;
import org.slf4j.Logger;
Expand All @@ -39,7 +40,7 @@ public class WorkerMetricsClient {

private final MasterClientWrapper clientWrapper;

private final JobWorkerMetricsLocator jobWrokerMetricsLocator = new JobWorkerMetricsLocator() {
private final JobWorkerMetricsLocator jobWorkerMetricsLocator = new JobWorkerMetricsLocator() {
@Override
public Observable<EndpointChange> locateWorkerMetricsForJob(final String jobId) {
return clientWrapper.getMasterClientApi()
Expand Down Expand Up @@ -101,7 +102,7 @@ public WorkerMetricsClient(MantisMasterGateway gateway) {


public JobWorkerMetricsLocator getWorkerMetricsLocator() {
return jobWrokerMetricsLocator;
return jobWorkerMetricsLocator;
}

/* package */ MasterClientWrapper getClientWrapper() {
Expand Down Expand Up @@ -131,6 +132,6 @@ public Integer call(MasterClientWrapper.JobNumWorkers jobNumWorkers) {
return jobNumWorkers.getNumWorkers();
}
}),
workerConnectionsStatusObserver, dataRecvTimeoutSecs);
workerConnectionsStatusObserver, dataRecvTimeoutSecs, meterRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,26 @@ protected void setPayload(final long intervalSecs) {
long totalDropped = 0L;
long totalOnNext = 0L;
try {
if (meterRegistry.getMeters().contains(dropCountGauge) && meterRegistry.getMeters().contains(onNextCountGauge)){
//logger.info("Got " + metrics.size() + " metrics for DropOperator");
final Counter dropped = meterRegistry.find("DropOperator_" + "" + DropOperator.Counters.dropped).counter();
final Counter onNext = meterRegistry.find("DropOperator_" + "" + DropOperator.Counters.onNext).counter();
if (dropped != null)
totalDropped += dropped.count();
else
logger.warn("Unexpected to get null dropped counter for metric DropOperator DropOperator_dropped.");
if (onNext != null)
totalOnNext += onNext.count();
else
logger.warn("Unexpected to get null onNext counter for metric DropOperator_onNext.");
//logger.info("Got " + metrics.size() + " metrics for DropOperator");
final Counter dropped = meterRegistry.find("DropOperator_" + DropOperator.Counters.dropped.toString()).counter();
final Counter onNext = meterRegistry.find("DropOperator_" + DropOperator.Counters.onNext.toString()).counter();
if (dropped != null)
totalDropped += dropped.count();
else
logger.warn("Unexpected to get null dropped counter for metric DropOperator DropOperator_dropped.");
if (onNext != null)
totalOnNext += onNext.count();
else
logger.warn("Unexpected to get null onNext counter for metric DropOperator_onNext.");

final StatusPayloads.DataDropCounts dataDrop = new StatusPayloads.DataDropCounts(totalOnNext, totalDropped);
try {
heartbeat.addSingleUsePayload("" + StatusPayloads.Type.IncomingDataDrop, objectMapper.writeValueAsString(dataDrop));
} catch (JsonProcessingException e) {
logger.warn("Error writing json for dataDrop payload: " + e.getMessage());
}
dropCountValue.set(dataDrop.getDroppedCount());
onNextCountValue.set(dataDrop.getOnNextCount());
} else
logger.debug("Got no metrics from DropOperator");
final StatusPayloads.DataDropCounts dataDrop = new StatusPayloads.DataDropCounts(totalOnNext, totalDropped);
try {
heartbeat.addSingleUsePayload("" + StatusPayloads.Type.IncomingDataDrop, objectMapper.writeValueAsString(dataDrop));
} catch (JsonProcessingException e) {
logger.warn("Error writing json for dataDrop payload: " + e.getMessage());
}
dropCountValue.set(dataDrop.getDroppedCount());
onNextCountValue.set(dataDrop.getOnNextCount());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
Expand Down

0 comments on commit 5fde01a

Please sign in to comment.