Skip to content

Commit

Permalink
Use cores instead
Browse files Browse the repository at this point in the history
  • Loading branch information
fdc-ntflx committed Oct 25, 2023
1 parent 9b173ab commit 6144e49
Showing 1 changed file with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
public class ExecutorStateManagerImpl implements ExecutorStateManager {
private final Map<TaskExecutorID, TaskExecutorState> taskExecutorStateMap = new HashMap<>();
// TODO: do we need to make this thread safe?
private final Map<MachineDefinition, Integer> unavailabilityByMachineDef = new HashMap<>();
private final Map<Double, Integer> unavailabilityByMachineDef = new HashMap<>();

/**
* Cache the available executors ready to accept assignments. Note these executors' state are not strongly
Expand Down Expand Up @@ -241,8 +241,8 @@ public void markJobAsStartedRequest(String jobId) {
.filter(s -> s.getRegistration() != null && s.getWorkerId() != null && s.getWorkerId().getJobId().equals(jobId))
.forEach(s -> {
MachineDefinition m = s.getRegistration().getMachineDefinition();
if (unavailabilityByMachineDef.containsKey(m)) {
unavailabilityByMachineDef.put(m, unavailabilityByMachineDef.get(m) -1);
if (unavailabilityByMachineDef.containsKey(m.getCpuCores())) {
unavailabilityByMachineDef.put(m.getCpuCores(), unavailabilityByMachineDef.get(m.getCpuCores()) - 1);
}
});
}
Expand Down Expand Up @@ -306,7 +306,7 @@ public Optional<Map<TaskExecutorAllocationRequest, Pair<TaskExecutorID, TaskExec
} else {
machineDefWithNoResourcesAvailable.add(Pair.of(allocationRequests.getKey(), allocationRequests.getValue().size()));
// TODO: think a way to get rid of machineDefWithNoResourcesAvailable
unavailabilityByMachineDef.put(allocationRequests.getKey(), allocationRequests.getValue().size());
unavailabilityByMachineDef.put(allocationRequests.getKey().getCpuCores(), allocationRequests.getValue().size());
log.warn("Not enough available TEs found for machine def {} with core count: {}, request: {}",
allocationRequests.getKey(), allocationRequests.getKey().getCpuCores(), request);
}
Expand Down Expand Up @@ -418,8 +418,9 @@ public GetClusterUsageResponse getClusterUsage(GetClusterUsageRequest req) {
}

if (!unavailableByGroupKey.containsKey(groupKey)) {
log.info("[fdc-99] groupKey {} - unavailabilityByMachineDef {} - mdef {} - sol {}", groupKey, unavailabilityByMachineDef, value.getRegistration().getMachineDefinition(), unavailabilityByMachineDef.get(value.getRegistration().getMachineDefinition()));
int unavailability = unavailabilityByMachineDef.getOrDefault(value.getRegistration().getMachineDefinition(), 0);
// TODO: machine doesn't have to match exactly right?
log.info("[fdc-99] groupKey {} - unavailabilityByMachineDef {} - mdef {} - sol {}", groupKey, unavailabilityByMachineDef, value.getRegistration().getMachineDefinition(), unavailabilityByMachineDef.get(value.getRegistration().getMachineDefinition().getCpuCores()));
int unavailability = unavailabilityByMachineDef.getOrDefault(value.getRegistration().getMachineDefinition().getCpuCores(), 0);
log.info("[fdc-99] unavailability for sku {} - {}", groupKey, unavailability);
unavailableByGroupKey.put(groupKey, unavailability);
}
Expand Down

0 comments on commit 6144e49

Please sign in to comment.