Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Initial impl of az watcher #3091

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions pkg/csi/service/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client/config"
cnsvolume "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/volume"
cnsvsphere "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/vsphere"
cnsconfig "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/config"
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/logger"
csitypes "sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/types"
k8s "sigs.k8s.io/vsphere-csi-driver/v3/pkg/kubernetes"
)

const (
Expand All @@ -47,6 +50,7 @@ const (
)

var ErrAvailabilityZoneCRNotRegistered = errors.New("AvailabilityZone custom resource not registered")
var ErrGroupVersionResourceNotRegistered = errors.New("GroupVersionResource custom resource not registered")

// GetVCenter returns VirtualCenter object from specified Manager object.
// Before returning VirtualCenter object, vcenter connection is established if
Expand Down Expand Up @@ -470,3 +474,45 @@ func GetValidatedCNSVolumeInfoPatch(ctx context.Context,
}
return patch, nil
}

// StartInformer listens to changes to the given group version resource
func StartInformer(ctx context.Context, cfg restclient.Config, gvr schema.GroupVersionResource,
addFunc func(ob interface{}), updateFunc func(old interface{}, new interface{}),
deleteFunc func(ob interface{})) (*cache.SharedIndexInformer, error) {
log := logger.GetLogger(ctx)
client, err := dynamic.NewForConfig(&cfg)
if err != nil {
return nil, fmt.Errorf("failed to create client using config. Err: %+v", err)
}

_, err = client.Resource(gvr).List(ctx, metav1.ListOptions{})
if apiMeta.IsNoMatchError(err) {
log.Error("Requested CR is not registered on the cluster", gvr)
return nil, ErrGroupVersionResourceNotRegistered
}

// At this point, we are sure the resource is registered. Create an informer.
dynInformer, err := k8s.GetDynamicInformer(ctx, gvr.Group,
gvr.Version, gvr.Resource, metav1.NamespaceAll, &cfg, true)
if err != nil {
log.Errorf("failed to create dynamic informer for CR. Error: %+v",
err)
return nil, err
}
informer := dynInformer.Informer()
_, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: addFunc,
UpdateFunc: updateFunc,
DeleteFunc: deleteFunc,
})
if err != nil {
return nil, logger.LogNewErrorf(log,
"failed to add event handler on informer for CR. Error: %v", err)
}

go func() {
log.Info("Informer to watch on CR starting..")
informer.Run(make(chan struct{}))
}()
return &informer, nil
}
42 changes: 42 additions & 0 deletions pkg/syncer/storagepool/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ package storagepool

import (
"context"
"os"
"reflect"
"sync"
"time"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
restclient "k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client/config"

storagepoolconfig "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/storagepool/config"
Expand All @@ -41,6 +45,8 @@ type Service struct {
scWatchCntlr *StorageClassWatch
migrationCntlr *migrationController
clusterIDs []string

restart chan struct{}
}

var (
Expand Down Expand Up @@ -153,6 +159,7 @@ func InitStoragePoolService(ctx context.Context,
storagePoolService.scWatchCntlr = scWatchCntlr
storagePoolService.migrationCntlr = migrationController
storagePoolService.clusterIDs = clusterIDs
storagePoolService.restart = make(chan struct{})

// Create the default Service.
defaultStoragePoolServiceLock.Lock()
Expand All @@ -162,6 +169,14 @@ func InitStoragePoolService(ctx context.Context,
startPropertyCollectorListener(ctx)

log.Infof("Done initializing Storage Pool Service")
go func() {
<-defaultStoragePoolService.restart
err := InitStoragePoolService(ctx, configInfo, coInitParams)
if err != nil { // TODO: how to handle this?
log.Errorf("Failed starting Storage Pool Service. Err: %+v", err)
os.Exit(1)
}
}()
return nil
}

Expand Down Expand Up @@ -221,3 +236,30 @@ func ResetVC(ctx context.Context, vc *cnsvsphere.VirtualCenter) {
log.Info("Successfully reset VC connection in StoragePool service")
}
}

// TODO: this could cause multiple watcher registrations. So, we should just update the map and send a signal
func startAvailabilityZoneWatcher(ctx context.Context, cfg restclient.Config) error {
var restartSPS = func(obj interface{}) {
_, log := logger.GetNewContextWithLogger()
// Retrieve name of CR instance.
azName, found, err := unstructured.NestedString(obj.(*unstructured.Unstructured).Object, "metadata", "name")
if !found || err != nil {
log.Errorf("failed to get `name` from AvailabilityZone instance: %+v, Error: %+v", obj, err)
return
}

log.Infof("restarting StoragePool service as AvailabilityZone %s is either created or deleted", azName)
defaultStoragePoolServiceLock.Lock()
defer defaultStoragePoolServiceLock.Unlock()
defaultStoragePoolService.restart <- struct{}{}
}

gvr := schema.GroupVersionResource{
Group: "topology.tanzu.vmware.com", Version: "v1alpha1", Resource: "availabilityzones",
}
_, err := common.StartInformer(ctx, cfg, gvr, restartSPS, nil, restartSPS)
if err != nil {
return err
}
return nil
}