From 20e5d26dfd408483b59c5b634480b48b049aab72 Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Fri, 13 Oct 2023 15:47:39 -0700 Subject: [PATCH] refactor TE reconnect (#564) --- .../resourcecluster/ResourceCluster.java | 2 - .../resourcecluster/ResourceClusterActor.java | 57 ------------------- .../ResourceClusterAkkaImpl.java | 12 ---- .../ResourceClustersManagerActor.java | 3 - .../resourcecluster/TaskExecutorState.java | 23 +++++--- .../ResourceClusterAwareSchedulerActor.java | 48 ++++++++-------- 6 files changed, 37 insertions(+), 108 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java index f63f5ccb1..18d68dcd4 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java +++ b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java @@ -118,8 +118,6 @@ CompletableFuture getTaskExecutorFor( */ CompletableFuture getTaskExecutorGateway(TaskExecutorID taskExecutorID); - CompletableFuture reconnectGateway(TaskExecutorID taskExecutorID); - CompletableFuture getTaskExecutorInfo(String hostName); /** diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java index f78b816d3..954e6ebb6 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java @@ -35,7 +35,6 @@ import io.mantisrx.server.master.persistence.MantisJobStore; import io.mantisrx.server.master.resourcecluster.ClusterID; import io.mantisrx.server.master.resourcecluster.PagedActiveJobOverview; -import io.mantisrx.server.master.resourcecluster.ResourceCluster.ConnectionFailedException; import io.mantisrx.server.master.resourcecluster.ResourceCluster.NoResourceAvailableException; import io.mantisrx.server.master.resourcecluster.ResourceCluster.ResourceOverview; import io.mantisrx.server.master.resourcecluster.ResourceCluster.TaskExecutorStatus; @@ -209,7 +208,6 @@ public Receive createReceive() { .match(ResourceOverviewRequest.class, this::onResourceOverviewRequest) .match(TaskExecutorInfoRequest.class, this::onTaskExecutorInfoRequest) .match(TaskExecutorGatewayRequest.class, this::onTaskExecutorGatewayRequest) - .match(TaskExecutorGatewayReconnectRequest.class, this::onTaskExecutorGatewayReconnectRequest) .match(DisableTaskExecutorsRequest.class, this::onNewDisableTaskExecutorsRequest) .match(CheckDisabledTaskExecutors.class, this::findAndMarkDisabledTaskExecutors) .match(ExpireDisableTaskExecutorsRequest.class, this::onDisableTaskExecutorsRequestExpiry) @@ -372,54 +370,6 @@ private void onTaskExecutorGatewayRequest(TaskExecutorGatewayRequest request) { clusterID.getResourceID(), "taskExecutor", request.getTaskExecutorID().getResourceId()))); - try { - // let's try one more time by reconnecting with the gateway. - sender().tell(state.reconnect().join(), self()); - } catch (Exception e1) { - metrics.incrementCounter( - ResourceClusterActorMetrics.TE_RECONNECTION_FAILURE, - TagList.create(ImmutableMap.of( - "resourceCluster", - clusterID.getResourceID(), - "taskExecutor", - request.getTaskExecutorID().getResourceId()))); - sender().tell(new Status.Failure(new ConnectionFailedException(e)), self()); - } - } - } - } - - private void onTaskExecutorGatewayReconnectRequest(TaskExecutorGatewayReconnectRequest request) { - log.info("Requesting to reconnect to TaskExecutor: {}", request); - TaskExecutorState state = this.executorStateManager.get(request.getTaskExecutorID()); - if (state == null) { - sender().tell( - new Status.Failure(new NullPointerException("Null TaskExecutor state: " + request.getTaskExecutorID())), - self()); - } else { - try { - if (state.isRegistered()) { - state.reconnect().whenComplete((res, throwable) -> { - if (throwable != null) { - log.error("failed to reconnect to {}", request.getTaskExecutorID(), throwable); - } - }); - sender().tell(Ack.getInstance(), self()); - } else { - sender().tell( - new Status.Failure( - new IllegalStateException("Unregistered TaskExecutor: " + request.getTaskExecutorID())), - self()); - } - } catch (Exception e) { - metrics.incrementCounter( - ResourceClusterActorMetrics.TE_RECONNECTION_FAILURE, - TagList.create(ImmutableMap.of( - "resourceCluster", - clusterID.getResourceID(), - "taskExecutor", - request.getTaskExecutorID().getResourceId()))); - sender().tell(new Status.Failure(new ConnectionFailedException(e)), self()); } } } @@ -921,13 +871,6 @@ static class TaskExecutorGatewayRequest { ClusterID clusterID; } - @Value - static class TaskExecutorGatewayReconnectRequest { - TaskExecutorID taskExecutorID; - - ClusterID clusterID; - } - @Value static class GetRegisteredTaskExecutorsRequest implements HasAttributes { ClusterID clusterID; diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java index 896ce6cf1..6204b5469 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java @@ -34,7 +34,6 @@ import io.mantisrx.master.resourcecluster.ResourceClusterActor.RemoveJobArtifactsToCacheRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.ResourceOverviewRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest; -import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayReconnectRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorInfoRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorsList; @@ -212,17 +211,6 @@ public CompletableFuture getTaskExecutorGateway( }); } - @Override - public CompletableFuture reconnectGateway( - TaskExecutorID taskExecutorID) { - return - Patterns - .ask(resourceClusterManagerActor, new TaskExecutorGatewayReconnectRequest(taskExecutorID, clusterID), - askTimeout) - .thenApply(Ack.class::cast) - .toCompletableFuture(); - } - @Override public CompletableFuture getTaskExecutorInfo(String hostName) { return diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java index 199f43d24..382a3f8a6 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java @@ -34,7 +34,6 @@ import io.mantisrx.master.resourcecluster.ResourceClusterActor.RemoveJobArtifactsToCacheRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.ResourceOverviewRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest; -import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayReconnectRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorInfoRequest; import io.mantisrx.master.resourcecluster.ResourceClusterScalerActor.TriggerClusterRuleRefreshRequest; @@ -144,8 +143,6 @@ public Receive createReceive() { getRCActor(req.getClusterID()).forward(req, context())) .match(TaskExecutorGatewayRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context())) - .match(TaskExecutorGatewayReconnectRequest.class, req -> - getRCActor(req.getClusterID()).forward(req, context())) .match(DisableTaskExecutorsRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context())) .match(AddNewJobArtifactsToCacheRequest.class, req -> diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java index 091bead66..6792eb0d8 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java @@ -252,19 +252,24 @@ TaskExecutorRegistration getRegistration() { } protected CompletableFuture getGatewayAsync() { + if (this.registration == null || this.state == RegistrationState.Unregistered) { + throw new IllegalStateException("TE is unregistered"); + } + if (this.gateway == null) { throw new IllegalStateException("gateway is null"); } - return this.gateway; - } - protected CompletableFuture reconnect() { - this.gateway = rpcService.connect(registration.getTaskExecutorAddress(), TaskExecutorGateway.class) - .whenComplete((gateway, throwable) -> { - if (throwable != null) { - log.error("Failed to connect to the gateway", throwable); - } - }); + if (this.gateway.isCompletedExceptionally()) { + log.warn("gateway connection encountered error, reconnect: {}.", registration.getTaskExecutorAddress()); + this.gateway = rpcService.connect(registration.getTaskExecutorAddress(), TaskExecutorGateway.class) + .whenComplete((gateway, throwable) -> { + if (throwable != null) { + log.error("Failed to connect to the gateway", throwable); + } + }); + } + return this.gateway; } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java index e384c98b5..46f895031 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java @@ -162,9 +162,12 @@ private void onAssignedScheduleRequestEvent(AssignedScheduleRequestEvent event) event.getScheduleRequestEvent(), event.getTaskExecutorID())) .exceptionally( - throwable -> new FailedToSubmitScheduleRequestEvent( - event.getScheduleRequestEvent(), - event.getTaskExecutorID(), throwable)) + throwable -> + new FailedToSubmitScheduleRequestEvent( + event.getScheduleRequestEvent(), + event.getTaskExecutorID(), + ExceptionUtils.stripCompletionException(throwable)) + ) .whenCompleteAsync((res, err) -> { if (err == null) { @@ -177,9 +180,15 @@ private void onAssignedScheduleRequestEvent(AssignedScheduleRequestEvent event) ) .exceptionally( - throwable -> new FailedToSubmitScheduleRequestEvent( - event.getScheduleRequestEvent(), - event.getTaskExecutorID(), throwable)); + // Note: throwable is the wrapped completable error (inside is akka rpc actor selection + // error). + // On this error, we want to: + // 1) trigger rpc service reconnection (to fix the missing action). + // 2) re-schedule worker node with delay (to avoid a fast loop to exhaust idle TE pool). + throwable -> + event.getScheduleRequestEvent() + .onFailure(ExceptionUtils.stripCompletionException(throwable)) + ); pipe(ackFuture, getContext().getDispatcher()).to(self()); } } catch (Exception e) { @@ -197,11 +206,17 @@ private void onFailedScheduleRequestEvent(FailedToScheduleRequestEvent event) { if (event.getAttempt() >= this.maxScheduleRetries) { log.error("Failed to submit the request {} because of ", event.getScheduleRequestEvent(), event.getThrowable()); } else { - log.error("Failed to submit the request {}; Retrying in {} because of ", event.getScheduleRequestEvent(), intervalBetweenRetries, event.getThrowable()); + // honor the readyAt attribute from schedule request's rate limiter. + Duration timeout = Duration.ofMillis( + Math.max( + event.getScheduleRequestEvent().getRequest().getReadyAt() - Instant.now().toEpochMilli(), + intervalBetweenRetries.toMillis())); + log.error("Failed to submit the request {}; Retrying in {} because of ", + event.getScheduleRequestEvent(), timeout, event.getThrowable()); getTimers().startSingleTimer( getSchedulingQueueKeyFor(event.getScheduleRequestEvent().getRequest().getWorkerId()), event.onRetry(), - intervalBetweenRetries); + timeout); } } @@ -240,23 +255,6 @@ private void onFailedToSubmitScheduleRequestEvent(FailedToSubmitScheduleRequestE event.getScheduleRequestEvent().getRequest().getWorkerId(), event.getScheduleRequestEvent().getRequest().getStageNum(), Throwables.getStackTraceAsString(event.throwable))); - - try { - resourceCluster.reconnectGateway(event.getTaskExecutorID()) - .whenComplete((res, throwable) -> { - if (throwable != null) { - log.error("Failed to request reconnect to gateway for {}", event.getTaskExecutorID(), throwable); - } - else { - log.debug("Acked from reconnection request for {}", event.getTaskExecutorID()); - } - }); - } catch (Exception e) { - log.warn( - "Failed to establish re-connection with the task executor {} on failed schedule request", - event.getTaskExecutorID(), e); - connectionFailures.increment(); - } } private void onCancelRequestEvent(CancelRequestEvent event) {