Skip to content

Commit

Permalink
fix TE double booking
Browse files Browse the repository at this point in the history
  • Loading branch information
Andyz26 committed Dec 17, 2024
1 parent 735435f commit eedfbd6
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import io.mantisrx.shaded.com.google.common.cache.Cache;
import io.mantisrx.shaded.com.google.common.cache.CacheBuilder;
import io.mantisrx.shaded.com.google.common.cache.RemovalListener;

import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Comparator;
Expand Down Expand Up @@ -111,6 +113,8 @@ private TaskExecutorGroupKey findBestFitGroupOrDefault(SchedulingConstraints con

private final Map<String, String> schedulingAttributes;

private final Duration schedulerLeaseExpirationDuration;

private final Cache<TaskExecutorID, TaskExecutorState> archivedState = CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterWrite(24, TimeUnit.HOURS)
Expand All @@ -121,11 +125,16 @@ private TaskExecutorGroupKey findBestFitGroupOrDefault(SchedulingConstraints con
ExecutorStateManagerImpl(Map<String, String> schedulingAttributes) {
this.schedulingAttributes = schedulingAttributes;
this.fitnessCalculator = new CpuWeightedFitnessCalculator();
this.schedulerLeaseExpirationDuration = Duration.ofMillis(100);
}

ExecutorStateManagerImpl(Map<String, String> schedulingAttributes, FitnessCalculator fitnessCalculator) {
ExecutorStateManagerImpl(
Map<String, String> schedulingAttributes,
FitnessCalculator fitnessCalculator,
Duration schedulerLeaseExpirationDuration) {
this.schedulingAttributes = schedulingAttributes;
this.fitnessCalculator = fitnessCalculator;
this.schedulerLeaseExpirationDuration = schedulerLeaseExpirationDuration;
}

@Override
Expand Down Expand Up @@ -357,10 +366,22 @@ private Optional<Map<TaskExecutorID, TaskExecutorState>> findBestFitFor(TaskExec
}
TaskExecutorState st = this.taskExecutorStateMap.get(teHolder.getId());
return st.isAvailable() &&
// when a TE is returned from here to be used for scheduling, its state remain active until
// the scheduler trigger another message to update (lock) the state. However when large number
// of the requests are active at the same time on same sku, the gap between here and the message
// to lock the state can be large so another schedule request message can be in between and
// got the same set of TEs. To avoid this, a lease is added to each TE state to temporarily
// lock the TE to be used again. Since this is only lock between actor messages and lease
// duration can be short.
st.getLastSchedulerLeasedDuration().compareTo(this.schedulerLeaseExpirationDuration) > 0 &&
st.getRegistration() != null;
})
.limit(numWorkers)
.map(TaskExecutorHolder::getId)
.map(teHolder -> {
TaskExecutorState st = this.taskExecutorStateMap.get(teHolder.getId());
st.updateLastSchedulerLeased();
return teHolder.getId();
})
.collect(Collectors.toMap(
taskExecutorID -> taskExecutorID,
this.taskExecutorStateMap::get)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public SupervisorStrategy supervisorStrategy() {
private final Duration heartbeatTimeout;
private final Duration assignmentTimeout;
private final Duration disabledTaskExecutorsCheckInterval;
private final Duration schedulerLeaseExpirationDuration;

private final ExecutorStateManager executorStateManager;
private final Clock clock;
Expand All @@ -139,16 +140,47 @@ public SupervisorStrategy supervisorStrategy() {

private final boolean isJobArtifactCachingEnabled;

static Props props(final ClusterID clusterID, final Duration heartbeatTimeout, Duration assignmentTimeout, Duration disabledTaskExecutorsCheckInterval, Clock clock, RpcService rpcService, MantisJobStore mantisJobStore, JobMessageRouter jobMessageRouter, int maxJobArtifactsToCache, String jobClustersWithArtifactCachingEnabled, boolean isJobArtifactCachingEnabled, Map<String, String> schedulingAttributes, FitnessCalculator fitnessCalculator) {
return Props.create(ResourceClusterActor.class, clusterID, heartbeatTimeout, assignmentTimeout, disabledTaskExecutorsCheckInterval, clock, rpcService, mantisJobStore, jobMessageRouter, maxJobArtifactsToCache, jobClustersWithArtifactCachingEnabled, isJobArtifactCachingEnabled, schedulingAttributes, fitnessCalculator)
.withMailbox("akka.actor.metered-mailbox");
static Props props(
final ClusterID clusterID,
final Duration heartbeatTimeout,
Duration assignmentTimeout,
Duration disabledTaskExecutorsCheckInterval,
Duration schedulerLeaseExpirationDuration,
Clock clock,
RpcService rpcService,
MantisJobStore mantisJobStore,
JobMessageRouter jobMessageRouter,
int maxJobArtifactsToCache,
String jobClustersWithArtifactCachingEnabled,
boolean isJobArtifactCachingEnabled,
Map<String, String> schedulingAttributes,
FitnessCalculator fitnessCalculator
) {
return Props.create(
ResourceClusterActor.class,
clusterID,
heartbeatTimeout,
assignmentTimeout,
disabledTaskExecutorsCheckInterval,
schedulerLeaseExpirationDuration,
clock,
rpcService,
mantisJobStore,
jobMessageRouter,
maxJobArtifactsToCache,
jobClustersWithArtifactCachingEnabled,
isJobArtifactCachingEnabled,
schedulingAttributes,
fitnessCalculator
).withMailbox("akka.actor.metered-mailbox");
}

ResourceClusterActor(
ClusterID clusterID,
Duration heartbeatTimeout,
Duration assignmentTimeout,
Duration disabledTaskExecutorsCheckInterval,
Duration schedulerLeaseExpirationDuration,
Clock clock,
RpcService rpcService,
MantisJobStore mantisJobStore,
Expand All @@ -162,6 +194,7 @@ static Props props(final ClusterID clusterID, final Duration heartbeatTimeout, D
this.heartbeatTimeout = heartbeatTimeout;
this.assignmentTimeout = assignmentTimeout;
this.disabledTaskExecutorsCheckInterval = disabledTaskExecutorsCheckInterval;
this.schedulerLeaseExpirationDuration = schedulerLeaseExpirationDuration;
this.isJobArtifactCachingEnabled = isJobArtifactCachingEnabled;

this.clock = clock;
Expand All @@ -173,7 +206,8 @@ static Props props(final ClusterID clusterID, final Duration heartbeatTimeout, D
this.maxJobArtifactsToCache = maxJobArtifactsToCache;
this.jobClustersWithArtifactCachingEnabled = jobClustersWithArtifactCachingEnabled;

this.executorStateManager = new ExecutorStateManagerImpl(schedulingAttributes, fitnessCalculator);
this.executorStateManager = new ExecutorStateManagerImpl(
schedulingAttributes, fitnessCalculator, this.schedulerLeaseExpirationDuration);

this.metrics = new ResourceClusterActorMetrics();
}
Expand Down Expand Up @@ -523,10 +557,12 @@ private void onDisableTaskExecutorsRequestExpiry(ExpireDisableTaskExecutorsReque
if (request.getRequest().getTaskExecutorID().isPresent()) {
final TaskExecutorState state = this.executorStateManager.get(
request.getRequest().getTaskExecutorID().get());
state.onNodeEnabled();
if (state != null) {
state.onNodeEnabled();
}
}
} catch (Exception e) {
log.error("Failed to delete expired {}", request.getRequest());
log.error("Failed to delete expired {}", request.getRequest(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ private ActorRef createResourceClusterActorFor(ClusterID clusterID) {
Duration.ofMillis(masterConfiguration.getHeartbeatIntervalInMs()),
Duration.ofMillis(masterConfiguration.getAssignmentIntervalInMs()),
Duration.ofMillis(masterConfiguration.getAssignmentIntervalInMs()),
Duration.ofMillis(masterConfiguration.getSchedulerLeaseExpirationDurationInMs()),
clock,
rpcService,
mantisJobStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.mantisrx.server.master.scheduler.WorkerOnDisabledVM;
import io.mantisrx.server.worker.TaskExecutorGateway;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -62,6 +63,9 @@ enum RegistrationState {

// last interaction initiated by the task executor
private Instant lastActivity;

// last interaction time when this instance was leased by the scheduler in findBestFit.
private Instant lastSchedulerLeased;
private final Clock clock;
private final RpcService rpcService;
private final JobMessageRouter jobMessageRouter;
Expand All @@ -78,6 +82,7 @@ static TaskExecutorState of(Clock clock, RpcService rpcService, JobMessageRouter
null,
false,
clock.instant(),
Instant.MIN,
clock,
rpcService,
jobMessageRouter,
Expand Down Expand Up @@ -278,6 +283,15 @@ Instant getLastActivity() {
return this.lastActivity;
}

Duration getLastSchedulerLeasedDuration()
{
return Duration.between(this.lastSchedulerLeased, this.clock.instant());
}

void updateLastSchedulerLeased() {
this.lastSchedulerLeased = this.clock.instant();
}

TaskExecutorRegistration getRegistration() {
return this.registration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ default Duration getSchedulerIntervalBetweenRetries() {
@Default("60000") // 1 minute
int getAssignmentIntervalInMs();

@Config("mantis.agent.assignment.scheduler.lease.ms")
@Default("100")
int getSchedulerLeaseExpirationDurationInMs();

@Config("mantis.job.costsCalculator.class")
@Default("io.mantisrx.master.jobcluster.job.NoopCostsCalculator")
CostsCalculator getJobCostsCalculator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ public void testGetBestFit_WithGenerationFromScaleGroup() {
ATTRIBUTES_WITH_SCALE_GROUP_1,
stateManager);

this.actual.set(Clock.fixed(Instant.ofEpochSecond(2), ZoneId.systemDefault()));
bestFitO =
stateManager.findBestFit(
new TaskExecutorBatchAssignmentRequest(
Expand Down Expand Up @@ -364,12 +365,14 @@ private TaskExecutorState registerNewTaskExecutor(TaskExecutorID id, MachineDefi
}

@Test
public void testGetBestFit_WithDifferentResourcesSameSku() {
public void testGetBestFit_WithDifferentResourcesSameSku() throws InterruptedException {
registerNewTaskExecutor(TASK_EXECUTOR_ID_1,
MACHINE_DEFINITION_2,
ATTRIBUTES_WITH_SCALE_GROUP_2,
stateManager);

this.actual.set(Clock.fixed(Instant.ofEpochSecond(2), ZoneId.systemDefault()));

// should get te1 with group2
Optional<BestFit> bestFitO =
stateManager.findBestFit(
Expand All @@ -386,6 +389,19 @@ public void testGetBestFit_WithDifferentResourcesSameSku() {
ATTRIBUTES_WITH_SCALE_GROUP_1,
stateManager);

// do not move the clock, should still get nothing as scheduler lease is still valid
bestFitO =
stateManager.findBestFit(
new TaskExecutorBatchAssignmentRequest(
new HashSet<>(Arrays.asList(
TaskExecutorAllocationRequest.of(WORKER_ID, SchedulingConstraints.of(MACHINE_DEFINITION_2), null, 0),
TaskExecutorAllocationRequest.of(WORKER_ID, SchedulingConstraints.of(MACHINE_DEFINITION_1), null, 1))),
CLUSTER_ID));

assertFalse(bestFitO.isPresent());

this.actual.set(Clock.fixed(Instant.ofEpochSecond(3), ZoneId.systemDefault()));

bestFitO =
stateManager.findBestFit(
new TaskExecutorBatchAssignmentRequest(
Expand Down Expand Up @@ -605,6 +621,7 @@ public void testGetBestFit_WithSameCoresDifferentMemory() {
stateManager.tryMarkAvailable(TaskExecutorID.of("te3"));

// matching found for 2cores, 14GB since the fit TE shape is now 2cores, 14GB and there are 2 TE available
this.actual.set(Clock.fixed(Instant.ofEpochSecond(2), ZoneId.systemDefault()));
bestFit =
stateManager.findBestFit(new TaskExecutorBatchAssignmentRequest(
new HashSet<>(Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class ResourceClusterActorClusterUsageAkkaTest {
private static final ClusterID CLUSTER_ID = ClusterID.of("clusterId");
private static final Duration heartbeatTimeout = Duration.ofSeconds(10);
private static final Duration checkForDisabledExecutorsInterval = Duration.ofSeconds(10);
private static final Duration schedulerLeaseExpirationDuration = Duration.ofMillis(100);
private static final Duration assignmentTimeout = Duration.ofSeconds(1);
private static final String HOST_NAME = "hostname";
private static final WorkerPorts WORKER_PORTS = new WorkerPorts(1, 2, 3, 4, 5);
Expand Down Expand Up @@ -166,6 +167,7 @@ public void setupActor() throws Exception {
heartbeatTimeout,
assignmentTimeout,
checkForDisabledExecutorsInterval,
schedulerLeaseExpirationDuration,
Clock.systemDefaultZone(),
rpcService,
mantisJobStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class ResourceClusterActorTest {
private static final ClusterID CLUSTER_ID = ClusterID.of("clusterId");
private static final Duration heartbeatTimeout = Duration.ofSeconds(10);
private static final Duration checkForDisabledExecutorsInterval = Duration.ofSeconds(10);
private static final Duration schedulerLeaseExpirationDuration = Duration.ofMillis(100);
private static final Duration assignmentTimeout = Duration.ofSeconds(1);
private static final String HOST_NAME = "hostname";

Expand Down Expand Up @@ -201,6 +202,7 @@ public void setupActor() {
heartbeatTimeout,
assignmentTimeout,
checkForDisabledExecutorsInterval,
schedulerLeaseExpirationDuration,
Clock.systemDefaultZone(),
rpcService,
mantisJobStore,
Expand Down

0 comments on commit eedfbd6

Please sign in to comment.