From 889612b1861f97842345c737634698db9b8a2b4d Mon Sep 17 00:00:00 2001 From: Phil Winder Date: Wed, 18 Dec 2024 10:15:04 +0000 Subject: [PATCH] refactor(scheduler): fix flakey test and refactor to make more testable Fixes #654 --- api/pkg/scheduler/scheduler.go | 124 +++++++++++++++------------- api/pkg/scheduler/scheduler_test.go | 19 +++-- 2 files changed, 79 insertions(+), 64 deletions(-) diff --git a/api/pkg/scheduler/scheduler.go b/api/pkg/scheduler/scheduler.go index bc3630ca..5fcbd9e3 100644 --- a/api/pkg/scheduler/scheduler.go +++ b/api/pkg/scheduler/scheduler.go @@ -51,7 +51,7 @@ var _ Scheduler = &scheduler{} // NewScheduler creates a new scheduler with a workload allocator. // This also starts a goroutine to process the queue in the background. func NewScheduler(ctx context.Context, cfg *config.ServerConfig, onSchedulingErr func(work *Workload, err error)) *scheduler { - scheduler := newSchedulerWithoutQueue(cfg, onSchedulingErr) + scheduler := newSchedulerWithoutGoroutines(cfg, onSchedulingErr) // Start a goroutine to process the buffered queue go func() { @@ -60,13 +60,13 @@ func NewScheduler(ctx context.Context, cfg *config.ServerConfig, onSchedulingErr // Start a goroutine that will periodically check for dead runners and reschedule their work go func() { - scheduler.rescheduleDeadWork(ctx) + scheduler.checkForDeadRunners(ctx) }() return scheduler } -func newSchedulerWithoutQueue(cfg *config.ServerConfig, onSchedulingErr func(work *Workload, err error)) *scheduler { +func newSchedulerWithoutGoroutines(cfg *config.ServerConfig, onSchedulingErr func(work *Workload, err error)) *scheduler { modelTTL := cfg.Providers.Helix.ModelTTL if modelTTL == 0 { modelTTL = 10 * time.Second @@ -405,76 +405,82 @@ func (s *scheduler) processQueue(ctx context.Context) { case <-ctx.Done(): return default: - s.queueMtx.Lock() - // Store jobs that weren't able to be scheduled to re-add to the queue later - // This is important because there many be workloads that persistently fail to schedule - // and we don't want to block workloads that can be scheduled from further down the queue - unscheduledQueue := make([]*Workload, 0) - - // Schedule any requests that are currently in the queue. - for _, work := range s.queue { - err := s.Schedule(work) - if err != nil { - retry, err := ErrorHandlingStrategy(err, work) - - // If we can retry, break out of the loop and try again later - if retry { - unscheduledQueue = append(unscheduledQueue, work) - continue - } - - // If we can't retry, write an error to the request and continue so it takes it off - // the queue - s.onSchedulingErr(work, err) - } - } - // Clear processed queue - s.queue = unscheduledQueue - s.queueMtx.Unlock() - + s.processQueueOnce() // Sleep for a while to allow others to access the queue time.Sleep(10 * time.Millisecond) } } } -func (s *scheduler) rescheduleDeadWork(ctx context.Context) { +func (s *scheduler) processQueueOnce() { + s.queueMtx.Lock() + defer s.queueMtx.Unlock() + + // Store jobs that weren't able to be scheduled to re-add to the queue later + // This is important because there many be workloads that persistently fail to schedule + // and we don't want to block workloads that can be scheduled from further down the queue + unscheduledQueue := make([]*Workload, 0) + + // Schedule any requests that are currently in the queue. + for _, work := range s.queue { + err := s.Schedule(work) + if err != nil { + retry, err := ErrorHandlingStrategy(err, work) + + // If we can retry, break out of the loop and try again later + if retry { + unscheduledQueue = append(unscheduledQueue, work) + continue + } + + // If we can't retry, write an error to the request and continue so it takes it off + // the queue + s.onSchedulingErr(work, err) + } + } + // Clear processed queue + s.queue = unscheduledQueue +} + +func (s *scheduler) checkForDeadRunners(ctx context.Context) { for { select { case <-ctx.Done(): return default: - deadRunnerIDs := s.cluster.DeadRunnerIDs() - for _, id := range deadRunnerIDs { - deadSlots := s.allocator.DeadSlots([]string{id}) - for _, dead := range deadSlots { - // Get work associated with the dead slot. - work, ok := s.workStore.Load(dead.ID) - if !ok { - continue // No work to reschedule - } - - // Attempt to reschedule the work. - log.Trace(). - Str("runner_id", id). - Str("slot_id", dead.ID.String()). - Msg("rescheduling work for dead slot") - err := s.Enqueue(work) - if err != nil { - log.Error(). - Err(err). - Str("runner_id", id). - Str("slot_id", dead.ID.String()). - Msg("failed to reschedule work for dead slot") - continue - } - // Delete the workload from the store now it has been rescheduled - s.workStore.Delete(dead.ID) - } - } + s.checkForDeadRunnersOnce() // Sleep for a while to allow others to access the scheduler time.Sleep(10 * time.Millisecond) } } } + +func (s *scheduler) checkForDeadRunnersOnce() { + deadRunnerIDs := s.cluster.DeadRunnerIDs() + for _, id := range deadRunnerIDs { + deadSlots := s.allocator.DeadSlots([]string{id}) + for _, dead := range deadSlots { + // Load and delete that work from the store + work, ok := s.workStore.LoadAndDelete(dead.ID) + if !ok { + continue // No work to reschedule + } + + // Attempt to reschedule the work. + log.Trace(). + Str("runner_id", id). + Str("slot_id", dead.ID.String()). + Msg("rescheduling work for dead slot") + err := s.Enqueue(work) + if err != nil { + log.Error(). + Err(err). + Str("runner_id", id). + Str("slot_id", dead.ID.String()). + Msg("failed to reschedule work for dead slot") + continue + } + } + } +} diff --git a/api/pkg/scheduler/scheduler_test.go b/api/pkg/scheduler/scheduler_test.go index 4bc50e84..62ba3754 100644 --- a/api/pkg/scheduler/scheduler_test.go +++ b/api/pkg/scheduler/scheduler_test.go @@ -488,7 +488,7 @@ func TestScheduler_ProcessQueue(t *testing.T) { // Manually start a scheduler so that the goroutine doesn't start config, _ := config.LoadServerConfig() - scheduler := newSchedulerWithoutQueue(&config, errorFunc) + scheduler := newSchedulerWithoutGoroutines(&config, errorFunc) // Without a runner, adding to the queue and processing should result in an error on the work err := enqueueTestLLMWorkload(scheduler, "request-1", model.Model_Ollama_Llama3_8b) @@ -531,12 +531,14 @@ func TestScheduler_ProcessQueue(t *testing.T) { assert.Len(t, scheduler.queue, 1) } +// We had a bug where if a runner changed its name, the new scheduler code did not run the +// dead runner cleanup code. This test ensures that that bug is fixed. func TestScheduler_ChangingRunnerName(t *testing.T) { - // We had a bug where if a runner changed its name, the new scheduler code did not run the - // dead runner cleanup code. This test ensures that that bug is fixed. + // Manually start a scheduler so that the goroutines dont't start config, _ := config.LoadServerConfig() - config.Providers.Helix.RunnerTTL = 50 * time.Millisecond // Needs to be long enough to wait for the async runner cleanup goroutine to run - scheduler := NewScheduler(context.Background(), &config, nil) + // Has to be long enough to allow initial scheduling, but short enough to allow the runner to die + config.Providers.Helix.RunnerTTL = 10 * time.Millisecond + scheduler := newSchedulerWithoutGoroutines(&config, func(*Workload, error) {}) // Add a runner m, _ := model.GetModel(model.Model_Ollama_Llama3_8b) @@ -549,8 +551,14 @@ func TestScheduler_ChangingRunnerName(t *testing.T) { err := enqueueTestLLMWorkload(scheduler, "request-1", model.Model_Ollama_Llama3_8b) assert.NoError(t, err) + // Manually process the queue + scheduler.processQueueOnce() + // Allow the runner to die WaitFor(t, func() bool { + // Manually check that the runner is dead + scheduler.checkForDeadRunnersOnce() + data := scheduler.DashboardSlotsData() return len(data) == 0 }, 2*time.Second) @@ -558,6 +566,7 @@ func TestScheduler_ChangingRunnerName(t *testing.T) { assert.Len(t, data, 0) // Make sure that the work has gone from the old slot + time.Sleep(10 * time.Millisecond) _, ok := scheduler.find("request-1") assert.False(t, ok) }