Skip to content

Commit

Permalink
Recreate TE gateway per request (#566)
Browse files Browse the repository at this point in the history
* recreate TEGw per req

* log for scheInfo

* enable client addr

* Revert "Ensure masterClient cache result"

This reverts commit eb4f501.

* Revert "Fix index obs (#548)"

This reverts commit 867913f.

* Fix unsub schedInfo stream

* Revert "log for scheInfo"

This reverts commit 2260709.

* Revert "enable client addr"

This reverts commit 33d95dd.
  • Loading branch information
Andyz26 authored Oct 18, 2023
1 parent f2d77de commit 6391a9f
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ enum RegistrationState {
@Nullable
private TaskExecutorRegistration registration;

@Nullable
private CompletableFuture<TaskExecutorGateway> gateway;

// availabilityState being null here represents that we don't know about the actual state of the task executor
// and are waiting for more information
@Nullable
Expand All @@ -72,7 +69,6 @@ static TaskExecutorState of(Clock clock, RpcService rpcService, JobMessageRouter
RegistrationState.Unregistered,
null,
null,
null,
false,
clock.instant(),
clock,
Expand All @@ -98,13 +94,6 @@ boolean onRegistration(TaskExecutorRegistration registration) {
} else {
this.state = RegistrationState.Registered;
this.registration = registration;
this.gateway =
rpcService.connect(registration.getTaskExecutorAddress(), TaskExecutorGateway.class)
.whenComplete((gateway, throwable) -> {
if (throwable != null) {
log.error("Failed to connect to the gateway", throwable);
}
});
updateTicker();
return true;
}
Expand All @@ -117,7 +106,6 @@ boolean onDisconnection() {
state = RegistrationState.Unregistered;
registration = null;
setAvailabilityState(null);
gateway = null;
updateTicker();
return true;
}
Expand Down Expand Up @@ -256,21 +244,17 @@ protected CompletableFuture<TaskExecutorGateway> getGatewayAsync() {
throw new IllegalStateException("TE is unregistered");
}

if (this.gateway == null) {
throw new IllegalStateException("gateway is null");
}

if (this.gateway.isCompletedExceptionally()) {
log.warn("gateway connection encountered error, reconnect: {}.", registration.getTaskExecutorAddress());
this.gateway = rpcService.connect(registration.getTaskExecutorAddress(), TaskExecutorGateway.class)
// [Note] here the gateway connection is re-created every time it's requested to avoid corrupted state that
// can block the connection to TE.
// To be able to store and re-use the gateway, we probably need to make a chain of callbacks so taht the TE
// is only marked as available after this gateway connection is successfully established with proper retry
// loops (since the TE only register once and take the ack as success on API response).
return rpcService.connect(registration.getTaskExecutorAddress(), TaskExecutorGateway.class)
.whenComplete((gateway, throwable) -> {
if (throwable != null) {
log.error("Failed to connect to the gateway", throwable);
}
});
}

return this.gateway;
}

boolean containsAttributes(Map<String, String> attributes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ private void onAssignedScheduleRequestEvent(AssignedScheduleRequestEvent event)
// 1) trigger rpc service reconnection (to fix the missing action).
// 2) re-schedule worker node with delay (to avoid a fast loop to exhaust idle TE pool).
throwable ->
event.getScheduleRequestEvent()
.onFailure(ExceptionUtils.stripCompletionException(throwable))
event.getScheduleRequestEvent().onFailure(throwable)
);
pipe(ackFuture, getContext().getDispatcher()).to(self());
}
Expand Down Expand Up @@ -333,7 +332,8 @@ static ScheduleRequestEvent of(ScheduleRequest request) {
}

FailedToScheduleRequestEvent onFailure(Throwable throwable) {
return new FailedToScheduleRequestEvent(this, this.attempt, throwable);
return new FailedToScheduleRequestEvent(
this, this.attempt, ExceptionUtils.stripCompletionException(throwable));
}

AssignedScheduleRequestEvent onAssignment(TaskExecutorID taskExecutorID) {
Expand Down

0 comments on commit 6391a9f

Please sign in to comment.