Skip to content

Commit

Permalink
Fix runtime Index total worker observable + Revert control-plane chan…
Browse files Browse the repository at this point in the history
…ge (#537)

* Revert "Fix job discovery workerAssignment (#524)"

This reverts commit 53b84f1.

* revert HB change

* Ensure masterClient cache result
  • Loading branch information
Andyz26 committed Aug 24, 2023
1 parent 2de1314 commit 0fc88ec
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func2;

import rx.subjects.BehaviorSubject;

/**
*
Expand Down Expand Up @@ -698,8 +698,11 @@ public Observable<String> getJobStatusObservable(final String jobId) {
* @param jobId
* @return
*/
@Override
public Observable<JobSchedulingInfo> schedulingChanges(final String jobId) {
return masterMonitor.getMasterObservable()
BehaviorSubject<JobSchedulingInfo> behaviorSubject = BehaviorSubject.create();

masterMonitor.getMasterObservable()
.filter(masterDescription -> masterDescription != null)
.retryWhen(retryLogic)
.switchMap((Func1<MasterDescription,
Expand Down Expand Up @@ -728,7 +731,8 @@ public Observable<JobSchedulingInfo> schedulingChanges(final String jobId) {
}))
.repeatWhen(repeatLogic)
.retryWhen(retryLogic)
;
.subscribe(behaviorSubject);
return behaviorSubject;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
Expand Down Expand Up @@ -102,8 +101,7 @@ public CompletionStage<JobDiscoveryRouteProto.SchedInfoResponse> schedulingInfoS
try {
AtomicBoolean isJobCompleted = new AtomicBoolean(false);
final String jobId = request.getJobId().getId();
AtomicReference<JobSchedulingInfo> jobSchedulingInfoAtomicRef =
new AtomicReference<>(new JobSchedulingInfo(jobId, new HashMap<>()));
final JobSchedulingInfo completedJobSchedulingInfo = new JobSchedulingInfo(jobId, new HashMap<>());
CompletionStage<JobDiscoveryRouteProto.SchedInfoResponse> jobSchedInfoObsCS = response
.thenApply(getJobSchedInfoResp -> {
Optional<BehaviorSubject<JobSchedulingInfo>> jobStatusSubjectO = getJobSchedInfoResp.getJobSchedInfoSubject();
Expand All @@ -113,23 +111,18 @@ public CompletionStage<JobDiscoveryRouteProto.SchedInfoResponse> schedulingInfoS
Observable<JobSchedulingInfo> heartbeats =
Observable.interval(5, serverIdleConnectionTimeout.getSeconds() - 1, TimeUnit.SECONDS)
.map(x -> {
if(isJobCompleted.get()) {
if(!isJobCompleted.get()) {
return SCHED_INFO_HB_INSTANCE;
} else {
return jobSchedulingInfoAtomicRef.get();
return completedJobSchedulingInfo;
}

})
.takeWhile(x -> sendHeartbeats == true);

Observable<JobSchedulingInfo> jobSchedulingInfoObservable =
jobSchedulingInfoObs
.doOnCompleted(() -> isJobCompleted.set(true))
.doOnNext(jobSchedulingInfoAtomicRef::set);

// Job SchedulingInfo obs completes on job shutdown. Use the do On completed as a signal to inform the user that there are no workers to connect to.
// TODO For future a more explicit key in the payload saying the job is completed.
Observable<JobSchedulingInfo> jobSchedulingInfoWithHBObs = Observable.merge(jobSchedulingInfoObservable, heartbeats);
Observable<JobSchedulingInfo> jobSchedulingInfoWithHBObs = Observable.merge(jobSchedulingInfoObs.doOnCompleted(() -> isJobCompleted.set(true)), heartbeats);
return new JobDiscoveryRouteProto.SchedInfoResponse(
getJobSchedInfoResp.requestId,
getJobSchedInfoResp.responseCode,
Expand Down

0 comments on commit 0fc88ec

Please sign in to comment.