From 72a4107c44fbdbc9f6f8771b2d87ffed1b615cfa Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Thu, 2 Jan 2025 19:37:22 -0500 Subject: [PATCH] :bug: Address review comments and fix metrics in priority queue This change addresses Stefans review comments from the original PR and fixes a bug in the metrics where we always included items that are not ready yet - This is incorrect, metrics are only implemented on the [basequeue][0] in upstream, meaning they are only emitted for items that are ready. The impact of this was for example an incorrect queue_depth metric. [0]: https://github.com/kubernetes/kubernetes/blob/b1cb471982b74c13c26dbcc0f4e1b5ae92ea47e6/staging/src/k8s.io/client-go/util/workqueue/queue.go#L157 --- examples/priorityqueue/main.go | 3 +- pkg/config/controller.go | 2 +- pkg/controller/controller.go | 5 +- pkg/controller/controller_test.go | 2 +- pkg/controller/priorityqueue/metrics.go | 1 + pkg/controller/priorityqueue/priorityqueue.go | 69 +++++++++++++------ .../priorityqueue/priorityqueue_test.go | 69 +++++++++++++++++-- pkg/handler/eventhandler.go | 4 +- pkg/handler/eventhandler_test.go | 4 +- 9 files changed, 123 insertions(+), 36 deletions(-) diff --git a/examples/priorityqueue/main.go b/examples/priorityqueue/main.go index d6b25f6419..2b09432f22 100644 --- a/examples/priorityqueue/main.go +++ b/examples/priorityqueue/main.go @@ -23,6 +23,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/builder" kubeconfig "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/config" @@ -48,7 +49,7 @@ func run() error { // Setup a Manager mgr, err := manager.New(kubeconfig.GetConfigOrDie(), manager.Options{ - Controller: config.Controller{UsePriorityQueue: true}, + Controller: config.Controller{UsePriorityQueue: ptr.To(true)}, }) if err != nil { return fmt.Errorf("failed to set up controller-manager: %w", err) diff --git a/pkg/config/controller.go b/pkg/config/controller.go index b702f2838d..0b2aa0cb7b 100644 --- a/pkg/config/controller.go +++ b/pkg/config/controller.go @@ -58,5 +58,5 @@ type Controller struct { // priority queue. // // Note: This flag is disabled by default until a future version. It's currently in beta. - UsePriorityQueue bool + UsePriorityQueue *bool } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 47f8aecd1c..b7d7286033 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -24,6 +24,7 @@ import ( "github.com/go-logr/logr" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/internal/controller" @@ -190,7 +191,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt } if options.RateLimiter == nil { - if mgr.GetControllerOptions().UsePriorityQueue { + if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) { options.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[request](5*time.Millisecond, 1000*time.Second) } else { options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]() @@ -199,7 +200,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt if options.NewQueue == nil { options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { - if mgr.GetControllerOptions().UsePriorityQueue { + if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) { return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) { o.RateLimiter = rateLimiter }) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 02fbf27dc2..1c5b11d709 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -441,7 +441,7 @@ var _ = Describe("controller.Controller", func() { It("should configure a priority queue if UsePriorityQueue is set", func() { m, err := manager.New(cfg, manager.Options{ - Controller: config.Controller{UsePriorityQueue: true}, + Controller: config.Controller{UsePriorityQueue: ptr.To(true)}, }) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/controller/priorityqueue/metrics.go b/pkg/controller/priorityqueue/metrics.go index bfb31ffc1e..f3ac226eea 100644 --- a/pkg/controller/priorityqueue/metrics.go +++ b/pkg/controller/priorityqueue/metrics.go @@ -66,6 +66,7 @@ type defaultQueueMetrics[T comparable] struct { retries workqueue.CounterMetric } +// add is called for ready items only func (m *defaultQueueMetrics[T]) add(item T) { if m == nil { return diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 8f9adf2629..24a582bad3 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -57,9 +57,10 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { } pq := &priorityqueue[T]{ - items: map[T]*item[T]{}, - queue: btree.NewG(32, less[T]), - metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}), + items: map[T]*item[T]{}, + queue: btree.NewG(32, less[T]), + becameReady: sets.Set[T]{}, + metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}), // itemOrWaiterAdded indicates that an item or // waiter was added. It must be buffered, because // if we currently process items we can't tell @@ -83,16 +84,21 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { type priorityqueue[T comparable] struct { // lock has to be acquired for any access any of items, queue, addedCounter - // or metrics. - lock sync.Mutex - items map[T]*item[T] - queue bTree[*item[T]] - metrics queueMetrics[T] + // or becameReady + lock sync.Mutex + items map[T]*item[T] + queue bTree[*item[T]] // addedCounter is a counter of elements added, we need it // because unixNano is not guaranteed to be unique. addedCounter uint64 + // becameReady holds items that are in the queue, were added + // with non-zero after and became ready. We need it to call the + // metrics add exactly once for them. + becameReady sets.Set[T] + metrics queueMetrics[T] + itemOrWaiterAdded chan struct{} rateLimiter workqueue.TypedRateLimiter[T] @@ -142,7 +148,9 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { } w.items[key] = item w.queue.ReplaceOrInsert(item) - w.metrics.add(key) + if item.readyAt == nil { + w.metrics.add(key) + } w.addedCounter++ continue } @@ -196,18 +204,21 @@ func (w *priorityqueue[T]) spin() { defer w.lockedLock.Unlock() w.queue.Ascend(func(item *item[T]) bool { - if w.waiters.Load() == 0 { // no waiters, return as we can not hand anything out anyways - return false + if item.readyAt != nil { + if readyAt := item.readyAt.Sub(w.now()); readyAt > 0 { + nextReady = w.tick(readyAt) + return false + } + if !w.becameReady.Has(item.key) { + w.metrics.add(item.key) + w.becameReady.Insert(item.key) + } } - // No next element we can process - if item.readyAt != nil && item.readyAt.After(w.now()) { - readyAt := item.readyAt.Sub(w.now()) - if readyAt <= 0 { // Toctou race with the above check - readyAt = 1 - } - nextReady = w.tick(readyAt) - return false + if w.waiters.Load() == 0 { + // Have to keep iterating here to ensure we update metrics + // for further items that became ready and set nextReady. + return true } // Item is locked, we can not hand it out @@ -220,6 +231,7 @@ func (w *priorityqueue[T]) spin() { w.waiters.Add(-1) delete(w.items, item.key) w.queue.Delete(item) + w.becameReady.Delete(item.key) w.get <- *item return true @@ -279,22 +291,36 @@ func (w *priorityqueue[T]) ShutDown() { close(w.done) } +// ShutDownWithDrain just calls ShutDown, as the draining +// functionality is not used by controller-runtime. func (w *priorityqueue[T]) ShutDownWithDrain() { w.ShutDown() } +// Len returns the number of items that are ready to be +// picked up. It does not include items that are not yet +// ready. func (w *priorityqueue[T]) Len() int { w.lock.Lock() defer w.lock.Unlock() - return w.queue.Len() + var result int + w.queue.Ascend(func(item *item[T]) bool { + if item.readyAt == nil || item.readyAt.Compare(w.now()) <= 0 { + result++ + return true + } + return false + }) + + return result } func less[T comparable](a, b *item[T]) bool { if a.readyAt == nil && b.readyAt != nil { return true } - if a.readyAt != nil && b.readyAt == nil { + if b.readyAt == nil && a.readyAt != nil { return false } if a.readyAt != nil && b.readyAt != nil && !a.readyAt.Equal(*b.readyAt) { @@ -329,5 +355,4 @@ type bTree[T any] interface { ReplaceOrInsert(item T) (_ T, _ bool) Delete(item T) (T, bool) Ascend(iterator btree.ItemIteratorG[T]) - Len() int } diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 13bd5fc8d3..0a1266f5c2 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -283,6 +283,67 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.depth["test"]).To(Equal(0)) Expect(metrics.adds["test"]).To(Equal(2)) }) + + It("doesn't include non-ready items in Len()", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{After: time.Minute}, "foo") + q.AddWithOpts(AddOpts{}, "baz") + q.AddWithOpts(AddOpts{After: time.Minute}, "bar") + q.AddWithOpts(AddOpts{}, "bal") + + Expect(q.Len()).To(Equal(2)) + Expect(metrics.depth).To(HaveLen(1)) + Expect(metrics.depth["test"]).To(Equal(2)) + }) + + It("items are included in Len() and the queueDepth metric once they are ready", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "foo") + q.AddWithOpts(AddOpts{}, "baz") + q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "bar") + q.AddWithOpts(AddOpts{}, "bal") + + Expect(q.Len()).To(Equal(2)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(2)) + metrics.mu.Unlock() + time.Sleep(time.Second) + Expect(q.Len()).To(Equal(4)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(4)) + metrics.mu.Unlock() + + // Drain queue + for range 4 { + item, _ := q.Get() + q.Done(item) + } + Expect(q.Len()).To(Equal(0)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(0)) + metrics.mu.Unlock() + + // Validate that doing it again still works to notice bugs with removing + // it from the queues becameReady tracking. + q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "foo") + q.AddWithOpts(AddOpts{}, "baz") + q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "bar") + q.AddWithOpts(AddOpts{}, "bal") + + Expect(q.Len()).To(Equal(2)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(2)) + metrics.mu.Unlock() + time.Sleep(time.Second) + Expect(q.Len()).To(Equal(4)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(4)) + metrics.mu.Unlock() + }) }) func BenchmarkAddGetDone(b *testing.B) { @@ -438,10 +499,6 @@ func TestFuzzPrioriorityQueue(t *testing.T) { } wg.Wait() - - if expected := len(inQueue); expected != q.Len() { - t.Errorf("Expected queue length to be %d, was %d", expected, q.Len()) - } } func newQueue() (PriorityQueue[string], *fakeMetricsProvider) { @@ -453,6 +510,8 @@ func newQueue() (PriorityQueue[string], *fakeMetricsProvider) { bTree: q.(*priorityqueue[string]).queue, } + // validate that tick always gets a positive value as it will just return + // nil otherwise, which results in blocking forever. upstreamTick := q.(*priorityqueue[string]).tick q.(*priorityqueue[string]).tick = func(d time.Duration) <-chan time.Time { if d <= 0 { @@ -477,7 +536,7 @@ func (b *btreeInteractionValidator) ReplaceOrInsert(item *item[string]) (*item[s } func (b *btreeInteractionValidator) Delete(item *item[string]) (*item[string], bool) { - // There is node codepath that deletes an item that doesn't exist + // There is no codepath that deletes an item that doesn't exist old, existed := b.bTree.Delete(item) if !existed { panic(fmt.Sprintf("Delete: item %v not found", item)) diff --git a/pkg/handler/eventhandler.go b/pkg/handler/eventhandler.go index 9fd912f882..57107f20e9 100644 --- a/pkg/handler/eventhandler.go +++ b/pkg/handler/eventhandler.go @@ -194,8 +194,8 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) { } // isObjectUnchanged checks if the object in a create event is unchanged, for example because -// we got it in our initial listwatch or because of a resync. The heuristic it uses is to check -// if the object is older than one minute. +// we got it in our initial listwatch. The heuristic it uses is to check if the object is older +// than one minute. func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool { return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute)) } diff --git a/pkg/handler/eventhandler_test.go b/pkg/handler/eventhandler_test.go index 5679d9dffe..6e57c22c3b 100644 --- a/pkg/handler/eventhandler_test.go +++ b/pkg/handler/eventhandler_test.go @@ -776,7 +776,7 @@ var _ = Describe("Eventhandler", func() { }) Describe("WithLowPriorityWhenUnchanged", func() { - It("should lower the priority of a create request for an object that was crated more than one minute in the past", func() { + It("should lower the priority of a create request for an object that was created more than one minute in the past", func() { actualOpts := priorityqueue.AddOpts{} var actualRequests []reconcile.Request wq := &fakePriorityQueue{ @@ -797,7 +797,7 @@ var _ = Describe("Eventhandler", func() { Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) }) - It("should not lower the priority of a create request for an object that was crated less than one minute in the past", func() { + It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() { actualOpts := priorityqueue.AddOpts{} var actualRequests []reconcile.Request wq := &fakePriorityQueue{