Skip to content

Commit

Permalink
introduce generic synced map, use in generic_iterator (#348)
Browse files Browse the repository at this point in the history
  • Loading branch information
stippi2 authored Sep 11, 2024
1 parent 3636f6a commit aa2d92c
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 24 deletions.
52 changes: 28 additions & 24 deletions replicate/common/generic-replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,20 @@ type GenericReplicator struct {

// ReplicateToList is a set that caches the names of all secrets that have a
// "replicate-to" annotation.
ReplicateToList map[string]struct{}
ReplicateToList GenericMap[string, struct{}]

// ReplicateToMatchingList is a set that caches the names of all secrets
// that have a "replicate-to-matching" annotation.
ReplicateToMatchingList map[string]labels.Selector
ReplicateToMatchingList GenericMap[string, labels.Selector]
}

// NewReplicator creates a new generic replicator
// NewGenericReplicator creates a new generic replicator
func NewGenericReplicator(config ReplicatorConfig) *GenericReplicator {
repl := GenericReplicator{
ReplicatorConfig: config,
DependencyMap: make(map[string]map[string]interface{}),
ReplicateToList: make(map[string]struct{}),
ReplicateToMatchingList: make(map[string]labels.Selector),
ReplicateToList: GenericMap[string, struct{}]{},
ReplicateToMatchingList: GenericMap[string, labels.Selector]{},
}

store, controller := cache.NewInformer(
Expand Down Expand Up @@ -150,16 +150,16 @@ func (r *GenericReplicator) Run() {
// annotations into newly created namespaces.
func (r *GenericReplicator) NamespaceAdded(ns *v1.Namespace) {
logger := log.WithField("kind", r.Kind).WithField("target", ns.Name)
for sourceKey := range r.ReplicateToList {
r.ReplicateToList.Range(func(sourceKey string, _ struct{}) bool {
logger := logger.WithField("resource", sourceKey)
obj, exists, err := r.Store.GetByKey(sourceKey)

if err != nil {
log.WithError(err).Error("error fetching object from store")
continue
return true
} else if !exists {
log.Warn("object not found in store")
continue
return true
}

objectMeta := MustGetObject(obj)
Expand All @@ -175,29 +175,32 @@ func (r *GenericReplicator) NamespaceAdded(ns *v1.Namespace) {
}

}
}

return true
})

namespaceLabels := labels.Set(ns.Labels)
for sourceKey, selector := range r.ReplicateToMatchingList {
r.ReplicateToMatchingList.Range(func(sourceKey string, selector labels.Selector) bool {
logger := logger.WithField("resource", sourceKey)

obj, exists, err := r.Store.GetByKey(sourceKey)
if err != nil {
log.WithError(err).Error("error fetching object from store")
continue
return true
} else if !exists {
log.Warn("object not found in store")
continue
return true
}

if !selector.Matches(namespaceLabels) {
continue
return true
}

if _, err := r.replicateResourceToNamespaces(obj, []v1.Namespace{*ns}); err != nil {
logger.WithError(err).Error("error while replicating object to namespace")
}
}
return true
})
}

// NamespaceUpdated checks if namespace's labels changed and deletes any 'replicate-to-matching' resources
Expand All @@ -217,21 +220,22 @@ func (r *GenericReplicator) NamespaceUpdated(nsOld *v1.Namespace, nsNew *v1.Name
var oldLabelSet labels.Set
oldLabelSet = nsOld.Labels
// check 'replicate-to-matching' resources against new labels
for sourceKey, selector := range r.ReplicateToMatchingList {
r.ReplicateToMatchingList.Range(func(sourceKey string, selector labels.Selector) bool {
if selector.Matches(oldLabelSet) && !selector.Matches(newLabelSet) {
obj, exists, err := r.Store.GetByKey(sourceKey)
if err != nil {
log.WithError(err).Error("error fetching object from store")
continue
return true
} else if !exists {
log.Warn("object not found in store")
continue
return true
}
// delete resource from the updated namespace
logger.Infof("removed %s %s from %s", r.Kind, sourceKey, nsNew.Name)
r.DeleteResourceInNamespaces(obj, &v1.NamespaceList{Items: []v1.Namespace{*nsNew}})
}
}
return true
})

// replicate resources to updated ns
logger.Infof("labels of namespace %s changed, attempting to replicate %ss", nsNew.Name, r.Kind)
Expand Down Expand Up @@ -267,7 +271,7 @@ func (r *GenericReplicator) ResourceAdded(obj interface{}) {

// Match resources with "replicate-to" annotation
if namespacePatterns, ok := annotations[ReplicateTo]; ok {
r.ReplicateToList[sourceKey] = struct{}{}
r.ReplicateToList.Store(sourceKey, struct{}{})

namespacesFromStore := namespaceWatcher.NamespaceStore.List()
namespaces := make([]v1.Namespace, len(namespacesFromStore))
Expand All @@ -278,26 +282,26 @@ func (r *GenericReplicator) ResourceAdded(obj interface{}) {
logger.WithError(err).Errorf("could not replicate object to other namespaces")
}
} else {
delete(r.ReplicateToList, sourceKey)
r.ReplicateToList.Delete(sourceKey)
}

// Match resources with "replicate-to-matching" annotations
if namespaceSelectorString, ok := annotations[ReplicateToMatching]; ok {
namespaceSelector, err := labels.Parse(namespaceSelectorString)
if err != nil {
delete(r.ReplicateToMatchingList, sourceKey)
r.ReplicateToMatchingList.Delete(sourceKey)
logger.WithError(err).Error("failed to parse label selector")

return
}

r.ReplicateToMatchingList[sourceKey] = namespaceSelector
r.ReplicateToMatchingList.Store(sourceKey, namespaceSelector)

if err := r.replicateResourceToMatchingNamespacesByLabel(ctx, obj, namespaceSelector); err != nil {
logger.WithError(err).Error("error while replicating by label selector")
}
} else {
delete(r.ReplicateToMatchingList, sourceKey)
r.ReplicateToMatchingList.Delete(sourceKey)
}
}

Expand Down Expand Up @@ -458,7 +462,7 @@ func (r *GenericReplicator) ResourceDeleted(source interface{}) {
r.ResourceDeletedReplicateTo(source)
r.ResourceDeletedReplicateFrom(source)

delete(r.ReplicateToList, sourceKey)
r.ReplicateToList.Delete(sourceKey)

}

Expand Down
44 changes: 44 additions & 0 deletions replicate/common/generic_sync_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package common

import (
"sync"
)

// GenericMap is a generic sync.Map that can store any type of key and value.
type GenericMap[K comparable, V any] struct {
m sync.Map
}

func (gm *GenericMap[K, V]) Store(key K, value V) {
gm.m.Store(key, value)
}

func (gm *GenericMap[K, V]) Load(key K) (value V, ok bool) {
rawValue, ok := gm.m.Load(key)
if ok {
value = rawValue.(V)
}
return value, ok
}

func (gm *GenericMap[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
rawActual, loaded := gm.m.LoadOrStore(key, value)
if loaded {
actual = rawActual.(V)
} else {
actual = value
}
return actual, loaded
}

func (gm *GenericMap[K, V]) Delete(key K) {
gm.m.Delete(key)
}

func (gm *GenericMap[K, V]) Range(f func(key K, value V) bool) {
gm.m.Range(func(rawKey, rawValue any) bool {
key := rawKey.(K)
value := rawValue.(V)
return f(key, value)
})
}

0 comments on commit aa2d92c

Please sign in to comment.