From 6b6cb541cd81ec78a53a544af542c699f294ca8a Mon Sep 17 00:00:00 2001 From: Francesco Di Chiara Date: Thu, 2 Nov 2023 14:26:52 -0700 Subject: [PATCH] Implement batch schedule request. --- .../resourcecluster/ResourceCluster.java | 19 +- .../master/jobcluster/job/JobActor.java | 64 +++--- .../master/jobcluster/job/JobState.java | 17 ++ .../resourcecluster/ExecutorStateManager.java | 13 +- .../ExecutorStateManagerImpl.java | 183 +++++++++++++++--- .../resourcecluster/ResourceClusterActor.java | 117 ++++++++--- .../ResourceClusterAkkaImpl.java | 21 +- .../ResourceClustersManagerActor.java | 7 +- .../server/master/SchedulingService.java | 10 +- .../scheduler/BatchScheduleRequest.java | 26 +++ .../master/scheduler/MantisScheduler.java | 4 +- .../ResourceClusterAwareScheduler.java | 12 +- .../ResourceClusterAwareSchedulerActor.java | 170 +++++++++++++++- .../api/akka/route/v1/JobsRouteTest.java | 1 - .../master/jobcluster/JobClusterTest.java | 6 +- .../jobcluster/job/JobClusterManagerTest.java | 19 +- .../jobcluster/job/JobScaleUpDownTests.java | 14 +- .../jobcluster/job/JobTestLifecycle.java | 15 +- .../jobcluster/job/JobTestMigrationTests.java | 11 +- .../ExecutorStateManagerTests.java | 83 ++++---- .../ResourceClusterActorTest.java | 41 ++-- .../master/scheduler/FakeMantisScheduler.java | 82 ++++---- 22 files changed, 711 insertions(+), 224 deletions(-) create mode 100644 mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/BatchScheduleRequest.java diff --git a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java index 18d68dcd4..16478a8b1 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java +++ b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import javax.annotation.Nullable; import lombok.Value; @@ -100,14 +101,13 @@ default CompletableFuture> getUnregisteredTaskExecutors() { /** * Can throw {@link NoResourceAvailableException} wrapped within the CompletableFuture in case there - * are no task executors. + * are no enough available task executors. * - * @param machineDefinition machine definition that's requested for the worker - * @param workerId worker id of the task that's going to run on the node. - * @return task executor assigned for the particular task. + * @param allocationRequests set of machine definitions requested for 1 or more workers + * @return task executors assigned for the particular request */ - CompletableFuture getTaskExecutorFor( - TaskExecutorAllocationRequest allocationRequest); + CompletableFuture> getTaskExecutorsFor( + Set allocationRequests); /** * Returns the Gateway instance to talk to the task executor. If unable to make connection with @@ -188,6 +188,13 @@ CompletableFuture getActiveJobOverview( CompletableFuture> getTaskExecutorWorkerMapping( Map attributes); + /** + * Signals that the given job id has been killed. + * + * @param jobId id of the job killed + */ + CompletableFuture unscheduleJob(String jobId); + class NoResourceAvailableException extends Exception { public NoResourceAvailableException(String message) { diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java index 7fcf6ca1f..b23e87d87 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java @@ -1317,6 +1317,7 @@ void initialize(boolean isSubmit) throws Exception { private void initializeRunningWorkers() { // Scan for the list of all corrupted workers to be resubmitted. List workersToResubmit = markCorruptedWorkers(); + List workersToSubmit = new ArrayList<>(); // publish a refresh before enqueuing tasks to the Scheduler, as there is a potential race between // WorkerRegistryV2 getting updated and isWorkerValid being called from SchedulingService loop @@ -1357,7 +1358,14 @@ private void initializeRunningWorkers() { scheduler.initializeRunningWorker(scheduleRequest, wm.getSlave(), wm.getSlaveID()); } else if (wm.getState().equals(WorkerState.Accepted)) { - queueTask(wm); + + // If the job is in accepted state, queue all its pending workers at once in a batch request. + // This is important when before master failover there were pending batch requests + if (JobState.isAcceptedState(mantisJobMetaData.getState())) { + workersToSubmit.add(wm); + } else { + queueTask(wm); + } } } @@ -1367,6 +1375,10 @@ private void initializeRunningWorkers() { } } + if (JobState.isAcceptedState(mantisJobMetaData.getState()) && !workersToSubmit.isEmpty()) { + queueTasks(workersToSubmit, empty()); + } + // publish another update after queuing tasks to Fenzo (in case some workers were marked Started // due to the Fake heartbeat in above loop) markStageAssignmentsChanged(true); @@ -1475,40 +1487,37 @@ private void submitInitialWorkers() throws Exception { mantisJobMetaData.getJobDefinition(), System.currentTimeMillis()); - int beg = 0; - while (true) { - if (beg >= workers.size()) { - break; - } - int en = beg + Math.min(workerWritesBatchSize, workers.size() - beg); - final List workerRequests = workers.subList(beg, en); - try { - jobStore.storeNewWorkers(jobMgr.getJobDetails(), workerRequests); - LOGGER.info("Stored workers {} for Job {}", workerRequests, jobId); - // refresh Worker Registry state before enqueuing task to Scheduler - markStageAssignmentsChanged(true); + try { + jobStore.storeNewWorkers(jobMgr.getJobDetails(), workers); + LOGGER.info("Stored workers {} for Job {}", workers, jobId); + // refresh Worker Registry state before enqueuing task to Scheduler + markStageAssignmentsChanged(true); + + if (!workers.isEmpty()) { // queue to scheduler - workerRequests.forEach(this::queueTask); - } catch (Exception e) { - LOGGER.error("Error {} storing workers of job {}", e.getMessage(), jobId.getId(), e); - throw new RuntimeException("Exception saving worker for Job " + jobId, e); + queueTasks(workers, empty()); } - beg = en; + } catch (Exception e) { + LOGGER.error("Error {} storing workers of job {}", e.getMessage(), jobId.getId(), e); + throw new RuntimeException("Exception saving worker for Job " + jobId, e); } } - private void queueTask(final IMantisWorkerMetadata workerRequest, final Optional readyAt) { - final ScheduleRequest schedulingRequest = createSchedulingRequest(workerRequest, readyAt); - LOGGER.info("Queueing up scheduling request {} ", schedulingRequest); + private void queueTasks(final List workerRequests, final Optional readyAt) { + final List scheduleRequests = workerRequests + .stream() + .map(wR -> createSchedulingRequest(wR, readyAt)) + .collect(Collectors.toList()); + LOGGER.info("Queueing up batch schedule request for {} workers", workerRequests.size()); try { - scheduler.scheduleWorker(schedulingRequest); + scheduler.scheduleWorkers(new BatchScheduleRequest(scheduleRequests)); } catch (Exception e) { - LOGGER.error("Exception queueing task", e); + LOGGER.error("Exception queueing tasks", e); } } private void queueTask(final IMantisWorkerMetadata workerRequest) { - queueTask(workerRequest, empty()); + queueTasks(Collections.singletonList(workerRequest), empty()); } private ScheduleRequest createSchedulingRequest( @@ -1672,6 +1681,7 @@ private IMantisWorkerMetadata addWorker(SchedulingInfo schedulingInfo, int stage @Override public void shutdown() { + scheduler.unscheduleJob(jobId.getId()); // if workers have not already completed if (!allWorkerCompleted()) { // kill workers @@ -1805,6 +1815,9 @@ public void checkHeartBeats(Instant currentTime) { Instant acceptedAt = Instant.ofEpochMilli(workerMeta.getAcceptedAt()); if (Duration.between(acceptedAt, currentTime).getSeconds() > stuckInSubmitToleranceSecs) { // worker stuck in accepted + LOGGER.info("Job {}, Worker {} stuck in accepted state for {}", this.jobMgr.getJobId(), + workerMeta.getWorkerId(), Duration.between(acceptedAt, currentTime).getSeconds()); + workersToResubmit.add(worker); eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent( WARN, @@ -2016,6 +2029,7 @@ else if (currentWorkerNum < eventWorkerNum) { if (allWorkerStarted()) { allWorkersStarted = true; jobMgr.onAllWorkersStarted(); + scheduler.unscheduleJob(jobId.getId()); markStageAssignmentsChanged(true); } else if (allWorkerCompleted()) { LOGGER.info("Job {} All workers completed1", jobId); @@ -2169,7 +2183,7 @@ private void resubmitWorker(JobWorker oldWorker) throws Exception { // publish a refresh before enqueuing new Task to Scheduler markStageAssignmentsChanged(true); // queue the new worker for execution - queueTask(newWorker.getMetadata(), delayDuration); + queueTasks(Collections.singletonList(newWorker.getMetadata()), delayDuration); LOGGER.info("Worker {} successfully queued for scheduling", newWorker); numWorkerResubmissions.increment(); } else { diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobState.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobState.java index e06a84d5f..f633694a0 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobState.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobState.java @@ -181,4 +181,21 @@ public static boolean isRunningState(JobState state) { return false; } } + + /** + * Returns true if the job is accepted. + * + * @param state + * + * @return + */ + public static boolean isAcceptedState(JobState state) { + switch (state) { + case Accepted: + + return true; + default: + return false; + } + } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManager.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManager.java index 62f5dda34..ae3c0b05c 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManager.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManager.java @@ -16,9 +16,10 @@ package io.mantisrx.master.resourcecluster; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.BestFit; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetActiveJobsRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetClusterUsageRequest; -import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorBatchAssignmentRequest; import io.mantisrx.master.resourcecluster.proto.GetClusterIdleInstancesRequest; import io.mantisrx.master.resourcecluster.proto.GetClusterUsageResponse; import io.mantisrx.server.master.resourcecluster.ResourceCluster.ResourceOverview; @@ -29,7 +30,6 @@ import java.util.Set; import java.util.function.Predicate; import javax.annotation.Nullable; -import org.apache.commons.lang3.tuple.Pair; /** * A component to manage the states of {@link TaskExecutorState} for a given {@link ResourceClusterActor}. @@ -79,11 +79,14 @@ Optional> findFirst( Predicate> predicate); /** - * Find a matched task executor best fitting the given assignment request. + * Finds set of matched task executors best fitting the given assignment requests. + * * @param request Assignment request. - * @return Optional of matched task executor. + * @return Optional of matched task executors. */ - Optional> findBestFit(TaskExecutorAssignmentRequest request); + Optional findBestFit(TaskExecutorBatchAssignmentRequest request); + + void unscheduleJob(String jobId); Set> getActiveExecutorEntry(); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java index e471c3611..929a27b20 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java @@ -17,20 +17,25 @@ package io.mantisrx.master.resourcecluster; import io.mantisrx.common.WorkerConstants; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.BestFit; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetActiveJobsRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetClusterUsageRequest; -import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorBatchAssignmentRequest; import io.mantisrx.master.resourcecluster.proto.GetClusterIdleInstancesRequest; import io.mantisrx.master.resourcecluster.proto.GetClusterUsageResponse; import io.mantisrx.master.resourcecluster.proto.GetClusterUsageResponse.GetClusterUsageResponseBuilder; import io.mantisrx.master.resourcecluster.proto.GetClusterUsageResponse.UsageByGroupKey; +import io.mantisrx.runtime.MachineDefinition; import io.mantisrx.server.core.domain.WorkerId; import io.mantisrx.server.master.resourcecluster.ContainerSkuID; import io.mantisrx.server.master.resourcecluster.ResourceCluster.ResourceOverview; +import io.mantisrx.server.master.resourcecluster.TaskExecutorAllocationRequest; import io.mantisrx.server.master.resourcecluster.TaskExecutorID; import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration; import io.mantisrx.shaded.com.google.common.cache.Cache; import io.mantisrx.shaded.com.google.common.cache.CacheBuilder; +import io.mantisrx.shaded.com.google.common.cache.RemovalListener; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -47,6 +52,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import lombok.Builder; +import lombok.RequiredArgsConstructor; +import lombok.ToString; import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; @@ -54,6 +61,23 @@ @Slf4j public class ExecutorStateManagerImpl implements ExecutorStateManager { private final Map taskExecutorStateMap = new HashMap<>(); + Cache pendingJobRequests = CacheBuilder.newBuilder() + .maximumSize(1000) + .expireAfterWrite(10, TimeUnit.MINUTES) + .removalListener((RemovalListener) notification -> { + log.info("Removing key {} from pending job requests due to reason {}", notification.getKey(), notification.getCause()); + }) + .build(); + + @RequiredArgsConstructor + @ToString + static class JobRequirements { + public final Map coresToWorkerCount; + + public int getTotalWorkers() { + return coresToWorkerCount.values().stream().mapToInt(Integer::intValue).sum(); + } + } /** * Cache the available executors ready to accept assignments. Note these executors' state are not strongly @@ -227,10 +251,56 @@ public Optional> findFirst( } @Override - public Optional> findBestFit(TaskExecutorAssignmentRequest request) { + public void unscheduleJob(String jobId) { + pendingJobRequests.invalidate(jobId); + } + + @Override + public Optional findBestFit(TaskExecutorBatchAssignmentRequest request) { + + if (request.getAllocationRequests().isEmpty()) { + log.warn("TaskExecutorBatchAssignmentRequest {} with empty allocation requests.", request); + return Optional.empty(); + } + + boolean noResourcesAvailable = false; + final Map> bestFit = new HashMap<>(); + final boolean isJobIdAlreadyPending = pendingJobRequests.getIfPresent(request.getJobId()) != null; + + for (Entry> entry : request.getGroupedByMachineDef().entrySet()) { + final MachineDefinition machineDefinition = entry.getKey(); + final List allocationRequests = entry.getValue(); + + Optional> taskExecutors = findTaskExecutorsFor(request, machineDefinition, allocationRequests, isJobIdAlreadyPending); + + // Mark noResourcesAvailable if we can't find enough TEs for a given machine def + if (!taskExecutors.isPresent() || noResourcesAvailable) { + noResourcesAvailable = true; + continue; + } + + // Map each TE to a given allocation request + int index = 0; + for (Entry taskToStateEntry : taskExecutors.get().entrySet()) { + bestFit.put(allocationRequests.get(index), Pair.of(taskToStateEntry.getKey(), taskToStateEntry.getValue())); + index++; + } + } + + if (noResourcesAvailable) { + log.warn("Not all machine def had enough workers available to fulfill the request {}", request); + return Optional.empty(); + } else { + // Return best fit only if there are enough available TEs for all machine def + return Optional.of(new BestFit(bestFit)); + } + + } + + private Optional> findBestFitFor(TaskExecutorBatchAssignmentRequest request, MachineDefinition machineDefinition, Integer numWorkers) { // only allow allocation in the lowest CPU cores matching group. SortedMap> targetMap = - this.executorByCores.tailMap(request.getAllocationRequest().getMachineDefinition().getCpuCores()); + this.executorByCores.tailMap(machineDefinition.getCpuCores()); if (targetMap.isEmpty()) { log.warn("Cannot find any executor for request: {}", request); @@ -239,12 +309,12 @@ public Optional> findBestFit(TaskExecuto Double targetCoreCount = targetMap.firstKey(); log.debug("Applying assignmentReq: {} to {} cores.", request, targetCoreCount); - Double requestedCoreCount = request.getAllocationRequest().getMachineDefinition().getCpuCores(); + Double requestedCoreCount = machineDefinition.getCpuCores(); if (Math.abs(targetCoreCount - requestedCoreCount) > 1E-10) { // this mismatch should not happen in production and indicates TE registration/spec problem. log.warn("Requested core count mismatched. requested: {}, found: {} for {}", requestedCoreCount, - targetCoreCount, - request); + targetCoreCount, + request); } if (this.executorByCores.get(targetCoreCount).isEmpty()) { @@ -252,23 +322,25 @@ public Optional> findBestFit(TaskExecuto return Optional.empty(); } - return this.executorByCores.get(targetCoreCount) - .descendingSet() - .stream() - .filter(teHolder -> { - if (!this.taskExecutorStateMap.containsKey(teHolder.getId())) { - return false; - } - - TaskExecutorState st = this.taskExecutorStateMap.get(teHolder.getId()); - return st.isAvailable() && - st.getRegistration() != null && - st.getRegistration().getMachineDefinition().canFit( - request.getAllocationRequest().getMachineDefinition()); - }) - .findFirst() - .map(TaskExecutorHolder::getId) - .map(taskExecutorID -> Pair.of(taskExecutorID, this.taskExecutorStateMap.get(taskExecutorID))); + return Optional.of( + this.executorByCores.get(targetCoreCount) + .descendingSet() + .stream() + .filter(teHolder -> { + if (!this.taskExecutorStateMap.containsKey(teHolder.getId())) { + return false; + } + + TaskExecutorState st = this.taskExecutorStateMap.get(teHolder.getId()); + return st.isAvailable() && + st.getRegistration() != null && + st.getRegistration().getMachineDefinition().canFit(machineDefinition); + }) + .limit(numWorkers) + .map(TaskExecutorHolder::getId) + .collect(Collectors.toMap( + taskExecutorID -> taskExecutorID, + this.taskExecutorStateMap::get))); } @Override @@ -278,10 +350,12 @@ public Set> getActiveExecutorEntry() { @Override public GetClusterUsageResponse getClusterUsage(GetClusterUsageRequest req) { - log.info("Computing cluster usage: {}", req.getClusterID()); - // default grouping is containerSkuID to usage + Map pendingCountByGroupKey = new HashMap<>(); Map> usageByGroupKey = new HashMap<>(); + // helper struct to verify job has been fully deployed so we can remove it from pending + Map> jobIdToMachineDef = new HashMap<>(); + taskExecutorStateMap.forEach((key, value) -> { if (value == null || value.getRegistration() == null) { @@ -317,12 +391,35 @@ public GetClusterUsageResponse getClusterUsage(GetClusterUsageRequest req) { } else { usageByGroupKey.put(groupKey, kvState); } + + if ((value.isAssigned() || value.isRunningTask()) && value.getWorkerId() != null) { + if (pendingJobRequests.getIfPresent(value.getWorkerId().getJobId()) != null) { + List workers = jobIdToMachineDef.getOrDefault(value.getWorkerId().getJobId(), new ArrayList<>()); + workers.add(value.getRegistration().getMachineDefinition()); + jobIdToMachineDef.put(value.getWorkerId().getJobId(), workers); + } + } + + if (!pendingCountByGroupKey.containsKey(groupKey)) { + pendingCountByGroupKey.put( + groupKey, + getPendingCountyByCores(value.getRegistration().getMachineDefinition().getCpuCores())); + } + }); + + // remove jobs from pending set which have all pending workers + jobIdToMachineDef.forEach((jobId, workers) -> { + final JobRequirements jobStats = pendingJobRequests.getIfPresent(jobId); + if (jobStats != null && jobStats.getTotalWorkers() <= workers.size()) { + log.info("Removing job {} from pending requests", jobId); + pendingJobRequests.invalidate(jobId); + } }); GetClusterUsageResponseBuilder resBuilder = GetClusterUsageResponse.builder().clusterID(req.getClusterID()); usageByGroupKey.forEach((key, value) -> resBuilder.usage(UsageByGroupKey.builder() .usageGroupKey(key) - .idleCount(value.getLeft()) + .idleCount(value.getLeft() - pendingCountByGroupKey.get(key)) .totalCount(value.getRight()) .build())); @@ -331,6 +428,40 @@ public GetClusterUsageResponse getClusterUsage(GetClusterUsageRequest req) { return res; } + private int getPendingCountyByCores(Double cores) { + return pendingJobRequests + .asMap() + .values() + .stream() + .map(req -> req.coresToWorkerCount.getOrDefault(cores, 0)) + .reduce(Integer::sum) + .orElse(0); + } + + private Optional> findTaskExecutorsFor(TaskExecutorBatchAssignmentRequest request, MachineDefinition machineDefinition, List allocationRequests, boolean isJobIdAlreadyPending) { + // Finds best fit for N workers of the same machine def + final Optional> taskExecutors = findBestFitFor( + request, machineDefinition, allocationRequests.size()); + + // Verify that the number of task executors returned matches the asked + if (taskExecutors.isPresent() && taskExecutors.get().size() == allocationRequests.size()) { + return taskExecutors; + } else { + log.warn("Not enough available TEs found for machine def {} with core count: {}, request: {}", + machineDefinition, machineDefinition.getCpuCores(), request); + + // If there are not enough workers with the given spec then add the request the pending ones + if (!isJobIdAlreadyPending && request.getAllocationRequests().size() > 2) { + // Add jobId to pending requests only once + if (pendingJobRequests.getIfPresent(request.getJobId()) == null) { + log.info("Adding job {} to pending requests for {} machine {}", request.getJobId(), allocationRequests.size(), machineDefinition); + pendingJobRequests.put(request.getJobId(), new JobRequirements(request.getGroupedByCoresCount())); + } + } + return Optional.empty(); + } + } + /** * Holder class in {@link ExecutorStateManagerImpl} to wrap task executor ID with other metatdata needed during * scheduling e.g. generation. diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java index 954e6ebb6..948b161fd 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java @@ -29,6 +29,7 @@ import io.mantisrx.master.resourcecluster.metrics.ResourceClusterActorMetrics; import io.mantisrx.master.resourcecluster.proto.GetClusterIdleInstancesRequest; import io.mantisrx.master.resourcecluster.proto.GetClusterIdleInstancesResponse; +import io.mantisrx.runtime.MachineDefinition; import io.mantisrx.server.core.CacheJobArtifactsRequest; import io.mantisrx.server.core.domain.ArtifactID; import io.mantisrx.server.core.domain.WorkerId; @@ -204,7 +205,8 @@ public Receive createReceive() { .match(TaskExecutorStatusChange.class, this::onTaskExecutorStatusChange) .match(TaskExecutorDisconnection.class, this::onTaskExecutorDisconnection) .match(HeartbeatTimeout.class, this::onTaskExecutorHeartbeatTimeout) - .match(TaskExecutorAssignmentRequest.class, this::onTaskExecutorAssignmentRequest) + .match(TaskExecutorBatchAssignmentRequest.class, this::onTaskExecutorBatchAssignmentRequest) + .match(UnscheduleJobRequest.class, this::onUnscheduleJobRequest) .match(ResourceOverviewRequest.class, this::onResourceOverviewRequest) .match(TaskExecutorInfoRequest.class, this::onTaskExecutorInfoRequest) .match(TaskExecutorGatewayRequest.class, this::onTaskExecutorGatewayRequest) @@ -582,44 +584,50 @@ private void onTaskExecutorStatusChange(TaskExecutorStatusChange statusChange) { } } - private void onTaskExecutorAssignmentRequest(TaskExecutorAssignmentRequest request) { - Optional> matchedExecutor = - this.executorStateManager.findBestFit(request); - - if (matchedExecutor.isPresent()) { - log.info("matched executor {} for request {}", matchedExecutor.get().getKey(), request); + private void onUnscheduleJobRequest(UnscheduleJobRequest request) { + this.executorStateManager.unscheduleJob(request.jobId); + } - // trigger job artifact cache if needed - if(shouldCacheJobArtifacts(request)) { - self().tell(new AddNewJobArtifactsToCacheRequest(clusterID, Collections.singletonList(request.getAllocationRequest().getJobMetadata().getJobArtifact())), self()); - } + private void onTaskExecutorBatchAssignmentRequest(TaskExecutorBatchAssignmentRequest request) { + Optional matchedExecutors = this.executorStateManager.findBestFit(request); - matchedExecutor.get().getValue().onAssignment(request.getAllocationRequest().getWorkerId()); - // let's give some time for the assigned executor to be scheduled work. otherwise, the assigned executor - // will be returned back to the pool. - getTimers().startSingleTimer( - "Assignment-" + matchedExecutor.get().getKey().toString(), - new TaskExecutorAssignmentTimeout(matchedExecutor.get().getKey()), - assignmentTimeout); - sender().tell(matchedExecutor.get().getKey(), self()); + if (matchedExecutors.isPresent()) { + log.info("Matched all executors {} for request {}", matchedExecutors.get(), request); + matchedExecutors.get().getBestFit().forEach((allocationRequest, taskExecutorToState) -> assignTaskExecutor( + allocationRequest, taskExecutorToState.getLeft(), taskExecutorToState.getRight(), request)); + sender().tell(new TaskExecutorsAllocation(matchedExecutors.get().getRequestToTaskExecutorMap()), self()); } else { - metrics.incrementCounter( + request.allocationRequests.forEach(req -> metrics.incrementCounter( ResourceClusterActorMetrics.NO_RESOURCES_AVAILABLE, TagList.create(ImmutableMap.of( "resourceCluster", clusterID.getResourceID(), "workerId", - request.getAllocationRequest().getWorkerId().getId(), + req.getWorkerId().getId(), "jobCluster", - request.getAllocationRequest().getWorkerId().getJobCluster(), + req.getWorkerId().getJobCluster(), "cpuCores", - String.valueOf(request.getAllocationRequest().getMachineDefinition().getCpuCores())))); + String.valueOf(req.getMachineDefinition().getCpuCores()))))); sender().tell(new Status.Failure(new NoResourceAvailableException( String.format("No resource available for request %s: resource overview: %s", request, getResourceOverview()))), self()); } } + private void assignTaskExecutor(TaskExecutorAllocationRequest allocationRequest, TaskExecutorID taskExecutorID, TaskExecutorState taskExecutorState, TaskExecutorBatchAssignmentRequest request) { + if(shouldCacheJobArtifacts(allocationRequest)) { + self().tell(new AddNewJobArtifactsToCacheRequest(clusterID, Collections.singletonList(allocationRequest.getJobMetadata().getJobArtifact())), self()); + } + + taskExecutorState.onAssignment(allocationRequest.getWorkerId()); + // let's give some time for the assigned executor to be scheduled work. otherwise, the assigned executor + // will be returned back to the pool. + getTimers().startSingleTimer( + "Assignment-" + taskExecutorID.toString(), + new TaskExecutorAssignmentTimeout(taskExecutorID), + assignmentTimeout); + } + private void onTaskExecutorAssignmentTimeout(TaskExecutorAssignmentTimeout request) { TaskExecutorState state = this.executorStateManager.get(request.getTaskExecutorID()); if (state == null) { @@ -798,9 +806,9 @@ private void onCacheJobArtifactsOnTaskExecutorRequest(CacheJobArtifactsOnTaskExe * (this is to reduce the work in master) and if the job cluster is enabled (via config * for now) */ - private boolean shouldCacheJobArtifacts(TaskExecutorAssignmentRequest request) { - final WorkerId workerId = request.getAllocationRequest().getWorkerId(); - final boolean isFirstWorkerOfFirstStage = request.getAllocationRequest().getStageNum() == 1 && workerId.getWorkerIndex() == 0; + private boolean shouldCacheJobArtifacts(TaskExecutorAllocationRequest allocationRequest) { + final WorkerId workerId = allocationRequest.getWorkerId(); + final boolean isFirstWorkerOfFirstStage = allocationRequest.getStageNum() == 1 && workerId.getWorkerIndex() == 0; if (isFirstWorkerOfFirstStage) { final Set jobClusters = getJobClustersWithArtifactCachingEnabled(); return jobClusters.contains(workerId.getJobCluster()); @@ -820,11 +828,40 @@ private static class HeartbeatTimeout { } @Value - static class TaskExecutorAssignmentRequest { - TaskExecutorAllocationRequest allocationRequest; + static class UnscheduleJobRequest { + String jobId; ClusterID clusterID; } + @Value + static class TaskExecutorBatchAssignmentRequest { + Set allocationRequests; + ClusterID clusterID; + + public Map> getGroupedByMachineDef() { + return allocationRequests + .stream() + .collect(Collectors.groupingBy(TaskExecutorAllocationRequest::getMachineDefinition)); + } + + public Map getGroupedByCoresCount() { + return allocationRequests + .stream() + .collect(Collectors.groupingBy(TaskExecutorAllocationRequest::getMachineDefinition)) + .entrySet() + .stream() + .collect(Collectors.toMap( + e -> e.getKey().getCpuCores(), + e -> e.getValue().size(), + Integer::sum + )); + } + + public String getJobId() { + return allocationRequests.iterator().next().getWorkerId().getJobId(); + } + } + @Value static class TaskExecutorAssignmentTimeout { TaskExecutorID taskExecutorID; @@ -935,6 +972,11 @@ static class TaskExecutorsList { List taskExecutors; } + @Value + static class TaskExecutorsAllocation { + Map allocations; + } + @Value static class ArtifactList { List artifacts; @@ -986,6 +1028,25 @@ static class GetJobArtifactsToCacheRequest { ClusterID clusterID; } + + @Value + @Builder + static class BestFit { + Map> bestFit; + + public Map getRequestToTaskExecutorMap() { + return bestFit + .entrySet() + .stream() + .collect(Collectors.toMap( + Entry::getKey, + e -> e.getValue().getKey() + )); + } + } + + + /** * Represents the Availability of a given node in the resource cluster. * Can go from PENDING -> ASSIGNED(workerId) -> RUNNING(workerId) -> PENDING diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java index 997680b05..822af21b2 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java @@ -33,10 +33,12 @@ import io.mantisrx.master.resourcecluster.ResourceClusterActor.InitializeTaskExecutorRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.RemoveJobArtifactsToCacheRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.ResourceOverviewRequest; -import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorBatchAssignmentRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorInfoRequest; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorsAllocation; import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorsList; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.UnscheduleJobRequest; import io.mantisrx.master.resourcecluster.ResourceClusterScalerActor.TriggerClusterRuleRefreshRequest; import io.mantisrx.master.resourcecluster.proto.SetResourceClusterScalerStatusRequest; import io.mantisrx.server.core.domain.ArtifactID; @@ -56,6 +58,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -176,11 +179,21 @@ public CompletableFuture> getJobArtifactsToCache() { } @Override - public CompletableFuture getTaskExecutorFor(TaskExecutorAllocationRequest allocationRequest) { + public CompletableFuture> getTaskExecutorsFor(Set allocationRequests) { return Patterns - .ask(resourceClusterManagerActor, new TaskExecutorAssignmentRequest(allocationRequest, clusterID), askTimeout) - .thenApply(TaskExecutorID.class::cast) + .ask(resourceClusterManagerActor, new TaskExecutorBatchAssignmentRequest(allocationRequests, clusterID), askTimeout) + .thenApply(TaskExecutorsAllocation.class::cast) + .toCompletableFuture() + .thenApply(l -> l.getAllocations()); + } + + @Override + public CompletableFuture unscheduleJob(String jobId) { + return + Patterns + .ask(resourceClusterManagerActor, new UnscheduleJobRequest(jobId, clusterID), askTimeout) + .thenApply(Ack.class::cast) .toCompletableFuture(); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java index 382a3f8a6..4adf6e74c 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java @@ -33,9 +33,10 @@ import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetUnregisteredTaskExecutorsRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.RemoveJobArtifactsToCacheRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.ResourceOverviewRequest; -import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorBatchAssignmentRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest; import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorInfoRequest; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.UnscheduleJobRequest; import io.mantisrx.master.resourcecluster.ResourceClusterScalerActor.TriggerClusterRuleRefreshRequest; import io.mantisrx.master.resourcecluster.proto.SetResourceClusterScalerStatusRequest; import io.mantisrx.server.master.config.MasterConfiguration; @@ -135,7 +136,9 @@ public Receive createReceive() { getRCActor(statusChange.getClusterID()).forward(statusChange, context())) .match(TaskExecutorDisconnection.class, disconnection -> getRCActor(disconnection.getClusterID()).forward(disconnection, context())) - .match(TaskExecutorAssignmentRequest.class, req -> + .match(TaskExecutorBatchAssignmentRequest.class, req -> + getRCActor(req.getClusterID()).forward(req, context())) + .match(UnscheduleJobRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context())) .match(ResourceOverviewRequest.class, req -> getRCActor(req.getClusterID()).forward(req, context())) diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/SchedulingService.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/SchedulingService.java index 896938568..fc14e41a2 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/SchedulingService.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/SchedulingService.java @@ -43,6 +43,7 @@ import io.mantisrx.server.core.BaseService; import io.mantisrx.server.core.domain.WorkerId; import io.mantisrx.server.master.config.ConfigurationProvider; +import io.mantisrx.server.master.scheduler.BatchScheduleRequest; import io.mantisrx.server.master.scheduler.JobMessageRouter; import io.mantisrx.server.master.scheduler.LaunchTaskRequest; import io.mantisrx.server.master.scheduler.MantisScheduler; @@ -497,8 +498,13 @@ public void initializeRunningWorker(final ScheduleRequest request, String hostna } @Override - public void scheduleWorker(final ScheduleRequest scheduleRequest) { - taskQueue.queueTask(scheduleRequest); + public void scheduleWorkers(BatchScheduleRequest scheduleRequest) { + scheduleRequest.getScheduleRequests().forEach(taskQueue::queueTask); + } + + @Override + public void unscheduleJob(String jobId) { + // TODO: } @Override diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/BatchScheduleRequest.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/BatchScheduleRequest.java new file mode 100644 index 000000000..bdc4a36ae --- /dev/null +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/BatchScheduleRequest.java @@ -0,0 +1,26 @@ +/* + * Copyright 2023 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mantisrx.server.master.scheduler; + +import java.util.List; +import lombok.Value; + +@Value +public class BatchScheduleRequest { + + List scheduleRequests; +} diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/MantisScheduler.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/MantisScheduler.java index 695f43a4a..5f7a7fcbf 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/MantisScheduler.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/MantisScheduler.java @@ -30,7 +30,9 @@ public interface MantisScheduler { * * @param scheduleRequest worker to schedule */ - void scheduleWorker(final ScheduleRequest scheduleRequest); + void scheduleWorkers(final BatchScheduleRequest scheduleRequest); + + void unscheduleJob(final String jobId); /** * Mark the worker to be removed from the Scheduling queue. This is expected to be called for all tasks that were added to the Scheduler, whether or diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareScheduler.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareScheduler.java index c55a44e1b..831a2fda1 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareScheduler.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareScheduler.java @@ -21,9 +21,10 @@ import com.netflix.fenzo.VirtualMachineLease; import io.mantisrx.server.core.domain.WorkerId; import io.mantisrx.server.master.resourcecluster.TaskExecutorID; +import io.mantisrx.server.master.scheduler.ResourceClusterAwareSchedulerActor.BatchScheduleRequestEvent; +import io.mantisrx.server.master.scheduler.ResourceClusterAwareSchedulerActor.CancelBatchRequestEvent; import io.mantisrx.server.master.scheduler.ResourceClusterAwareSchedulerActor.CancelRequestEvent; import io.mantisrx.server.master.scheduler.ResourceClusterAwareSchedulerActor.InitializeRunningWorkerRequestEvent; -import io.mantisrx.server.master.scheduler.ResourceClusterAwareSchedulerActor.ScheduleRequestEvent; import io.mantisrx.shaded.com.google.common.base.Throwables; import java.util.List; import java.util.Optional; @@ -37,8 +38,13 @@ public class ResourceClusterAwareScheduler implements MantisScheduler { private final ActorRef schedulerActor; @Override - public void scheduleWorker(ScheduleRequest scheduleRequest) { - schedulerActor.tell(ScheduleRequestEvent.of(scheduleRequest), null); + public void scheduleWorkers(BatchScheduleRequest scheduleRequest) { + schedulerActor.tell(BatchScheduleRequestEvent.of(scheduleRequest), null); + } + + @Override + public void unscheduleJob(String jobId) { + schedulerActor.tell(CancelBatchRequestEvent.of(jobId),null); } @Override diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java index 31b04cfc4..409193dca 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java @@ -40,13 +40,18 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.Collections; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import javax.annotation.Nullable; import lombok.Value; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; import org.apache.flink.util.ExceptionUtils; @Slf4j @@ -60,6 +65,7 @@ class ResourceClusterAwareSchedulerActor extends AbstractActorWithTimers { private final Duration intervalBetweenRetries; private final Timer schedulingLatency; private final Counter schedulingFailures; + private final Counter batchSchedulingFailures; private final Counter connectionFailures; public static Props props( @@ -93,12 +99,14 @@ public ResourceClusterAwareSchedulerActor( new Metrics.Builder() .id(metricsGroup, Tag.of("resourceCluster", resourceCluster.getName())) .addTimer("schedulingLatency") + .addCounter("batchSchedulingFailures") .addCounter("schedulingFailures") .addCounter("connectionFailures") .build(); metricsRegistry.registerAndGet(metrics); this.schedulingLatency = metrics.getTimer("schedulingLatency"); this.schedulingFailures = metrics.getCounter("schedulingFailures"); + this.batchSchedulingFailures = metrics.getCounter("batchSchedulingFailures"); this.connectionFailures = metrics.getCounter("connectionFailures"); } @@ -106,6 +114,13 @@ public ResourceClusterAwareSchedulerActor( @Override public Receive createReceive() { return ReceiveBuilder.create() + // batch schedule requests + .match(BatchScheduleRequestEvent.class, this::onBatchScheduleRequestEvent) + .match(AssignedBatchScheduleRequestEvent.class, this::onAssignedBatchScheduleRequestEvent) + .match(FailedToBatchScheduleRequestEvent.class, this::onFailedToBatchScheduleRequestEvent) + .match(CancelBatchRequestEvent.class, this::onCancelBatchRequestEvent) + + // single schedule request .match(ScheduleRequestEvent.class, this::onScheduleRequestEvent) .match(InitializeRunningWorkerRequestEvent.class, this::onInitializeRunningWorkerRequest) .match(CancelRequestEvent.class, this::onCancelRequestEvent) @@ -120,6 +135,58 @@ public Receive createReceive() { .build(); } + private void onBatchScheduleRequestEvent(BatchScheduleRequestEvent event) { + log.info("Received batch schedule request event: {}", event); + + // If the size of the batch request is 1 we'll handle it as the "old" schedule request + if (event.getRequest().getScheduleRequests().size() == 1) { + final ScheduleRequestEvent scheduleRequestEvent = ScheduleRequestEvent.of(event); + self().tell(scheduleRequestEvent, self()); + } else { + if (event.isRetry()) { + log.info("Retrying Batch Schedule Request {}, attempt {}", event.getRequest(), + event.getAttempt()); + } + + CompletableFuture assignedFuture = + resourceCluster + .getTaskExecutorsFor(event.getTaskExecutorAllocationRequests()) + .thenApply(event::onAssignment) + .exceptionally(event::onFailure); + + pipe(assignedFuture, getContext().getDispatcher()).to(self()); + } + } + + private void onAssignedBatchScheduleRequestEvent(AssignedBatchScheduleRequestEvent event) { + // Each and every allocation of a batch schedule request will be handled separately (ie. assignment, retry etc etc) + event + .getAllocations() + .forEach((key, value) -> { + final ScheduleRequest scheduleRequest = event.getScheduleRequestEvent().getAllocationRequestScheduleRequestMap().get(key); + final ScheduleRequestEvent scheduleRequestEvent = ScheduleRequestEvent.of(scheduleRequest, event.getScheduleRequestEvent().eventTime); + final AssignedScheduleRequestEvent assignedScheduleRequestEvent = new AssignedScheduleRequestEvent(scheduleRequestEvent, value); + + self().tell(assignedScheduleRequestEvent, self()); + }); + } + + private void onFailedToBatchScheduleRequestEvent(FailedToBatchScheduleRequestEvent event) { + batchSchedulingFailures.increment(); + if (event.getAttempt() >= this.maxScheduleRetries) { + log.error("Failed to submit the batch request {} because of ", event.getScheduleRequestEvent(), event.getThrowable()); + } else { + Duration timeout = Duration.ofMillis(intervalBetweenRetries.toMillis()); + log.error("Failed to submit the request {}; Retrying in {} because of ", + event.getScheduleRequestEvent(), timeout, event.getThrowable()); + + getTimers().startSingleTimer( + getBatchSchedulingQueueKeyFor(event.getScheduleRequestEvent().getJobId()), + event.onRetry(), + timeout); + } + } + private void onScheduleRequestEvent(ScheduleRequestEvent event) { if (event.isRetry()) { log.info("Retrying Schedule Request {}, attempt {}", event.getRequest(), @@ -128,10 +195,10 @@ private void onScheduleRequestEvent(ScheduleRequestEvent event) { CompletableFuture assignedFuture = resourceCluster - .getTaskExecutorFor( - TaskExecutorAllocationRequest.of( - event.getRequest().getWorkerId(), event.getRequest().getMachineDefinition(), event.getRequest().getJobMetadata(), event.getRequest().getStageNum())) - .thenApply(event::onAssignment) + .getTaskExecutorsFor( + Collections.singleton(TaskExecutorAllocationRequest.of( + event.getRequest().getWorkerId(), event.getRequest().getMachineDefinition(), event.getRequest().getJobMetadata(), event.getRequest().getStageNum()))) + .thenApply(allocation -> event.onAssignment(allocation.values().stream().findFirst().get())) .exceptionally(event::onFailure); pipe(assignedFuture, getContext().getDispatcher()).to(self()); @@ -311,9 +378,60 @@ private void onRetryCancelRequestEvent(RetryCancelRequestEvent event) { } } + private void onCancelBatchRequestEvent(CancelBatchRequestEvent event) { + resourceCluster.unscheduleJob(event.getJobId()); + getTimers().cancel(getBatchSchedulingQueueKeyFor(event.getJobId())); + } + private void onNoop(Noop event) { } + @Value + static class BatchScheduleRequestEvent { + BatchScheduleRequest request; + int attempt; + @Nullable + Throwable previousFailure; + Instant eventTime; + Map allocationRequestScheduleRequestMap; + + BatchScheduleRequestEvent(BatchScheduleRequest request, int attempt, Throwable previousFailure, Instant eventTime) { + this.request = request; + this.attempt = attempt; + this.previousFailure = previousFailure; + this.eventTime = eventTime; + this.allocationRequestScheduleRequestMap = request + .getScheduleRequests() + .stream() + .map(req -> Pair.of(req, TaskExecutorAllocationRequest.of(req.getWorkerId(), req.getMachineDefinition(), req.getJobMetadata(), req.getStageNum()))) + .collect(Collectors.toMap(Pair::getRight, Pair::getLeft)); + } + + boolean isRetry() { + return attempt > 1; + } + + static BatchScheduleRequestEvent of(BatchScheduleRequest request) { + return new BatchScheduleRequestEvent(request, 1, null, Clock.systemDefaultZone().instant()); + } + + AssignedBatchScheduleRequestEvent onAssignment(Map allocations) { + return new AssignedBatchScheduleRequestEvent(this, allocations); + } + + FailedToBatchScheduleRequestEvent onFailure(Throwable throwable) { + return new FailedToBatchScheduleRequestEvent(this, this.attempt, throwable); + } + + Set getTaskExecutorAllocationRequests() { + return allocationRequestScheduleRequestMap.keySet(); + } + + String getJobId() { + return request.getScheduleRequests().get(0).getJobMetadata().getJobId(); + } + } + @Value static class ScheduleRequestEvent { @@ -327,8 +445,12 @@ boolean isRetry() { return attempt > 1; } - static ScheduleRequestEvent of(ScheduleRequest request) { - return new ScheduleRequestEvent(request, 1, null, Clock.systemDefaultZone().instant()); + static ScheduleRequestEvent of(BatchScheduleRequestEvent request) { + return new ScheduleRequestEvent(request.getRequest().getScheduleRequests().get(0), 1, null, Clock.systemDefaultZone().instant()); + } + + static ScheduleRequestEvent of(ScheduleRequest request, Instant eventTime) { + return new ScheduleRequestEvent(request, 1, null, eventTime); } FailedToScheduleRequestEvent onFailure(Throwable throwable) { @@ -363,6 +485,29 @@ private ScheduleRequestEvent onRetry() { } } + @Value + private static class AssignedBatchScheduleRequestEvent { + + BatchScheduleRequestEvent scheduleRequestEvent; + Map allocations; + } + + @Value + private static class FailedToBatchScheduleRequestEvent { + + BatchScheduleRequestEvent scheduleRequestEvent; + int attempt; + Throwable throwable; + + private BatchScheduleRequestEvent onRetry() { + return new BatchScheduleRequestEvent( + scheduleRequestEvent.getRequest(), + attempt + 1, + this.throwable, + scheduleRequestEvent.getEventTime()); + } + } + @Value private static class AssignedScheduleRequestEvent { @@ -401,6 +546,15 @@ RetryCancelRequestEvent onFailure(Throwable throwable) { } } + @Value + static class CancelBatchRequestEvent { + String jobId; + + static CancelBatchRequestEvent of(String jobId) { + return new CancelBatchRequestEvent(jobId); + } + } + @Value private static class RetryCancelRequestEvent { @@ -421,4 +575,8 @@ private static class Noop { private String getSchedulingQueueKeyFor(WorkerId workerId) { return "Retry-Schedule-Request-For" + workerId.toString(); } + + private String getBatchSchedulingQueueKeyFor(String jobId) { + return "Retry-Batch-Schedule-Request-For" + jobId; + } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/api/akka/route/v1/JobsRouteTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/api/akka/route/v1/JobsRouteTest.java index 14b0a4c60..d68067c5e 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/api/akka/route/v1/JobsRouteTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/api/akka/route/v1/JobsRouteTest.java @@ -249,7 +249,6 @@ public static void teardown() { t.interrupt(); } - @Test public void testIt() throws InterruptedException { cleanupExistingJobs(); setupJobCluster(); diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterTest.java index 0d6419e8e..34748f448 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterTest.java @@ -2425,6 +2425,8 @@ public void testScaleStage() { jobClusterActor.tell(new SubmitJobRequest(clusterName, "user", jobDefn), probe.getRef()); SubmitJobResponse submitResponse = probe.expectMsgClass(SubmitJobResponse.class); + System.out.println("SUBMIT RESP: " + submitResponse); + JobTestHelper.sendLaunchedInitiatedStartedEventsToWorker(probe, jobClusterActor, jobId, 0, new WorkerId(clusterName, jobId, 0, 1)); JobTestHelper.sendLaunchedInitiatedStartedEventsToWorker(probe, jobClusterActor, jobId, 1, new WorkerId(clusterName, jobId, 0, 2)); @@ -2448,7 +2450,7 @@ public void testScaleStage() { verify(jobStoreMock, times(3)).updateJob(any()); // initial worker and scale up worker - verify(schedulerMock, times(3)).scheduleWorker(any()); + verify(schedulerMock, times(2)).scheduleWorkers(any()); @@ -2956,7 +2958,7 @@ public void testLostWorkerGetsReplaced() { } // 2 initial schedules and 1 replacement - verify(schedulerMock, timeout(1_000).times(2)).scheduleWorker(any()); + verify(schedulerMock, timeout(1_000).times(2)).scheduleWorkers(any()); // archive worker should get called once for the dead worker // verify(jobStoreMock, timeout(1_000).times(1)).archiveWorker(any()); diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobClusterManagerTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobClusterManagerTest.java index f933825b9..4f90b1e56 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobClusterManagerTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobClusterManagerTest.java @@ -496,6 +496,18 @@ public void testBootStrapJobClustersAndJobs() { 0, 1)); + jobClusterManagerActor.tell(new GetJobDetailsRequest( + "user", + JobId.fromId("testBootStrapJobClustersAndJobs3-1") + .get()), probe.getRef()); + GetJobDetailsResponse acceptedResponse = probe.expectMsgClass(Duration.of(10, ChronoUnit.MINUTES), + GetJobDetailsResponse.class); + + System.out.println("[fdc-92] acceptedResponse -> " + acceptedResponse); + // Ensure its Accepted + assertEquals(SUCCESS, acceptedResponse.responseCode); + assertEquals(JobState.Accepted, acceptedResponse.getJobMetadata().get().getState()); + try { Thread.sleep(500); } catch (InterruptedException e) { @@ -541,7 +553,7 @@ public void testBootStrapJobClustersAndJobs() { GetJobDetailsResponse resp2 = probe.expectMsgClass(GetJobDetailsResponse.class); // Ensure its launched - System.out.println("Resp2 -> " + resp2.message); + System.out.println("Resp2 -> " + resp2); assertEquals(SUCCESS, resp2.responseCode); assertEquals(JobState.Launched, resp2.getJobMetadata().get().getState()); @@ -565,6 +577,7 @@ public void testBootStrapJobClustersAndJobs() { resp2 = probe.expectMsgClass(Duration.of(10, ChronoUnit.MINUTES), GetJobDetailsResponse.class); + System.out.println("[fdc-92] resp -> " + resp2); // Ensure its Accepted assertEquals(SUCCESS, resp2.responseCode); assertEquals(JobState.Accepted, resp2.getJobMetadata().get().getState()); @@ -619,7 +632,7 @@ public void testBootStrapJobClustersAndJobs() { any()); // 2 worker schedule requests - verify(schedulerMock, timeout(100_000).times(4)).scheduleWorker(any()); + verify(schedulerMock, timeout(100_000).times(4)).scheduleWorkers(any()); try { Mockito.verify(jobStoreSpied).loadAllArchivedJobsAsync(); @@ -795,7 +808,7 @@ public void testBootstrapJobClusterAndJobsWithCorruptedWorkerPorts() }); // Two schedules: one for the initial success, one for a resubmit from corrupted worker ports. - verify(schedulerMock, times(2)).scheduleWorker(any()); + verify(schedulerMock, times(2)).scheduleWorkers(any()); // One unschedule from corrupted worker ID 1 (before the resubmit). verify(schedulerMock, times(1)).unscheduleAndTerminateWorker(eq(workerId), any()); diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobScaleUpDownTests.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobScaleUpDownTests.java index 16c6a48bd..39d768d35 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobScaleUpDownTests.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobScaleUpDownTests.java @@ -127,25 +127,25 @@ public void testJobScaleUp() throws Exception, InvalidJobException, io.mantisrx. ActorRef jobActor = JobTestHelper.submitSingleStageScalableJob(system,probe, clusterName, sInfo, schedulerMock, jobStoreMock, lifecycleEventPublisher); // send scale up request - jobActor.tell(new JobClusterManagerProto.ScaleStageRequest(clusterName+"-1", 1, 2, "", ""), probe.getRef()); + jobActor.tell(new JobClusterManagerProto.ScaleStageRequest(clusterName+"-1", 1, 3, "", ""), probe.getRef()); JobClusterManagerProto.ScaleStageResponse scaleResp = probe.expectMsgClass(JobClusterManagerProto.ScaleStageResponse.class); System.out.println("ScaleupResp " + scaleResp.message); assertEquals(SUCCESS, scaleResp.responseCode); - assertEquals(2,scaleResp.getActualNumWorkers()); + assertEquals(3,scaleResp.getActualNumWorkers()); verify(jobStoreMock, times(1)).storeNewJob(any()); // initial worker verify(jobStoreMock, times(1)).storeNewWorkers(any(),any()); //scale up worker - verify(jobStoreMock, times(1)).storeNewWorker(any()); + verify(jobStoreMock, times(2)).storeNewWorker(any()); verify(jobStoreMock, times(6)).updateWorker(any()); verify(jobStoreMock, times(3)).updateJob(any()); // initial worker + job master and scale up worker - verify(schedulerMock, times(3)).scheduleWorker(any()); + verify(schedulerMock, times(3)).scheduleWorkers(any()); } @@ -190,7 +190,7 @@ public void testJobScaleDown() throws Exception, InvalidJobException, io.mantisr verify(schedulerMock, times(1)).unscheduleAndTerminateWorker(any(), any()); // 1 job master + 2 workers - verify(schedulerMock, times(3)).scheduleWorker(any()); + verify(schedulerMock, times(1)).scheduleWorkers(any()); } @@ -528,7 +528,7 @@ public void testJobScaleUpFailsIfNoScaleStrategy() throws Exception { verify(jobStoreMock, times(3)).updateJob(any()); // initial worker only - verify(schedulerMock, times(1)).scheduleWorker(any()); + verify(schedulerMock, times(1)).scheduleWorkers(any()); } @Test @@ -571,7 +571,7 @@ public void testJobScaleUpFailsIfMinEqualsMax() throws Exception { verify(jobStoreMock, times(3)).updateJob(any()); // initial worker only - verify(schedulerMock, times(1)).scheduleWorker(any()); + verify(schedulerMock, times(1)).scheduleWorkers(any()); } @Test public void stageScalingPolicyTest() { diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java index 9d7331806..6b2eb7f6a 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java @@ -58,6 +58,7 @@ import io.mantisrx.server.master.persistence.IMantisPersistenceProvider; import io.mantisrx.server.master.persistence.KeyValueBasedPersistenceProvider; import io.mantisrx.server.master.persistence.MantisJobStore; +import io.mantisrx.server.master.scheduler.BatchScheduleRequest; import io.mantisrx.server.master.scheduler.MantisScheduler; import io.mantisrx.server.master.scheduler.ScheduleRequest; import io.mantisrx.server.master.store.FileBasedStore; @@ -65,6 +66,7 @@ import java.io.IOException; import java.net.URL; import java.time.Instant; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -296,13 +298,14 @@ public void testJobSubmitPerpetual() { //verify(jobStoreMock, times(3)) - verify(schedulerMock,times(1)).scheduleWorker(any()); + verify(schedulerMock,times(1)).scheduleWorkers(any()); JobMetadata jobMetadata = new JobMetadata(jobId, new URL("http://myart" + ""),1,"njoshi",schedInfo,Lists.newArrayList(),0,10, 0); - ScheduleRequest expectedScheduleRequest = new ScheduleRequest(workerId, + ScheduleRequest scheduleRequest = new ScheduleRequest(workerId, 1,4, jobMetadata,MantisJobDurationType.Perpetual,machineDefinition,Lists.newArrayList(),Lists.newArrayList(),0,empty()); - verify(schedulerMock).scheduleWorker(expectedScheduleRequest); + BatchScheduleRequest expectedRequest = new BatchScheduleRequest(Collections.singletonList(scheduleRequest)); + verify(schedulerMock).scheduleWorkers(expectedRequest); //assertEquals(jobActor, probe.getLastSender()); @@ -721,7 +724,7 @@ public void testkill() throws Exception { JobTestHelper.sendWorkerTerminatedEvent(probe,jobActor,jId.getId(),new WorkerId(jId.getId(),0,1)); Thread.sleep(1000); verify(schedulerMock, times(1)).unscheduleAndTerminateWorker(any(), any()); - verify(schedulerMock, times(1)).scheduleWorker(any()); + verify(schedulerMock, times(1)).scheduleWorkers(any()); verify(jobStoreMock, times(1)).storeNewJob(any()); verify(jobStoreMock, times(1)).storeNewWorkers(any(),any()); verify(jobStoreMock, times(2)).updateJob(any()); @@ -807,7 +810,7 @@ public void testHeartBeatEnforcement() { Thread.sleep(1000); // 2 original submissions and 2 resubmits because of HB timeouts - verify(schedulerMock, times(4)).scheduleWorker(any()); + verify(schedulerMock, times(3)).scheduleWorkers(any()); // 2 kills due to resubmits verify(schedulerMock, times(2)).unscheduleAndTerminateWorker(any(), any()); @@ -919,7 +922,7 @@ public void testLostWorkerGetsReplaced() { } // 2 initial schedules and 1 replacement - verify(schedulerMock, timeout(1_000).times(3)).scheduleWorker(any()); + verify(schedulerMock, timeout(1_000).times(2)).scheduleWorkers(any()); // archive worker should get called once for the dead worker // verify(jobStoreMock, timeout(1_000).times(1)).archiveWorker(any()); diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestMigrationTests.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestMigrationTests.java index b333f3ee2..d8e8b9754 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestMigrationTests.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestMigrationTests.java @@ -44,6 +44,7 @@ import io.mantisrx.server.master.domain.JobDefinition; import io.mantisrx.server.master.domain.JobId; import io.mantisrx.server.master.persistence.MantisJobStore; +import io.mantisrx.server.master.scheduler.BatchScheduleRequest; import io.mantisrx.server.master.scheduler.MantisScheduler; import io.mantisrx.server.master.scheduler.ScheduleRequest; import io.mantisrx.server.master.scheduler.WorkerOnDisabledVM; @@ -179,11 +180,13 @@ public DummyScheduler(CountDownLatch scheduleCDL, CountDownLatch unscheduleCDL) } @Override - public void scheduleWorker(ScheduleRequest scheduleRequest) { - // TODO Auto-generated method stub - System.out.println("----------------------> schedule Worker Called"); - schedL.countDown(); + public void scheduleWorkers(BatchScheduleRequest scheduleRequest) { + // TODO: + } + @Override + public void unscheduleJob(String jobId) { + // TODO: } @Override diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerTests.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerTests.java index 30426aadf..05bc07ab8 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerTests.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerTests.java @@ -28,7 +28,8 @@ import io.mantisrx.common.WorkerPorts; import io.mantisrx.common.util.DelegateClock; import io.mantisrx.master.resourcecluster.ExecutorStateManagerImpl.TaskExecutorHolder; -import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.BestFit; +import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorBatchAssignmentRequest; import io.mantisrx.runtime.MachineDefinition; import io.mantisrx.server.core.TestingRpcService; import io.mantisrx.server.core.domain.WorkerId; @@ -45,11 +46,11 @@ import java.time.Clock; import java.time.Instant; import java.time.ZoneId; +import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.lang3.tuple.Pair; import org.junit.Before; import org.junit.Test; @@ -129,9 +130,9 @@ public void setup() { @Test public void testGetBestFit() { - Optional> bestFitO = - stateManager.findBestFit(new TaskExecutorAssignmentRequest( - TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2, null, 0), + Optional bestFitO = + stateManager.findBestFit(new TaskExecutorBatchAssignmentRequest( + Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2, null, 0)), CLUSTER_ID)); assertFalse(bestFitO.isPresent()); @@ -153,26 +154,26 @@ public void testGetBestFit() { // test machine def 1 bestFitO = - stateManager.findBestFit(new TaskExecutorAssignmentRequest( - TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_1, null, 0), CLUSTER_ID)); + stateManager.findBestFit(new TaskExecutorBatchAssignmentRequest( + Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_1, null, 0)), CLUSTER_ID)); assertTrue(bestFitO.isPresent()); - assertEquals(TASK_EXECUTOR_ID_1, bestFitO.get().getLeft()); - assertEquals(state1, bestFitO.get().getRight()); + assertEquals(TASK_EXECUTOR_ID_1, bestFitO.get().getBestFit().values().stream().findFirst().get().getLeft()); + assertEquals(state1, bestFitO.get().getBestFit().values().stream().findFirst().get().getRight()); bestFitO = - stateManager.findBestFit(new TaskExecutorAssignmentRequest( - TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2, null, 0), CLUSTER_ID)); + stateManager.findBestFit(new TaskExecutorBatchAssignmentRequest( + Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2, null, 0)), CLUSTER_ID)); assertTrue(bestFitO.isPresent()); - assertEquals(TASK_EXECUTOR_ID_2, bestFitO.get().getLeft()); - assertEquals(state2, bestFitO.get().getRight()); + assertEquals(TASK_EXECUTOR_ID_2, bestFitO.get().getBestFit().values().stream().findFirst().get().getLeft()); + assertEquals(state2, bestFitO.get().getBestFit().values().stream().findFirst().get().getRight()); // disable e1 and should get nothing state1.onTaskExecutorStatusChange(new TaskExecutorStatusChange(TASK_EXECUTOR_ID_1, CLUSTER_ID, TaskExecutorReport.occupied(WORKER_ID))); bestFitO = - stateManager.findBestFit(new TaskExecutorAssignmentRequest( - TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_1, null, 0), CLUSTER_ID)); + stateManager.findBestFit(new TaskExecutorBatchAssignmentRequest( + Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_1, null, 0)), CLUSTER_ID)); assertFalse(bestFitO.isPresent()); // enable e3 and disable e2 @@ -183,18 +184,18 @@ public void testGetBestFit() { TaskExecutorReport.occupied(WORKER_ID))); bestFitO = - stateManager.findBestFit(new TaskExecutorAssignmentRequest( - TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2, null, 0), CLUSTER_ID)); + stateManager.findBestFit(new TaskExecutorBatchAssignmentRequest( + Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2, null, 0)), CLUSTER_ID)); assertTrue(bestFitO.isPresent()); - assertEquals(TASK_EXECUTOR_ID_3, bestFitO.get().getLeft()); - assertEquals(state3, bestFitO.get().getRight()); + assertEquals(TASK_EXECUTOR_ID_3, bestFitO.get().getBestFit().values().stream().findFirst().get().getLeft()); + assertEquals(state3, bestFitO.get().getBestFit().values().stream().findFirst().get().getRight()); // test mark as unavailable stateManager.tryMarkUnavailable(TASK_EXECUTOR_ID_3); bestFitO = - stateManager.findBestFit(new TaskExecutorAssignmentRequest( - TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2, null, 0), CLUSTER_ID)); + stateManager.findBestFit(new TaskExecutorBatchAssignmentRequest( + Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2, null, 0)), CLUSTER_ID)); assertFalse(bestFitO.isPresent()); } @@ -232,10 +233,10 @@ public void testTaskExecutorHolderCreation() { @Test public void testGetBestFit_WithGenerationFromScaleGroup() { - Optional> bestFitO = + Optional bestFitO = stateManager.findBestFit( - new TaskExecutorAssignmentRequest( - TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2, null, 0), + new TaskExecutorBatchAssignmentRequest( + Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2, null, 0)), CLUSTER_ID)); assertFalse(bestFitO.isPresent()); @@ -267,13 +268,13 @@ public void testGetBestFit_WithGenerationFromScaleGroup() { // should get te1 with group2 bestFitO = stateManager.findBestFit( - new TaskExecutorAssignmentRequest( - TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_1, null, 0), + new TaskExecutorBatchAssignmentRequest( + Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_1, null, 0)), CLUSTER_ID)); assertTrue(bestFitO.isPresent()); - assertEquals(TASK_EXECUTOR_ID_1, bestFitO.get().getLeft()); - assertEquals(teState1, bestFitO.get().getRight()); + assertEquals(TASK_EXECUTOR_ID_1, bestFitO.get().getBestFit().values().stream().findFirst().get().getLeft()); + assertEquals(teState1, bestFitO.get().getBestFit().values().stream().findFirst().get().getRight()); // add new TE in group1 doesn't affect result. TaskExecutorState teState4 = registerNewTaskExecutor(TaskExecutorID.of("te4"), @@ -283,13 +284,13 @@ public void testGetBestFit_WithGenerationFromScaleGroup() { bestFitO = stateManager.findBestFit( - new TaskExecutorAssignmentRequest( - TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_1, null, 0), + new TaskExecutorBatchAssignmentRequest( + Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_1, null, 0)), CLUSTER_ID)); assertTrue(bestFitO.isPresent()); - assertEquals(TASK_EXECUTOR_ID_1, bestFitO.get().getLeft()); - assertEquals(teState1, bestFitO.get().getRight()); + assertEquals(TASK_EXECUTOR_ID_1, bestFitO.get().getBestFit().values().stream().findFirst().get().getLeft()); + assertEquals(teState1, bestFitO.get().getBestFit().values().stream().findFirst().get().getRight()); // remove te1 and add new te in both groups teState1.onTaskExecutorStatusChange( @@ -308,28 +309,28 @@ public void testGetBestFit_WithGenerationFromScaleGroup() { bestFitO = stateManager.findBestFit( - new TaskExecutorAssignmentRequest( - TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_1, null, 0), + new TaskExecutorBatchAssignmentRequest( + Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_1, null, 0)), CLUSTER_ID)); assertTrue(bestFitO.isPresent()); - assertEquals(te5Id, bestFitO.get().getLeft()); - assertEquals(teState5, bestFitO.get().getRight()); + assertEquals(te5Id, bestFitO.get().getBestFit().values().stream().findFirst().get().getLeft()); + assertEquals(teState5, bestFitO.get().getBestFit().values().stream().findFirst().get().getRight()); // disable all group2 TEs and allow bestFit from group1 teState5.onTaskExecutorStatusChange( new TaskExecutorStatusChange(te5Id, CLUSTER_ID, TaskExecutorReport.occupied(WORKER_ID))); bestFitO = stateManager.findBestFit( - new TaskExecutorAssignmentRequest( - TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_1, null, 0), + new TaskExecutorBatchAssignmentRequest( + Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_1, null, 0)), CLUSTER_ID)); assertTrue(bestFitO.isPresent()); - assertNotEquals(te5Id, bestFitO.get().getLeft()); - assertNotEquals(TASK_EXECUTOR_ID_1, bestFitO.get().getLeft()); + assertNotEquals(te5Id, bestFitO.get().getBestFit().values().stream().findFirst().get().getLeft()); + assertNotEquals(TASK_EXECUTOR_ID_1, bestFitO.get().getBestFit().values().stream().findFirst().get().getLeft()); assertEquals(SCALE_GROUP_1, - Objects.requireNonNull(bestFitO.get().getRight().getRegistration()) + Objects.requireNonNull(bestFitO.get().getBestFit().values().stream().findFirst().get().getRight().getRegistration()) .getAttributeByKey(WorkerConstants.AUTO_SCALE_GROUP_KEY).orElse("invalid")); assertNotNull(stateManager.get(TASK_EXECUTOR_ID_1)); diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java index 648e1ae72..c7e2ae620 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java @@ -66,10 +66,13 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import org.apache.flink.util.ExceptionUtils; import org.junit.AfterClass; @@ -202,7 +205,7 @@ public void setupActor() { Duration.ofSeconds(1), CLUSTER_ID, mapper, - 100); + () -> 10000); } @Test @@ -232,9 +235,10 @@ public void testGetFreeTaskExecutors() throws Exception { TASK_EXECUTOR_ID, CLUSTER_ID, TaskExecutorReport.available())).get()); + final Set requests = Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION, null, 0)); assertEquals( TASK_EXECUTOR_ID, - resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION, null, 0)).get()); + resourceCluster.getTaskExecutorsFor(requests).get().values().stream().findFirst().get()); assertEquals( TASK_EXECUTOR_ID, resourceCluster.getTaskExecutorAssignedFor(WORKER_ID).get()); @@ -328,9 +332,10 @@ public void testGetTaskExecutorsUsageAndList() throws Exception { assertEquals(ImmutableList.of(TASK_EXECUTOR_ID_3, TASK_EXECUTOR_ID_2), idleInstancesResponse.getInstanceIds()); assertEquals(CONTAINER_DEF_ID_2, idleInstancesResponse.getSkuId()); + Set requests = Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2, null, 0)); assertEquals( TASK_EXECUTOR_ID_3, - resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2, null, 0)).get()); + resourceCluster.getTaskExecutorsFor(requests).get().values().stream().findFirst().get()); probe = new TestKit(actorSystem); resourceClusterActor.tell(new GetClusterUsageRequest( @@ -362,9 +367,10 @@ public void testGetTaskExecutorsUsageAndList() throws Exception { assertEquals(ImmutableList.of(TASK_EXECUTOR_ID), idleInstancesResponse.getInstanceIds()); assertEquals(CONTAINER_DEF_ID_1, idleInstancesResponse.getSkuId()); + requests = Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION, null, 0)); assertEquals( TASK_EXECUTOR_ID_2, - resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION, null, 0)).get()); + resourceCluster.getTaskExecutorsFor(requests).get().values().stream().findFirst().get()); probe = new TestKit(actorSystem); resourceClusterActor.tell(new GetClusterUsageRequest( CLUSTER_ID, ResourceClusterScalerActor.groupKeyFromTaskExecutorDefinitionIdFunc), @@ -417,19 +423,24 @@ public void testAssignmentTimeout() throws Exception { TASK_EXECUTOR_ID, CLUSTER_ID, TaskExecutorReport.available())).get()); + Set requests = Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION, null, 0)); assertEquals( TASK_EXECUTOR_ID, - resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION, null, 0)).get()); + resourceCluster.getTaskExecutorsFor(requests).get().values().stream().findFirst().get()); assertEquals(ImmutableList.of(), resourceCluster.getAvailableTaskExecutors().get()); Thread.sleep(2000); assertEquals(ImmutableList.of(TASK_EXECUTOR_ID), resourceCluster.getAvailableTaskExecutors().get()); + requests = Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION, null, 0)); assertEquals( TASK_EXECUTOR_ID, - resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION, null, 0)).get()); + resourceCluster.getTaskExecutorsFor(requests).get().values().stream().findFirst().get()); } @Test public void testGetMultipleActiveJobs() throws ExecutionException, InterruptedException { + + final Set requests = new HashSet<>(); + final Set expectedTaskExecutorIds = new HashSet<>(); final int n = 10; List expectedJobIdList = new ArrayList<>(n); for (int i = 0; i < n * 2; i ++) { @@ -464,11 +475,12 @@ public void testGetMultipleActiveJobs() throws ExecutionException, InterruptedEx expectedJobIdList.add(String.format("late-sine-function-tutorial-%d", idx)); } - assertEquals( - taskExecutorID, - resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(workerId, MACHINE_DEFINITION, null, 0)) - .get()); + expectedTaskExecutorIds.add(taskExecutorID); + requests.add(TaskExecutorAllocationRequest.of(workerId, MACHINE_DEFINITION, null, 0)); } + assertEquals( + expectedTaskExecutorIds, + new HashSet<>(resourceCluster.getTaskExecutorsFor(requests).get().values())); TestKit probe = new TestKit(actorSystem); resourceClusterActor.tell(new GetActiveJobsRequest( @@ -577,9 +589,10 @@ public void testIfDisabledTaskExecutorsAreNotAvailableForScheduling() throws Exc resourceCluster.heartBeatFromTaskExecutor( new TaskExecutorHeartbeat(TASK_EXECUTOR_ID_2, CLUSTER_ID, TaskExecutorReport.available())).get()); resourceCluster.disableTaskExecutorsFor(ATTRIBUTES, Instant.now().plus(Duration.ofDays(1)), Optional.empty()).get(); + Set requests = Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION, null, 0)); assertEquals( TASK_EXECUTOR_ID_2, - resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION, null, 0)).get()); + resourceCluster.getTaskExecutorsFor(requests).get().values().stream().findFirst().get()); } @Test @@ -610,9 +623,11 @@ public void testGetAssignedTaskExecutorAfterTaskCompletes() throws Throwable { resourceCluster.heartBeatFromTaskExecutor( new TaskExecutorHeartbeat(TASK_EXECUTOR_ID, CLUSTER_ID, TaskExecutorReport.available())).join()); + + final Set requests = Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION, null, 0)); assertEquals( TASK_EXECUTOR_ID, - resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION, null, 0)).join()); + resourceCluster.getTaskExecutorsFor(requests).get().values().stream().findFirst().get()); assertEquals(TASK_EXECUTOR_ID, resourceCluster.getTaskExecutorAssignedFor(WORKER_ID).join()); assertEquals(Ack.getInstance(), resourceCluster.notifyTaskExecutorStatusChange( new TaskExecutorStatusChange(TASK_EXECUTOR_ID, CLUSTER_ID, TaskExecutorReport.occupied(WORKER_ID))).join()); @@ -621,7 +636,7 @@ public void testGetAssignedTaskExecutorAfterTaskCompletes() throws Throwable { new TaskExecutorStatusChange(TASK_EXECUTOR_ID, CLUSTER_ID, TaskExecutorReport.available())).join()); try { - TaskExecutorID result = resourceCluster.getTaskExecutorAssignedFor(WORKER_ID).join(); + resourceCluster.getTaskExecutorAssignedFor(WORKER_ID).join(); } catch (Exception e) { throw ExceptionUtils.stripCompletionException(e); diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/scheduler/FakeMantisScheduler.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/scheduler/FakeMantisScheduler.java index 2044b5bf7..1ea79e413 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/scheduler/FakeMantisScheduler.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/scheduler/FakeMantisScheduler.java @@ -19,18 +19,12 @@ import akka.actor.ActorRef; import com.netflix.fenzo.VirtualMachineCurrentState; import com.netflix.fenzo.VirtualMachineLease; -import io.mantisrx.common.WorkerPorts; -import io.mantisrx.master.jobcluster.job.worker.WorkerHeartbeat; -import io.mantisrx.master.jobcluster.job.worker.WorkerStatus; -import io.mantisrx.runtime.MantisJobState; -import io.mantisrx.server.core.Status; import io.mantisrx.server.core.domain.WorkerId; +import io.mantisrx.server.master.scheduler.BatchScheduleRequest; import io.mantisrx.server.master.scheduler.MantisScheduler; import io.mantisrx.server.master.scheduler.ScheduleRequest; import io.mantisrx.server.master.scheduler.WorkerEvent; -import io.mantisrx.server.master.scheduler.WorkerLaunched; import io.mantisrx.server.master.scheduler.WorkerResourceStatus; -import io.mantisrx.shaded.com.google.common.collect.Lists; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -43,39 +37,49 @@ public FakeMantisScheduler(final ActorRef jobClusterManagerActor) { this.jobClusterManagerActor = jobClusterManagerActor; } +// @Override +// public void scheduleWorker(final ScheduleRequest scheduleRequest) { +// // Worker Launched +// final WorkerEvent workerLaunched = new WorkerLaunched(scheduleRequest.getWorkerId(), +// scheduleRequest.getStageNum(), +// "host1", +// "vm1", +// scheduleRequest.getPreferredCluster(), Optional.empty(), new WorkerPorts(Lists.newArrayList(8000, 9000, 9010, 9020, 9030))); +// +// jobClusterManagerActor.tell(workerLaunched, ActorRef.noSender()); +// +// // fake Worker Start initiated event +// final WorkerEvent workerStartInit = new WorkerStatus(new Status( +// scheduleRequest.getWorkerId().getJobId(), +// scheduleRequest.getStageNum(), +// scheduleRequest.getWorkerId().getWorkerIndex(), +// scheduleRequest.getWorkerId().getWorkerNum(), +// Status.TYPE.INFO, +// "fake Start Initiated", +// MantisJobState.StartInitiated)); +// +// jobClusterManagerActor.tell(workerStartInit, ActorRef.noSender()); +// +// // fake Worker Heartbeat event +// final WorkerEvent workerHeartbeat = new WorkerHeartbeat(new Status( +// scheduleRequest.getWorkerId().getJobId(), +// scheduleRequest.getStageNum(), +// scheduleRequest.getWorkerId().getWorkerIndex(), +// scheduleRequest.getWorkerId().getWorkerNum(), +// Status.TYPE.HEARTBEAT, +// "fake heartbeat event", +// MantisJobState.Started)); +// jobClusterManagerActor.tell(workerHeartbeat, ActorRef.noSender()); +// } + + @Override + public void scheduleWorkers(BatchScheduleRequest scheduleRequest) { + // TODO: + } + @Override - public void scheduleWorker(final ScheduleRequest scheduleRequest) { - // Worker Launched - final WorkerEvent workerLaunched = new WorkerLaunched(scheduleRequest.getWorkerId(), - scheduleRequest.getStageNum(), - "host1", - "vm1", - scheduleRequest.getPreferredCluster(), Optional.empty(), new WorkerPorts(Lists.newArrayList(8000, 9000, 9010, 9020, 9030))); - - jobClusterManagerActor.tell(workerLaunched, ActorRef.noSender()); - - // fake Worker Start initiated event - final WorkerEvent workerStartInit = new WorkerStatus(new Status( - scheduleRequest.getWorkerId().getJobId(), - scheduleRequest.getStageNum(), - scheduleRequest.getWorkerId().getWorkerIndex(), - scheduleRequest.getWorkerId().getWorkerNum(), - Status.TYPE.INFO, - "fake Start Initiated", - MantisJobState.StartInitiated)); - - jobClusterManagerActor.tell(workerStartInit, ActorRef.noSender()); - - // fake Worker Heartbeat event - final WorkerEvent workerHeartbeat = new WorkerHeartbeat(new Status( - scheduleRequest.getWorkerId().getJobId(), - scheduleRequest.getStageNum(), - scheduleRequest.getWorkerId().getWorkerIndex(), - scheduleRequest.getWorkerId().getWorkerNum(), - Status.TYPE.HEARTBEAT, - "fake heartbeat event", - MantisJobState.Started)); - jobClusterManagerActor.tell(workerHeartbeat, ActorRef.noSender()); + public void unscheduleJob(String jobId) { + // TODO: } @Override