Skip to content

Commit

Permalink
Implement batch schedule request.
Browse files Browse the repository at this point in the history
  • Loading branch information
fdc-ntflx committed Nov 7, 2023
1 parent 035b3e0 commit 6b6cb54
Show file tree
Hide file tree
Showing 22 changed files with 711 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import lombok.Value;
Expand Down Expand Up @@ -100,14 +101,13 @@ default CompletableFuture<List<TaskExecutorID>> getUnregisteredTaskExecutors() {

/**
* Can throw {@link NoResourceAvailableException} wrapped within the CompletableFuture in case there
* are no task executors.
* are no enough available task executors.
*
* @param machineDefinition machine definition that's requested for the worker
* @param workerId worker id of the task that's going to run on the node.
* @return task executor assigned for the particular task.
* @param allocationRequests set of machine definitions requested for 1 or more workers
* @return task executors assigned for the particular request
*/
CompletableFuture<TaskExecutorID> getTaskExecutorFor(
TaskExecutorAllocationRequest allocationRequest);
CompletableFuture<Map<TaskExecutorAllocationRequest, TaskExecutorID>> getTaskExecutorsFor(
Set<TaskExecutorAllocationRequest> allocationRequests);

/**
* Returns the Gateway instance to talk to the task executor. If unable to make connection with
Expand Down Expand Up @@ -188,6 +188,13 @@ CompletableFuture<PagedActiveJobOverview> getActiveJobOverview(
CompletableFuture<Map<TaskExecutorID, WorkerId>> getTaskExecutorWorkerMapping(
Map<String, String> attributes);

/**
* Signals that the given job id has been killed.
*
* @param jobId id of the job killed
*/
CompletableFuture<Ack> unscheduleJob(String jobId);

class NoResourceAvailableException extends Exception {

public NoResourceAvailableException(String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,7 @@ void initialize(boolean isSubmit) throws Exception {
private void initializeRunningWorkers() {
// Scan for the list of all corrupted workers to be resubmitted.
List<JobWorker> workersToResubmit = markCorruptedWorkers();
List<IMantisWorkerMetadata> workersToSubmit = new ArrayList<>();

// publish a refresh before enqueuing tasks to the Scheduler, as there is a potential race between
// WorkerRegistryV2 getting updated and isWorkerValid being called from SchedulingService loop
Expand Down Expand Up @@ -1357,7 +1358,14 @@ private void initializeRunningWorkers() {

scheduler.initializeRunningWorker(scheduleRequest, wm.getSlave(), wm.getSlaveID());
} else if (wm.getState().equals(WorkerState.Accepted)) {
queueTask(wm);

// If the job is in accepted state, queue all its pending workers at once in a batch request.
// This is important when before master failover there were pending batch requests
if (JobState.isAcceptedState(mantisJobMetaData.getState())) {
workersToSubmit.add(wm);
} else {
queueTask(wm);
}
}
}

Expand All @@ -1367,6 +1375,10 @@ private void initializeRunningWorkers() {
}
}

if (JobState.isAcceptedState(mantisJobMetaData.getState()) && !workersToSubmit.isEmpty()) {
queueTasks(workersToSubmit, empty());
}

// publish another update after queuing tasks to Fenzo (in case some workers were marked Started
// due to the Fake heartbeat in above loop)
markStageAssignmentsChanged(true);
Expand Down Expand Up @@ -1475,40 +1487,37 @@ private void submitInitialWorkers() throws Exception {
mantisJobMetaData.getJobDefinition(),
System.currentTimeMillis());

int beg = 0;
while (true) {
if (beg >= workers.size()) {
break;
}
int en = beg + Math.min(workerWritesBatchSize, workers.size() - beg);
final List<IMantisWorkerMetadata> workerRequests = workers.subList(beg, en);
try {
jobStore.storeNewWorkers(jobMgr.getJobDetails(), workerRequests);
LOGGER.info("Stored workers {} for Job {}", workerRequests, jobId);
// refresh Worker Registry state before enqueuing task to Scheduler
markStageAssignmentsChanged(true);
try {
jobStore.storeNewWorkers(jobMgr.getJobDetails(), workers);
LOGGER.info("Stored workers {} for Job {}", workers, jobId);
// refresh Worker Registry state before enqueuing task to Scheduler
markStageAssignmentsChanged(true);

if (!workers.isEmpty()) {
// queue to scheduler
workerRequests.forEach(this::queueTask);
} catch (Exception e) {
LOGGER.error("Error {} storing workers of job {}", e.getMessage(), jobId.getId(), e);
throw new RuntimeException("Exception saving worker for Job " + jobId, e);
queueTasks(workers, empty());
}
beg = en;
} catch (Exception e) {
LOGGER.error("Error {} storing workers of job {}", e.getMessage(), jobId.getId(), e);
throw new RuntimeException("Exception saving worker for Job " + jobId, e);
}
}

private void queueTask(final IMantisWorkerMetadata workerRequest, final Optional<Long> readyAt) {
final ScheduleRequest schedulingRequest = createSchedulingRequest(workerRequest, readyAt);
LOGGER.info("Queueing up scheduling request {} ", schedulingRequest);
private void queueTasks(final List<IMantisWorkerMetadata> workerRequests, final Optional<Long> readyAt) {
final List<ScheduleRequest> scheduleRequests = workerRequests
.stream()
.map(wR -> createSchedulingRequest(wR, readyAt))
.collect(Collectors.toList());
LOGGER.info("Queueing up batch schedule request for {} workers", workerRequests.size());
try {
scheduler.scheduleWorker(schedulingRequest);
scheduler.scheduleWorkers(new BatchScheduleRequest(scheduleRequests));
} catch (Exception e) {
LOGGER.error("Exception queueing task", e);
LOGGER.error("Exception queueing tasks", e);
}
}

private void queueTask(final IMantisWorkerMetadata workerRequest) {
queueTask(workerRequest, empty());
queueTasks(Collections.singletonList(workerRequest), empty());
}

private ScheduleRequest createSchedulingRequest(
Expand Down Expand Up @@ -1672,6 +1681,7 @@ private IMantisWorkerMetadata addWorker(SchedulingInfo schedulingInfo, int stage

@Override
public void shutdown() {
scheduler.unscheduleJob(jobId.getId());
// if workers have not already completed
if (!allWorkerCompleted()) {
// kill workers
Expand Down Expand Up @@ -1805,6 +1815,9 @@ public void checkHeartBeats(Instant currentTime) {
Instant acceptedAt = Instant.ofEpochMilli(workerMeta.getAcceptedAt());
if (Duration.between(acceptedAt, currentTime).getSeconds() > stuckInSubmitToleranceSecs) {
// worker stuck in accepted
LOGGER.info("Job {}, Worker {} stuck in accepted state for {}", this.jobMgr.getJobId(),
workerMeta.getWorkerId(), Duration.between(acceptedAt, currentTime).getSeconds());

workersToResubmit.add(worker);
eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(
WARN,
Expand Down Expand Up @@ -2016,6 +2029,7 @@ else if (currentWorkerNum < eventWorkerNum) {
if (allWorkerStarted()) {
allWorkersStarted = true;
jobMgr.onAllWorkersStarted();
scheduler.unscheduleJob(jobId.getId());
markStageAssignmentsChanged(true);
} else if (allWorkerCompleted()) {
LOGGER.info("Job {} All workers completed1", jobId);
Expand Down Expand Up @@ -2169,7 +2183,7 @@ private void resubmitWorker(JobWorker oldWorker) throws Exception {
// publish a refresh before enqueuing new Task to Scheduler
markStageAssignmentsChanged(true);
// queue the new worker for execution
queueTask(newWorker.getMetadata(), delayDuration);
queueTasks(Collections.singletonList(newWorker.getMetadata()), delayDuration);
LOGGER.info("Worker {} successfully queued for scheduling", newWorker);
numWorkerResubmissions.increment();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,21 @@ public static boolean isRunningState(JobState state) {
return false;
}
}

/**
* Returns true if the job is accepted.
*
* @param state
*
* @return
*/
public static boolean isAcceptedState(JobState state) {
switch (state) {
case Accepted:

return true;
default:
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package io.mantisrx.master.resourcecluster;

import io.mantisrx.master.resourcecluster.ResourceClusterActor.BestFit;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetActiveJobsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetClusterUsageRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorBatchAssignmentRequest;
import io.mantisrx.master.resourcecluster.proto.GetClusterIdleInstancesRequest;
import io.mantisrx.master.resourcecluster.proto.GetClusterUsageResponse;
import io.mantisrx.server.master.resourcecluster.ResourceCluster.ResourceOverview;
Expand All @@ -29,7 +30,6 @@
import java.util.Set;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;

/**
* A component to manage the states of {@link TaskExecutorState} for a given {@link ResourceClusterActor}.
Expand Down Expand Up @@ -79,11 +79,14 @@ Optional<Entry<TaskExecutorID, TaskExecutorState>> findFirst(
Predicate<Entry<TaskExecutorID, TaskExecutorState>> predicate);

/**
* Find a matched task executor best fitting the given assignment request.
* Finds set of matched task executors best fitting the given assignment requests.
*
* @param request Assignment request.
* @return Optional of matched task executor.
* @return Optional of matched task executors.
*/
Optional<Pair<TaskExecutorID, TaskExecutorState>> findBestFit(TaskExecutorAssignmentRequest request);
Optional<BestFit> findBestFit(TaskExecutorBatchAssignmentRequest request);

void unscheduleJob(String jobId);

Set<Entry<TaskExecutorID, TaskExecutorState>> getActiveExecutorEntry();

Expand Down
Loading

0 comments on commit 6b6cb54

Please sign in to comment.