Skip to content

Commit

Permalink
Handle job kill
Browse files Browse the repository at this point in the history
  • Loading branch information
fdc-ntflx committed Nov 2, 2023
1 parent f6ad8cf commit 8a8d7d2
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ default CompletableFuture<List<TaskExecutorID>> getUnregisteredTaskExecutors() {
CompletableFuture<Map<TaskExecutorAllocationRequest, TaskExecutorID>> getTaskExecutorsFor(
Set<TaskExecutorAllocationRequest> allocationRequests);

CompletableFuture<Ack> unscheduleJob(String jobId);

/**
* Returns the Gateway instance to talk to the task executor. If unable to make connection with
* the task executor, then a ConnectionFailedException is thrown wrapped inside the future.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,7 @@ public void shutdown() {
private void unscheduleJob() {
LOGGER.info("Unscheduling job {}", jobId);
scheduler.unscheduleJob(jobId.getId());

}

private void terminateAllWorkersAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ Optional<Entry<TaskExecutorID, TaskExecutorState>> findFirst(
*/
Optional<BestFit> findBestFit(TaskExecutorBatchAssignmentRequest request);

void unscheduleJob(String jobId);

Set<Entry<TaskExecutorID, TaskExecutorState>> getActiveExecutorEntry();

Predicate<Entry<TaskExecutorID, TaskExecutorState>> isRegistered =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -59,7 +60,16 @@
public class ExecutorStateManagerImpl implements ExecutorStateManager {
private final Map<TaskExecutorID, TaskExecutorState> taskExecutorStateMap = new HashMap<>();
private final Map<Double, Integer> pendingWorkersByCoreCount = new HashMap<>();
private final Map<String, Long> pendingJobRequests = new HashMap<>();
private final Map<String, JobStats> pendingJobRequests = new HashMap<>();

@RequiredArgsConstructor
static class JobStats {
public final Map<Double, Integer> workers;

public int getTotalWorkers() {
return workers.values().stream().mapToInt(Integer::intValue).sum();
}
}

/**
* Cache the available executors ready to accept assignments. Note these executors' state are not strongly
Expand Down Expand Up @@ -232,6 +242,18 @@ public Optional<Entry<TaskExecutorID, TaskExecutorState>> findFirst(
.findFirst();
}

@Override
public void unscheduleJob(String jobId) {
final JobStats jobStats = pendingJobRequests.remove(jobId);
if (jobStats != null) {
jobStats.workers.forEach((cores, count) -> {
if (pendingWorkersByCoreCount.containsKey(cores)) {
pendingWorkersByCoreCount.put(cores, Math.max(pendingWorkersByCoreCount.get(cores) - count, 0));
}
});
}
}

@Override
public Optional<BestFit> findBestFit(TaskExecutorBatchAssignmentRequest request) {

Expand Down Expand Up @@ -395,8 +417,8 @@ public GetClusterUsageResponse getClusterUsage(GetClusterUsageRequest req) {

// remove jobs from pending set which have all pending workers
jobIdToMachineDef.forEach((jobId, workers) -> {
log.info("[fdc-999] job id {} - workers {} vs pending {}", jobId, workers.size(), pendingJobRequests.getOrDefault(jobId, -1L));
if (pendingJobRequests.getOrDefault(jobId, -1L) <= workers.size()) {
log.info("[fdc-999] job id {} - workers {} vs pending {}", jobId, workers.size(), pendingJobRequests.getOrDefault(jobId, null));
if (pendingJobRequests.containsKey(jobId) && pendingJobRequests.get(jobId).getTotalWorkers() <= workers.size()) {
log.info("Removing job {} from pending requests", jobId);
pendingJobRequests.remove(jobId);
workers.forEach(machineDefinition -> {
Expand Down Expand Up @@ -445,13 +467,7 @@ private Optional<Map<TaskExecutorID, TaskExecutorState>> findTaskExecutorsFor(Ta
// Add jobId to pending requests only once
if (!pendingJobRequests.containsKey(request.getJobId())) {
log.info("Adding job {} to pending requests for {} machine {}", request.getJobId(), allocationRequests.size(), machineDefinition);
long totalJobSize = request
.getGroupedByMachineDef()
.values()
.stream()
.mapToLong(List::size)
.sum();
pendingJobRequests.put(request.getJobId(), totalJobSize);
pendingJobRequests.put(request.getJobId(), new JobStats(request.getGroupedByCoresCount()));
}
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public Receive createReceive() {
.match(TaskExecutorDisconnection.class, this::onTaskExecutorDisconnection)
.match(HeartbeatTimeout.class, this::onTaskExecutorHeartbeatTimeout)
.match(TaskExecutorBatchAssignmentRequest.class, this::onTaskExecutorBatchAssignmentRequest)
.match(UnscheduleJobRequest.class, this::onUnscheduleJobRequest)
.match(ResourceOverviewRequest.class, this::onResourceOverviewRequest)
.match(TaskExecutorInfoRequest.class, this::onTaskExecutorInfoRequest)
.match(TaskExecutorGatewayRequest.class, this::onTaskExecutorGatewayRequest)
Expand Down Expand Up @@ -583,6 +584,10 @@ private void onTaskExecutorStatusChange(TaskExecutorStatusChange statusChange) {
}
}

private void onUnscheduleJobRequest(UnscheduleJobRequest request) {
this.executorStateManager.unscheduleJob(request.jobId);
}

private void onTaskExecutorBatchAssignmentRequest(TaskExecutorBatchAssignmentRequest request) {
Optional<BestFit> matchedExecutors = this.executorStateManager.findBestFit(request);

Expand Down Expand Up @@ -822,6 +827,12 @@ private static class HeartbeatTimeout {
Instant lastActivity;
}

@Value
static class UnscheduleJobRequest {
String jobId;
ClusterID clusterID;
}

@Value
static class TaskExecutorBatchAssignmentRequest {
Set<TaskExecutorAllocationRequest> allocationRequests;
Expand All @@ -834,6 +845,19 @@ public Map<MachineDefinition, List<TaskExecutorAllocationRequest>> getGroupedByM
.collect(Collectors.groupingBy(TaskExecutorAllocationRequest::getMachineDefinition));
}

public Map<Double, Integer> getGroupedByCoresCount() {
// TODO: group by new class which wraps MachineDefinition + metadata.
return allocationRequests
.stream()
.collect(Collectors.groupingBy(TaskExecutorAllocationRequest::getMachineDefinition))
.entrySet()
.stream()
.collect(Collectors.toMap(
e -> e.getKey().getCpuCores(),
e -> e.getValue().size()
));
}

public String getJobId() {
return allocationRequests.iterator().next().getWorkerId().getJobId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorInfoRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorsAllocation;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorsList;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.UnscheduleJobRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterScalerActor.TriggerClusterRuleRefreshRequest;
import io.mantisrx.master.resourcecluster.proto.SetResourceClusterScalerStatusRequest;
import io.mantisrx.server.core.domain.ArtifactID;
Expand Down Expand Up @@ -186,6 +187,15 @@ public CompletableFuture<Map<TaskExecutorAllocationRequest, TaskExecutorID>> get
.thenApply(l -> l.getAllocations());
}

@Override
public CompletableFuture<Ack> unscheduleJob(String jobId) {
return
Patterns
.ask(resourceClusterManagerActor, new UnscheduleJobRequest(jobId, clusterID), askTimeout)
.thenApply(Ack.class::cast)
.toCompletableFuture();
}

@Override
public CompletableFuture<TaskExecutorID> getTaskExecutorAssignedFor(WorkerId workerId) {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorBatchAssignmentRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorInfoRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.UnscheduleJobRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterScalerActor.TriggerClusterRuleRefreshRequest;
import io.mantisrx.master.resourcecluster.proto.SetResourceClusterScalerStatusRequest;
import io.mantisrx.server.master.config.MasterConfiguration;
Expand Down Expand Up @@ -137,6 +138,8 @@ public Receive createReceive() {
getRCActor(disconnection.getClusterID()).forward(disconnection, context()))
.match(TaskExecutorBatchAssignmentRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(UnscheduleJobRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(ResourceOverviewRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(TaskExecutorInfoRequest.class, req ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ public Receive createReceive() {
.match(AssignedBatchScheduleRequestEvent.class, this::onAssignedBatchScheduleRequestEvent)
.match(FailedToBatchScheduleRequestEvent.class, this::onFailedToBatchScheduleRequestEvent)
.match(CancelBatchRequestEvent.class, this::onCancelBatchRequestEvent)
// .match(RetryCancelBatchRequestEvent.class, this::onRetryBatchCancelRequestEvent)

// single schedule request
.match(ScheduleRequestEvent.class, this::onScheduleRequestEvent)
Expand Down Expand Up @@ -375,13 +374,11 @@ private void onRetryCancelRequestEvent(RetryCancelRequestEvent event) {
}

private void onCancelBatchRequestEvent(CancelBatchRequestEvent event) {
try {
log.info("CancelBatchRequestEvent {}", event);
getTimers().cancel(getBatchSchedulingQueueKeyFor(event.getJobId()));
} catch (Exception e) {
log.warn("[fdc-91] Failed to cancel batch request {}", event.getJobId(), e);
// TODO: retry...
}
log.info("CancelBatchRequestEvent {}", event);
getTimers().cancel(getBatchSchedulingQueueKeyFor(event.getJobId()));

// TODO: do we need to handle response?
resourceCluster.unscheduleJob(event.getJobId());
}

private void onNoop(Noop event) {
Expand Down Expand Up @@ -554,10 +551,6 @@ static class CancelBatchRequestEvent {
static CancelBatchRequestEvent of(String jobId) {
return new CancelBatchRequestEvent(jobId);
}

// RetryCancelRequestEvent onFailure(Throwable throwable) {
// return new RetryCancelRequestEvent(this, throwable);
// }
}

@Value
Expand Down

0 comments on commit 8a8d7d2

Please sign in to comment.