Skip to content

Commit

Permalink
Merge branch 'main' into tallaxes/karpenter-provider-azure
Browse files Browse the repository at this point in the history
  • Loading branch information
tallaxes authored Jan 23, 2024
2 parents ec969a7 + 25a02b8 commit c402094
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 47 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
require (
github.com/Azure/azure-kusto-go v0.14.0
github.com/Azure/azure-sdk-for-go v68.0.0+incompatible
github.com/Azure/azure-sdk-for-go-extensions v0.1.5
github.com/Azure/azure-sdk-for-go-extensions v0.1.6
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.1
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ github.com/Azure/azure-kusto-go v0.14.0 h1:5XVmjh5kVgsm2scpsWisJ6Q1ZgWHJcIOPCZC1
github.com/Azure/azure-kusto-go v0.14.0/go.mod h1:wSmXIsQwBVPHDNsSQsX98nuc12VyvxoNHQa2q9t1Ce0=
github.com/Azure/azure-sdk-for-go v68.0.0+incompatible h1:fcYLmCpyNYRnvJbPerq7U0hS+6+I79yEDJBqVNcqUzU=
github.com/Azure/azure-sdk-for-go v68.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go-extensions v0.1.5 h1:iGNv4we/Mst2hPcaZOynmCJJGNsPyN4YNV1YTzaGCb8=
github.com/Azure/azure-sdk-for-go-extensions v0.1.5/go.mod h1:27StPiXJp6Xzkq2AQL7gPK7VC0hgmCnUKlco1dO1jaM=
github.com/Azure/azure-sdk-for-go-extensions v0.1.6 h1:EXGvDcj54u98XfaI/Cy65Ds6vNsIJeGKYf0eNLB1y4Q=
github.com/Azure/azure-sdk-for-go-extensions v0.1.6/go.mod h1:27StPiXJp6Xzkq2AQL7gPK7VC0hgmCnUKlco1dO1jaM=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1 h1:lGlwhPtrX6EVml1hO0ivjkUxsSyl4dsiw9qcA1k/3IQ=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1/go.mod h1:RKUqNu35KJYcVG/fqTRqmuXJZYNhYkBrnC/hX7yGbTA=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.1 h1:LNHhpdK7hzUcx/k1LIcuh5k7k1LGIWLQfCjaneSj7Fc=
Expand Down
24 changes: 19 additions & 5 deletions pkg/cache/unavailableofferings.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@ import (
"fmt"
"time"

"github.com/aws/karpenter-core/pkg/apis/v1beta1"
"github.com/patrickmn/go-cache"
"knative.dev/pkg/logging"
)

var (
spotKey = key("", "", v1beta1.CapacityTypeSpot)
)

// UnavailableOfferings stores any offerings that return ICE (insufficient capacity errors) when
// attempting to launch the capacity. These offerings are ignored as long as they are in the cache on
// GetInstanceTypes responses
Expand All @@ -40,18 +45,27 @@ func NewUnavailableOfferingsWithCache(c *cache.Cache) *UnavailableOfferings {
}

func NewUnavailableOfferings() *UnavailableOfferings {
c := cache.New(UnavailableOfferingsTTL, DefaultCleanupInterval)
return &UnavailableOfferings{
cache: c,
cache: cache.New(UnavailableOfferingsTTL, DefaultCleanupInterval),
}
}

// IsUnavailable returns true if the offering appears in the cache
func (u *UnavailableOfferings) IsUnavailable(instanceType, zone, capacityType string) bool {
_, found := u.cache.Get(u.key(instanceType, zone, capacityType))
if capacityType == v1beta1.CapacityTypeSpot {
if _, found := u.cache.Get(spotKey); found {
return true
}
}
_, found := u.cache.Get(key(instanceType, zone, capacityType))
return found
}

// MarkSpotUnavailable communicates recently observed temporary capacity shortages for spot
func (u *UnavailableOfferings) MarkSpotUnavailableWithTTL(ctx context.Context, ttl time.Duration) {
u.MarkUnavailableWithTTL(ctx, "SpotUnavailable", "", "", v1beta1.CapacityTypeSpot, UnavailableOfferingsTTL)
}

// MarkUnavailableWithTTL allows us to mark an offering unavailable with a custom TTL
func (u *UnavailableOfferings) MarkUnavailableWithTTL(ctx context.Context, unavailableReason, instanceType, zone, capacityType string, ttl time.Duration) {
// even if the key is already in the cache, we still need to call Set to extend the cached entry's TTL
Expand All @@ -61,7 +75,7 @@ func (u *UnavailableOfferings) MarkUnavailableWithTTL(ctx context.Context, unava
"zone", zone,
"capacity-type", capacityType,
"ttl", ttl).Debugf("removing offering from offerings")
u.cache.Set(u.key(instanceType, zone, capacityType), struct{}{}, ttl)
u.cache.Set(key(instanceType, zone, capacityType), struct{}{}, ttl)
}

// MarkUnavailable communicates recently observed temporary capacity shortages in the provided offerings
Expand All @@ -74,6 +88,6 @@ func (u *UnavailableOfferings) Flush() {
}

// key returns the cache key for all offerings in the cache
func (u *UnavailableOfferings) key(instanceType string, zone string, capacityType string) string {
func key(instanceType string, zone string, capacityType string) string {
return fmt.Sprintf("%s:%s:%s", capacityType, instanceType, zone)
}
6 changes: 1 addition & 5 deletions pkg/cache/unavailableofferings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,8 @@ func TestUnavailableOfferings(t *testing.T) {
}

func TestUnavailableOfferings_KeyGeneration(t *testing.T) {
c := cache.New(time.Second, time.Second)
u := NewUnavailableOfferingsWithCache(c)

// test that the key is generated correctly
expectedKey := "spot:NV16as_v4:westus"
key := u.key("NV16as_v4", "westus", "spot")
key := key("NV16as_v4", "westus", "spot")
if key != expectedKey {
t.Errorf("Expected key to be %s, but got %s", expectedKey, key)
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,7 @@ func (c *CloudProvider) IsDrifted(ctx context.Context, nodeClaim *corev1beta1.No
if imageVersionDrifted != "" {
return imageVersionDrifted, nil
}
imageDrifted, err := c.isImageDrifted(ctx, nodeClaim, nodePool, nodeClass)
if err != nil {
return "", err
}
return imageDrifted, nil
return "", nil
}

// Name returns the CloudProvider implementation name.
Expand Down
19 changes: 0 additions & 19 deletions pkg/cloudprovider/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/Azure/karpenter-provider-azure/pkg/apis/v1alpha2"
"github.com/Azure/karpenter-provider-azure/pkg/providers/imagefamily"
"github.com/Azure/karpenter-provider-azure/pkg/utils"
"github.com/samber/lo"

v1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -125,21 +124,3 @@ func (c *CloudProvider) isImageVersionDrifted(
}
return "", nil
}

// TODO: remove nolint on unparam. Added for now in order to pass "make verify"
// nolint: unparam
func (c *CloudProvider) isImageDrifted(
ctx context.Context, nodeClaim *corev1beta1.NodeClaim, nodePool *corev1beta1.NodePool, _ *v1alpha2.AKSNodeClass) (cloudprovider.DriftReason, error) {
instanceTypes, err := c.GetInstanceTypes(ctx, nodePool)
if err != nil {
return "", fmt.Errorf("getting instanceTypes, %w", err)
}
_, found := lo.Find(instanceTypes, func(instType *cloudprovider.InstanceType) bool {
return instType.Name == nodeClaim.Labels[v1.LabelInstanceTypeStable]
})
if !found {
return "", fmt.Errorf(`finding node instance type "%s"`, nodeClaim.Labels[v1.LabelInstanceTypeStable])
}

return "", nil
}
7 changes: 0 additions & 7 deletions pkg/cloudprovider/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,6 @@ var _ = Describe("CloudProvider", func() {
Expect(err).ToNot(HaveOccurred())
Expect(drifted).To(BeEmpty())
})
It("should error if the NodeClaim doesn't have the instance-type label", func() {
nodeClaim.Labels = map[string]string{
corev1beta1.NodePoolLabelKey: nodePool.Name,
}
_, err := cloudProvider.IsDrifted(ctx, nodeClaim)
Expect(err).To(HaveOccurred())
})
It("should error drift if NodeClaim doesn't have provider id", func() {
nodeClaim.Status = corev1beta1.NodeClaimStatus{}
drifted, err := cloudProvider.IsDrifted(ctx, nodeClaim)
Expand Down
8 changes: 8 additions & 0 deletions pkg/providers/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,15 @@ func (p *Provider) launchInstance(
return resp, instanceType, nil
}

// nolint:gocyclo
func (p *Provider) handleResponseErrors(ctx context.Context, instanceType *corecloudprovider.InstanceType, zone, capacityType string, err error) error {
if sdkerrors.LowPriorityQuotaHasBeenReached(err) {
// Mark in cache that spot quota has been reached for this subscription
p.unavailableOfferings.MarkSpotUnavailableWithTTL(ctx, SubscriptionQuotaReachedTTL)

logging.FromContext(ctx).Error(err)
return fmt.Errorf("this subscription has reached the regional vCPU quota for spot (LowPriorityQuota). To scale beyond this limit, please review the quota increase process here: https://docs.microsoft.com/en-us/azure/azure-portal/supportability/low-priority-quota")
}
if sdkerrors.SKUFamilyQuotaHasBeenReached(err) {
// Subscription quota has been reached for this VM SKU, mark the instance type as unavailable in all zones available to the offering
// This will also update the TTL for an existing offering in the cache that is already unavailable
Expand Down
6 changes: 3 additions & 3 deletions pkg/providers/instancetype/instancetypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/logging"

corev1beta1 "github.com/aws/karpenter-core/pkg/apis/v1beta1"
"github.com/aws/karpenter-core/pkg/cloudprovider"

"github.com/Azure/karpenter-provider-azure/pkg/providers/instance/skuclient"
"github.com/Azure/karpenter-provider-azure/pkg/providers/pricing"

"github.com/Azure/skewer"
"github.com/alecthomas/units"
corev1beta1 "github.com/aws/karpenter-core/pkg/apis/v1beta1"
"github.com/aws/karpenter-core/pkg/cloudprovider"
)

const (
Expand Down
33 changes: 33 additions & 0 deletions pkg/providers/instancetype/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,39 @@ var _ = Describe("InstanceType Provider", func() {
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, coreProvisioner, pod)
ExpectScheduled(ctx, env.Client, pod)
})
It("should fail to provision when LowPriorityCoresQuota errors are hit, then switch capacity type and succeed", func() {
LowPriorityCoresQuotaErrorMessage := "Operation could not be completed as it results in exceeding approved Low Priority Cores quota. Additional details - Deployment Model: Resource Manager, Location: westus2, Current Limit: 0, Current Usage: 0, Additional Required: 32, (Minimum) New Limit Required: 32. Submit a request for Quota increase at https://aka.ms/ProdportalCRP/#blade/Microsoft_Azure_Capacity/UsageAndQuota.ReactView/Parameters/%7B%22subscriptionId%22:%(redacted)%22,%22command%22:%22openQuotaApprovalBlade%22,%22quotas%22:[%7B%22location%22:%22westus2%22,%22providerId%22:%22Microsoft.Compute%22,%22resourceName%22:%22LowPriorityCores%22,%22quotaRequest%22:%7B%22properties%22:%7B%22limit%22:32,%22unit%22:%22Count%22,%22name%22:%7B%22value%22:%22LowPriorityCores%22%7D%7D%7D%7D]%7D by specifying parameters listed in the ‘Details’ section for deployment to succeed. Please read more about quota limits at https://docs.microsoft.com/en-us/azure/azure-supportability/per-vm-quota-requests"
// Create nodepool that has both ondemand and spot capacity types enabled
coretest.ReplaceRequirements(nodePool, v1.NodeSelectorRequirement{
Key: corev1beta1.CapacityTypeLabelKey,
Operator: v1.NodeSelectorOpIn,
Values: []string{corev1beta1.CapacityTypeOnDemand, corev1beta1.CapacityTypeSpot},
})
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
// Set the LowPriorityCoresQuota error to be returned when creating the vm
azureEnv.VirtualMachinesAPI.VirtualMachinesBehavior.VirtualMachineCreateOrUpdateBehavior.Error.Set(
&azcore.ResponseError{
ErrorCode: sdkerrors.OperationNotAllowed,
RawResponse: &http.Response{
Body: createSDKErrorBody(sdkerrors.OperationNotAllowed, LowPriorityCoresQuotaErrorMessage),
},
},
)
// Create a pod that should fail to schedule
pod := coretest.UnschedulablePod()
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, coreProvisioner, pod)
ExpectNotScheduled(ctx, env.Client, pod)
azureEnv.VirtualMachinesAPI.VirtualMachineCreateOrUpdateBehavior.BeginError.Set(nil)
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, coreProvisioner, pod)
ExpectScheduled(ctx, env.Client, pod)

// Expect that on-demand nodes are selected if spot capacity is unavailable, and the nodepool uses both spot + on-demand
nodes, err := env.KubernetesInterface.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(len(nodes.Items)).To(Equal(1))
Expect(nodes.Items[0].Labels[corev1beta1.CapacityTypeLabelKey]).To(Equal(corev1beta1.CapacityTypeOnDemand))
})

It("should fail to provision when VM SKU family vCPU quota exceeded error is returned, and succeed when it is gone", func() {
familyVCPUQuotaExceededErrorMessage := "Operation could not be completed as it results in exceeding approved standardDLSv5Family Cores quota. Additional details - Deployment Model: Resource Manager, Location: westus2, Current Limit: 100, Current Usage: 96, Additional Required: 32, (Minimum) New Limit Required: 128. Submit a request for Quota increase at https://aka.ms/ProdportalCRP/#blade/Microsoft_Azure_Capacity/UsageAndQuota.ReactView/Parameters/%7B%22subscriptionId%22:%(redacted)%22,%22command%22:%22openQuotaApprovalBlade%22,%22quotas%22:[%7B%22location%22:%22westus2%22,%22providerId%22:%22Microsoft.Compute%22,%22resourceName%22:%22standardDLSv5Family%22,%22quotaRequest%22:%7B%22properties%22:%7B%22limit%22:128,%22unit%22:%22Count%22,%22name%22:%7B%22value%22:%22standardDLSv5Family%22%7D%7D%7D%7D]%7D by specifying parameters listed in the ‘Details’ section for deployment to succeed. Please read more about quota limits at https://docs.microsoft.com/en-us/azure/azure-supportability/per-vm-quota-requests"
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
Expand Down

0 comments on commit c402094

Please sign in to comment.