Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
J. Wu authored and J. Wu committed Sep 26, 2023
1 parent af2c0f1 commit ba8be43
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 178 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ ext.versions = [
jsr305 : "3.0.1",
junit4 : "4.11",
junit5 : "5.4.+",
micrometer : "1.11.0",
mockito : "2.0.+",
mockito3 : "3.+",
spectator: "1.3.+",
Expand Down Expand Up @@ -73,6 +74,7 @@ ext.libraries = [
"org.junit.jupiter:junit-jupiter-params:${versions.junit5}",
],
mantisShaded : "io.mantisrx:mantis-shaded:2.0.8",
micrometerCore : "io.micrometer:micrometer-core:${versions.micrometer}",
mockitoAll : "org.mockito:mockito-all:${versions.mockito}",
mockitoCore : "org.mockito:mockito-core:${versions.mockito}",
mockitoCore3 : "org.mockito:mockito-core:${versions.mockito3}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import io.mantisrx.shaded.com.google.common.util.concurrent.Service;
import io.mantisrx.shaded.com.google.common.util.concurrent.Service.State;
import io.mantisrx.shaded.org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -119,17 +118,15 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {

private RuntimeTask currentTask;
private ExecuteStageRequest currentRequest;
private MeterRegistry meterRegistry;

public TaskExecutor(
RpcService rpcService,
WorkerConfiguration workerConfiguration,
HighAvailabilityServices highAvailabilityServices,
ClassLoaderHandle classLoaderHandle,
SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory,
MeterRegistry meterRegistry) {
SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory) {
this(rpcService, workerConfiguration, highAvailabilityServices, classLoaderHandle,
subscriptionStateHandlerFactory, null, meterRegistry);
subscriptionStateHandlerFactory, null);
}

public TaskExecutor(
Expand All @@ -138,8 +135,7 @@ public TaskExecutor(
HighAvailabilityServices highAvailabilityServices,
ClassLoaderHandle classLoaderHandle,
SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory,
@Nullable TaskFactory taskFactory,
MeterRegistry meterRegistry) {
@Nullable TaskFactory taskFactory) {
super(rpcService, RpcServiceUtils.createRandomName("worker"));

// this is the task executor ID that will be used for the rest of the JVM process
Expand Down Expand Up @@ -207,8 +203,8 @@ private void startTaskExecutorServices() {
validateRunsInMainThread();

masterMonitor = highAvailabilityServices.getMasterClientApi();
taskStatusUpdateHandler = TaskStatusUpdateHandler.forReportingToGateway(masterMonitor, meterRegistry);
RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory(meterRegistry));
taskStatusUpdateHandler = TaskStatusUpdateHandler.forReportingToGateway(masterMonitor);
RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory());
resourceClusterGatewaySupplier =
highAvailabilityServices.connectWithResourceManager(clusterID);
resourceClusterGatewaySupplier.register(new ResourceManagerChangeListener());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public class TaskExecutorStarter extends AbstractIdleService {
private final TaskExecutor taskExecutor;
private final HighAvailabilityServices highAvailabilityServices;
private final RpcSystem rpcSystem;
private final MeterRegistry meterRegistry;

@Override
protected void startUp() {
Expand Down Expand Up @@ -113,7 +112,6 @@ public static class TaskExecutorStarterBuilder {
private TaskFactory taskFactory;

private final List<Tuple2<TaskExecutor.Listener, Executor>> listeners = new ArrayList<>();
private MeterRegistry meterRegistry;

private TaskExecutorStarterBuilder(WorkerConfiguration workerConfiguration) {
this.workerConfiguration = workerConfiguration;
Expand All @@ -132,10 +130,6 @@ public TaskExecutorStarterBuilder rpcSystem(RpcSystem rpcSystem) {
return this;
}

public TaskExecutorStarterBuilder registry(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
return this;
}

private RpcSystem getRpcSystem() {
if (this.rpcSystem == null) {
Expand Down Expand Up @@ -214,14 +208,13 @@ public TaskExecutorStarter build() throws Exception {
highAvailabilityServices,
getClassLoaderHandle(),
getSinkSubscriptionHandlerFactory(),
this.taskFactory,
meterRegistry);
this.taskFactory);

for (Tuple2<TaskExecutor.Listener, Executor> listener : listeners) {
taskExecutor.addListener(listener._1(), listener._2());
}

return new TaskExecutorStarter(taskExecutor, highAvailabilityServices, getRpcSystem(), meterRegistry);
return new TaskExecutorStarter(taskExecutor, highAvailabilityServices, getRpcSystem());
}
}
}
2 changes: 1 addition & 1 deletion mantis-server/mantis-server-worker-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
dependencies {
api project(":mantis-control-plane:mantis-control-plane-core")
api project(":mantis-control-plane:mantis-control-plane-client")

implementation libraries.micrometerCore
testImplementation libraries.junit4
testImplementation libraries.mockitoAll
testImplementation libraries.spectatorApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void call() {
}
})
.share()
.lift(new DropOperator<Observable<T>>(meterRegistry, "client_metrics_share"))
.lift(new DropOperator<Observable<T>>("client_metrics_share"))
;
}

Expand Down
Loading

0 comments on commit ba8be43

Please sign in to comment.