Skip to content

Commit

Permalink
Merge pull request #392 from barney-s/watch-timeout
Browse files Browse the repository at this point in the history
Add the ability to set watch timeouts.
  • Loading branch information
k8s-ci-robot authored Jun 14, 2024
2 parents a2c2fc4 + 4a54ca1 commit 53ff542
Showing 1 changed file with 49 additions and 3 deletions.
52 changes: 49 additions & 3 deletions pkg/patterns/declarative/pkg/watch/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -35,8 +36,19 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
)

// WatchDelay is the time between a Watch being dropped and attempting to resume it
const WatchDelay = 30 * time.Second
var (
// WatchActivityTimeout sets a timeout for a Watch activity under normal operation
WatchActivityTimeout = 300 * time.Second
// WatchActivityFirstTimeout sets a timeout for Watch activity in an Apply path
// We expect the author to set this to a lower value in environments where it makes sense.
// func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { ... watch.WatchActivityFirstTimeout = 10 * time.Second ... }
WatchActivityFirstTimeout = 300 * time.Second
)

const (
// WatchDelay is the time between a Watch being dropped and attempting to resume it
WatchDelay = 30 * time.Second
)

// NewDynamicWatch constructs a watcher for unstructured objects.
// Deprecated: avoid using directly; will move to internal in future.
Expand Down Expand Up @@ -138,13 +150,46 @@ type clientObject struct {
//
// [1] https://github.com/kubernetes/kubernetes/issues/54878#issuecomment-357575276
func (w *dynamicKindWatch) watchUntilClosed(ctx context.Context, eventTarget metav1.ObjectMeta, watchStarted *sync.WaitGroup) {
var sawActivity atomic.Bool

log := log.FromContext(ctx)

options := w.FilterOptions
// Though we don't use the resource version, we allow bookmarks to help keep TCP connections healthy.
options.AllowWatchBookmarks = true

events, err := w.resource.Watch(context.TODO(), options)
activityTimeout := WatchActivityTimeout
if watchStarted != nil {
activityTimeout = WatchActivityFirstTimeout
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Check for events periodically
ticker := time.NewTicker(activityTimeout)
defer ticker.Stop()
sawActivity.Store(false)

go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !sawActivity.Load() {
log.WithValues("kind", w.GVK.String()).WithValues("namespace", w.FilterNamespace).WithValues("labels", options.LabelSelector).Info("no activity seen for a while, cancelling watch")
cancel()
return
}
sawActivity.Store(false)
}
}
}()

events, err := w.resource.Watch(ctx, options)
// If the Watch() call doesnt return, this would not be set to true thereby causing the timer to cancle the watch() context
// We have seen cases where a proxy in between causes the first watch to hang if there were no matching objects to return
sawActivity.Store(true)

if watchStarted != nil {
watchStarted.Done()
}
Expand All @@ -159,6 +204,7 @@ func (w *dynamicKindWatch) watchUntilClosed(ctx context.Context, eventTarget met
defer events.Stop()

for clientEvent := range events.ResultChan() {
sawActivity.Store(true)
switch clientEvent.Type {
case watch.Bookmark:
// not an object change, we ignore it
Expand Down

0 comments on commit 53ff542

Please sign in to comment.