Skip to content

Commit

Permalink
Merge pull request #3087 from tareksha/typed_waitforsync
Browse files Browse the repository at this point in the history
[release-0.19] 🐛fix(controller): support WaitForSync in custom TypedSyncingSource
  • Loading branch information
k8s-ci-robot authored Jan 23, 2025
2 parents d4df90f + b2c2171 commit e022b91
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *Controller[request]) Start(ctx context.Context) error {
c.LogConstructor(nil).Info("Starting Controller")

for _, watch := range c.startWatches {
syncingSource, ok := watch.(source.SyncingSource)
syncingSource, ok := watch.(source.TypedSyncingSource[request])
if !ok {
continue
}
Expand Down
76 changes: 76 additions & 0 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

type TestRequest struct {
Key string
}

var _ = Describe("controller", func() {
var fakeReconcile *fakeReconciler
var ctrl *Controller[reconcile.Request]
Expand Down Expand Up @@ -323,6 +327,41 @@ var _ = Describe("controller", func() {
Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times"))
})

It("should check for correct TypedSyncingSource if custom types are used", func() {
queue := &controllertest.TypedQueue[TestRequest]{
TypedInterface: workqueue.NewTyped[TestRequest](),
}
ctrl := &Controller[TestRequest]{
NewQueue: func(string, workqueue.TypedRateLimiter[TestRequest]) workqueue.TypedRateLimitingInterface[TestRequest] {
return queue
},
LogConstructor: func(*TestRequest) logr.Logger {
return log.RuntimeLog.WithName("controller").WithName("test")
},
}
ctrl.CacheSyncTimeout = time.Second
src := &bisignallingSource[TestRequest]{
startCall: make(chan workqueue.TypedRateLimitingInterface[TestRequest]),
startDone: make(chan error, 1),
waitCall: make(chan struct{}),
waitDone: make(chan error, 1),
}
ctrl.startWatches = []source.TypedSource[TestRequest]{src}
ctrl.Name = "foo"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
startCh := make(chan error)
go func() {
defer GinkgoRecover()
startCh <- ctrl.Start(ctx)
}()
Eventually(src.startCall).Should(Receive(Equal(queue)))
src.startDone <- nil
Eventually(src.waitCall).Should(BeClosed())
src.waitDone <- nil
cancel()
Eventually(startCh).Should(Receive(Succeed()))
})
})

Describe("Processing queue items from a Controller", func() {
Expand Down Expand Up @@ -875,3 +914,40 @@ func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Conte
<-ctx.Done()
return nil, errors.New("GetInformer timed out")
}

type bisignallingSource[T comparable] struct {
// receives the queue that is passed to Start
startCall chan workqueue.TypedRateLimitingInterface[T]
// passes an error to return from Start
startDone chan error
// closed when WaitForSync is called
waitCall chan struct{}
// passes an error to return from WaitForSync
waitDone chan error
}

var _ source.TypedSyncingSource[int] = (*bisignallingSource[int])(nil)

func (t *bisignallingSource[T]) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[T]) error {
select {
case t.startCall <- q:
case <-ctx.Done():
return ctx.Err()
}
select {
case err := <-t.startDone:
return err
case <-ctx.Done():
return ctx.Err()
}
}

func (t *bisignallingSource[T]) WaitForSync(ctx context.Context) error {
close(t.waitCall)
select {
case err := <-t.waitDone:
return err
case <-ctx.Done():
return ctx.Err()
}
}

0 comments on commit e022b91

Please sign in to comment.