Skip to content

Commit

Permalink
refactor TE reconnect (#564)
Browse files Browse the repository at this point in the history
  • Loading branch information
Andyz26 authored Oct 13, 2023
1 parent cd735c5 commit 20e5d26
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ CompletableFuture<TaskExecutorID> getTaskExecutorFor(
*/
CompletableFuture<TaskExecutorGateway> getTaskExecutorGateway(TaskExecutorID taskExecutorID);

CompletableFuture<Ack> reconnectGateway(TaskExecutorID taskExecutorID);

CompletableFuture<TaskExecutorRegistration> getTaskExecutorInfo(String hostName);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.mantisrx.server.master.persistence.MantisJobStore;
import io.mantisrx.server.master.resourcecluster.ClusterID;
import io.mantisrx.server.master.resourcecluster.PagedActiveJobOverview;
import io.mantisrx.server.master.resourcecluster.ResourceCluster.ConnectionFailedException;
import io.mantisrx.server.master.resourcecluster.ResourceCluster.NoResourceAvailableException;
import io.mantisrx.server.master.resourcecluster.ResourceCluster.ResourceOverview;
import io.mantisrx.server.master.resourcecluster.ResourceCluster.TaskExecutorStatus;
Expand Down Expand Up @@ -209,7 +208,6 @@ public Receive createReceive() {
.match(ResourceOverviewRequest.class, this::onResourceOverviewRequest)
.match(TaskExecutorInfoRequest.class, this::onTaskExecutorInfoRequest)
.match(TaskExecutorGatewayRequest.class, this::onTaskExecutorGatewayRequest)
.match(TaskExecutorGatewayReconnectRequest.class, this::onTaskExecutorGatewayReconnectRequest)
.match(DisableTaskExecutorsRequest.class, this::onNewDisableTaskExecutorsRequest)
.match(CheckDisabledTaskExecutors.class, this::findAndMarkDisabledTaskExecutors)
.match(ExpireDisableTaskExecutorsRequest.class, this::onDisableTaskExecutorsRequestExpiry)
Expand Down Expand Up @@ -372,54 +370,6 @@ private void onTaskExecutorGatewayRequest(TaskExecutorGatewayRequest request) {
clusterID.getResourceID(),
"taskExecutor",
request.getTaskExecutorID().getResourceId())));
try {
// let's try one more time by reconnecting with the gateway.
sender().tell(state.reconnect().join(), self());
} catch (Exception e1) {
metrics.incrementCounter(
ResourceClusterActorMetrics.TE_RECONNECTION_FAILURE,
TagList.create(ImmutableMap.of(
"resourceCluster",
clusterID.getResourceID(),
"taskExecutor",
request.getTaskExecutorID().getResourceId())));
sender().tell(new Status.Failure(new ConnectionFailedException(e)), self());
}
}
}
}

private void onTaskExecutorGatewayReconnectRequest(TaskExecutorGatewayReconnectRequest request) {
log.info("Requesting to reconnect to TaskExecutor: {}", request);
TaskExecutorState state = this.executorStateManager.get(request.getTaskExecutorID());
if (state == null) {
sender().tell(
new Status.Failure(new NullPointerException("Null TaskExecutor state: " + request.getTaskExecutorID())),
self());
} else {
try {
if (state.isRegistered()) {
state.reconnect().whenComplete((res, throwable) -> {
if (throwable != null) {
log.error("failed to reconnect to {}", request.getTaskExecutorID(), throwable);
}
});
sender().tell(Ack.getInstance(), self());
} else {
sender().tell(
new Status.Failure(
new IllegalStateException("Unregistered TaskExecutor: " + request.getTaskExecutorID())),
self());
}
} catch (Exception e) {
metrics.incrementCounter(
ResourceClusterActorMetrics.TE_RECONNECTION_FAILURE,
TagList.create(ImmutableMap.of(
"resourceCluster",
clusterID.getResourceID(),
"taskExecutor",
request.getTaskExecutorID().getResourceId())));
sender().tell(new Status.Failure(new ConnectionFailedException(e)), self());
}
}
}
Expand Down Expand Up @@ -921,13 +871,6 @@ static class TaskExecutorGatewayRequest {
ClusterID clusterID;
}

@Value
static class TaskExecutorGatewayReconnectRequest {
TaskExecutorID taskExecutorID;

ClusterID clusterID;
}

@Value
static class GetRegisteredTaskExecutorsRequest implements HasAttributes {
ClusterID clusterID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.mantisrx.master.resourcecluster.ResourceClusterActor.RemoveJobArtifactsToCacheRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.ResourceOverviewRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayReconnectRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorInfoRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorsList;
Expand Down Expand Up @@ -212,17 +211,6 @@ public CompletableFuture<TaskExecutorGateway> getTaskExecutorGateway(
});
}

@Override
public CompletableFuture<Ack> reconnectGateway(
TaskExecutorID taskExecutorID) {
return
Patterns
.ask(resourceClusterManagerActor, new TaskExecutorGatewayReconnectRequest(taskExecutorID, clusterID),
askTimeout)
.thenApply(Ack.class::cast)
.toCompletableFuture();
}

@Override
public CompletableFuture<TaskExecutorRegistration> getTaskExecutorInfo(String hostName) {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.mantisrx.master.resourcecluster.ResourceClusterActor.RemoveJobArtifactsToCacheRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.ResourceOverviewRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayReconnectRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorInfoRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterScalerActor.TriggerClusterRuleRefreshRequest;
Expand Down Expand Up @@ -144,8 +143,6 @@ public Receive createReceive() {
getRCActor(req.getClusterID()).forward(req, context()))
.match(TaskExecutorGatewayRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(TaskExecutorGatewayReconnectRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(DisableTaskExecutorsRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(AddNewJobArtifactsToCacheRequest.class, req ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,19 +252,24 @@ TaskExecutorRegistration getRegistration() {
}

protected CompletableFuture<TaskExecutorGateway> getGatewayAsync() {
if (this.registration == null || this.state == RegistrationState.Unregistered) {
throw new IllegalStateException("TE is unregistered");
}

if (this.gateway == null) {
throw new IllegalStateException("gateway is null");
}
return this.gateway;
}

protected CompletableFuture<TaskExecutorGateway> reconnect() {
this.gateway = rpcService.connect(registration.getTaskExecutorAddress(), TaskExecutorGateway.class)
.whenComplete((gateway, throwable) -> {
if (throwable != null) {
log.error("Failed to connect to the gateway", throwable);
}
});
if (this.gateway.isCompletedExceptionally()) {
log.warn("gateway connection encountered error, reconnect: {}.", registration.getTaskExecutorAddress());
this.gateway = rpcService.connect(registration.getTaskExecutorAddress(), TaskExecutorGateway.class)
.whenComplete((gateway, throwable) -> {
if (throwable != null) {
log.error("Failed to connect to the gateway", throwable);
}
});
}

return this.gateway;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,12 @@ private void onAssignedScheduleRequestEvent(AssignedScheduleRequestEvent event)
event.getScheduleRequestEvent(),
event.getTaskExecutorID()))
.exceptionally(
throwable -> new FailedToSubmitScheduleRequestEvent(
event.getScheduleRequestEvent(),
event.getTaskExecutorID(), throwable))
throwable ->
new FailedToSubmitScheduleRequestEvent(
event.getScheduleRequestEvent(),
event.getTaskExecutorID(),
ExceptionUtils.stripCompletionException(throwable))
)
.whenCompleteAsync((res, err) ->
{
if (err == null) {
Expand All @@ -177,9 +180,15 @@ private void onAssignedScheduleRequestEvent(AssignedScheduleRequestEvent event)

)
.exceptionally(
throwable -> new FailedToSubmitScheduleRequestEvent(
event.getScheduleRequestEvent(),
event.getTaskExecutorID(), throwable));
// Note: throwable is the wrapped completable error (inside is akka rpc actor selection
// error).
// On this error, we want to:
// 1) trigger rpc service reconnection (to fix the missing action).
// 2) re-schedule worker node with delay (to avoid a fast loop to exhaust idle TE pool).
throwable ->
event.getScheduleRequestEvent()
.onFailure(ExceptionUtils.stripCompletionException(throwable))
);
pipe(ackFuture, getContext().getDispatcher()).to(self());
}
} catch (Exception e) {
Expand All @@ -197,11 +206,17 @@ private void onFailedScheduleRequestEvent(FailedToScheduleRequestEvent event) {
if (event.getAttempt() >= this.maxScheduleRetries) {
log.error("Failed to submit the request {} because of ", event.getScheduleRequestEvent(), event.getThrowable());
} else {
log.error("Failed to submit the request {}; Retrying in {} because of ", event.getScheduleRequestEvent(), intervalBetweenRetries, event.getThrowable());
// honor the readyAt attribute from schedule request's rate limiter.
Duration timeout = Duration.ofMillis(
Math.max(
event.getScheduleRequestEvent().getRequest().getReadyAt() - Instant.now().toEpochMilli(),
intervalBetweenRetries.toMillis()));
log.error("Failed to submit the request {}; Retrying in {} because of ",
event.getScheduleRequestEvent(), timeout, event.getThrowable());
getTimers().startSingleTimer(
getSchedulingQueueKeyFor(event.getScheduleRequestEvent().getRequest().getWorkerId()),
event.onRetry(),
intervalBetweenRetries);
timeout);
}
}

Expand Down Expand Up @@ -240,23 +255,6 @@ private void onFailedToSubmitScheduleRequestEvent(FailedToSubmitScheduleRequestE
event.getScheduleRequestEvent().getRequest().getWorkerId(),
event.getScheduleRequestEvent().getRequest().getStageNum(),
Throwables.getStackTraceAsString(event.throwable)));

try {
resourceCluster.reconnectGateway(event.getTaskExecutorID())
.whenComplete((res, throwable) -> {
if (throwable != null) {
log.error("Failed to request reconnect to gateway for {}", event.getTaskExecutorID(), throwable);
}
else {
log.debug("Acked from reconnection request for {}", event.getTaskExecutorID());
}
});
} catch (Exception e) {
log.warn(
"Failed to establish re-connection with the task executor {} on failed schedule request",
event.getTaskExecutorID(), e);
connectionFailures.increment();
}
}

private void onCancelRequestEvent(CancelRequestEvent event) {
Expand Down

0 comments on commit 20e5d26

Please sign in to comment.