diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 24a582bad3..a2e80d3065 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -203,6 +203,9 @@ func (w *priorityqueue[T]) spin() { w.lockedLock.Lock() defer w.lockedLock.Unlock() + // manipulating the tree from within Ascend might lead to panics, so + // track what we want to delete and do it after we are done ascending. + var toDelete []*item[T] w.queue.Ascend(func(item *item[T]) bool { if item.readyAt != nil { if readyAt := item.readyAt.Sub(w.now()); readyAt > 0 { @@ -230,12 +233,16 @@ func (w *priorityqueue[T]) spin() { w.locked.Insert(item.key) w.waiters.Add(-1) delete(w.items, item.key) - w.queue.Delete(item) + toDelete = append(toDelete, item) w.becameReady.Delete(item.key) w.get <- *item return true }) + + for _, item := range toDelete { + w.queue.Delete(item) + } }() } } diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 0a1266f5c2..0e201a3986 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -2,6 +2,7 @@ package priorityqueue import ( "fmt" + "math/rand/v2" "sync" "testing" "time" @@ -344,6 +345,40 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.depth["test"]).To(Equal(4)) metrics.mu.Unlock() }) + + It("returns many items", func() { + // This test ensures the queue is able to drain a large queue without panic'ing. + // In a previous version of the code we were calling queue.Delete within q.Ascend + // which led to a panic in queue.Ascend > iterate: + // "panic: runtime error: index out of range [0] with length 0" + q, _ := newQueue() + defer q.ShutDown() + + for range 20 { + for i := range 1000 { + rn := rand.N(100) //nolint:gosec // We don't need cryptographically secure entropy here + if rn < 10 { + q.AddWithOpts(AddOpts{After: time.Duration(rn) * time.Millisecond}, fmt.Sprintf("foo%d", i)) + } else { + q.AddWithOpts(AddOpts{Priority: rn}, fmt.Sprintf("foo%d", i)) + } + } + + wg := sync.WaitGroup{} + for range 100 { // The panic only occurred relatively frequently with a high number of go routines. + wg.Add(1) + go func() { + defer wg.Done() + for range 10 { + obj, _, _ := q.GetWithPriority() + q.Done(obj) + } + }() + } + + wg.Wait() + } + }) }) func BenchmarkAddGetDone(b *testing.B) {