Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
fdc-ntflx committed Oct 27, 2023
1 parent b82f7bf commit 821dee2
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1307,10 +1307,8 @@ class WorkerManager implements IWorkerManager {
*/
void initialize(boolean isSubmit) throws Exception {
if (isSubmit) {
// TODO: how to differentiate autoscaling requests
submitInitialWorkers();
} else {
// TODO: understand if this still stands after change.
initializeRunningWorkers();
}
mantisJobMetaData.setJobCosts(costsCalculator.calculateCosts(mantisJobMetaData));
Expand All @@ -1320,7 +1318,6 @@ private void initializeRunningWorkers() {
LOGGER.info("[fdc-96] initializeRunningWorkers {}", jobId);

// Scan for the list of all corrupted workers to be resubmitted.
// TODO: all workers in accepted state are marked as corrupted upon restart due to "assignedPorts should have at least 5 ports"
List<JobWorker> workersToResubmit = markCorruptedWorkers();
List<IMantisWorkerMetadata> workersToSubmit = new ArrayList<>();

Expand Down Expand Up @@ -1364,11 +1361,14 @@ private void initializeRunningWorkers() {
scheduler.initializeRunningWorker(scheduleRequest, wm.getSlave(), wm.getSlaveID());
} else if (wm.getState().equals(WorkerState.Accepted)) {

if (JobState.isInitiatedState(mantisJobMetaData.getState())) {
LOGGER.info("[fdc-94::queueTask] P0 - w: {}", wm);
// If the job is in accepted state, queue its pending workers at once
if (JobState.isAcceptedState(mantisJobMetaData.getState())) {
LOGGER.info("[fdc-97::workersToSubmit] P0 - w: {}", wm);

workersToSubmit.add(wm);
} else {
LOGGER.info("[fdc-94::queueTask] P0 - w: {}", wm);

queueTask(wm);
}
}
Expand All @@ -1380,7 +1380,7 @@ private void initializeRunningWorkers() {
}
}

if (JobState.isInitiatedState(mantisJobMetaData.getState()) && !workersToSubmit.isEmpty()) {
if (JobState.isAcceptedState(mantisJobMetaData.getState()) && !workersToSubmit.isEmpty()) {
LOGGER.info("[fdc-94::queueTasks] P0 - workersToSubmit: {}", workersToSubmit);

queueTasks(workersToSubmit, empty());
Expand Down Expand Up @@ -1495,38 +1495,25 @@ private void submitInitialWorkers() throws Exception {
mantisJobMetaData.getJobDefinition(),
System.currentTimeMillis());

// TODO: this is the key part!
// TODO: not sure about this "beg" part...
int beg = 0;
while (true) {
if (beg >= workers.size()) {
break;
}
int en = beg + Math.min(workerWritesBatchSize, workers.size() - beg);
final List<IMantisWorkerMetadata> workerRequests = workers.subList(beg, en);
try {
jobStore.storeNewWorkers(jobMgr.getJobDetails(), workerRequests);
LOGGER.info("Stored workers {} for Job {}", workerRequests, jobId);
// refresh Worker Registry state before enqueuing task to Scheduler
markStageAssignmentsChanged(true);
try {
jobStore.storeNewWorkers(jobMgr.getJobDetails(), workers);
LOGGER.info("Stored workers {} for Job {}", workers, jobId);
// refresh Worker Registry state before enqueuing task to Scheduler
markStageAssignmentsChanged(true);

if (!workerRequests.isEmpty()) {
// queue to scheduler
LOGGER.info("[fdc-94::queueTasks] P1 - workersToSubmit: {}", workerRequests);
if (!workers.isEmpty()) {
// queue to scheduler
LOGGER.info("[fdc-94::queueTasks] P1 - workersToSubmit: {}", workers);

queueTasks(workerRequests, empty());
}
} catch (Exception e) {
LOGGER.error("Error {} storing workers of job {}", e.getMessage(), jobId.getId(), e);
throw new RuntimeException("Exception saving worker for Job " + jobId, e);
queueTasks(workers, empty());
}
beg = en;
} catch (Exception e) {
LOGGER.error("Error {} storing workers of job {}", e.getMessage(), jobId.getId(), e);
throw new RuntimeException("Exception saving worker for Job " + jobId, e);
}
}

private void queueTasks(final List<IMantisWorkerMetadata> workerRequests, final Optional<Long> readyAt) {
// TODO: understand if we ever need to queue 1 worker at the time
// TODO: understand if we want to simplify/enhance the batch request
LOGGER.info("[fdc-91] hello queieing tasks! Req: {}", workerRequests);

final List<ScheduleRequest> scheduleRequests = workerRequests
Expand Down Expand Up @@ -1708,8 +1695,8 @@ private IMantisWorkerMetadata addWorker(SchedulingInfo schedulingInfo, int stage

@Override
public void shutdown() {
unscheduleJob();
// if workers have not already completed
terminateJob();
if (!allWorkerCompleted()) {
// kill workers
terminateAllWorkersAsync();
Expand All @@ -1721,10 +1708,9 @@ public void shutdown() {
jobSchedulingInfoBehaviorSubject.onCompleted();
}

private void terminateJob() {
LOGGER.info("Terminating job {}", jobId);
// TODO: make this async
scheduler.terminateJob(jobId.getId());
private void unscheduleJob() {
LOGGER.info("Unscheduling job {}", jobId);
scheduler.unscheduleJob(jobId.getId());
}

private void terminateAllWorkersAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,13 @@ public static boolean isRunningState(JobState state) {
}

/**
* Returns true if the job is initiated but not running.
* Returns true if the job is accepted.
*
* @param state
*
* @return
*/
public static boolean isInitiatedState(JobState state) {
public static boolean isAcceptedState(JobState state) {
switch (state) {
case Accepted:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ public Receive createReceive() {
.match(GetUnregisteredTaskExecutorsRequest.class, req -> sender().tell(getTaskExecutors(filterByAttrs(req).and(ExecutorStateManager.unregistered)), self()))
.match(GetActiveJobsRequest.class, this::getActiveJobs)
.match(GetTaskExecutorStatusRequest.class, this::getTaskExecutorStatus)
.match(GetClusterUsageRequest.class,
req -> sender().tell(this.executorStateManager.getClusterUsage(req), self()))
.match(GetClusterUsageRequest.class, this::onGetClusterUsage)
.match(GetClusterIdleInstancesRequest.class,
req -> sender().tell(onGetClusterIdleInstancesRequest(req), self()))
.match(GetAssignedTaskExecutorRequest.class, this::onAssignedTaskExecutorRequest)
Expand Down Expand Up @@ -736,6 +735,24 @@ private ResourceOverview getResourceOverview() {
return this.executorStateManager.getResourceOverview();
}

private void onGetClusterUsage(GetClusterUsageRequest req) {
log.info("[fdc-000] onGetClusterUsage {}", clusterID.getResourceID());
getContext().system().actorSelection("user/" + "scheduler-for-" + clusterID.getResourceID())
.resolveOne(Duration.ofSeconds(1))
.whenComplete((r, t) -> {
if (t != null) {
log.warn("[fdc-000] actor {} not found", "scheduler-for-" + clusterID.getResourceID(), t);
}
else {
// Message for pending requests
log.info("[fdc-000] found actor {}!", "scheduler-for-" + clusterID.getResourceID());
}
});

log.info("[fdc-000] sending request back!");
sender().tell(this.executorStateManager.getClusterUsage(req), self());
}

private void getTaskExecutorStatus(GetTaskExecutorStatusRequest req) {
TaskExecutorID taskExecutorID = req.getTaskExecutorID();
final TaskExecutorState state = this.executorStateManager.get(taskExecutorID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ public void scheduleWorkers(BatchScheduleRequest scheduleRequest) {
}

@Override
public void terminateJob(String jobId) {
public void unscheduleJob(String jobId) {
// TODO:
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface MantisScheduler {

void scheduleWorkers(final BatchScheduleRequest scheduleRequest);

void terminateJob(final String jobId);
void unscheduleJob(final String jobId);

void markJobAsStarted(final String jobId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void markJobAsStarted(String jobId) {
}

@Override
public void terminateJob(String jobId) {
public void unscheduleJob(String jobId) {
schedulerActor.tell(CancelBatchRequestEvent.of(jobId),null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void scheduleWorkers(BatchScheduleRequest scheduleRequest) {
}

@Override
public void terminateJob(String jobId) {
public void unscheduleJob(String jobId) {
// TODO:
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void scheduleWorkers(BatchScheduleRequest scheduleRequest) {
}

@Override
public void terminateJob(String jobId) {
public void unscheduleJob(String jobId) {
// TODO:
}

Expand Down

0 comments on commit 821dee2

Please sign in to comment.