Skip to content

Commit

Permalink
refactor(scheduler): fix flakey test and refactor to make more testable
Browse files Browse the repository at this point in the history
Fixes #654
  • Loading branch information
philwinder committed Dec 18, 2024
1 parent 4af1345 commit 889612b
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 64 deletions.
124 changes: 65 additions & 59 deletions api/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var _ Scheduler = &scheduler{}
// NewScheduler creates a new scheduler with a workload allocator.
// This also starts a goroutine to process the queue in the background.
func NewScheduler(ctx context.Context, cfg *config.ServerConfig, onSchedulingErr func(work *Workload, err error)) *scheduler {
scheduler := newSchedulerWithoutQueue(cfg, onSchedulingErr)
scheduler := newSchedulerWithoutGoroutines(cfg, onSchedulingErr)

// Start a goroutine to process the buffered queue
go func() {
Expand All @@ -60,13 +60,13 @@ func NewScheduler(ctx context.Context, cfg *config.ServerConfig, onSchedulingErr

// Start a goroutine that will periodically check for dead runners and reschedule their work
go func() {
scheduler.rescheduleDeadWork(ctx)
scheduler.checkForDeadRunners(ctx)
}()

return scheduler
}

func newSchedulerWithoutQueue(cfg *config.ServerConfig, onSchedulingErr func(work *Workload, err error)) *scheduler {
func newSchedulerWithoutGoroutines(cfg *config.ServerConfig, onSchedulingErr func(work *Workload, err error)) *scheduler {
modelTTL := cfg.Providers.Helix.ModelTTL
if modelTTL == 0 {
modelTTL = 10 * time.Second
Expand Down Expand Up @@ -405,76 +405,82 @@ func (s *scheduler) processQueue(ctx context.Context) {
case <-ctx.Done():
return
default:
s.queueMtx.Lock()
// Store jobs that weren't able to be scheduled to re-add to the queue later
// This is important because there many be workloads that persistently fail to schedule
// and we don't want to block workloads that can be scheduled from further down the queue
unscheduledQueue := make([]*Workload, 0)

// Schedule any requests that are currently in the queue.
for _, work := range s.queue {
err := s.Schedule(work)
if err != nil {
retry, err := ErrorHandlingStrategy(err, work)

// If we can retry, break out of the loop and try again later
if retry {
unscheduledQueue = append(unscheduledQueue, work)
continue
}

// If we can't retry, write an error to the request and continue so it takes it off
// the queue
s.onSchedulingErr(work, err)
}
}
// Clear processed queue
s.queue = unscheduledQueue
s.queueMtx.Unlock()

s.processQueueOnce()
// Sleep for a while to allow others to access the queue
time.Sleep(10 * time.Millisecond)
}
}
}

func (s *scheduler) rescheduleDeadWork(ctx context.Context) {
func (s *scheduler) processQueueOnce() {
s.queueMtx.Lock()
defer s.queueMtx.Unlock()

// Store jobs that weren't able to be scheduled to re-add to the queue later
// This is important because there many be workloads that persistently fail to schedule
// and we don't want to block workloads that can be scheduled from further down the queue
unscheduledQueue := make([]*Workload, 0)

// Schedule any requests that are currently in the queue.
for _, work := range s.queue {
err := s.Schedule(work)
if err != nil {
retry, err := ErrorHandlingStrategy(err, work)

// If we can retry, break out of the loop and try again later
if retry {
unscheduledQueue = append(unscheduledQueue, work)
continue
}

// If we can't retry, write an error to the request and continue so it takes it off
// the queue
s.onSchedulingErr(work, err)
}
}
// Clear processed queue
s.queue = unscheduledQueue
}

func (s *scheduler) checkForDeadRunners(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
deadRunnerIDs := s.cluster.DeadRunnerIDs()
for _, id := range deadRunnerIDs {
deadSlots := s.allocator.DeadSlots([]string{id})
for _, dead := range deadSlots {
// Get work associated with the dead slot.
work, ok := s.workStore.Load(dead.ID)
if !ok {
continue // No work to reschedule
}

// Attempt to reschedule the work.
log.Trace().
Str("runner_id", id).
Str("slot_id", dead.ID.String()).
Msg("rescheduling work for dead slot")
err := s.Enqueue(work)
if err != nil {
log.Error().
Err(err).
Str("runner_id", id).
Str("slot_id", dead.ID.String()).
Msg("failed to reschedule work for dead slot")
continue
}
// Delete the workload from the store now it has been rescheduled
s.workStore.Delete(dead.ID)
}
}
s.checkForDeadRunnersOnce()

// Sleep for a while to allow others to access the scheduler
time.Sleep(10 * time.Millisecond)
}
}
}

func (s *scheduler) checkForDeadRunnersOnce() {
deadRunnerIDs := s.cluster.DeadRunnerIDs()
for _, id := range deadRunnerIDs {
deadSlots := s.allocator.DeadSlots([]string{id})
for _, dead := range deadSlots {
// Load and delete that work from the store
work, ok := s.workStore.LoadAndDelete(dead.ID)
if !ok {
continue // No work to reschedule
}

// Attempt to reschedule the work.
log.Trace().
Str("runner_id", id).
Str("slot_id", dead.ID.String()).
Msg("rescheduling work for dead slot")
err := s.Enqueue(work)
if err != nil {
log.Error().
Err(err).
Str("runner_id", id).
Str("slot_id", dead.ID.String()).
Msg("failed to reschedule work for dead slot")
continue
}
}
}
}
19 changes: 14 additions & 5 deletions api/pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func TestScheduler_ProcessQueue(t *testing.T) {

// Manually start a scheduler so that the goroutine doesn't start
config, _ := config.LoadServerConfig()
scheduler := newSchedulerWithoutQueue(&config, errorFunc)
scheduler := newSchedulerWithoutGoroutines(&config, errorFunc)

// Without a runner, adding to the queue and processing should result in an error on the work
err := enqueueTestLLMWorkload(scheduler, "request-1", model.Model_Ollama_Llama3_8b)
Expand Down Expand Up @@ -531,12 +531,14 @@ func TestScheduler_ProcessQueue(t *testing.T) {
assert.Len(t, scheduler.queue, 1)
}

// We had a bug where if a runner changed its name, the new scheduler code did not run the
// dead runner cleanup code. This test ensures that that bug is fixed.
func TestScheduler_ChangingRunnerName(t *testing.T) {
// We had a bug where if a runner changed its name, the new scheduler code did not run the
// dead runner cleanup code. This test ensures that that bug is fixed.
// Manually start a scheduler so that the goroutines dont't start
config, _ := config.LoadServerConfig()
config.Providers.Helix.RunnerTTL = 50 * time.Millisecond // Needs to be long enough to wait for the async runner cleanup goroutine to run
scheduler := NewScheduler(context.Background(), &config, nil)
// Has to be long enough to allow initial scheduling, but short enough to allow the runner to die
config.Providers.Helix.RunnerTTL = 10 * time.Millisecond
scheduler := newSchedulerWithoutGoroutines(&config, func(*Workload, error) {})

// Add a runner
m, _ := model.GetModel(model.Model_Ollama_Llama3_8b)
Expand All @@ -549,15 +551,22 @@ func TestScheduler_ChangingRunnerName(t *testing.T) {
err := enqueueTestLLMWorkload(scheduler, "request-1", model.Model_Ollama_Llama3_8b)
assert.NoError(t, err)

// Manually process the queue
scheduler.processQueueOnce()

// Allow the runner to die
WaitFor(t, func() bool {
// Manually check that the runner is dead
scheduler.checkForDeadRunnersOnce()

data := scheduler.DashboardSlotsData()
return len(data) == 0
}, 2*time.Second)
data := scheduler.DashboardSlotsData()
assert.Len(t, data, 0)

// Make sure that the work has gone from the old slot
time.Sleep(10 * time.Millisecond)
_, ok := scheduler.find("request-1")
assert.False(t, ok)
}
Expand Down

0 comments on commit 889612b

Please sign in to comment.