From 6890a8d7bbffb0961c1934ed2413ae406e4eb11e Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Thu, 14 Sep 2023 00:02:06 -0700 Subject: [PATCH 1/3] unblock calls on cache+reconnect --- .../resourcecluster/ResourceCluster.java | 2 +- .../resourcecluster/ResourceClusterActor.java | 27 ++++++++++++++----- .../ResourceClusterAkkaImpl.java | 4 +-- .../resourcecluster/TaskExecutorState.java | 7 +++++ .../ResourceClusterAwareSchedulerActor.java | 10 ++++++- 5 files changed, 40 insertions(+), 10 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java index 0048402b2..b791ca630 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java +++ b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java @@ -118,7 +118,7 @@ CompletableFuture getTaskExecutorFor( */ CompletableFuture getTaskExecutorGateway(TaskExecutorID taskExecutorID); - CompletableFuture reconnectTaskExecutorGateway(TaskExecutorID taskExecutorID); + CompletableFuture reconnectTaskExecutorGateway(TaskExecutorID taskExecutorID); CompletableFuture getTaskExecutorInfo(String hostName); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java index b9c0f7e49..72b336bee 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java @@ -50,7 +50,6 @@ import io.mantisrx.server.master.resourcecluster.TaskExecutorReport.Occupied; import io.mantisrx.server.master.resourcecluster.TaskExecutorStatusChange; import io.mantisrx.server.master.scheduler.JobMessageRouter; -import io.mantisrx.server.worker.TaskExecutorGateway; import io.mantisrx.server.worker.TaskExecutorGateway.TaskNotFoundException; import io.mantisrx.shaded.com.google.common.base.Preconditions; import io.mantisrx.shaded.com.google.common.collect.Comparators; @@ -399,7 +398,12 @@ private void onTaskExecutorGatewayReconnectRequest(TaskExecutorGatewayReconnectR } else { try { if (state.isRegistered()) { - sender().tell(state.reconnect().join(), self()); + state.reconnect().whenComplete((res, throwable) -> { + if (throwable != null) { + log.error("failed to reconnect to {}", request.getTaskExecutorID(), throwable); + } + }); + sender().tell(Ack.getInstance(), self()); } else { sender().tell( new Status.Failure( @@ -813,11 +817,22 @@ private void onCacheJobArtifactsOnTaskExecutorRequest(CacheJobArtifactsOnTaskExe TaskExecutorState state = this.executorStateManager.get(request.getTaskExecutorID()); if (state != null && state.isRegistered()) { try { - TaskExecutorGateway gateway = state.getGateway(); // TODO(fdichiara): store URI directly to avoid remapping for each TE - List artifacts = jobArtifactsToCache.stream().map(artifactID -> URI.create(artifactID.getResourceID())).collect(Collectors.toList()); - - gateway.cacheJobArtifacts(new CacheJobArtifactsRequest(artifacts)); + state.getGatewayAsync() + .thenComposeAsync(taskExecutorGateway -> + taskExecutorGateway.cacheJobArtifacts(new CacheJobArtifactsRequest( + jobArtifactsToCache + .stream() + .map(artifactID -> URI.create(artifactID.getResourceID())) + .collect(Collectors.toList())))) + .whenComplete((res, throwable) -> { + if (throwable != null) { + log.error("failed to cache artifact on {}", request.getTaskExecutorID(), throwable); + } + else { + log.debug("Acked from cacheJobArtifacts for {}", request.getTaskExecutorID()); + } + }); } catch (Exception ex) { log.warn("Failed to cache job artifacts in task executor {}", request.getTaskExecutorID(), ex); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java index baa705fb6..c507b375d 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java @@ -204,13 +204,13 @@ public CompletableFuture getTaskExecutorGateway( } @Override - public CompletableFuture reconnectTaskExecutorGateway( + public CompletableFuture reconnectTaskExecutorGateway( TaskExecutorID taskExecutorID) { return Patterns .ask(resourceClusterManagerActor, new TaskExecutorGatewayReconnectRequest(taskExecutorID, clusterID), askTimeout) - .thenApply(TaskExecutorGateway.class::cast) + .thenApply(Ack.class::cast) .toCompletableFuture(); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java index 170dced19..834eacc5c 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java @@ -259,6 +259,13 @@ protected TaskExecutorGateway getGateway() throws ExecutionException, Interrupte return this.gateway.get(); } + protected CompletableFuture getGatewayAsync() { + if (this.gateway == null) { + throw new IllegalStateException("gateway is null"); + } + return this.gateway; + } + protected CompletableFuture reconnect() { this.gateway = rpcService.connect(registration.getTaskExecutorAddress(), TaskExecutorGateway.class) .whenComplete((gateway, throwable) -> { diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java index 314d1f120..ded25bc89 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java @@ -222,7 +222,15 @@ private void onFailedToSubmitScheduleRequestEvent(FailedToSubmitScheduleRequestE Throwables.getStackTraceAsString(event.throwable))); try { - resourceCluster.reconnectTaskExecutorGateway(event.getTaskExecutorID()).join(); + resourceCluster.reconnectTaskExecutorGateway(event.getTaskExecutorID()) + .whenComplete((res, throwable) -> { + if (throwable != null) { + log.error("Failed to request reconnect to gateway for {}", event.getTaskExecutorID(), throwable); + } + else { + log.debug("Acked from reconnection request for {}", event.getTaskExecutorID()); + } + }); } catch (Exception e) { log.warn( "Failed to establish re-connection with the task executor {} on failed schedule request", From 88c5f9fe5d4dec40228f92b1a7bec2d3c6371679 Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Thu, 14 Sep 2023 11:39:48 -0700 Subject: [PATCH 2/3] refactor submit/cancel task --- .../resourcecluster/ResourceClusterActor.java | 3 +- .../ResourceClusterAkkaImpl.java | 17 +++- .../resourcecluster/TaskExecutorState.java | 8 -- .../ResourceClusterAwareSchedulerActor.java | 96 +++++++++++-------- 4 files changed, 71 insertions(+), 53 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java index 72b336bee..f78b816d3 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java @@ -357,13 +357,14 @@ private void onTaskExecutorGatewayRequest(TaskExecutorGatewayRequest request) { } else { try { if (state.isRegistered()) { - sender().tell(state.getGateway(), self()); + sender().tell(state.getGatewayAsync(), self()); } else { sender().tell( new Status.Failure(new IllegalStateException("Unregistered TaskExecutor: " + request.getTaskExecutorID())), self()); } } catch (Exception e) { + log.error("onTaskExecutorGatewayRequest error: {}", request, e); metrics.incrementCounter( ResourceClusterActorMetrics.TE_CONNECTION_FAILURE, TagList.create(ImmutableMap.of( diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java index c507b375d..6d0e98366 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java @@ -197,10 +197,19 @@ public CompletableFuture getTaskExecutorAssignedFor(WorkerId wor public CompletableFuture getTaskExecutorGateway( TaskExecutorID taskExecutorID) { return - Patterns - .ask(resourceClusterManagerActor, new TaskExecutorGatewayRequest(taskExecutorID, clusterID), askTimeout) - .thenApply(TaskExecutorGateway.class::cast) - .toCompletableFuture(); + (CompletableFuture) Patterns + .ask(resourceClusterManagerActor, new TaskExecutorGatewayRequest(taskExecutorID, clusterID), + askTimeout) + .thenComposeAsync(result -> { + if (result instanceof CompletableFuture) { + return (CompletableFuture) result; + } else { + CompletableFuture exceptionFuture = new CompletableFuture<>(); + exceptionFuture.completeExceptionally(new RuntimeException( + "Unexpected object type on getTaskExecutorGateway: " + result.getClass().getName())); + return exceptionFuture; + } + }); } @Override diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java index 834eacc5c..091bead66 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java @@ -34,7 +34,6 @@ import java.time.Instant; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import javax.annotation.Nullable; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -252,13 +251,6 @@ TaskExecutorRegistration getRegistration() { return this.registration; } - protected TaskExecutorGateway getGateway() throws ExecutionException, InterruptedException { - if (this.gateway == null) { - throw new IllegalStateException("gateway is null"); - } - return this.gateway.get(); - } - protected CompletableFuture getGatewayAsync() { if (this.gateway == null) { throw new IllegalStateException("gateway is null"); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java index ded25bc89..af843ea40 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java @@ -20,8 +20,10 @@ import akka.actor.AbstractActorWithTimers; import akka.actor.Props; +import akka.actor.Status.Failure; import akka.japi.pf.ReceiveBuilder; import com.netflix.spectator.api.Tag; +import io.mantisrx.common.Ack; import io.mantisrx.common.metrics.Counter; import io.mantisrx.common.metrics.Metrics; import io.mantisrx.common.metrics.MetricsRegistry; @@ -40,6 +42,7 @@ import java.time.Instant; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import lombok.Value; @@ -112,6 +115,8 @@ public Receive createReceive() { .match(FailedToSubmitScheduleRequestEvent.class, this::onFailedToSubmitScheduleRequestEvent) .match(RetryCancelRequestEvent.class, this::onRetryCancelRequestEvent) .match(Noop.class, this::onNoop) + .match(Ack.class, ack -> log.debug("Received ack from {}", sender())) + .match(Failure.class, failure -> log.error("Received failure from {}: {}", sender(), failure)) .build(); } @@ -139,38 +144,48 @@ private void onInitializeRunningWorkerRequest(InitializeRunningWorkerRequestEven } private void onAssignedScheduleRequestEvent(AssignedScheduleRequestEvent event) { - TaskExecutorGateway gateway = null; - TaskExecutorRegistration info = null; try { - gateway = resourceCluster.getTaskExecutorGateway(event.getTaskExecutorID()).join(); - info = resourceCluster.getTaskExecutorInfo(event.getTaskExecutorID()).join(); + CompletableFuture gatewayFut = resourceCluster.getTaskExecutorGateway(event.getTaskExecutorID()); + TaskExecutorRegistration info = resourceCluster.getTaskExecutorInfo(event.getTaskExecutorID()).join(); + + if (gatewayFut != null && info != null) { + CompletionStage ackFuture = + gatewayFut + .thenComposeAsync(gateway -> + gateway + .submitTask( + executeStageRequestFactory.of( + event.getScheduleRequestEvent().getRequest(), + info)) + .thenApply( + dontCare -> new SubmittedScheduleRequestEvent( + event.getScheduleRequestEvent(), + event.getTaskExecutorID())) + .exceptionally( + throwable -> new FailedToSubmitScheduleRequestEvent( + event.getScheduleRequestEvent(), + event.getTaskExecutorID(), throwable)) + .whenCompleteAsync((res, err) -> + { + if (err == null) { + log.debug("[Submit Task] finish with {}", res); + } + else { + log.error("[Submit Task] fail: {}", event.getTaskExecutorID(), err); + } + }) + + ); + pipe(ackFuture, getContext().getDispatcher()).to(self()); + } } catch (Exception e) { // we are not able to get the gateway, which either means the node is not great or some transient network issue // we will retry the request log.warn( - "Failed to establish connection with the task executor {}; Resubmitting the request", + "Failed to submit task with the task executor {}; Resubmitting the request", event.getTaskExecutorID(), e); - connectionFailures.increment(); self().tell(event.getScheduleRequestEvent().onFailure(e), self()); } - - if (gateway != null && info != null) { - CompletableFuture ackFuture = - gateway - .submitTask( - executeStageRequestFactory.of(event.getScheduleRequestEvent().getRequest(), - info)) - .thenApply( - dontCare -> new SubmittedScheduleRequestEvent( - event.getScheduleRequestEvent(), - event.getTaskExecutorID())) - .exceptionally( - throwable -> new FailedToSubmitScheduleRequestEvent( - event.getScheduleRequestEvent(), - event.getTaskExecutorID(), throwable)); - - pipe(ackFuture, getContext().getDispatcher()).to(self()); - } } private void onFailedScheduleRequestEvent(FailedToScheduleRequestEvent event) { @@ -187,6 +202,7 @@ private void onFailedScheduleRequestEvent(FailedToScheduleRequestEvent event) { } private void onSubmittedScheduleRequestEvent(SubmittedScheduleRequestEvent event) { + log.debug("[Submit Task]: receive SubmittedScheduleRequestEvent: {}", event); final TaskExecutorID taskExecutorID = event.getTaskExecutorID(); try { final TaskExecutorRegistration info = resourceCluster.getTaskExecutorInfo(taskExecutorID) @@ -245,24 +261,24 @@ private void onCancelRequestEvent(CancelRequestEvent event) { getTimers().cancel(getSchedulingQueueKeyFor(event.getWorkerId())); final TaskExecutorID taskExecutorID = resourceCluster.getTaskExecutorAssignedFor(event.getWorkerId()).join(); - final TaskExecutorGateway gateway = - resourceCluster.getTaskExecutorGateway(taskExecutorID).join(); CompletableFuture cancelFuture = - gateway - .cancelTask(event.getWorkerId()) - .thenApply(dontCare -> Noop.getInstance()) - .exceptionally(exception -> { - Throwable actual = - ExceptionUtils.stripCompletionException( - ExceptionUtils.stripExecutionException(exception)); - // no need to retry if the TaskExecutor does not know about the task anymore. - if (actual instanceof TaskNotFoundException) { - return Noop.getInstance(); - } else { - return event.onFailure(actual); - } - }); + resourceCluster.getTaskExecutorGateway(taskExecutorID) + .thenComposeAsync(gateway -> + gateway + .cancelTask(event.getWorkerId()) + .thenApply(dontCare -> Noop.getInstance()) + .exceptionally(exception -> { + Throwable actual = + ExceptionUtils.stripCompletionException( + ExceptionUtils.stripExecutionException(exception)); + // no need to retry if the TaskExecutor does not know about the task anymore. + if (actual instanceof TaskNotFoundException) { + return Noop.getInstance(); + } else { + return event.onFailure(actual); + } + })); pipe(cancelFuture, context().dispatcher()).to(self()); } catch (Exception e) { From f5be7bce34c39f5d2c6cc0b5d144f15946b19314 Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Thu, 14 Sep 2023 11:42:34 -0700 Subject: [PATCH 3/3] rename --- .../mantisrx/server/master/resourcecluster/ResourceCluster.java | 2 +- .../master/resourcecluster/ResourceClusterAkkaImpl.java | 2 +- .../master/scheduler/ResourceClusterAwareSchedulerActor.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java index b791ca630..f63f5ccb1 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java +++ b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java @@ -118,7 +118,7 @@ CompletableFuture getTaskExecutorFor( */ CompletableFuture getTaskExecutorGateway(TaskExecutorID taskExecutorID); - CompletableFuture reconnectTaskExecutorGateway(TaskExecutorID taskExecutorID); + CompletableFuture reconnectGateway(TaskExecutorID taskExecutorID); CompletableFuture getTaskExecutorInfo(String hostName); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java index 6d0e98366..896ce6cf1 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java @@ -213,7 +213,7 @@ public CompletableFuture getTaskExecutorGateway( } @Override - public CompletableFuture reconnectTaskExecutorGateway( + public CompletableFuture reconnectGateway( TaskExecutorID taskExecutorID) { return Patterns diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java index af843ea40..2f0b07092 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java @@ -238,7 +238,7 @@ private void onFailedToSubmitScheduleRequestEvent(FailedToSubmitScheduleRequestE Throwables.getStackTraceAsString(event.throwable))); try { - resourceCluster.reconnectTaskExecutorGateway(event.getTaskExecutorID()) + resourceCluster.reconnectGateway(event.getTaskExecutorID()) .whenComplete((res, throwable) -> { if (throwable != null) { log.error("Failed to request reconnect to gateway for {}", event.getTaskExecutorID(), throwable);