Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: drift detection and takeover implementation (1/ contd) #1010

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 53 additions & 24 deletions pkg/controllers/workapplier/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/pkg/utils/resource"
)

Expand All @@ -35,7 +36,7 @@ func init() {
_ = clientgoscheme.AddToScheme(builtInScheme)
}

// applyInDryRunMode
// applyInDryRunMode dry-runs an apply op.
func (r *Reconciler) applyInDryRunMode(
ctx context.Context,
gvr *schema.GroupVersionResource,
Expand Down Expand Up @@ -74,8 +75,8 @@ func (r *Reconciler) apply(

// Compute the hash of the manifest object.
//
// Originally the manifest hash is kept only if three-way merge patch (client side apply esque
// strategy) is used; with the new drift detection and takeover capabilities, the manifest hash
// Originally the manifest hash is kept only if three-way merge patch (client side apply)
// is used; with the new drift detection and takeover capabilities, the manifest hash
// will always be kept regardless of the apply strategy in use, as it is needed for
// drift detection purposes.
//
Expand Down Expand Up @@ -141,12 +142,12 @@ func (r *Reconciler) apply(
switch {
case applyStrategy.Type == fleetv1beta1.ApplyStrategyTypeClientSideApply && isLastAppliedAnnotationSet:
// The apply strategy dictates that three-way merge patch
// (client-side apply esque patch) should be used, and the last applied annotation
// (client-side apply) should be used, and the last applied annotation
// has been set.
return r.threeWayMergePatch(ctx, gvr, manifestObjCopy, inMemberClusterObj, isOptimisticLockEnabled, false)
case applyStrategy.Type == fleetv1beta1.ApplyStrategyTypeClientSideApply:
// The apply strategy dictates that three-way merge patch
// (client-side apply esque patch) should be used, but the last applied annotation
// (client-side apply) should be used, but the last applied annotation
// cannot be set. Fleet will fall back to server-side apply.
return r.serverSideApply(
ctx,
Expand All @@ -161,8 +162,11 @@ func (r *Reconciler) apply(
applyStrategy.ServerSideApplyConfig.ForceConflicts, isOptimisticLockEnabled, false,
)
default:
// An unexpected apply strategy has been set.
return nil, fmt.Errorf("unexpected apply strategy %s is found", applyStrategy.Type)
// An unexpected apply strategy has been set. Normally this will never run as the built-in
// validation would block invalid values.
wrappedErr := fmt.Errorf("unexpected apply strategy %s is found", applyStrategy.Type)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return nil, wrappedErr
Comment on lines +167 to +169
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why don't we return the unexpectedError here but return APIServer errors in other places?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that UnExpected erorr are "unExpected" so we don't want to record?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ryan! It's still logged; the thing about this one is that the returned error will be formatted into a condition message which the user would be able to see, and the unexpected behavior cannot be handled by the controller part might read a bit weird; do we still want to expose them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will make the changes if that is OK with you.

}
}

Expand All @@ -177,7 +181,8 @@ func (r *Reconciler) createManifestObject(
}
createdObj, err := r.spokeDynamicClient.Resource(*gvr).Namespace(manifestObject.GetNamespace()).Create(ctx, manifestObject, createOpts)
if err != nil {
return nil, fmt.Errorf("failed to create manifest object: %w", err)
wrappedErr := controller.NewAPIServerError(false, err)
return nil, fmt.Errorf("failed to create manifest object: %w", wrappedErr)
}
return createdObj, nil
}
Expand Down Expand Up @@ -212,7 +217,9 @@ func (r *Reconciler) threeWayMergePatch(
data, err := patch.Data(manifestObj)
if err != nil {
// Fleet uses raw patch; this branch should never run.
return nil, fmt.Errorf("failed to get patch data: %w", err)
wrappedErr := fmt.Errorf("failed to get patch data: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return nil, wrappedErr
}

// Use three-way merge (similar to kubectl client side apply) to patch the object in the
Expand All @@ -233,7 +240,8 @@ func (r *Reconciler) threeWayMergePatch(
Resource(*gvr).Namespace(manifestObj.GetNamespace()).
Patch(ctx, manifestObj.GetName(), patch.Type(), data, patchOpts)
if err != nil {
return nil, fmt.Errorf("failed to patch the manifest object: %w", err)
wrappedErr := controller.NewAPIServerError(false, err)
return nil, fmt.Errorf("failed to patch the manifest object: %w", wrappedErr)
}
return patchedObj, nil
}
Expand Down Expand Up @@ -274,7 +282,8 @@ func (r *Reconciler) serverSideApply(
Resource(*gvr).Namespace(manifestObj.GetNamespace()).
Apply(ctx, manifestObj.GetName(), manifestObj, applyOpts)
if err != nil {
return nil, fmt.Errorf("failed to apply the manifest object: %w", err)
wrappedErr := controller.NewAPIServerError(false, err)
return nil, fmt.Errorf("failed to apply the manifest object: %w", wrappedErr)
}
return appliedObj, nil
}
Expand Down Expand Up @@ -314,7 +323,9 @@ func buildThreeWayMergePatch(manifestObj, liveObj *unstructured.Unstructured) (c
patchData, err = jsonmergepatch.CreateThreeWayJSONMergePatch(
lastAppliedObjJSONBytes, manifestObjJSONBytes, liveObjJSONBytes, preconditions...)
if err != nil {
return nil, err
wrappedErr := fmt.Errorf("failed to create three-way JSON merge patch: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return nil, wrappedErr
}
case err != nil:
return nil, err
Expand All @@ -323,11 +334,15 @@ func buildThreeWayMergePatch(manifestObj, liveObj *unstructured.Unstructured) (c
patchType = types.StrategicMergePatchType
lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject)
if err != nil {
return nil, err
wrappedErr := fmt.Errorf("failed to create patch meta from struct (strategic merge patch): %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return nil, wrappedErr
}
patchData, err = strategicpatch.CreateThreeWayMergePatch(lastAppliedObjJSONBytes, manifestObjJSONBytes, liveObjJSONBytes, lookupPatchMeta, true)
if err != nil {
return nil, err
wrappedErr := fmt.Errorf("failed to create three-way strategic merge patch: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return nil, wrappedErr
}
}
return client.RawPatch(patchType, patchData), nil
Expand All @@ -346,7 +361,9 @@ func setFleetLastAppliedAnnotation(manifestObj *unstructured.Unstructured) (bool

lastAppliedManifestJSONBytes, err := manifestObj.MarshalJSON()
if err != nil {
return false, fmt.Errorf("failed to marshal the manifest object into JSON: %w", err)
wrappedErr := fmt.Errorf("failed to marshal the manifest object into JSON: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return false, wrappedErr
}
annotations[fleetv1beta1.LastAppliedConfigAnnotation] = string(lastAppliedManifestJSONBytes)
isLastAppliedAnnotationSet := true
Expand Down Expand Up @@ -388,7 +405,9 @@ func setManifestHashAnnotation(manifestObj *unstructured.Unstructured) error {
cleanedManifestObj := discardFieldsIrrelevantInComparisonFrom(manifestObj)
manifestObjHash, err := resource.HashOf(cleanedManifestObj.Object)
if err != nil {
return err
wrappedErr := fmt.Errorf("failed to compute the hash of the manifest object: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return wrappedErr
}

annotations := manifestObj.GetAnnotations()
Expand All @@ -404,13 +423,13 @@ func setManifestHashAnnotation(manifestObj *unstructured.Unstructured) error {
// on a manifest to be applied.
func setOwnerRef(obj *unstructured.Unstructured, expectedAppliedWorkOwnerRef *metav1.OwnerReference) {
ownerRefs := obj.GetOwnerReferences()
if ownerRefs == nil {
ownerRefs = []metav1.OwnerReference{}
}

// Typically owner references is a system-managed field, and at this moment Fleet will
// clear owner references (if any) set in the manifest object. However, for consistency
// reasons, here Fleet will still assume that there might be some owner references set
// in the manifest object.
//
// TO-DO (chenyu1): evaluate if user-set owner references should be kept.
ownerRefs = append(ownerRefs, *expectedAppliedWorkOwnerRef)
obj.SetOwnerReferences(ownerRefs)
}
Expand All @@ -431,7 +450,9 @@ func validateOwnerReferences(
// perform sanitization on the manifest object before applying it, which removes all owner
// references.
if len(manifestObjOwnerRefs) > 0 && !applyStrategy.AllowCoOwnership {
return fmt.Errorf("manifest is set to have multiple owner references but co-ownership is disallowed")
wrappedErr := fmt.Errorf("manifest is set to have owner references but co-ownership is disallowed")
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return wrappedErr
}

// Do a sanity check to verify that no AppliedWork object is directly added as an owner
Expand All @@ -440,7 +461,9 @@ func validateOwnerReferences(
// references.
for _, ownerRef := range manifestObjOwnerRefs {
if ownerRef.APIVersion == fleetv1beta1.GroupVersion.String() && ownerRef.Kind == fleetv1beta1.AppliedWorkKind {
return fmt.Errorf("an AppliedWork object is unexpectedly added as an owner in the manifest object")
wrappedErr := fmt.Errorf("an AppliedWork object is unexpectedly added as an owner in the manifest object")
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return wrappedErr
}
}

Expand All @@ -452,7 +475,9 @@ func validateOwnerReferences(

// If the live object is co-owned but co-ownership is no longer allowed, the validation fails.
if len(inMemberClusterObjOwnerRefs) > 1 && !applyStrategy.AllowCoOwnership {
return fmt.Errorf("object is co-owned by multiple objects but co-ownership has been disallowed")
wrappedErr := fmt.Errorf("object is co-owned by multiple objects but co-ownership has been disallowed")
_ = controller.NewUserError(wrappedErr)
return wrappedErr
}

// Note that at this point of execution, one of the owner references is guaranteed to be the
Expand All @@ -465,15 +490,19 @@ func validateOwnerReferences(
}
}
if !found {
return fmt.Errorf("object is not owned by the expected AppliedWork object")
wrappedErr := fmt.Errorf("object is not owned by the expected AppliedWork object")
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return wrappedErr
}

// If the object is already owned by another AppliedWork object, the validation fails.
//
// Normally this branch will never get executed as Fleet would refuse to take over an object
// that has been owned by another AppliedWork object.
if isPlacedByFleetInDuplicate(inMemberClusterObjOwnerRefs, expectedAppliedWorkOwnerRef) {
return fmt.Errorf("object is already owned by another AppliedWork object")
wrappedErr := fmt.Errorf("object is already owned by another AppliedWork object")
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return wrappedErr
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/workapplier/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestValidateOwnerReferences(t *testing.T) {
AllowCoOwnership: false,
},
wantErred: true,
wantErrMsgSubStr: "manifest is set to have multiple owner references but co-ownership is disallowed",
wantErrMsgSubStr: "manifest is set to have owner references but co-ownership is disallowed",
},
{
name: "unexpected AppliedWork owner ref",
Expand Down
20 changes: 15 additions & 5 deletions pkg/controllers/workapplier/availability_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ func trackDeploymentAvailability(inMemberClusterObj *unstructured.Unstructured)
var deploy appv1.Deployment
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(inMemberClusterObj.Object, &deploy); err != nil {
// Normally this branch should never run.
return ManifestProcessingAvailabilityResultTypeFailed, controller.NewUnexpectedBehaviorError(fmt.Errorf("failed to convert the unstructured object to a deployment: %w", err))
wrappedErr := fmt.Errorf("failed to convert the unstructured object to a deployment: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return ManifestProcessingAvailabilityResultTypeFailed, wrappedErr
}

// Check if the deployment is available.
Expand All @@ -119,7 +121,9 @@ func trackStatefulSetAvailability(inMemberClusterObj *unstructured.Unstructured)
var statefulSet appv1.StatefulSet
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(inMemberClusterObj.Object, &statefulSet); err != nil {
// Normally this branch should never run.
return ManifestProcessingAvailabilityResultTypeFailed, controller.NewUnexpectedBehaviorError(fmt.Errorf("failed to convert the unstructured object to a stateful set: %w", err))
wrappedErr := fmt.Errorf("failed to convert the unstructured object to a stateful set: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return ManifestProcessingAvailabilityResultTypeFailed, wrappedErr
}

// Check if the stateful set is available.
Expand All @@ -145,8 +149,10 @@ func trackStatefulSetAvailability(inMemberClusterObj *unstructured.Unstructured)
func trackDaemonSetAvailability(inMemberClusterObj *unstructured.Unstructured) (ManifestProcessingAvailabilityResultType, error) {
var daemonSet appv1.DaemonSet
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(inMemberClusterObj.Object, &daemonSet); err != nil {
wrappedErr := fmt.Errorf("failed to convert the unstructured object to a daemon set: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
// Normally this branch should never run.
return ManifestProcessingAvailabilityResultTypeFailed, controller.NewUnexpectedBehaviorError(fmt.Errorf("failed to convert the unstructured object to a daemon set: %w", err))
return ManifestProcessingAvailabilityResultTypeFailed, wrappedErr
}

// Check if the daemonSet is available.
Expand All @@ -168,7 +174,9 @@ func trackDaemonSetAvailability(inMemberClusterObj *unstructured.Unstructured) (
func trackServiceAvailability(inMemberClusterObj *unstructured.Unstructured) (ManifestProcessingAvailabilityResultType, error) {
var svc corev1.Service
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(inMemberClusterObj.Object, &svc); err != nil {
return ManifestProcessingAvailabilityResultTypeFailed, controller.NewUnexpectedBehaviorError(fmt.Errorf("failed to convert the unstructured object to a service: %w", err))
wrappedErr := fmt.Errorf("failed to convert the unstructured object to a service: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return ManifestProcessingAvailabilityResultTypeFailed, wrappedErr
}
switch svc.Spec.Type {
case "":
Expand Down Expand Up @@ -205,7 +213,9 @@ func trackServiceAvailability(inMemberClusterObj *unstructured.Unstructured) (Ma
func trackCRDAvailability(inMemberClusterObj *unstructured.Unstructured) (ManifestProcessingAvailabilityResultType, error) {
var crd apiextensionsv1.CustomResourceDefinition
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(inMemberClusterObj.Object, &crd); err != nil {
return ManifestProcessingAvailabilityResultTypeFailed, controller.NewUnexpectedBehaviorError(fmt.Errorf("failed to convert the unstructured object to a custom resource definition: %w", err))
wrappedErr := fmt.Errorf("failed to convert the unstructured object to a custom resource definition: %w", err)
_ = controller.NewUnexpectedBehaviorError(wrappedErr)
return ManifestProcessingAvailabilityResultTypeFailed, wrappedErr
}

// If both conditions are True, the CRD has become available.
Expand Down
17 changes: 13 additions & 4 deletions pkg/controllers/workapplier/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,17 @@ func (r *Reconciler) garbageCollectAppliedWork(ctx context.Context, work *fleetv
klog.V(2).InfoS("The appliedWork is already deleted", "appliedWork", work.Name)
case err != nil:
klog.ErrorS(err, "Failed to delete the appliedWork", "appliedWork", work.Name)
return ctrl.Result{}, err
return ctrl.Result{}, controller.NewAPIServerError(false, err)
default:
klog.InfoS("Successfully deleted the appliedWork", "appliedWork", work.Name)
}
controllerutil.RemoveFinalizer(work, fleetv1beta1.WorkFinalizer)
return ctrl.Result{}, r.hubClient.Update(ctx, work, &client.UpdateOptions{})

if err := r.hubClient.Update(ctx, work, &client.UpdateOptions{}); err != nil {
klog.ErrorS(err, "Failed to remove the finalizer from the work", "work", klog.KObj(work))
return ctrl.Result{}, controller.NewAPIServerError(false, err)
}
return ctrl.Result{}, nil
}

// ensureAppliedWork makes sure that an associated appliedWork and a finalizer on the work resource exists on the cluster.
Expand Down Expand Up @@ -377,12 +382,16 @@ func (r *Reconciler) ensureAppliedWork(ctx context.Context, work *fleetv1beta1.W
}
if err := r.spokeClient.Create(ctx, appliedWork); err != nil && !apierrors.IsAlreadyExists(err) {
klog.ErrorS(err, "AppliedWork create failed", "appliedWork", workRef.Name)
return nil, err
return nil, controller.NewAPIServerError(false, err)
}
if !hasFinalizer {
klog.InfoS("Add the finalizer to the work", "work", workRef)
work.Finalizers = append(work.Finalizers, fleetv1beta1.WorkFinalizer)
return appliedWork, r.hubClient.Update(ctx, work, &client.UpdateOptions{})

if err := r.hubClient.Update(ctx, work, &client.UpdateOptions{}); err != nil {
klog.ErrorS(err, "Failed to add the finalizer to the work", "work", workRef)
return nil, controller.NewAPIServerError(false, err)
}
}
klog.InfoS("Recreated the appliedWork resource", "appliedWork", workRef.Name)
return appliedWork, nil
Expand Down
Loading
Loading