Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
fdc-ntflx committed Oct 28, 2023
1 parent 821dee2 commit 1e653b7
Show file tree
Hide file tree
Showing 13 changed files with 59 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ CompletableFuture<TaskExecutorID> getTaskExecutorFor(
CompletableFuture<List<Pair<TaskExecutorAllocationRequest, TaskExecutorID>>> getTaskExecutorsFor(
Set<TaskExecutorAllocationRequest> allocationRequest);

CompletableFuture<Ack> markJobAsStarted(String jobId);


/**
* Returns the Gateway instance to talk to the task executor. If unable to make connection with
* the task executor, then a ConnectionFailedException is thrown wrapped inside the future.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2054,7 +2054,7 @@ else if (currentWorkerNum < eventWorkerNum) {
if (allWorkerStarted()) {
allWorkersStarted = true;
jobMgr.onAllWorkersStarted();
scheduler.markJobAsStarted(jobId.getId());
unscheduleJob();
markStageAssignmentsChanged(true);
} else if (allWorkerCompleted()) {
LOGGER.info("Job {} All workers completed1", jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ Optional<Entry<TaskExecutorID, TaskExecutorState>> findFirst(
*/
Optional<Pair<TaskExecutorID, TaskExecutorState>> findBestFit(TaskExecutorAssignmentRequest request);

void markJobAsStartedRequest(String jobId);

Optional<Map<TaskExecutorAllocationRequest, Pair<TaskExecutorID, TaskExecutorState>>> findBestFit(TaskExecutorBatchAssignmentRequest request);

Set<Entry<TaskExecutorID, TaskExecutorState>> getActiveExecutorEntry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -60,9 +59,8 @@
@Slf4j
public class ExecutorStateManagerImpl implements ExecutorStateManager {
private final Map<TaskExecutorID, TaskExecutorState> taskExecutorStateMap = new HashMap<>();
// TODO: do we need to make this thread safe?
private final Map<Double, Integer> unavailabilityByMachineDef = new HashMap<>();
private final Set<String> jobIds = new HashSet<>();
private final Map<String, Long> pendingJobRequests = new HashMap<>();

/**
* Cache the available executors ready to accept assignments. Note these executors' state are not strongly
Expand Down Expand Up @@ -235,23 +233,6 @@ public Optional<Entry<TaskExecutorID, TaskExecutorState>> findFirst(
.findFirst();
}

@Override
public void markJobAsStartedRequest(String jobId) {
log.info("[fdc-111] markJobAsStartedRequest {}", jobId);
taskExecutorStateMap
.values()
.stream()
.filter(s -> s.getRegistration() != null && s.getWorkerId() != null && s.getWorkerId().getJobId().equals(jobId))
.forEach(s -> {
MachineDefinition m = s.getRegistration().getMachineDefinition();
if (unavailabilityByMachineDef.containsKey(m.getCpuCores())) {
log.info("[fdc-111] markJobAsStartedRequest {} - totMissing: {}", jobId, unavailabilityByMachineDef.get(m.getCpuCores()) - 1);
unavailabilityByMachineDef.put(m.getCpuCores(), unavailabilityByMachineDef.get(m.getCpuCores()) - 1);
}
});
jobIds.remove(jobId);
}

@Override
public Optional<Pair<TaskExecutorID, TaskExecutorState>> findBestFit(TaskExecutorAssignmentRequest request) {
// only allow allocation in the lowest CPU cores matching group.
Expand Down Expand Up @@ -312,11 +293,16 @@ public Optional<Map<TaskExecutorAllocationRequest, Pair<TaskExecutorID, TaskExec
machineDefWithNoResourcesAvailable.add(Pair.of(allocationRequests.getKey(), allocationRequests.getValue().size()));

final String jobId = allocationRequests.getValue().get(0).getJobMetadata().getJobId();
if (allocationRequests.getValue().size() > 2 && !jobIds.contains(jobId)) {
if (allocationRequests.getValue().size() > 2 && !pendingJobRequests.containsKey(jobId)) {
log.info("[fdc-100] storing missing {} machines {} for job {}",
allocationRequests.getValue().size(), allocationRequests.getKey(), jobId);
// TODO: think a way to get rid of machineDefWithNoResourcesAvailable
jobIds.add(jobId);

long totalJobSize = request.getPerMachineDefRequests().values().stream()
.mapToLong(List::size)
.sum();
pendingJobRequests.put(jobId, totalJobSize);

if (!unavailabilityByMachineDef.containsKey(allocationRequests.getKey().getCpuCores())) {
unavailabilityByMachineDef.put(allocationRequests.getKey().getCpuCores(), 0);
}
Expand Down Expand Up @@ -397,6 +383,9 @@ public GetClusterUsageResponse getClusterUsage(GetClusterUsageRequest req) {
log.info("Computing cluster usage: {}", req.getClusterID());
log.info("[fdc-100] getClusterUsage: {}", unavailabilityByMachineDef);

Map<String, Long> jobIdToWorkerCount = new HashMap<>();
pendingJobRequests.keySet().forEach(jobId -> jobIdToWorkerCount.put(jobId, 0L));

// default grouping is containerSkuID to usage
Map<String, Integer> unavailableByGroupKey = new HashMap<>();
Map<String, Pair<Integer, Integer>> usageByGroupKey = new HashMap<>();
Expand Down Expand Up @@ -436,6 +425,12 @@ public GetClusterUsageResponse getClusterUsage(GetClusterUsageRequest req) {
usageByGroupKey.put(groupKey, kvState);
}

if (value.isAssigned() && value.getWorkerId() != null) {
if (pendingJobRequests.containsKey(value.getWorkerId().getJobId())) {
jobIdToWorkerCount.put(value.getWorkerId().getJobId(), jobIdToWorkerCount.get(value.getWorkerId().getJobId()) + 1);
}
}

if (!unavailableByGroupKey.containsKey(groupKey)) {
// TODO: machine doesn't have to match exactly right?
log.info("[fdc-99] groupKey {} - unavailabilityByMachineDef {} - mdef {} - sol {}", groupKey, unavailabilityByMachineDef, value.getRegistration().getMachineDefinition(), unavailabilityByMachineDef.get(value.getRegistration().getMachineDefinition().getCpuCores()));
Expand All @@ -445,6 +440,13 @@ public GetClusterUsageResponse getClusterUsage(GetClusterUsageRequest req) {
}
});

// remove jobs from pending set which have all pending workers
jobIdToWorkerCount.forEach((key, value) -> {
if (pendingJobRequests.getOrDefault(key, -1L).equals(value)) {
pendingJobRequests.remove(key);
}
});

log.info("[fdc-100] unavailableByGroupKey: {}", unavailableByGroupKey);

GetClusterUsageResponseBuilder resBuilder = GetClusterUsageResponse.builder().clusterID(req.getClusterID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ public Receive createReceive() {
.match(CacheJobArtifactsOnTaskExecutorRequest.class, this::onCacheJobArtifactsOnTaskExecutorRequest)
.match(AddNewJobArtifactsToCacheRequest.class, this::onAddNewJobArtifactsToCacheRequest)
.match(RemoveJobArtifactsToCacheRequest.class, this::onRemoveJobArtifactsToCacheRequest)
.match(MarkJobAsStartedRequest.class, this::onMarkJobAsStartedRequest)
.match(GetJobArtifactsToCacheRequest.class, req -> sender().tell(new ArtifactList(new ArrayList<>(jobArtifactsToCache)), self()))
.build();
}
Expand Down Expand Up @@ -311,10 +310,6 @@ private void getActiveJobs(GetActiveJobsRequest req) {
sender().tell(res, self());
}

private void onMarkJobAsStartedRequest(MarkJobAsStartedRequest req) {
this.executorStateManager.markJobAsStartedRequest(req.jobId);
}

private void onTaskExecutorInfoRequest(TaskExecutorInfoRequest request) {
if (request.getTaskExecutorID() != null) {
TaskExecutorState state =
Expand Down Expand Up @@ -909,12 +904,6 @@ static class TaskExecutorAssignmentTimeout {
TaskExecutorID taskExecutorID;
}

@Value
static class MarkJobAsStartedRequest {
String jobId;
ClusterID clusterID;
}

@Value
static class ExpireDisableTaskExecutorsRequest {
DisableTaskExecutorsRequest request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetTaskExecutorWorkerMappingRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetUnregisteredTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.InitializeTaskExecutorRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.MarkJobAsStartedRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.RemoveJobArtifactsToCacheRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.ResourceOverviewRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest;
Expand Down Expand Up @@ -206,16 +205,6 @@ public CompletableFuture<List<Pair<TaskExecutorAllocationRequest, TaskExecutorID
.thenApply(l -> l.getAssignments());
}


@Override
public CompletableFuture<Ack> markJobAsStarted(String jobId) {
return Patterns
.ask(resourceClusterManagerActor, new MarkJobAsStartedRequest(jobId, clusterID), askTimeout)
.thenApply(Ack.class::cast)
.toCompletableFuture();
}


@Override
public CompletableFuture<TaskExecutorID> getTaskExecutorAssignedFor(WorkerId workerId) {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetRegisteredTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetTaskExecutorStatusRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetUnregisteredTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.MarkJobAsStartedRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.RemoveJobArtifactsToCacheRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.ResourceOverviewRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest;
Expand Down Expand Up @@ -145,8 +144,6 @@ public Receive createReceive() {
getRCActor(req.getClusterID()).forward(req, context()))
.match(TaskExecutorInfoRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(MarkJobAsStartedRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(TaskExecutorGatewayRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(DisableTaskExecutorsRequest.class, req ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,11 +497,6 @@ public void initializeRunningWorker(final ScheduleRequest request, String hostna
taskSchedulingService.initializeRunningTask(request, hostname);
}

@Override
public void scheduleWorker(final ScheduleRequest scheduleRequest) {
taskQueue.queueTask(scheduleRequest);
}

@Override
public void scheduleWorkers(BatchScheduleRequest scheduleRequest) {
scheduleRequest.getScheduleRequests().forEach(taskQueue::queueTask);
Expand All @@ -512,11 +507,6 @@ public void unscheduleJob(String jobId) {
// TODO:
}

@Override
public void markJobAsStarted(String jobId) {
// TODO:
}

@Override
public void unscheduleWorker(final WorkerId workerId, final Optional<String> hostname) {
taskSchedulingService.removeTask(workerId.getId(), DEFAULT_Q_ATTRIBUTES, hostname.orElse(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,10 @@ public interface MantisScheduler {
*
* @param scheduleRequest worker to schedule
*/
void scheduleWorker(final ScheduleRequest scheduleRequest);

void scheduleWorkers(final BatchScheduleRequest scheduleRequest);

void unscheduleJob(final String jobId);

void markJobAsStarted(final String jobId);

/**
* Mark the worker to be removed from the Scheduling queue. This is expected to be called for all tasks that were added to the Scheduler, whether or
* not the worker is already running. If the worker is running, the <code>hostname</code> parameter must be set, otherwise,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import io.mantisrx.server.master.scheduler.ResourceClusterAwareSchedulerActor.CancelBatchRequestEvent;
import io.mantisrx.server.master.scheduler.ResourceClusterAwareSchedulerActor.CancelRequestEvent;
import io.mantisrx.server.master.scheduler.ResourceClusterAwareSchedulerActor.InitializeRunningWorkerRequestEvent;
import io.mantisrx.server.master.scheduler.ResourceClusterAwareSchedulerActor.MarkJobAsStartedRequestEvent;
import io.mantisrx.server.master.scheduler.ResourceClusterAwareSchedulerActor.ScheduleRequestEvent;
import io.mantisrx.shaded.com.google.common.base.Throwables;
import java.util.List;
import java.util.Optional;
Expand All @@ -39,22 +37,11 @@ public class ResourceClusterAwareScheduler implements MantisScheduler {

private final ActorRef schedulerActor;

@Override
public void scheduleWorker(ScheduleRequest scheduleRequest) {
// TODO: delete it.
schedulerActor.tell(ScheduleRequestEvent.of(scheduleRequest), null);
}

@Override
public void scheduleWorkers(BatchScheduleRequest scheduleRequest) {
schedulerActor.tell(BatchScheduleRequestEvent.of(scheduleRequest), null);
}

@Override
public void markJobAsStarted(String jobId) {
schedulerActor.tell(MarkJobAsStartedRequestEvent.of(jobId), null);
}

@Override
public void unscheduleJob(String jobId) {
schedulerActor.tell(CancelBatchRequestEvent.of(jobId),null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ public Receive createReceive() {
.match(AssignedBatchScheduleRequestEvent.class, this::onAssignedBatchScheduleRequestEvent)
.match(FailedToBatchScheduleRequestEvent.class, this::onFailedToBatchScheduleRequestEvent)
.match(CancelBatchRequestEvent.class, this::onCancelBatchRequestEvent)
.match(MarkJobAsStartedRequestEvent.class, this::onMarkJobAsStartedRequestEvent)
// .match(RetryCancelBatchRequestEvent.class, this::onRetryBatchCancelRequestEvent)

// single schedule request
Expand Down Expand Up @@ -413,16 +412,6 @@ private void onCancelBatchRequestEvent(CancelBatchRequestEvent event) {
}
}

private void onMarkJobAsStartedRequestEvent(MarkJobAsStartedRequestEvent event) {
try {
log.info("MarkJobAsStartedRequestEvent {}", event.getJobId());
getTimers().cancel(getBatchSchedulingQueueKeyFor(event.getJobId()));
resourceCluster.markJobAsStarted(event.jobId);
} catch (Exception e) {
log.warn("[fdc-91] Failed to MarkJobAsStartedRequestEvent batch request {}", event.getJobId(), e);
}
}

private void onNoop(Noop event) {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,6 @@ public DummyScheduler(CountDownLatch scheduleCDL, CountDownLatch unscheduleCDL)
unschedL = unscheduleCDL;
}

@Override
public void scheduleWorker(ScheduleRequest scheduleRequest) {
// TODO Auto-generated method stub
System.out.println("----------------------> schedule Worker Called");
schedL.countDown();

}

@Override
public void scheduleWorkers(BatchScheduleRequest scheduleRequest) {
// TODO:
Expand All @@ -197,11 +189,6 @@ public void unscheduleJob(String jobId) {
// TODO:
}

@Override
public void markJobAsStarted(String jobId) {
// TODO:
}

@Override
public void unscheduleWorker(WorkerId workerId, Optional<String> hostname) {
// TODO Auto-generated method stub
Expand Down
Loading

0 comments on commit 1e653b7

Please sign in to comment.