Skip to content
This repository has been archived by the owner on Aug 28, 2024. It is now read-only.

Commit

Permalink
Track resources with a label selector (#367)
Browse files Browse the repository at this point in the history
Update tracker package to the latest from knative.dev/pkg/tracker, and
then heavily modified to remove dependence on Knative packages and align
with controller-runtime.

The Tracker can now track resources by name or by selector. The
Everything selector along with an empty namespace can be used to
implicitly track all resource of a given type.

The Tracker#TrackReference method can specify either a name or selector
used to match resource. The Tracker#TrackObject method is a convenience
method to convert a client.Object into a tracker.Reference.

Similar to the existing Config#TrackAndGet method, there is a new
Config#TrackAndList method that follows the contract for client.List
while also tracking resources matching the query.

Direct users of the Tracker will need to adapt to the new interface,
however, indirect uses (like reconciler tests) are preserved.

Signed-off-by: Scott Andrews <[email protected]>
  • Loading branch information
scothis authored May 12, 2023
1 parent e547d52 commit 615186e
Show file tree
Hide file tree
Showing 13 changed files with 1,014 additions and 233 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,9 @@ func StashExampleSubReconciler(c reconcilers.Config) reconcilers.SubReconciler {

The [`Tracker`](https://pkg.go.dev/github.com/vmware-labs/reconciler-runtime/tracker#Tracker) provides a means for one resource to watch another resource for mutations, triggering the reconciliation of the resource defining the reference.

It's common to work with a resource that is also tracked. The [Config.TrackAndGet](https://pkg.go.dev/github.com/vmware-labs/reconciler-runtime/reconcilers#Config.TrackAndGet) method uses the same signature as client.Get, but additionally tracks the resource.
Resources can either be tracked by name or with a label selector using [`TrackReference`](https://pkg.go.dev/github.com/vmware-labs/reconciler-runtime/tracker#Tracker.TrackReference).

It's common to work with a resource that is also tracked. The [Config.TrackAndGet](https://pkg.go.dev/github.com/vmware-labs/reconciler-runtime/reconcilers#Config.TrackAndGet) method uses the same signature as client.Get, but additionally tracks the resource. Likewise, the [Config.TrackAndList](https://pkg.go.dev/github.com/vmware-labs/reconciler-runtime/reconcilers#Config.TrackAndList) method uses the same signature as client.List, but additionally tracks resources matching the query.

In the [Setup](https://pkg.go.dev/github.com/vmware-labs/reconciler-runtime/reconcilers#SyncReconciler) method, a watch is created that will notify the handler every time a resource of that kind is mutated. The [EnqueueTracked](https://pkg.go.dev/github.com/vmware-labs/reconciler-runtime/reconcilers#EnqueueTracked) helper returns a list of resources that are tracking the given resource, those resources are enqueued for the reconciler.

Expand Down
21 changes: 9 additions & 12 deletions reconcilers/enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,26 @@ package reconcilers
import (
"context"

"k8s.io/apimachinery/pkg/types"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"

"github.com/vmware-labs/reconciler-runtime/tracker"
)

func EnqueueTracked(ctx context.Context, by client.Object) handler.EventHandler {
func EnqueueTracked(ctx context.Context) handler.EventHandler {
c := RetrieveConfigOrDie(ctx)
log := logr.FromContextOrDiscard(ctx)

return handler.EnqueueRequestsFromMapFunc(
func(a client.Object) []Request {
func(obj client.Object) []Request {
var requests []Request

gvks, _, err := c.Scheme().ObjectKinds(by)
items, err := c.Tracker.GetObservers(obj)
if err != nil {
panic(err)
log.Error(err, "unable to get tracked requests")
return nil
}

key := tracker.NewKey(
gvks[0],
types.NamespacedName{Namespace: a.GetNamespace(), Name: a.GetName()},
)
for _, item := range c.Tracker.Lookup(ctx, key) {
for _, item := range items {
requests = append(requests, Request{NamespacedName: item})
}

Expand Down
61 changes: 50 additions & 11 deletions reconcilers/reconcilers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"errors"
"fmt"
"reflect"
"strings"
"sync"
"time"

Expand All @@ -21,11 +22,13 @@ import (
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -71,21 +74,56 @@ func (c Config) WithCluster(cluster cluster.Cluster) Config {
// TrackAndGet tracks the resources for changes and returns the current value. The track is
// registered even when the resource does not exists so that its creation can be tracked.
//
// Equivalent to calling both `c.Tracker.Track(...)` and `c.Client.Get(...)`
// Equivalent to calling both `c.Tracker.TrackObject(...)` and `c.Client.Get(...)`
func (c Config) TrackAndGet(ctx context.Context, key types.NamespacedName, obj client.Object, opts ...client.GetOption) error {
c.Tracker.Track(
ctx,
tracker.NewKey(gvk(obj, c.Scheme()), key),
RetrieveRequest(ctx).NamespacedName,
)
// create synthetic resource to track from known type and request
req := RetrieveRequest(ctx)
resource := RetrieveResourceType(ctx).DeepCopyObject().(client.Object)
resource.SetNamespace(req.Namespace)
resource.SetName(req.Name)
ref := obj.DeepCopyObject().(client.Object)
ref.SetNamespace(key.Namespace)
ref.SetName(key.Name)
c.Tracker.TrackObject(ref, resource)

return c.Get(ctx, key, obj, opts...)
}

// TrackAndList tracks the resources for changes and returns the current value.
//
// Equivalent to calling both `c.Tracker.TrackReference(...)` and `c.Client.List(...)`
func (c Config) TrackAndList(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
// create synthetic resource to track from known type and request
req := RetrieveRequest(ctx)
resource := RetrieveResourceType(ctx).DeepCopyObject().(client.Object)
resource.SetNamespace(req.Namespace)
resource.SetName(req.Name)

or, err := reference.GetReference(c.Scheme(), list)
if err != nil {
return err
}
gvk := schema.FromAPIVersionAndKind(or.APIVersion, or.Kind)
listOpts := (&client.ListOptions{}).ApplyOptions(opts)
if listOpts.LabelSelector == nil {
listOpts.LabelSelector = labels.Everything()
}
ref := tracker.Reference{
APIGroup: gvk.Group,
Kind: strings.TrimSuffix(gvk.Kind, "List"),
Namespace: listOpts.Namespace,
Selector: listOpts.LabelSelector,
}
c.Tracker.TrackReference(ref, resource)

return c.List(ctx, list, opts...)
}

// NewConfig creates a Config for a specific API type. Typically passed into a
// reconciler.
func NewConfig(mgr ctrl.Manager, apiType client.Object, syncPeriod time.Duration) Config {
return Config{
Tracker: tracker.New(2 * syncPeriod),
Tracker: tracker.New(mgr.GetScheme(), 2*syncPeriod),
}.WithCluster(mgr)
}

Expand Down Expand Up @@ -601,7 +639,7 @@ func (r *AggregateReconciler[T]) Reconcile(ctx context.Context, req Request) (Re
Client: c.Client,
APIReader: c.APIReader,
Recorder: c.Recorder,
Tracker: tracker.New(0),
Tracker: tracker.New(c.Scheme(), 0),
})
desired, err := r.desiredResource(ctx, resource)
if err != nil {
Expand Down Expand Up @@ -1069,7 +1107,7 @@ func (r *ChildReconciler[T, CT, CLT]) SetupWithManager(ctx context.Context, mgr
}

if r.SkipOwnerReference {
bldr.Watches(&source.Kind{Type: r.ChildType}, EnqueueTracked(ctx, r.ChildType))
bldr.Watches(&source.Kind{Type: r.ChildType}, EnqueueTracked(ctx))
} else {
bldr.Owns(r.ChildType)
}
Expand Down Expand Up @@ -1680,7 +1718,8 @@ func (r *ResourceManager[T]) Manage(ctx context.Context, resource client.Object,
if r.TrackDesired {
// normally tracks should occur before API operations, but when creating a resource with a
// generated name, we need to know the actual resource name.
if err := c.Tracker.TrackChild(ctx, resource, desired, c.Scheme()); err != nil {

if err := c.Tracker.TrackObject(desired, resource); err != nil {
return nilT, err
}
}
Expand Down Expand Up @@ -1715,7 +1754,7 @@ func (r *ResourceManager[T]) Manage(ctx context.Context, resource client.Object,
}
log.Info("updating resource", "diff", cmp.Diff(r.sanitize(actual), r.sanitize(current)))
if r.TrackDesired {
if err := c.Tracker.TrackChild(ctx, resource, current, c.Scheme()); err != nil {
if err := c.Tracker.TrackObject(current, resource); err != nil {
return nilT, err
}
}
Expand Down
148 changes: 147 additions & 1 deletion reconcilers/reconcilers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -122,6 +123,151 @@ func TestConfig_TrackAndGet(t *testing.T) {
})
}

func TestConfig_TrackAndList(t *testing.T) {
testNamespace := "test-namespace"
testName := "test-resource"
testSelector, _ := labels.Parse("app=test-app")

scheme := runtime.NewScheme()
_ = resources.AddToScheme(scheme)
_ = clientgoscheme.AddToScheme(scheme)

resource := dies.TestResourceBlank.
MetadataDie(func(d *diemetav1.ObjectMetaDie) {
d.Namespace(testNamespace)
d.Name(testName)
})

configMap := diecorev1.ConfigMapBlank.
MetadataDie(func(d *diemetav1.ObjectMetaDie) {
d.Namespace("track-namespace")
d.Name("track-name")
d.AddLabel("app", "test-app")
}).
AddData("greeting", "hello")

rts := rtesting.SubReconcilerTests[*resources.TestResource]{
"track and list": {
Resource: resource.DieReleasePtr(),
GivenObjects: []client.Object{
configMap,
},
Metadata: map[string]interface{}{
"listOpts": []client.ListOption{},
},
ExpectTracks: []rtesting.TrackRequest{
{
Tracker: types.NamespacedName{
Namespace: testNamespace,
Name: testName,
},
TrackedReference: tracker.Reference{
Kind: "ConfigMap",
Selector: labels.Everything(),
},
},
},
},
"track and list constrained": {
Resource: resource.DieReleasePtr(),
GivenObjects: []client.Object{
configMap,
},
Metadata: map[string]interface{}{
"listOpts": []client.ListOption{
client.InNamespace("track-namespace"),
client.MatchingLabels(map[string]string{"app": "test-app"}),
},
},
ExpectTracks: []rtesting.TrackRequest{
{
Tracker: types.NamespacedName{
Namespace: testNamespace,
Name: testName,
},
TrackedReference: tracker.Reference{
Kind: "ConfigMap",
Namespace: "track-namespace",
Selector: testSelector,
},
},
},
},
"track with errored list": {
Resource: resource.DieReleasePtr(),
ShouldErr: true,
WithReactors: []rtesting.ReactionFunc{
rtesting.InduceFailure("list", "ConfigMapList"),
},
Metadata: map[string]interface{}{
"listOpts": []client.ListOption{},
},
ExpectTracks: []rtesting.TrackRequest{
{
Tracker: types.NamespacedName{
Namespace: testNamespace,
Name: testName,
},
TrackedReference: tracker.Reference{
Kind: "ConfigMap",
Selector: labels.Everything(),
},
},
},
},
}

// run with typed objects
t.Run("typed", func(t *testing.T) {
rts.Run(t, scheme, func(t *testing.T, rtc *rtesting.SubReconcilerTestCase[*resources.TestResource], c reconcilers.Config) reconcilers.SubReconciler[*resources.TestResource] {
return &reconcilers.SyncReconciler[*resources.TestResource]{
Sync: func(ctx context.Context, resource *resources.TestResource) error {
c := reconcilers.RetrieveConfigOrDie(ctx)

cms := &corev1.ConfigMapList{}
listOpts := rtc.Metadata["listOpts"].([]client.ListOption)
err := c.TrackAndList(ctx, cms, listOpts...)
if err != nil {
return err
}

if expected, actual := "hello", cms.Items[0].Data["greeting"]; expected != actual {
// should never get here
panic(fmt.Errorf("expected configmap to have greeting %q, found %q", expected, actual))
}
return nil
},
}
})
})

// run with unstructured objects
t.Run("unstructured", func(t *testing.T) {
rts.Run(t, scheme, func(t *testing.T, rtc *rtesting.SubReconcilerTestCase[*resources.TestResource], c reconcilers.Config) reconcilers.SubReconciler[*resources.TestResource] {
return &reconcilers.SyncReconciler[*resources.TestResource]{
Sync: func(ctx context.Context, resource *resources.TestResource) error {
c := reconcilers.RetrieveConfigOrDie(ctx)

cms := &unstructured.UnstructuredList{}
cms.SetAPIVersion("v1")
cms.SetKind("ConfigMapList")
listOpts := rtc.Metadata["listOpts"].([]client.ListOption)
err := c.TrackAndList(ctx, cms, listOpts...)
if err != nil {
return err
}

if expected, actual := "hello", cms.UnstructuredContent()["items"].([]interface{})[0].(map[string]interface{})["data"].(map[string]interface{})["greeting"].(string); expected != actual {
// should never get here
panic(fmt.Errorf("expected configmap to have greeting %q, found %q", expected, actual))
}
return nil
},
}
})
})
}

func TestResourceReconciler_NoStatus(t *testing.T) {
testNamespace := "test-namespace"
testName := "test-resource-no-status"
Expand Down Expand Up @@ -3807,7 +3953,7 @@ func TestWithConfig(t *testing.T) {
Metadata: map[string]interface{}{
"SubReconciler": func(t *testing.T, oc reconcilers.Config) reconcilers.SubReconciler[*resources.TestResource] {
c := reconcilers.Config{
Tracker: tracker.New(0),
Tracker: tracker.New(oc.Scheme(), 0),
}

return &reconcilers.WithConfig[*resources.TestResource]{
Expand Down
8 changes: 7 additions & 1 deletion reconcilers/reconcilers_validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"github.com/vmware-labs/reconciler-runtime/internal/resources"
"github.com/vmware-labs/reconciler-runtime/tracker"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -545,8 +547,12 @@ func TestCastResource_validate(t *testing.T) {
}

func TestWithConfig_validate(t *testing.T) {
scheme := runtime.NewScheme()
_ = resources.AddToScheme(scheme)
_ = clientgoscheme.AddToScheme(scheme)

config := Config{
Tracker: tracker.New(0),
Tracker: tracker.New(scheme, 0),
}

tests := []struct {
Expand Down
10 changes: 6 additions & 4 deletions testing/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (c *ExpectConfig) init() {
events: []Event{},
scheme: c.Scheme,
}
c.tracker = createTracker(c.GivenTracks)
c.tracker = createTracker(c.GivenTracks, c.Scheme)
c.observedErrors = []string{}
})
}
Expand Down Expand Up @@ -330,18 +330,20 @@ func (c *ExpectConfig) AssertTrackerExpectations(t *testing.T) {

actualTracks := c.tracker.getTrackRequests()
for i, exp := range c.ExpectTracks {
exp.normalize()

if i >= len(actualTracks) {
c.errorf(t, "Missing tracking request for config %q: %s", c.Name, exp)
c.errorf(t, "Missing tracking request for config %q: %v", c.Name, exp)
continue
}

if diff := cmp.Diff(exp, actualTracks[i]); diff != "" {
if diff := cmp.Diff(exp, actualTracks[i], NormalizeLabelSelector); diff != "" {
c.errorf(t, "Unexpected tracking request for config %q (-expected, +actual): %s", c.Name, diff)
}
}
if actual, exp := len(actualTracks), len(c.ExpectTracks); actual > exp {
for _, extra := range actualTracks[exp:] {
c.errorf(t, "Extra tracking request for config %q: %s", c.Name, extra)
c.errorf(t, "Extra tracking request for config %q: %v", c.Name, extra)
}
}
}
Expand Down
Loading

0 comments on commit 615186e

Please sign in to comment.