diff --git a/apis/configuration/slo_controller_config.go b/apis/configuration/slo_controller_config.go index 241d9be1f..d4a2ab99b 100644 --- a/apis/configuration/slo_controller_config.go +++ b/apis/configuration/slo_controller_config.go @@ -173,11 +173,29 @@ func (in *NodeExtensionStrategy) DeepCopy() *NodeExtensionStrategy { return out } +// CalculatePolicy defines the calculate policy for resource overcommitment. +// Default is "usage". type CalculatePolicy string const ( - CalculateByPodUsage CalculatePolicy = "usage" + // CalculateByPodUsage is the calculate policy according to the pod resource usage. + // When the policy="usage", the low-priority (LP) resources are calculated according to the high-priority (HP) pods' + // usages, so LP pod can reclaim the requested but unused resources of the HP pods. + // It is the default policy where the resources are over-committed between priority bands. + CalculateByPodUsage CalculatePolicy = "usage" + // CalculateByPodRequest is the calculate policy according to the pod resource request. + // When the policy="request", the low-priority (LP) resources are calculated according to the high-priority (HP) + // pods' requests, so LP pod can allocate the unallocated resources of the HP pods but can NOT reclaim the + // requested but unused resources of the HP pods. + // It is the policy where the resources are NOT over-committed between priority bands. CalculateByPodRequest CalculatePolicy = "request" + // CalculateByPodMaxUsageRequest is the calculate policy according to the maximum of the pod usage and request. + // When the policy="maxUsageRequest", the low-priority (LP) resources are calculated according to the sum of the + // high-priority (HP) pods' maximum of its usage and its request, so LP pod can allocate the resources both + // unallocated and unused by the HP pods. + // It is the conservative policy where the resources are NOT over-committed between priority bands while HP's usage + // is also protected from the overcommitment. + CalculateByPodMaxUsageRequest CalculatePolicy = "maxUsageRequest" ) // +k8s:deepcopy-gen=true @@ -215,12 +233,17 @@ type ColocationStrategy struct { MetricAggregatePolicy *slov1alpha1.AggregatePolicy `json:"metricAggregatePolicy,omitempty"` MetricMemoryCollectPolicy *slov1alpha1.NodeMemoryCollectPolicy `json:"metricMemoryCollectPolicy,omitempty"` - CPUReclaimThresholdPercent *int64 `json:"cpuReclaimThresholdPercent,omitempty" validate:"omitempty,min=0,max=100"` + CPUReclaimThresholdPercent *int64 `json:"cpuReclaimThresholdPercent,omitempty" validate:"omitempty,min=0,max=100"` + // CPUCalculatePolicy determines the calculation policy of the CPU resources for the Batch pods. + // Supported: "usage" (default), "maxUsageRequest". + CPUCalculatePolicy *CalculatePolicy `json:"cpuCalculatePolicy,omitempty"` MemoryReclaimThresholdPercent *int64 `json:"memoryReclaimThresholdPercent,omitempty" validate:"omitempty,min=0,max=100"` - MemoryCalculatePolicy *CalculatePolicy `json:"memoryCalculatePolicy,omitempty"` - DegradeTimeMinutes *int64 `json:"degradeTimeMinutes,omitempty" validate:"omitempty,min=1"` - UpdateTimeThresholdSeconds *int64 `json:"updateTimeThresholdSeconds,omitempty" validate:"omitempty,min=1"` - ResourceDiffThreshold *float64 `json:"resourceDiffThreshold,omitempty" validate:"omitempty,gt=0,max=1"` + // MemoryCalculatePolicy determines the calculation policy of the memory resources for the Batch pods. + // Supported: "usage" (default), "request", "maxUsageRequest". + MemoryCalculatePolicy *CalculatePolicy `json:"memoryCalculatePolicy,omitempty"` + DegradeTimeMinutes *int64 `json:"degradeTimeMinutes,omitempty" validate:"omitempty,min=1"` + UpdateTimeThresholdSeconds *int64 `json:"updateTimeThresholdSeconds,omitempty" validate:"omitempty,min=1"` + ResourceDiffThreshold *float64 `json:"resourceDiffThreshold,omitempty" validate:"omitempty,gt=0,max=1"` // MidCPUThresholdPercent defines the maximum percentage of the Mid-tier cpu resource dividing the node allocatable. // MidCPUAllocatable <= NodeCPUAllocatable * MidCPUThresholdPercent / 100. diff --git a/apis/configuration/zz_generated.deepcopy.go b/apis/configuration/zz_generated.deepcopy.go index 891e0c0e9..946cc8563 100644 --- a/apis/configuration/zz_generated.deepcopy.go +++ b/apis/configuration/zz_generated.deepcopy.go @@ -159,6 +159,11 @@ func (in *ColocationStrategy) DeepCopyInto(out *ColocationStrategy) { *out = new(int64) **out = **in } + if in.CPUCalculatePolicy != nil { + in, out := &in.CPUCalculatePolicy, &out.CPUCalculatePolicy + *out = new(CalculatePolicy) + **out = **in + } if in.MemoryReclaimThresholdPercent != nil { in, out := &in.MemoryReclaimThresholdPercent, &out.MemoryReclaimThresholdPercent *out = new(int64) diff --git a/apis/extension/node_colocation.go b/apis/extension/node_colocation.go new file mode 100644 index 000000000..70527429b --- /dev/null +++ b/apis/extension/node_colocation.go @@ -0,0 +1,33 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package extension + +const ( + // AnnotationNodeColocationStrategy denotes the annotation key of the node colocation strategy. + // The value is the ColocationStrategy. It takes precedence to the ColocationStrategy in the slo-controller-config. + // The illegal value will be ignored. + AnnotationNodeColocationStrategy = NodeDomainPrefix + "/colocation-strategy" + + // LabelCPUReclaimRatio denotes the CPU reclaim ratio of a node. The value is a float number. + // It takes precedence to the CPUReclaimThresholdPercent in the slo-controller-config and the node annotations. + // The illegal value will be ignored. + LabelCPUReclaimRatio = NodeDomainPrefix + "/cpu-reclaim-ratio" + // LabelMemoryReclaimRatio denotes the memory reclaim ratio of a node. The value is a float number. + // It takes precedence to the MemoryReclaimThresholdPercent in the slo-controller-config and the node annotations. + // The illegal value will be ignored. + LabelMemoryReclaimRatio = NodeDomainPrefix + "/memory-reclaim-ratio" +) diff --git a/docs/images/noderesource-framework.svg b/docs/images/noderesource-framework.svg index e479628ae..82d20e988 100644 --- a/docs/images/noderesource-framework.svg +++ b/docs/images/noderesource-framework.svg @@ -1,4 +1,4 @@ -SetupSetupCalculateCalculateNodePrepareNodePrepareNeedSyncNeedSync GetNodeMetric GetNodeMetricUpdateNodeUpdateNodePluginPluginNodeNodeNode ReconciliationNode ReconciliationExtended APIExtended APIInternal APIInternal APINodeResourceNodeResourceText is not SVG - cannot display \ No newline at end of file +SetupSetupCalculate / ResetCalculate / ResetNodePrepareNodePrepareNodeCheckNodeCheck GetNodeMetric GetNodeMetricUpdateNodeUpdateNodePluginPluginNodeNodeNodeResource ReconciliationNodeResource ReconciliationExtended APIExtended APIInternal APIInternal APINodeResourceNodeResourceNodePreUpdateNodePreUpdateText is not SVG - cannot display \ No newline at end of file diff --git a/pkg/slo-controller/config/colocation_cm_event_handler.go b/pkg/slo-controller/config/colocation_cm_event_handler.go index 217fb33c4..4b0228ccd 100644 --- a/pkg/slo-controller/config/colocation_cm_event_handler.go +++ b/pkg/slo-controller/config/colocation_cm_event_handler.go @@ -100,7 +100,7 @@ func (p *ColocationHandlerForConfigMapEvent) syncConfig(configMap *corev1.Config err := json.Unmarshal([]byte(configStr), &newCfg) if err != nil { //if controller restart ,cache will unavailable, else use old cfg - klog.Errorf("syncConfig failed! parse colocation error then use old Cfg ,configmap %s/%s, err: %s", + klog.Errorf("syncConfig failed since parse colocation error, use old Cfg ,configmap %s/%s, err: %s", sloconfig.ConfigNameSpace, sloconfig.SLOCtrlConfigMap, err) p.recorder.Eventf(configMap, "Warning", ReasonColocationConfigUnmarshalFailed, "failed to unmarshal colocation config, err: %s", err) p.cfgCache.errorStatus = true @@ -115,7 +115,7 @@ func (p *ColocationHandlerForConfigMapEvent) syncConfig(configMap *corev1.Config if !sloconfig.IsColocationStrategyValid(&newCfg.ColocationStrategy) { //if controller restart ,cache will unavailable, else use old cfg - klog.Errorf("syncConfig failed! invalid cluster config,%+v", newCfg.ColocationStrategy) + klog.Errorf("syncConfig failed since the cluster config is invalid, %+v", newCfg.ColocationStrategy) p.cfgCache.errorStatus = true return false } @@ -126,7 +126,7 @@ func (p *ColocationHandlerForConfigMapEvent) syncConfig(configMap *corev1.Config mergedNodeStrategyInterface, _ := util.MergeCfg(clusterStrategyCopy, &nodeStrategy.ColocationStrategy) newNodeStrategy := *mergedNodeStrategyInterface.(*configuration.ColocationStrategy) if !sloconfig.IsColocationStrategyValid(&newNodeStrategy) { - klog.Errorf("syncConfig failed! invalid node config,then use clusterCfg,nodeCfg:%+v", nodeStrategy) + klog.Errorf("syncConfig failed since node config if invalid, use clusterCfg, nodeCfg:%+v", nodeStrategy) newCfg.NodeConfigs[index].ColocationStrategy = *newCfg.ColocationStrategy.DeepCopy() } else { newCfg.NodeConfigs[index].ColocationStrategy = newNodeStrategy diff --git a/pkg/slo-controller/config/colocation_cm_event_handler_test.go b/pkg/slo-controller/config/colocation_cm_event_handler_test.go index 7aba5b064..572b3f2dc 100644 --- a/pkg/slo-controller/config/colocation_cm_event_handler_test.go +++ b/pkg/slo-controller/config/colocation_cm_event_handler_test.go @@ -38,7 +38,9 @@ func Test_syncColocationConfigIfChanged(t *testing.T) { oldCfg.MemoryReclaimThresholdPercent = pointer.Int64(40) memoryCalcPolicyByUsage := configuration.CalculateByPodUsage memoryCalcPolicyByRequest := configuration.CalculateByPodRequest - var defaultNodeMemoryCollectPolicy slov1alpha1.NodeMemoryCollectPolicy = slov1alpha1.UsageWithoutPageCache + cpuCalcPolicyByUsage := configuration.CalculateByPodUsage + cpuCalcPolicyNew := configuration.CalculatePolicy("") + var defaultNodeMemoryCollectPolicy = slov1alpha1.UsageWithoutPageCache type fields struct { config *colocationCfgCache @@ -199,6 +201,7 @@ func Test_syncColocationConfigIfChanged(t *testing.T) { MetricReportIntervalSeconds: pointer.Int64(20), MetricAggregatePolicy: sloconfig.DefaultColocationStrategy().MetricAggregatePolicy, CPUReclaimThresholdPercent: pointer.Int64(70), + CPUCalculatePolicy: &cpuCalcPolicyByUsage, MemoryReclaimThresholdPercent: pointer.Int64(70), MemoryCalculatePolicy: &memoryCalcPolicyByUsage, DegradeTimeMinutes: pointer.Int64(15), @@ -289,6 +292,7 @@ func Test_syncColocationConfigIfChanged(t *testing.T) { MetricReportIntervalSeconds: pointer.Int64(20), MetricAggregatePolicy: sloconfig.DefaultColocationStrategy().MetricAggregatePolicy, CPUReclaimThresholdPercent: pointer.Int64(70), + CPUCalculatePolicy: &cpuCalcPolicyByUsage, MemoryReclaimThresholdPercent: pointer.Int64(80), MemoryCalculatePolicy: &memoryCalcPolicyByUsage, DegradeTimeMinutes: pointer.Int64(5), @@ -311,6 +315,7 @@ func Test_syncColocationConfigIfChanged(t *testing.T) { MetricReportIntervalSeconds: pointer.Int64(20), MetricAggregatePolicy: sloconfig.DefaultColocationStrategy().MetricAggregatePolicy, CPUReclaimThresholdPercent: pointer.Int64(70), + CPUCalculatePolicy: &cpuCalcPolicyByUsage, MemoryReclaimThresholdPercent: pointer.Int64(80), MemoryCalculatePolicy: &memoryCalcPolicyByUsage, DegradeTimeMinutes: pointer.Int64(5), @@ -355,6 +360,7 @@ func Test_syncColocationConfigIfChanged(t *testing.T) { MetricReportIntervalSeconds: pointer.Int64(20), MetricAggregatePolicy: sloconfig.DefaultColocationStrategy().MetricAggregatePolicy, CPUReclaimThresholdPercent: pointer.Int64(70), + CPUCalculatePolicy: &cpuCalcPolicyByUsage, MemoryReclaimThresholdPercent: pointer.Int64(80), MemoryCalculatePolicy: &memoryCalcPolicyByUsage, DegradeTimeMinutes: pointer.Int64(5), @@ -377,6 +383,7 @@ func Test_syncColocationConfigIfChanged(t *testing.T) { MetricReportIntervalSeconds: pointer.Int64(60), MetricAggregatePolicy: sloconfig.DefaultColocationStrategy().MetricAggregatePolicy, CPUReclaimThresholdPercent: pointer.Int64(70), + CPUCalculatePolicy: &cpuCalcPolicyByUsage, MemoryReclaimThresholdPercent: pointer.Int64(80), MemoryCalculatePolicy: &memoryCalcPolicyByUsage, DegradeTimeMinutes: pointer.Int64(5), @@ -411,6 +418,7 @@ func Test_syncColocationConfigIfChanged(t *testing.T) { MetricReportIntervalSeconds: pointer.Int64(60), MetricAggregatePolicy: sloconfig.DefaultColocationStrategy().MetricAggregatePolicy, CPUReclaimThresholdPercent: pointer.Int64(70), + CPUCalculatePolicy: &cpuCalcPolicyByUsage, MemoryReclaimThresholdPercent: pointer.Int64(80), MemoryCalculatePolicy: &memoryCalcPolicyByUsage, DegradeTimeMinutes: pointer.Int64(5), @@ -509,6 +517,7 @@ func Test_syncColocationConfigIfChanged(t *testing.T) { MetricReportIntervalSeconds: pointer.Int64(20), MetricAggregatePolicy: sloconfig.DefaultColocationStrategy().MetricAggregatePolicy, CPUReclaimThresholdPercent: pointer.Int64(70), + CPUCalculatePolicy: &cpuCalcPolicyByUsage, MemoryReclaimThresholdPercent: pointer.Int64(80), MemoryCalculatePolicy: &memoryCalcPolicyByUsage, DegradeTimeMinutes: pointer.Int64(5), @@ -533,6 +542,7 @@ func Test_syncColocationConfigIfChanged(t *testing.T) { MetricReportIntervalSeconds: pointer.Int64(20), MetricAggregatePolicy: sloconfig.DefaultColocationStrategy().MetricAggregatePolicy, CPUReclaimThresholdPercent: pointer.Int64(70), + CPUCalculatePolicy: &cpuCalcPolicyByUsage, MemoryReclaimThresholdPercent: pointer.Int64(80), MemoryCalculatePolicy: &memoryCalcPolicyByUsage, DegradeTimeMinutes: pointer.Int64(5), @@ -559,6 +569,7 @@ func Test_syncColocationConfigIfChanged(t *testing.T) { MetricReportIntervalSeconds: pointer.Int64(60), MetricAggregatePolicy: sloconfig.DefaultColocationStrategy().MetricAggregatePolicy, CPUReclaimThresholdPercent: pointer.Int64(70), + CPUCalculatePolicy: &cpuCalcPolicyByUsage, MemoryReclaimThresholdPercent: pointer.Int64(80), MemoryCalculatePolicy: &memoryCalcPolicyByUsage, DegradeTimeMinutes: pointer.Int64(5), @@ -598,7 +609,7 @@ func Test_syncColocationConfigIfChanged(t *testing.T) { "\"cpuReclaimThresholdPercent\":70,\"memoryReclaimThresholdPercent\":80,\"memoryCalculatePolicy\":\"request\"," + "\"updateTimeThresholdSeconds\":300," + "\"degradeTimeMinutes\":5,\"resourceDiffThreshold\":0.1,\"nodeConfigs\":[{\"nodeSelector\":" + - "{\"matchLabels\":{\"xxx\":\"yyy\"}},\"name\":\"xxx-yyy\",\"enable\":true,\"cpuReclaimThresholdPercent\":60}]}", + "{\"matchLabels\":{\"xxx\":\"yyy\"}},\"name\":\"xxx-yyy\",\"enable\":true,\"cpuReclaimThresholdPercent\":60, \"cpuCalculatePolicy\": \"\"}]}", }, }}, wantChanged: true, @@ -610,6 +621,7 @@ func Test_syncColocationConfigIfChanged(t *testing.T) { MetricReportIntervalSeconds: pointer.Int64(20), MetricAggregatePolicy: sloconfig.DefaultColocationStrategy().MetricAggregatePolicy, CPUReclaimThresholdPercent: pointer.Int64(70), + CPUCalculatePolicy: &cpuCalcPolicyByUsage, MemoryReclaimThresholdPercent: pointer.Int64(80), MemoryCalculatePolicy: &memoryCalcPolicyByRequest, DegradeTimeMinutes: pointer.Int64(5), @@ -640,6 +652,7 @@ func Test_syncColocationConfigIfChanged(t *testing.T) { MetricMemoryCollectPolicy: &defaultNodeMemoryCollectPolicy, //change CPUReclaimThresholdPercent: pointer.Int64(60), + CPUCalculatePolicy: &cpuCalcPolicyNew, }, }, }, @@ -666,7 +679,8 @@ func Test_syncColocationConfigIfChanged(t *testing.T) { func Test_IsCfgAvailable(t *testing.T) { defaultConfig := sloconfig.DefaultColocationCfg() memoryCalcPolicyByUsage := configuration.CalculateByPodUsage - var defaultNodeMemoryCollectPolicy slov1alpha1.NodeMemoryCollectPolicy = slov1alpha1.UsageWithoutPageCache + cpuCalcPolicyByUsage := configuration.CalculateByPodUsage + var defaultNodeMemoryCollectPolicy = slov1alpha1.UsageWithoutPageCache type fields struct { config *colocationCfgCache configMap *corev1.ConfigMap @@ -736,6 +750,7 @@ func Test_IsCfgAvailable(t *testing.T) { MetricAggregateDurationSeconds: pointer.Int64(60), MetricAggregatePolicy: sloconfig.DefaultColocationStrategy().MetricAggregatePolicy, CPUReclaimThresholdPercent: pointer.Int64(70), + CPUCalculatePolicy: &cpuCalcPolicyByUsage, MemoryReclaimThresholdPercent: pointer.Int64(80), MemoryCalculatePolicy: &memoryCalcPolicyByUsage, DegradeTimeMinutes: pointer.Int64(5), diff --git a/pkg/slo-controller/noderesource/framework/README.md b/pkg/slo-controller/noderesource/framework/README.md index b6657fd6f..cee36975e 100644 --- a/pkg/slo-controller/noderesource/framework/README.md +++ b/pkg/slo-controller/noderesource/framework/README.md @@ -44,13 +44,27 @@ type ResourceItem struct { } ``` -- **Prepare**: It prepares the Node object with the calculated result `NodeResource`. Before the Preparing, it is +- **PreUpdate**: It allows the plugin to preprocess for the calculated results called before updating the Node. +For example, a plugin can prepare and update some Objects like CRDs before updating the Node. And the plugin also can +mutate the internal NodeResource object according the fully calculated results. +It differs from the Prepare stage since a NodePreUpdatePlugin will be invoked only once in one loop (so the plugin +should consider implement a retry login itself if needed), while the NodePreparePlugin is not expected to update other +objects or mutate the NodeResource. + +```go +type NodePreUpdatePlugin interface { + Plugin + PreUpdate(strategy *ColocationStrategy, node *Node, nr *NodeResource) error +} +``` + +- **Prepare**: It prepares the Node object with the calculated result `NodeResource`. Before the updating, it is invoked after the Calculate so to allow the plugin to retry when the client updates conflicts. ```go type NodePreparePlugin interface { Plugin - Execute(strategy *ColocationStrategy, node *Node, nr *NodeResource) error + Prepare(strategy *ColocationStrategy, node *Node, nr *NodeResource) error } type NodeResource struct { @@ -62,17 +76,17 @@ type NodeResource struct { } ``` -- **NeedSync**: It checks if the newly-prepared Node object should be synchronized to the kube-apiserver. To be more -specific, there are two types of NeedSync plugins for different client update methods, where one can determine whether -the node status should be updated and another determines whether node metadata should be updated. +- **NodeCheck**: It checks if the newly-prepared Node object should be synchronized to the kube-apiserver. To be more +specific, currently there are two types of NeedSync plugins for different client update methods, where one can determine +whether the node status should be updated and another determines whether node metadata should be updated. ```go -type NodeSyncPlugin interface { +type NodeStatusCheckPlugin interface { Plugin NeedSync(strategy *ColocationStrategy, oldNode, newNode *Node) (bool, string) } -type NodeMetaSyncPlugin interface { +type NodeMetaCheckPlugin interface { Plugin NeedSyncMeta(strategy *ColocationStrategy, oldNode, newNode *Node) (bool, string) } @@ -85,7 +99,11 @@ There is the workflow about how the node resource controller handles a dequeued ## Example: Batch Resource Plugin The default `BatchResource` plugin is responsible for calculating and updating the Batch-tier resources. -It implements the stages `Calculate`, `Reset`, `Prepare` and `NeedSync`: +It implements the stages `Setup`, `Calculate`/`Reset`, `PreUpdate`, `Prepare` and `NodeStatusCheck`: + +**Setup**: + +In the initialization, the plugin sets the kube client, and add a watch for the NodeResourceTopology. **Calculate**: @@ -98,12 +116,17 @@ batchAllocatable := nodeAllocatable * thresholdPercent - podUsage(HP) - systemUs Besides, the plugin implements the `Reset` method to clean up the Batch resources when the node colocation is disabled. +**PreUpdate**: + +Before updating the Node obj, the plugin updates the zone-level Batch resources for the NRT (NodeResourceTopology) +according to the calculated results from the `Calculate` stage. + **Prepare**: The plugin sets the extended resources `kubernetes.io/batch-cpu`, `kubernetes.io/batch-memory` in the -`node.status.allocatable` according to the calculated results from the `Calculate` or `Reset` stage. +`node.status.allocatable` according to the calculated results from the `Calculate`/`Reset` stage. -**NeedSync**: +**NodeStatusCheck**: The plugin checks the extended resources `kubernetes.io/batch-cpu`, `kubernetes.io/batch-memory` of the prepared node and the old node. If the node's Batch resources have not been updated for too long or the calculated results changes diff --git a/pkg/slo-controller/noderesource/framework/extender_plugin.go b/pkg/slo-controller/noderesource/framework/extender_plugin.go index 787e31b48..9f70b682f 100644 --- a/pkg/slo-controller/noderesource/framework/extender_plugin.go +++ b/pkg/slo-controller/noderesource/framework/extender_plugin.go @@ -24,11 +24,20 @@ import ( "github.com/koordinator-sh/koordinator/pkg/slo-controller/metrics" ) +// The plugins called in the node resource initialization: +// - Setup +// +// The plugins called order in the node resource reconcile loop: +// - Calculate/Reset -> NodePreUpdate -> NodeUpdate(NodePrepare -> NodeStatusCheck,NodeMetaCheck) +// The NodeUpdate stage can be called for multiple times with retries. +// +// For more info, please see the README.md. var ( globalSetupExtender = NewRegistry("Setup") + globalNodePreUpdateExtender = NewRegistry("NodePreUpdate") globalNodePrepareExtender = NewRegistry("NodePrepare") - globalNodeSyncExtender = NewRegistry("NodeSync") - globalNodeMetaSyncExtender = NewRegistry("NodeMetaSync") + globalNodeStatusCheckExtender = NewRegistry("NodeStatusCheck") + globalNodeMetaCheckExtender = NewRegistry("NodeMetaCheck") globalResourceCalculateExtender = NewRegistry("ResourceCalculate") ) @@ -38,6 +47,8 @@ type Plugin interface { } // SetupPlugin implements setup for the plugin. +// The framework exposes the kube ClientSet and controller builder to the plugins thus the plugins can set up their +// necessary clients, add new watches and initialize their internal states. // The Setup of each plugin will be called before other extension stages and invoked only once. type SetupPlugin interface { Plugin @@ -71,13 +82,54 @@ func UnregisterSetupExtender(name string) { globalSetupExtender.Unregister(name) } +// NodePreUpdatePlugin implements preprocessing for the calculated results called before updating the Node. +// There are mainly two use cases for this stage: +// 1. A plugin may prepare and update some Objects like CRDs before updating the Node obj (NodePrepare and NodeXXXCheck). +// 2. A plugin may need to mutate the internal NodeResource object before updating the Node object. +// It differs from the NodePreparePlugin in that a NodePreUpdatePlugin will be invoked only once in one loop (so the +// plugin should consider implement a retry login itself if needed), while the NodePreparePlugin is not expected to +// update other objects or mutate the NodeResource. +type NodePreUpdatePlugin interface { + Plugin + PreUpdate(strategy *configuration.ColocationStrategy, node *corev1.Node, nr *NodeResource) error +} + +func RegisterNodePreUpdateExtender(filter FilterFn, plugins ...NodePreUpdatePlugin) { + ps := make([]Plugin, 0, len(plugins)) + for i := range plugins { + if filter(plugins[i].Name()) { + ps = append(ps, plugins[i]) + } + } + globalNodePreUpdateExtender.MustRegister(ps...) +} + +func RunNodePreUpdateExtenders(strategy *configuration.ColocationStrategy, node *corev1.Node, nr *NodeResource) { + for _, p := range globalNodePreUpdateExtender.GetAll() { + plugin := p.(NodePreUpdatePlugin) + if err := plugin.PreUpdate(strategy, node, nr); err != nil { + metrics.RecordNodeResourceRunPluginStatus(plugin.Name(), false, "NodePreUpdate") + klog.ErrorS(err, "run node pre update plugin failed", "plugin", plugin.Name(), + "node", node.Name) + } else { + metrics.RecordNodeResourceRunPluginStatus(plugin.Name(), true, "NodePreUpdate") + klog.V(5).InfoS("run node pre update plugin successfully", "plugin", plugin.Name(), + "node", node.Name) + } + } +} + +func UnregisterNodePreUpdateExtender(name string) { + globalNodePreUpdateExtender.Unregister(name) +} + // NodePreparePlugin implements node resource preparing for the calculated results. // For example, assign extended resources in the node allocatable. // It is invoked each time the controller tries updating the latest NodeResource object with calculated results. -// NOTE: The Execute should be idempotent since it can be called multiple times in one reconciliation. +// NOTE: The Prepare should be idempotent since it can be called multiple times in one reconciliation. type NodePreparePlugin interface { Plugin - Execute(strategy *configuration.ColocationStrategy, node *corev1.Node, nr *NodeResource) error + Prepare(strategy *configuration.ColocationStrategy, node *corev1.Node, nr *NodeResource) error } func RegisterNodePrepareExtender(filter FilterFn, plugins ...NodePreparePlugin) { @@ -93,7 +145,7 @@ func RegisterNodePrepareExtender(filter FilterFn, plugins ...NodePreparePlugin) func RunNodePrepareExtenders(strategy *configuration.ColocationStrategy, node *corev1.Node, nr *NodeResource) { for _, p := range globalNodePrepareExtender.GetAll() { plugin := p.(NodePreparePlugin) - if err := plugin.Execute(strategy, node, nr); err != nil { + if err := plugin.Prepare(strategy, node, nr); err != nil { metrics.RecordNodeResourceRunPluginStatus(plugin.Name(), false, "NodePrepare") klog.ErrorS(err, "run node prepare plugin failed", "plugin", plugin.Name(), "node", node.Name) @@ -109,74 +161,74 @@ func UnregisterNodePrepareExtender(name string) { globalNodePrepareExtender.Unregister(name) } -// NodeSyncPlugin implements the check of resource updating. +// NodeStatusCheckPlugin implements the check of resource updating. // For example, trigger an update if the values of the current is more than 10% different with the former. -type NodeSyncPlugin interface { +type NodeStatusCheckPlugin interface { Plugin NeedSync(strategy *configuration.ColocationStrategy, oldNode, newNode *corev1.Node) (bool, string) } -func RegisterNodeSyncExtender(filter FilterFn, plugins ...NodeSyncPlugin) { +func RegisterNodeStatusCheckExtender(filter FilterFn, plugins ...NodeStatusCheckPlugin) { ps := make([]Plugin, 0, len(plugins)) for i := range plugins { if filter(plugins[i].Name()) { ps = append(ps, plugins[i]) } } - globalNodeSyncExtender.MustRegister(ps...) + globalNodeStatusCheckExtender.MustRegister(ps...) } -func UnregisterNodeSyncExtender(name string) { - globalNodeSyncExtender.Unregister(name) +func UnregisterNodeStatusCheckExtender(name string) { + globalNodeStatusCheckExtender.Unregister(name) } -func RunNodeSyncExtenders(strategy *configuration.ColocationStrategy, oldNode, newNode *corev1.Node) bool { - for _, p := range globalNodeSyncExtender.GetAll() { - plugin := p.(NodeSyncPlugin) +func RunNodeStatusCheckExtenders(strategy *configuration.ColocationStrategy, oldNode, newNode *corev1.Node) bool { + for _, p := range globalNodeStatusCheckExtender.GetAll() { + plugin := p.(NodeStatusCheckPlugin) needSync, msg := plugin.NeedSync(strategy, oldNode, newNode) - metrics.RecordNodeResourceRunPluginStatus(plugin.Name(), true, "NodeSync") + metrics.RecordNodeResourceRunPluginStatus(plugin.Name(), true, "NodeStatusCheck") if needSync { - klog.V(4).InfoS("run node sync plugin, need sync", "plugin", plugin.Name(), + klog.V(4).InfoS("run node status check plugin, need sync", "plugin", plugin.Name(), "node", newNode.Name, "message", msg) return true } else { - klog.V(6).InfoS("run node sync plugin, no need to sync", "plugin", plugin.Name(), + klog.V(6).InfoS("run node status check plugin, no need to sync", "plugin", plugin.Name(), "node", newNode.Name) } } return false } -type NodeMetaSyncPlugin interface { +type NodeMetaCheckPlugin interface { Plugin NeedSyncMeta(strategy *configuration.ColocationStrategy, oldNode, newNode *corev1.Node) (bool, string) } -func RegisterNodeMetaSyncExtender(filter FilterFn, plugins ...NodeMetaSyncPlugin) { +func RegisterNodeMetaCheckExtender(filter FilterFn, plugins ...NodeMetaCheckPlugin) { ps := make([]Plugin, 0, len(plugins)) for i := range plugins { if filter(plugins[i].Name()) { ps = append(ps, plugins[i]) } } - globalNodeMetaSyncExtender.MustRegister(ps...) + globalNodeMetaCheckExtender.MustRegister(ps...) } -func UnregisterNodeMetaSyncExtender(name string) { - globalNodeMetaSyncExtender.Unregister(name) +func UnregisterNodeMetaCheckExtender(name string) { + globalNodeMetaCheckExtender.Unregister(name) } -func RunNodeMetaSyncExtenders(strategy *configuration.ColocationStrategy, oldNode, newNode *corev1.Node) bool { - for _, p := range globalNodeMetaSyncExtender.GetAll() { - plugin := p.(NodeMetaSyncPlugin) +func RunNodeMetaCheckExtenders(strategy *configuration.ColocationStrategy, oldNode, newNode *corev1.Node) bool { + for _, p := range globalNodeMetaCheckExtender.GetAll() { + plugin := p.(NodeMetaCheckPlugin) needSync, msg := plugin.NeedSyncMeta(strategy, oldNode, newNode) - metrics.RecordNodeResourceRunPluginStatus(plugin.Name(), true, "NodeSyncMeta") + metrics.RecordNodeResourceRunPluginStatus(plugin.Name(), true, "NodeStatusCheckMeta") if needSync { - klog.V(4).InfoS("run node meta sync plugin, need sync", "plugin", plugin.Name(), + klog.V(4).InfoS("run node meta check plugin, need sync", "plugin", plugin.Name(), "node", newNode.Name, "message", msg) return true } else { - klog.V(6).InfoS("run node meta sync plugin, no need to sync", + klog.V(6).InfoS("run node meta check plugin, no need to sync", "plugin", plugin.Name(), "node", newNode.Name) } } diff --git a/pkg/slo-controller/noderesource/framework/extender_plugin_test.go b/pkg/slo-controller/noderesource/framework/extender_plugin_test.go index 313ed22b3..7ef0e595b 100644 --- a/pkg/slo-controller/noderesource/framework/extender_plugin_test.go +++ b/pkg/slo-controller/noderesource/framework/extender_plugin_test.go @@ -38,7 +38,7 @@ func (s *SetNodeAnnotation) Name() string { return "SetNodeAnnotation" } -func (s *SetNodeAnnotation) Execute(strategy *configuration.ColocationStrategy, node *corev1.Node, nr *NodeResource) error { +func (s *SetNodeAnnotation) Prepare(_ *configuration.ColocationStrategy, node *corev1.Node, nr *NodeResource) error { node.Annotations[testNodeAnnoKey] = testNodeAnnoVal return nil } @@ -80,9 +80,10 @@ func Test_RegisterAlreadyExistNodePrepareExtender(t *testing.T) { } var _ SetupPlugin = (*testNodeResourcePlugin)(nil) +var _ NodePreUpdatePlugin = (*testNodeResourcePlugin)(nil) var _ NodePreparePlugin = (*testNodeResourcePlugin)(nil) -var _ NodeSyncPlugin = (*testNodeResourcePlugin)(nil) -var _ NodeMetaSyncPlugin = (*testNodeResourcePlugin)(nil) +var _ NodeStatusCheckPlugin = (*testNodeResourcePlugin)(nil) +var _ NodeMetaCheckPlugin = (*testNodeResourcePlugin)(nil) var _ ResourceCalculatePlugin = (*testNodeResourcePlugin)(nil) type testNodeResourcePlugin struct{} @@ -95,7 +96,11 @@ func (p *testNodeResourcePlugin) Setup(opt *Option) error { return nil } -func (p *testNodeResourcePlugin) Execute(strategy *configuration.ColocationStrategy, node *corev1.Node, nr *NodeResource) error { +func (p *testNodeResourcePlugin) PreUpdate(strategy *configuration.ColocationStrategy, node *corev1.Node, nr *NodeResource) error { + return nil +} + +func (p *testNodeResourcePlugin) Prepare(strategy *configuration.ColocationStrategy, node *corev1.Node, nr *NodeResource) error { return nil } @@ -142,34 +147,50 @@ func TestSetupPlugin(t *testing.T) { }) } -func TestNodeSyncPlugin(t *testing.T) { +func TestNodePreUpdatePlugin(t *testing.T) { + t.Run("node pre update extender", func(t *testing.T) { + plugin := &testNodeResourcePlugin{} + startedSize := globalNodePreUpdateExtender.Size() + RegisterNodePreUpdateExtender(AllPass, plugin) + assert.Equal(t, startedSize+1, globalNodePreUpdateExtender.Size()) + + RegisterNodePreUpdateExtender(AllPass, plugin) + assert.Equal(t, startedSize+1, globalNodePreUpdateExtender.Size(), "register duplicated") + + assert.NotPanics(t, func() { + UnregisterNodePreUpdateExtender(plugin.Name()) + }, "unregistered") + }) +} + +func TestNodeStatusCheckPlugin(t *testing.T) { t.Run("node sync extender", func(t *testing.T) { plugin := &testNodeResourcePlugin{} - startedSize := globalNodeSyncExtender.Size() - RegisterNodeSyncExtender(AllPass, plugin) - assert.Equal(t, startedSize+1, globalNodeSyncExtender.Size()) + startedSize := globalNodeStatusCheckExtender.Size() + RegisterNodeStatusCheckExtender(AllPass, plugin) + assert.Equal(t, startedSize+1, globalNodeStatusCheckExtender.Size()) - RegisterNodeSyncExtender(AllPass, plugin) - assert.Equal(t, startedSize+1, globalNodeSyncExtender.Size(), "register duplicated") + RegisterNodeStatusCheckExtender(AllPass, plugin) + assert.Equal(t, startedSize+1, globalNodeStatusCheckExtender.Size(), "register duplicated") assert.NotPanics(t, func() { - UnregisterNodeSyncExtender(plugin.Name()) + UnregisterNodeStatusCheckExtender(plugin.Name()) }, "unregistered") }) } -func TestNodeMetaSyncPlugin(t *testing.T) { +func TestNodeMetaCheckPlugin(t *testing.T) { t.Run("node sync extender", func(t *testing.T) { plugin := &testNodeResourcePlugin{} - startedSize := globalNodeMetaSyncExtender.Size() - RegisterNodeMetaSyncExtender(AllPass, plugin) - assert.Equal(t, startedSize+1, globalNodeMetaSyncExtender.Size()) + startedSize := globalNodeMetaCheckExtender.Size() + RegisterNodeMetaCheckExtender(AllPass, plugin) + assert.Equal(t, startedSize+1, globalNodeMetaCheckExtender.Size()) - RegisterNodeMetaSyncExtender(AllPass, plugin) - assert.Equal(t, startedSize+1, globalNodeMetaSyncExtender.Size(), "register duplicated") + RegisterNodeMetaCheckExtender(AllPass, plugin) + assert.Equal(t, startedSize+1, globalNodeMetaCheckExtender.Size(), "register duplicated") assert.NotPanics(t, func() { - UnregisterNodeMetaSyncExtender(plugin.Name()) + UnregisterNodeMetaCheckExtender(plugin.Name()) }, "unregistered") }) } diff --git a/pkg/slo-controller/noderesource/plugins/batchresource/plugin.go b/pkg/slo-controller/noderesource/plugins/batchresource/plugin.go index ee982b248..e5ab45936 100644 --- a/pkg/slo-controller/noderesource/plugins/batchresource/plugin.go +++ b/pkg/slo-controller/noderesource/plugins/batchresource/plugin.go @@ -84,6 +84,15 @@ func (p *Plugin) Setup(opt *framework.Option) error { return nil } +func (p *Plugin) PreUpdate(strategy *configuration.ColocationStrategy, node *corev1.Node, nr *framework.NodeResource) error { + // prepare for zone resources on NRT objects + err := p.prepareForNodeResourceTopology(strategy, node, nr) + if err != nil { + return fmt.Errorf("failed to prepare for NRT, err: %w", err) + } + return nil +} + func (p *Plugin) NeedSync(strategy *configuration.ColocationStrategy, oldNode, newNode *corev1.Node) (bool, string) { // batch resource diff is bigger than ResourceDiffThreshold resourcesToDiff := ResourceNames @@ -99,19 +108,12 @@ func (p *Plugin) NeedSync(strategy *configuration.ColocationStrategy, oldNode, n return false, "" } -func (p *Plugin) Execute(strategy *configuration.ColocationStrategy, node *corev1.Node, nr *framework.NodeResource) error { +func (p *Plugin) Prepare(_ *configuration.ColocationStrategy, node *corev1.Node, nr *framework.NodeResource) error { // prepare for node extended resources for _, resourceName := range ResourceNames { prepareNodeForResource(node, nr, resourceName) } - // prepare for zone resources - // TODO: move the NRT update into framework - err := p.prepareForNodeResourceTopology(strategy, node, nr) - if err != nil { - return fmt.Errorf("failed to prepare for NRT, err: %w", err) - } - return nil } @@ -153,18 +155,19 @@ func (p *Plugin) calculate(strategy *configuration.ColocationStrategy, node *cor resourceMetrics *framework.ResourceMetrics) ([]framework.ResourceItem, error) { // compute the requests and usages according to the pods' priority classes. // HP means High-Priority (i.e. not Batch or Free) pods - podHPRequest := util.NewZeroResourceList() - podHPUsed := util.NewZeroResourceList() - // podAllUsed is the sum usage of all pods reported in NodeMetric. - // podKnownUsed is the sum usage of pods which are both reported in NodeMetric and shown in current pod list. - podAllUsed := util.NewZeroResourceList() - podKnownUsed := util.NewZeroResourceList() + podsHPRequest := util.NewZeroResourceList() + podsHPUsed := util.NewZeroResourceList() + podsHPMaxUsedReq := util.NewZeroResourceList() + // podsAllUsed is the sum usage of all pods reported in NodeMetric. + // podsKnownUsed is the sum usage of pods which are both reported in NodeMetric and shown in current pod list. + podsAllUsed := util.NewZeroResourceList() + podsKnownUsed := util.NewZeroResourceList() nodeMetric := resourceMetrics.NodeMetric podMetricMap := make(map[string]*slov1alpha1.PodMetricInfo) for _, podMetric := range nodeMetric.Status.PodsMetric { podMetricMap[util.GetPodMetricKey(podMetric)] = podMetric - podAllUsed = quotav1.Add(podAllUsed, getPodMetricUsage(podMetric)) + podsAllUsed = quotav1.Add(podsAllUsed, getPodMetricUsage(podMetric)) } for i := range podList.Items { @@ -177,7 +180,7 @@ func (p *Plugin) calculate(strategy *configuration.ColocationStrategy, node *cor podKey := util.GetPodKey(pod) podMetric, hasMetric := podMetricMap[podKey] if hasMetric { - podKnownUsed = quotav1.Add(podKnownUsed, getPodMetricUsage(podMetric)) + podsKnownUsed = quotav1.Add(podsKnownUsed, getPodMetricUsage(podMetric)) } // count the high-priority usage @@ -187,22 +190,27 @@ func (p *Plugin) calculate(strategy *configuration.ColocationStrategy, node *cor continue } - podHPRequest = quotav1.Add(podHPRequest, podRequest) + podsHPRequest = quotav1.Add(podsHPRequest, podRequest) if !hasMetric { - podHPUsed = quotav1.Add(podHPUsed, podRequest) + podsHPUsed = quotav1.Add(podsHPUsed, podRequest) } else if qos := extension.GetPodQoSClassWithDefault(pod); qos == extension.QoSLSE { // NOTE: Currently qos=LSE pods does not reclaim CPU resource. - podHPUsed = quotav1.Add(podHPUsed, mixResourceListCPUAndMemory(podRequest, getPodMetricUsage(podMetric))) + podUsed := getPodMetricUsage(podMetric) + podsHPUsed = quotav1.Add(podsHPUsed, mixResourceListCPUAndMemory(podRequest, podUsed)) + podsHPMaxUsedReq = quotav1.Add(podsHPMaxUsedReq, quotav1.Max(podRequest, podUsed)) } else { - podHPUsed = quotav1.Add(podHPUsed, getPodMetricUsage(podMetric)) + podUsed := getPodMetricUsage(podMetric) + podsHPUsed = quotav1.Add(podsHPUsed, podUsed) + podsHPMaxUsedReq = quotav1.Add(podsHPMaxUsedReq, quotav1.Max(podRequest, podUsed)) } } // For the pods reported with metrics but not shown in the current list, count them into the HP used. - podUnknownPriorityUsed := quotav1.Subtract(podAllUsed, podKnownUsed) - podHPUsed = quotav1.Add(podHPUsed, podUnknownPriorityUsed) + podsUnknownPriorityUsed := quotav1.Subtract(podsAllUsed, podsKnownUsed) + podsHPUsed = quotav1.Add(podsHPUsed, podsUnknownPriorityUsed) + podsHPMaxUsedReq = quotav1.Add(podsHPMaxUsedReq, podsUnknownPriorityUsed) klog.V(6).InfoS("batch resource got unknown priority pods used", "node", node.Name, - "cpu", podUnknownPriorityUsed.Cpu().String(), "memory", podUnknownPriorityUsed.Memory().String()) + "cpu", podsUnknownPriorityUsed.Cpu().String(), "memory", podsUnknownPriorityUsed.Memory().String()) nodeCapacity := getNodeCapacity(node) nodeReservation := getNodeReservation(strategy, nodeCapacity) @@ -214,7 +222,7 @@ func (p *Plugin) calculate(strategy *configuration.ColocationStrategy, node *cor systemReserved := quotav1.Max(nodeKubeletReserved, nodeAnnoReserved) batchAllocatable, cpuMsg, memMsg := calculateBatchResourceByPolicy(strategy, nodeCapacity, nodeReservation, systemReserved, - systemUsed, podHPRequest, podHPUsed) + systemUsed, podsHPRequest, podsHPUsed, podsHPMaxUsedReq) metrics.RecordNodeExtendedResourceAllocatableInternal(node, string(extension.BatchCPU), metrics.UnitInteger, float64(batchAllocatable.Cpu().MilliValue())/1000) metrics.RecordNodeExtendedResourceAllocatableInternal(node, string(extension.BatchMemory), metrics.UnitByte, float64(batchAllocatable.Memory().Value())) @@ -267,13 +275,15 @@ func (p *Plugin) calculateOnNUMALevel(strategy *configuration.ColocationStrategy zoneNum := len(nrt.Zones) zoneIdxMap := map[int]string{} nodeMetric := resourceMetrics.NodeMetric + nodeZoneAllocatable := make([]corev1.ResourceList, zoneNum) nodeZoneReserve := make([]corev1.ResourceList, zoneNum) systemZoneUsed := make([]corev1.ResourceList, zoneNum) systemZoneReserved := make([]corev1.ResourceList, zoneNum) - podUnknownPriorityZoneUsed := make([]corev1.ResourceList, zoneNum) - podHPZoneRequested := make([]corev1.ResourceList, zoneNum) - podHPZoneUsed := make([]corev1.ResourceList, zoneNum) + podsUnknownPriorityZoneUsed := make([]corev1.ResourceList, zoneNum) + podsHPZoneRequested := make([]corev1.ResourceList, zoneNum) + podsHPZoneUsed := make([]corev1.ResourceList, zoneNum) + podsHPZoneMaxUsedReq := make([]corev1.ResourceList, zoneNum) batchZoneAllocatable := make([]corev1.ResourceList, zoneNum) systemUsed := getResourceListForCPUAndMemory(nodeMetric.Status.NodeMetric.SystemUsage.ResourceList) @@ -284,9 +294,10 @@ func (p *Plugin) calculateOnNUMALevel(strategy *configuration.ColocationStrategy for i, zone := range nrt.Zones { zoneIdxMap[i] = zone.Name nodeZoneAllocatable[i] = corev1.ResourceList{} - podUnknownPriorityZoneUsed[i] = util.NewZeroResourceList() - podHPZoneRequested[i] = util.NewZeroResourceList() - podHPZoneUsed[i] = util.NewZeroResourceList() + podsUnknownPriorityZoneUsed[i] = util.NewZeroResourceList() + podsHPZoneRequested[i] = util.NewZeroResourceList() + podsHPZoneUsed[i] = util.NewZeroResourceList() + podsHPZoneMaxUsedReq[i] = util.NewZeroResourceList() for _, resourceInfo := range zone.Resources { if checkedNRTResourceSet.Has(resourceInfo.Name) { nodeZoneAllocatable[i][corev1.ResourceName(resourceInfo.Name)] = resourceInfo.Allocatable.DeepCopy() @@ -328,14 +339,20 @@ func (p *Plugin) calculateOnNUMALevel(strategy *configuration.ColocationStrategy continue } - podHPZoneRequested = addZoneResourceList(podHPZoneRequested, podZoneRequests, zoneNum) + podsHPZoneRequested = addZoneResourceList(podsHPZoneRequested, podZoneRequests, zoneNum) if !hasMetric { - podHPZoneUsed = addZoneResourceList(podHPZoneUsed, podZoneRequests, zoneNum) + podsHPZoneUsed = addZoneResourceList(podsHPZoneUsed, podZoneRequests, zoneNum) + podsHPZoneMaxUsedReq = addZoneResourceList(podsHPZoneMaxUsedReq, podZoneRequests, zoneNum) } else if qos := extension.GetPodQoSClassWithDefault(pod); qos == extension.QoSLSE { // NOTE: Currently qos=LSE pods does not reclaim CPU resource. - podHPZoneUsed = addZoneResourceList(podHPZoneUsed, minxZoneResourceListCPUAndMemory(podZoneRequests, podZoneUsages, zoneNum), zoneNum) + podsHPZoneUsed = addZoneResourceList(podsHPZoneUsed, + minxZoneResourceListCPUAndMemory(podZoneRequests, podZoneUsages, zoneNum), zoneNum) + podsHPZoneMaxUsedReq = addZoneResourceList(podsHPZoneMaxUsedReq, + maxZoneResourceList(podZoneUsages, podZoneRequests, zoneNum), zoneNum) } else { - podHPZoneUsed = addZoneResourceList(podHPZoneUsed, podZoneUsages, zoneNum) + podsHPZoneUsed = addZoneResourceList(podsHPZoneUsed, podZoneUsages, zoneNum) + podsHPZoneMaxUsedReq = addZoneResourceList(podsHPZoneMaxUsedReq, + maxZoneResourceList(podZoneUsages, podZoneRequests, zoneNum), zoneNum) } } @@ -346,9 +363,10 @@ func (p *Plugin) calculateOnNUMALevel(strategy *configuration.ColocationStrategy continue } podNUMAUsage := getPodUnknownNUMAUsage(getPodMetricUsage(podMetric), zoneNum) - podUnknownPriorityZoneUsed = addZoneResourceList(podUnknownPriorityZoneUsed, podNUMAUsage, zoneNum) + podsUnknownPriorityZoneUsed = addZoneResourceList(podsUnknownPriorityZoneUsed, podNUMAUsage, zoneNum) } - podHPZoneUsed = addZoneResourceList(podHPZoneUsed, podUnknownPriorityZoneUsed, zoneNum) + podsHPZoneUsed = addZoneResourceList(podsHPZoneUsed, podsUnknownPriorityZoneUsed, zoneNum) + podsHPZoneMaxUsedReq = addZoneResourceList(podsHPZoneMaxUsedReq, podsUnknownPriorityZoneUsed, zoneNum) batchZoneCPU := map[string]resource.Quantity{} batchZoneMemory := map[string]resource.Quantity{} @@ -356,7 +374,8 @@ func (p *Plugin) calculateOnNUMALevel(strategy *configuration.ColocationStrategy for i := range batchZoneAllocatable { zoneName := zoneIdxMap[i] batchZoneAllocatable[i], cpuMsg, memMsg = calculateBatchResourceByPolicy(strategy, nodeZoneAllocatable[i], - nodeZoneReserve[i], systemZoneReserved[i], systemZoneUsed[i], podHPZoneRequested[i], podHPZoneUsed[i]) + nodeZoneReserve[i], systemZoneReserved[i], systemZoneUsed[i], + podsHPZoneRequested[i], podsHPZoneUsed[i], podsHPZoneMaxUsedReq[i]) klog.V(6).InfoS("calculate batch resource in NUMA level", "node", node.Name, "zone", zoneName, "batch resource", batchZoneAllocatable[i], "cpu", cpuMsg, "memory", memMsg) diff --git a/pkg/slo-controller/noderesource/plugins/batchresource/plugin_test.go b/pkg/slo-controller/noderesource/plugins/batchresource/plugin_test.go index 60b19fc91..a787bbe1b 100644 --- a/pkg/slo-controller/noderesource/plugins/batchresource/plugin_test.go +++ b/pkg/slo-controller/noderesource/plugins/batchresource/plugin_test.go @@ -211,7 +211,7 @@ func TestPlugin(t *testing.T) { }) } -func TestExecute(t *testing.T) { +func TestPreUpdate(t *testing.T) { testScheme := runtime.NewScheme() err := clientgoscheme.AddToScheme(testScheme) assert.NoError(t, err) @@ -232,7 +232,6 @@ func TestExecute(t *testing.T) { fields fields args args wantErr bool - wantField *corev1.Node checkFunc func(t *testing.T, client ctrlclient.Client) }{ { @@ -264,25 +263,6 @@ func TestExecute(t *testing.T) { }, }, wantErr: false, - wantField: &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - }, - Status: corev1.NodeStatus{ - Capacity: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("100"), - corev1.ResourceMemory: resource.MustParse("400Gi"), - extension.BatchCPU: *resource.NewQuantity(50000, resource.DecimalSI), - extension.BatchMemory: *resource.NewScaledQuantity(120, 9), - }, - Allocatable: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("100"), - corev1.ResourceMemory: resource.MustParse("380Gi"), - extension.BatchCPU: *resource.NewQuantity(50000, resource.DecimalSI), - extension.BatchMemory: *resource.NewScaledQuantity(120, 9), - }, - }, - }, }, { name: "reset batch resources", @@ -317,21 +297,6 @@ func TestExecute(t *testing.T) { }, }, wantErr: false, - wantField: &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - }, - Status: corev1.NodeStatus{ - Capacity: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("100"), - corev1.ResourceMemory: resource.MustParse("400Gi"), - }, - Allocatable: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("100"), - corev1.ResourceMemory: resource.MustParse("380Gi"), - }, - }, - }, }, { name: "add NUMA-level batch resources", @@ -417,25 +382,6 @@ func TestExecute(t *testing.T) { }, }, wantErr: false, - wantField: &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - }, - Status: corev1.NodeStatus{ - Capacity: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("100"), - corev1.ResourceMemory: resource.MustParse("400Gi"), - extension.BatchCPU: *resource.NewQuantity(50000, resource.DecimalSI), - extension.BatchMemory: *resource.NewScaledQuantity(120, 9), - }, - Allocatable: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("100"), - corev1.ResourceMemory: resource.MustParse("380Gi"), - extension.BatchCPU: *resource.NewQuantity(50000, resource.DecimalSI), - extension.BatchMemory: *resource.NewScaledQuantity(120, 9), - }, - }, - }, checkFunc: func(t *testing.T, client ctrlclient.Client) { nrt := &topov1alpha1.NodeResourceTopology{} err := client.Get(context.TODO(), types.NamespacedName{Name: "test-node"}, nrt) @@ -506,17 +452,662 @@ func TestExecute(t *testing.T) { }, }, }, - } - assert.Equal(t, len(expectedNRT.Zones), len(nrt.Zones)) - for i := range expectedNRT.Zones { - assert.Equal(t, expectedNRT.Zones[i].Name, nrt.Zones[i].Name, fmt.Sprintf("zone %v", i)) - assert.Equal(t, len(expectedNRT.Zones[i].Resources), len(nrt.Zones[i].Resources), fmt.Sprintf("zone %v", i)) - for j := range expectedNRT.Zones[i].Resources { - assert.Equal(t, expectedNRT.Zones[i].Resources[j].Capacity.Value(), nrt.Zones[i].Resources[j].Capacity.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) - assert.Equal(t, expectedNRT.Zones[i].Resources[j].Allocatable.Value(), nrt.Zones[i].Resources[j].Allocatable.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) - assert.Equal(t, expectedNRT.Zones[i].Resources[j].Available.Value(), nrt.Zones[i].Resources[j].Available.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) - } - } + } + assert.Equal(t, len(expectedNRT.Zones), len(nrt.Zones)) + for i := range expectedNRT.Zones { + assert.Equal(t, expectedNRT.Zones[i].Name, nrt.Zones[i].Name, fmt.Sprintf("zone %v", i)) + assert.Equal(t, len(expectedNRT.Zones[i].Resources), len(nrt.Zones[i].Resources), fmt.Sprintf("zone %v", i)) + for j := range expectedNRT.Zones[i].Resources { + assert.Equal(t, expectedNRT.Zones[i].Resources[j].Capacity.Value(), nrt.Zones[i].Resources[j].Capacity.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) + assert.Equal(t, expectedNRT.Zones[i].Resources[j].Allocatable.Value(), nrt.Zones[i].Resources[j].Allocatable.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) + assert.Equal(t, expectedNRT.Zones[i].Resources[j].Available.Value(), nrt.Zones[i].Resources[j].Available.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) + } + } + }, + }, + { + name: "update NUMA-level batch resources", + fields: fields{ + client: fake.NewClientBuilder().WithScheme(testScheme).WithObjects(&topov1alpha1.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Zones: topov1alpha1.ZoneList{ + { + Name: util.GenNodeZoneName(0), + Type: util.NodeZoneType, + Resources: topov1alpha1.ResourceInfoList{ + { + Name: string(corev1.ResourceCPU), + Capacity: resource.MustParse("50"), + Allocatable: resource.MustParse("50"), + Available: resource.MustParse("50"), + }, + { + Name: string(extension.BatchCPU), + Capacity: resource.MustParse("25000"), + Allocatable: resource.MustParse("25000"), + Available: resource.MustParse("25000"), + }, + { + Name: string(extension.BatchMemory), + Capacity: resource.MustParse("62G"), + Allocatable: resource.MustParse("62G"), + Available: resource.MustParse("62G"), + }, + { + Name: string(corev1.ResourceMemory), + Capacity: resource.MustParse("200Gi"), + Allocatable: resource.MustParse("200Gi"), + Available: resource.MustParse("200Gi"), + }, + }, + }, + { + Name: util.GenNodeZoneName(1), + Type: util.NodeZoneType, + Resources: topov1alpha1.ResourceInfoList{ + { + Name: string(corev1.ResourceCPU), + Capacity: resource.MustParse("50"), + Allocatable: resource.MustParse("50"), + Available: resource.MustParse("50"), + }, + { + Name: string(extension.BatchCPU), + Capacity: resource.MustParse("25000"), + Allocatable: resource.MustParse("25000"), + Available: resource.MustParse("25000"), + }, + { + Name: string(extension.BatchMemory), + Capacity: resource.MustParse("58G"), + Allocatable: resource.MustParse("58G"), + Available: resource.MustParse("58G"), + }, + { + Name: string(corev1.ResourceMemory), + Capacity: resource.MustParse("180Gi"), + Allocatable: resource.MustParse("180Gi"), + Available: resource.MustParse("180Gi"), + }, + }, + }, + }, + }).Build(), + }, + args: args{ + strategy: &configuration.ColocationStrategy{ + ResourceDiffThreshold: pointer.Float64(0.1), + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + extension.BatchCPU: *resource.NewQuantity(50000, resource.DecimalSI), + extension.BatchMemory: *resource.NewScaledQuantity(120, 9), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("380Gi"), + extension.BatchCPU: *resource.NewQuantity(50000, resource.DecimalSI), + extension.BatchMemory: *resource.NewScaledQuantity(120, 9), + }, + }, + }, + nr: &framework.NodeResource{ + Resources: map[corev1.ResourceName]*resource.Quantity{ + extension.BatchCPU: resource.NewQuantity(25000, resource.DecimalSI), + extension.BatchMemory: resource.NewScaledQuantity(50, 9), + }, + ZoneResources: map[string]corev1.ResourceList{ + util.GenNodeZoneName(0): { + extension.BatchCPU: resource.MustParse("15000"), + extension.BatchMemory: resource.MustParse("30G"), + }, + util.GenNodeZoneName(1): { + extension.BatchCPU: resource.MustParse("10000"), + extension.BatchMemory: resource.MustParse("20G"), + }, + }, + }, + }, + wantErr: false, + checkFunc: func(t *testing.T, client ctrlclient.Client) { + nrt := &topov1alpha1.NodeResourceTopology{} + err := client.Get(context.TODO(), types.NamespacedName{Name: "test-node"}, nrt) + assert.NoError(t, err) + expectedNRT := &topov1alpha1.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Zones: topov1alpha1.ZoneList{ + { + Name: util.GenNodeZoneName(0), + Type: util.NodeZoneType, + Resources: topov1alpha1.ResourceInfoList{ + { + Name: string(corev1.ResourceCPU), + Capacity: resource.MustParse("50"), + Allocatable: resource.MustParse("50"), + Available: resource.MustParse("50"), + }, + { + Name: string(extension.BatchCPU), + Capacity: resource.MustParse("15000"), + Allocatable: resource.MustParse("15000"), + Available: resource.MustParse("15000"), + }, + { + Name: string(extension.BatchMemory), + Capacity: resource.MustParse("30G"), + Allocatable: resource.MustParse("30G"), + Available: resource.MustParse("30G"), + }, + { + Name: string(corev1.ResourceMemory), + Capacity: resource.MustParse("200Gi"), + Allocatable: resource.MustParse("200Gi"), + Available: resource.MustParse("200Gi"), + }, + }, + }, + { + Name: util.GenNodeZoneName(1), + Type: util.NodeZoneType, + Resources: topov1alpha1.ResourceInfoList{ + { + Name: string(corev1.ResourceCPU), + Capacity: resource.MustParse("50"), + Allocatable: resource.MustParse("50"), + Available: resource.MustParse("50"), + }, + { + Name: string(extension.BatchCPU), + Capacity: resource.MustParse("10000"), + Allocatable: resource.MustParse("10000"), + Available: resource.MustParse("10000"), + }, + { + Name: string(extension.BatchMemory), + Capacity: resource.MustParse("20G"), + Allocatable: resource.MustParse("20G"), + Available: resource.MustParse("20G"), + }, + { + Name: string(corev1.ResourceMemory), + Capacity: resource.MustParse("180Gi"), + Allocatable: resource.MustParse("180Gi"), + Available: resource.MustParse("180Gi"), + }, + }, + }, + }, + } + assert.Equal(t, len(expectedNRT.Zones), len(nrt.Zones)) + for i := range expectedNRT.Zones { + assert.Equal(t, expectedNRT.Zones[i].Name, nrt.Zones[i].Name, fmt.Sprintf("zone %v", i)) + assert.Equal(t, len(expectedNRT.Zones[i].Resources), len(nrt.Zones[i].Resources), fmt.Sprintf("zone %v", i)) + for j := range expectedNRT.Zones[i].Resources { + assert.Equal(t, expectedNRT.Zones[i].Resources[j].Capacity.Value(), nrt.Zones[i].Resources[j].Capacity.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) + assert.Equal(t, expectedNRT.Zones[i].Resources[j].Allocatable.Value(), nrt.Zones[i].Resources[j].Allocatable.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) + assert.Equal(t, expectedNRT.Zones[i].Resources[j].Available.Value(), nrt.Zones[i].Resources[j].Available.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) + } + } + }, + }, + { + name: "update NUMA-level batch resources with cpu-normalization ratio", + fields: fields{ + client: fake.NewClientBuilder().WithScheme(testScheme).WithObjects(&topov1alpha1.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Zones: topov1alpha1.ZoneList{ + { + Name: util.GenNodeZoneName(0), + Type: util.NodeZoneType, + Resources: topov1alpha1.ResourceInfoList{ + { + Name: string(corev1.ResourceCPU), + Capacity: resource.MustParse("50"), + Allocatable: resource.MustParse("50"), + Available: resource.MustParse("50"), + }, + { + Name: string(extension.BatchCPU), + Capacity: resource.MustParse("25000"), + Allocatable: resource.MustParse("25000"), + Available: resource.MustParse("25000"), + }, + { + Name: string(extension.BatchMemory), + Capacity: resource.MustParse("62G"), + Allocatable: resource.MustParse("62G"), + Available: resource.MustParse("62G"), + }, + { + Name: string(corev1.ResourceMemory), + Capacity: resource.MustParse("200Gi"), + Allocatable: resource.MustParse("200Gi"), + Available: resource.MustParse("200Gi"), + }, + }, + }, + { + Name: util.GenNodeZoneName(1), + Type: util.NodeZoneType, + Resources: topov1alpha1.ResourceInfoList{ + { + Name: string(corev1.ResourceCPU), + Capacity: resource.MustParse("50"), + Allocatable: resource.MustParse("50"), + Available: resource.MustParse("50"), + }, + { + Name: string(extension.BatchCPU), + Capacity: resource.MustParse("25000"), + Allocatable: resource.MustParse("25000"), + Available: resource.MustParse("25000"), + }, + { + Name: string(extension.BatchMemory), + Capacity: resource.MustParse("58G"), + Allocatable: resource.MustParse("58G"), + Available: resource.MustParse("58G"), + }, + { + Name: string(corev1.ResourceMemory), + Capacity: resource.MustParse("180Gi"), + Allocatable: resource.MustParse("180Gi"), + Available: resource.MustParse("180Gi"), + }, + }, + }, + }, + }).Build(), + }, + args: args{ + strategy: &configuration.ColocationStrategy{ + ResourceDiffThreshold: pointer.Float64(0.1), + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + extension.BatchCPU: *resource.NewQuantity(50000, resource.DecimalSI), + extension.BatchMemory: *resource.NewScaledQuantity(120, 9), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("380Gi"), + extension.BatchCPU: *resource.NewQuantity(50000, resource.DecimalSI), + extension.BatchMemory: *resource.NewScaledQuantity(120, 9), + }, + }, + }, + nr: &framework.NodeResource{ + Resources: map[corev1.ResourceName]*resource.Quantity{ + extension.BatchCPU: resource.NewQuantity(25000, resource.DecimalSI), + extension.BatchMemory: resource.NewScaledQuantity(50, 9), + }, + ZoneResources: map[string]corev1.ResourceList{ + util.GenNodeZoneName(0): { + extension.BatchCPU: resource.MustParse("15000"), + extension.BatchMemory: resource.MustParse("30G"), + }, + util.GenNodeZoneName(1): { + extension.BatchCPU: resource.MustParse("10000"), + extension.BatchMemory: resource.MustParse("20G"), + }, + }, + Annotations: map[string]string{ + extension.AnnotationCPUNormalizationRatio: "1.20", + }, + }, + }, + wantErr: false, + checkFunc: func(t *testing.T, client ctrlclient.Client) { + nrt := &topov1alpha1.NodeResourceTopology{} + err := client.Get(context.TODO(), types.NamespacedName{Name: "test-node"}, nrt) + assert.NoError(t, err) + expectedNRT := &topov1alpha1.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Zones: topov1alpha1.ZoneList{ + { + Name: util.GenNodeZoneName(0), + Type: util.NodeZoneType, + Resources: topov1alpha1.ResourceInfoList{ + { + Name: string(corev1.ResourceCPU), + Capacity: resource.MustParse("50"), + Allocatable: resource.MustParse("50"), + Available: resource.MustParse("50"), + }, + { + Name: string(extension.BatchCPU), + Capacity: resource.MustParse("18000"), + Allocatable: resource.MustParse("18000"), + Available: resource.MustParse("18000"), + }, + { + Name: string(extension.BatchMemory), + Capacity: resource.MustParse("30G"), + Allocatable: resource.MustParse("30G"), + Available: resource.MustParse("30G"), + }, + { + Name: string(corev1.ResourceMemory), + Capacity: resource.MustParse("200Gi"), + Allocatable: resource.MustParse("200Gi"), + Available: resource.MustParse("200Gi"), + }, + }, + }, + { + Name: util.GenNodeZoneName(1), + Type: util.NodeZoneType, + Resources: topov1alpha1.ResourceInfoList{ + { + Name: string(corev1.ResourceCPU), + Capacity: resource.MustParse("50"), + Allocatable: resource.MustParse("50"), + Available: resource.MustParse("50"), + }, + { + Name: string(extension.BatchCPU), + Capacity: resource.MustParse("12000"), + Allocatable: resource.MustParse("12000"), + Available: resource.MustParse("12000"), + }, + { + Name: string(extension.BatchMemory), + Capacity: resource.MustParse("20G"), + Allocatable: resource.MustParse("20G"), + Available: resource.MustParse("20G"), + }, + { + Name: string(corev1.ResourceMemory), + Capacity: resource.MustParse("180Gi"), + Allocatable: resource.MustParse("180Gi"), + Available: resource.MustParse("180Gi"), + }, + }, + }, + }, + } + assert.Equal(t, len(expectedNRT.Zones), len(nrt.Zones)) + for i := range expectedNRT.Zones { + assert.Equal(t, expectedNRT.Zones[i].Name, nrt.Zones[i].Name, fmt.Sprintf("zone %v", i)) + assert.Equal(t, len(expectedNRT.Zones[i].Resources), len(nrt.Zones[i].Resources), fmt.Sprintf("zone %v", i)) + for j := range expectedNRT.Zones[i].Resources { + assert.Equal(t, expectedNRT.Zones[i].Resources[j].Capacity.Value(), nrt.Zones[i].Resources[j].Capacity.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) + assert.Equal(t, expectedNRT.Zones[i].Resources[j].Allocatable.Value(), nrt.Zones[i].Resources[j].Allocatable.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) + assert.Equal(t, expectedNRT.Zones[i].Resources[j].Available.Value(), nrt.Zones[i].Resources[j].Available.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) + } + } + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer testPluginCleanup() + p := &Plugin{} + assert.Equal(t, PluginName, p.Name()) + testOpt := &framework.Option{ + Scheme: testScheme, + Client: fake.NewClientBuilder().WithScheme(testScheme).Build(), + Builder: &builder.Builder{}, + Recorder: &record.FakeRecorder{}, + } + if tt.fields.client != nil { + testOpt.Client = tt.fields.client + } + err = p.Setup(testOpt) + assert.NoError(t, err) + + gotErr := p.PreUpdate(tt.args.strategy, tt.args.node, tt.args.nr) + assert.Equal(t, tt.wantErr, gotErr != nil, gotErr) + if tt.checkFunc != nil { + tt.checkFunc(t, testOpt.Client) + } + }) + } +} + +func TestPrepare(t *testing.T) { + testScheme := runtime.NewScheme() + err := clientgoscheme.AddToScheme(testScheme) + assert.NoError(t, err) + err = slov1alpha1.AddToScheme(testScheme) + assert.NoError(t, err) + err = topov1alpha1.AddToScheme(testScheme) + assert.NoError(t, err) + type fields struct { + client ctrlclient.Client + } + type args struct { + strategy *configuration.ColocationStrategy + node *corev1.Node + nr *framework.NodeResource + } + tests := []struct { + name string + fields fields + args args + wantErr bool + wantField *corev1.Node + }{ + { + name: "update batch resources", + fields: fields{ + client: fake.NewClientBuilder().WithScheme(testScheme).Build(), + }, + args: args{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("380Gi"), + }, + }, + }, + nr: &framework.NodeResource{ + Resources: map[corev1.ResourceName]*resource.Quantity{ + extension.BatchCPU: resource.NewQuantity(50000, resource.DecimalSI), + extension.BatchMemory: resource.NewScaledQuantity(120, 9), + }, + }, + }, + wantErr: false, + wantField: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + extension.BatchCPU: *resource.NewQuantity(50000, resource.DecimalSI), + extension.BatchMemory: *resource.NewScaledQuantity(120, 9), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("380Gi"), + extension.BatchCPU: *resource.NewQuantity(50000, resource.DecimalSI), + extension.BatchMemory: *resource.NewScaledQuantity(120, 9), + }, + }, + }, + }, + { + name: "reset batch resources", + fields: fields{ + client: fake.NewClientBuilder().WithScheme(testScheme).Build(), + }, + args: args{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + extension.BatchCPU: *resource.NewQuantity(50000, resource.DecimalSI), + extension.BatchMemory: *resource.NewScaledQuantity(120, 9), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("380Gi"), + extension.BatchCPU: *resource.NewQuantity(50000, resource.DecimalSI), + extension.BatchMemory: *resource.NewScaledQuantity(120, 9), + }, + }, + }, + nr: &framework.NodeResource{ + Resets: map[corev1.ResourceName]bool{ + extension.BatchCPU: true, + extension.BatchMemory: true, + }, + }, + }, + wantErr: false, + wantField: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("380Gi"), + }, + }, + }, + }, + { + name: "add NUMA-level batch resources", + fields: fields{ + client: fake.NewClientBuilder().WithScheme(testScheme).WithObjects(&topov1alpha1.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Zones: topov1alpha1.ZoneList{ + { + Name: util.GenNodeZoneName(0), + Type: util.NodeZoneType, + Resources: topov1alpha1.ResourceInfoList{ + { + Name: string(corev1.ResourceCPU), + Capacity: resource.MustParse("50"), + Allocatable: resource.MustParse("50"), + Available: resource.MustParse("50"), + }, + { + Name: string(corev1.ResourceMemory), + Capacity: resource.MustParse("200Gi"), + Allocatable: resource.MustParse("200Gi"), + Available: resource.MustParse("200Gi"), + }, + }, + }, + { + Name: util.GenNodeZoneName(1), + Type: util.NodeZoneType, + Resources: topov1alpha1.ResourceInfoList{ + { + Name: string(corev1.ResourceCPU), + Capacity: resource.MustParse("50"), + Allocatable: resource.MustParse("50"), + Available: resource.MustParse("50"), + }, + { + Name: string(corev1.ResourceMemory), + Capacity: resource.MustParse("180Gi"), + Allocatable: resource.MustParse("180Gi"), + Available: resource.MustParse("180Gi"), + }, + }, + }, + }, + }).Build(), + }, + args: args{ + strategy: &configuration.ColocationStrategy{ + ResourceDiffThreshold: pointer.Float64(0.1), + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("380Gi"), + }, + }, + }, + nr: &framework.NodeResource{ + Resources: map[corev1.ResourceName]*resource.Quantity{ + extension.BatchCPU: resource.NewQuantity(50000, resource.DecimalSI), + extension.BatchMemory: resource.NewScaledQuantity(120, 9), + }, + ZoneResources: map[string]corev1.ResourceList{ + util.GenNodeZoneName(0): { + extension.BatchCPU: resource.MustParse("25000"), + extension.BatchMemory: resource.MustParse("62G"), + }, + util.GenNodeZoneName(1): { + extension.BatchCPU: resource.MustParse("25000"), + extension.BatchMemory: resource.MustParse("58G"), + }, + }, + }, + }, + wantErr: false, + wantField: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + extension.BatchCPU: *resource.NewQuantity(50000, resource.DecimalSI), + extension.BatchMemory: *resource.NewScaledQuantity(120, 9), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("380Gi"), + extension.BatchCPU: *resource.NewQuantity(50000, resource.DecimalSI), + extension.BatchMemory: *resource.NewScaledQuantity(120, 9), + }, + }, }, }, { @@ -650,88 +1241,6 @@ func TestExecute(t *testing.T) { }, }, }, - checkFunc: func(t *testing.T, client ctrlclient.Client) { - nrt := &topov1alpha1.NodeResourceTopology{} - err := client.Get(context.TODO(), types.NamespacedName{Name: "test-node"}, nrt) - assert.NoError(t, err) - expectedNRT := &topov1alpha1.NodeResourceTopology{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - }, - Zones: topov1alpha1.ZoneList{ - { - Name: util.GenNodeZoneName(0), - Type: util.NodeZoneType, - Resources: topov1alpha1.ResourceInfoList{ - { - Name: string(corev1.ResourceCPU), - Capacity: resource.MustParse("50"), - Allocatable: resource.MustParse("50"), - Available: resource.MustParse("50"), - }, - { - Name: string(extension.BatchCPU), - Capacity: resource.MustParse("15000"), - Allocatable: resource.MustParse("15000"), - Available: resource.MustParse("15000"), - }, - { - Name: string(extension.BatchMemory), - Capacity: resource.MustParse("30G"), - Allocatable: resource.MustParse("30G"), - Available: resource.MustParse("30G"), - }, - { - Name: string(corev1.ResourceMemory), - Capacity: resource.MustParse("200Gi"), - Allocatable: resource.MustParse("200Gi"), - Available: resource.MustParse("200Gi"), - }, - }, - }, - { - Name: util.GenNodeZoneName(1), - Type: util.NodeZoneType, - Resources: topov1alpha1.ResourceInfoList{ - { - Name: string(corev1.ResourceCPU), - Capacity: resource.MustParse("50"), - Allocatable: resource.MustParse("50"), - Available: resource.MustParse("50"), - }, - { - Name: string(extension.BatchCPU), - Capacity: resource.MustParse("10000"), - Allocatable: resource.MustParse("10000"), - Available: resource.MustParse("10000"), - }, - { - Name: string(extension.BatchMemory), - Capacity: resource.MustParse("20G"), - Allocatable: resource.MustParse("20G"), - Available: resource.MustParse("20G"), - }, - { - Name: string(corev1.ResourceMemory), - Capacity: resource.MustParse("180Gi"), - Allocatable: resource.MustParse("180Gi"), - Available: resource.MustParse("180Gi"), - }, - }, - }, - }, - } - assert.Equal(t, len(expectedNRT.Zones), len(nrt.Zones)) - for i := range expectedNRT.Zones { - assert.Equal(t, expectedNRT.Zones[i].Name, nrt.Zones[i].Name, fmt.Sprintf("zone %v", i)) - assert.Equal(t, len(expectedNRT.Zones[i].Resources), len(nrt.Zones[i].Resources), fmt.Sprintf("zone %v", i)) - for j := range expectedNRT.Zones[i].Resources { - assert.Equal(t, expectedNRT.Zones[i].Resources[j].Capacity.Value(), nrt.Zones[i].Resources[j].Capacity.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) - assert.Equal(t, expectedNRT.Zones[i].Resources[j].Allocatable.Value(), nrt.Zones[i].Resources[j].Allocatable.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) - assert.Equal(t, expectedNRT.Zones[i].Resources[j].Available.Value(), nrt.Zones[i].Resources[j].Available.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) - } - } - }, }, { name: "update NUMA-level batch resources with cpu-normalization ratio", @@ -867,88 +1376,6 @@ func TestExecute(t *testing.T) { }, }, }, - checkFunc: func(t *testing.T, client ctrlclient.Client) { - nrt := &topov1alpha1.NodeResourceTopology{} - err := client.Get(context.TODO(), types.NamespacedName{Name: "test-node"}, nrt) - assert.NoError(t, err) - expectedNRT := &topov1alpha1.NodeResourceTopology{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - }, - Zones: topov1alpha1.ZoneList{ - { - Name: util.GenNodeZoneName(0), - Type: util.NodeZoneType, - Resources: topov1alpha1.ResourceInfoList{ - { - Name: string(corev1.ResourceCPU), - Capacity: resource.MustParse("50"), - Allocatable: resource.MustParse("50"), - Available: resource.MustParse("50"), - }, - { - Name: string(extension.BatchCPU), - Capacity: resource.MustParse("18000"), - Allocatable: resource.MustParse("18000"), - Available: resource.MustParse("18000"), - }, - { - Name: string(extension.BatchMemory), - Capacity: resource.MustParse("30G"), - Allocatable: resource.MustParse("30G"), - Available: resource.MustParse("30G"), - }, - { - Name: string(corev1.ResourceMemory), - Capacity: resource.MustParse("200Gi"), - Allocatable: resource.MustParse("200Gi"), - Available: resource.MustParse("200Gi"), - }, - }, - }, - { - Name: util.GenNodeZoneName(1), - Type: util.NodeZoneType, - Resources: topov1alpha1.ResourceInfoList{ - { - Name: string(corev1.ResourceCPU), - Capacity: resource.MustParse("50"), - Allocatable: resource.MustParse("50"), - Available: resource.MustParse("50"), - }, - { - Name: string(extension.BatchCPU), - Capacity: resource.MustParse("12000"), - Allocatable: resource.MustParse("12000"), - Available: resource.MustParse("12000"), - }, - { - Name: string(extension.BatchMemory), - Capacity: resource.MustParse("20G"), - Allocatable: resource.MustParse("20G"), - Available: resource.MustParse("20G"), - }, - { - Name: string(corev1.ResourceMemory), - Capacity: resource.MustParse("180Gi"), - Allocatable: resource.MustParse("180Gi"), - Available: resource.MustParse("180Gi"), - }, - }, - }, - }, - } - assert.Equal(t, len(expectedNRT.Zones), len(nrt.Zones)) - for i := range expectedNRT.Zones { - assert.Equal(t, expectedNRT.Zones[i].Name, nrt.Zones[i].Name, fmt.Sprintf("zone %v", i)) - assert.Equal(t, len(expectedNRT.Zones[i].Resources), len(nrt.Zones[i].Resources), fmt.Sprintf("zone %v", i)) - for j := range expectedNRT.Zones[i].Resources { - assert.Equal(t, expectedNRT.Zones[i].Resources[j].Capacity.Value(), nrt.Zones[i].Resources[j].Capacity.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) - assert.Equal(t, expectedNRT.Zones[i].Resources[j].Allocatable.Value(), nrt.Zones[i].Resources[j].Allocatable.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) - assert.Equal(t, expectedNRT.Zones[i].Resources[j].Available.Value(), nrt.Zones[i].Resources[j].Available.Value(), fmt.Sprintf("zone %v, resource %v", i, j)) - } - } - }, }, } for _, tt := range tests { @@ -968,13 +1395,10 @@ func TestExecute(t *testing.T) { err = p.Setup(testOpt) assert.NoError(t, err) - gotErr := p.Execute(tt.args.strategy, tt.args.node, tt.args.nr) + gotErr := p.Prepare(tt.args.strategy, tt.args.node, tt.args.nr) assert.Equal(t, tt.wantErr, gotErr != nil, gotErr) testingCorrectResourceList(t, &tt.wantField.Status.Capacity, &tt.args.node.Status.Capacity) testingCorrectResourceList(t, &tt.wantField.Status.Allocatable, &tt.args.node.Status.Allocatable) - if tt.checkFunc != nil { - tt.checkFunc(t, testOpt.Client) - } }) } } @@ -988,6 +1412,7 @@ func TestPluginCalculate(t *testing.T) { err = topov1alpha1.AddToScheme(testScheme) assert.NoError(t, err) memoryCalculateByReq := configuration.CalculateByPodRequest + cpuCalculateByMaxUsageReq := configuration.CalculateByPodMaxUsageRequest type fields struct { client ctrlclient.Client checkFn func(t *testing.T, client ctrlclient.Client) @@ -2931,6 +3356,82 @@ func TestPluginCalculate(t *testing.T) { }, wantErr: false, }, + { + name: "calculate with cpu maxUsageRequest and memory request", + args: args{ + strategy: &configuration.ColocationStrategy{ + Enable: pointer.Bool(true), + DegradeTimeMinutes: pointer.Int64(15), + UpdateTimeThresholdSeconds: pointer.Int64(300), + ResourceDiffThreshold: pointer.Float64(0.1), + CPUReclaimThresholdPercent: pointer.Int64(70), + CPUCalculatePolicy: &cpuCalculateByMaxUsageReq, + MemoryReclaimThresholdPercent: pointer.Int64(80), + MemoryCalculatePolicy: &memoryCalculateByReq, + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node1", + Labels: map[string]string{ + "cpu-calculate-by-request": "true", + "memory-calculate-by-request": "true", + }, + }, + Status: makeNodeStat("100", "120G"), + }, + }, + want: []framework.ResourceItem{ + { + Name: extension.BatchCPU, + Quantity: resource.NewQuantity(21000, resource.DecimalSI), + Message: "batchAllocatable[CPU(Milli-Core)]:21000 = nodeCapacity:100000 - nodeReservation:30000 - systemUsageOrReserved:7000 - podHPMaxUsedRequest:42000", + }, + { + Name: extension.BatchMemory, + Quantity: resource.NewScaledQuantity(36, 9), + Message: "batchAllocatable[Mem(GB)]:36 = nodeCapacity:120 - nodeReservation:24 - systemReserved:0 - podHPRequest:60", + }, + }, + wantErr: false, + }, + { + name: "calculate with adjusted reclaim ratio", + args: args{ + strategy: &configuration.ColocationStrategy{ + Enable: pointer.Bool(true), + DegradeTimeMinutes: pointer.Int64(15), + UpdateTimeThresholdSeconds: pointer.Int64(300), + ResourceDiffThreshold: pointer.Float64(0.1), + CPUReclaimThresholdPercent: pointer.Int64(150), + CPUCalculatePolicy: &cpuCalculateByMaxUsageReq, + MemoryReclaimThresholdPercent: pointer.Int64(120), + MemoryCalculatePolicy: &memoryCalculateByReq, + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node1", + Labels: map[string]string{ + "cpu-calculate-by-request": "true", + "memory-calculate-by-request": "true", + }, + }, + Status: makeNodeStat("100", "120G"), + }, + }, + want: []framework.ResourceItem{ + { + Name: extension.BatchCPU, + Quantity: resource.NewQuantity(101000, resource.DecimalSI), + Message: "batchAllocatable[CPU(Milli-Core)]:101000 = nodeCapacity:100000 - nodeReservation:-50000 - systemUsageOrReserved:7000 - podHPMaxUsedRequest:42000", + }, + { + Name: extension.BatchMemory, + Quantity: resource.NewScaledQuantity(84, 9), + Message: "batchAllocatable[Mem(GB)]:84 = nodeCapacity:120 - nodeReservation:-24 - systemReserved:0 - podHPRequest:60", + }, + }, + wantErr: false, + }, } for _, tt := range tests { diff --git a/pkg/slo-controller/noderesource/plugins/batchresource/util.go b/pkg/slo-controller/noderesource/plugins/batchresource/util.go index 60b6283c0..1a2e5f098 100644 --- a/pkg/slo-controller/noderesource/plugins/batchresource/util.go +++ b/pkg/slo-controller/noderesource/plugins/batchresource/util.go @@ -36,30 +36,51 @@ import ( ) func calculateBatchResourceByPolicy(strategy *configuration.ColocationStrategy, nodeCapacity, nodeReserved, systemReserved, - systemUsed, podHPReq, podHPUsed corev1.ResourceList) (corev1.ResourceList, string, string) { - // Node(Batch).Alloc = Node.Total - Node.Reserved - System.Used - Pod(Prod/Mid).Used + systemUsed, podHPReq, podHPUsed, podHPMaxUsedReq corev1.ResourceList) (corev1.ResourceList, string, string) { + // Node(Batch).Alloc[usage] := Node.Total - Node.Reserved - System.Used - sum(Pod(Prod/Mid).Used) // System.Used = max(Node.Used - Pod(All).Used, Node.Anno.Reserved, Node.Kubelet.Reserved) systemUsed = quotav1.Max(systemUsed, systemReserved) batchAllocatableByUsage := quotav1.Max(quotav1.Subtract(quotav1.Subtract(quotav1.Subtract( nodeCapacity, nodeReserved), systemUsed), podHPUsed), util.NewZeroResourceList()) - // Node(Batch).Alloc = Node.Total - Node.Reserved - System.Reserved - Pod(Prod/Mid).Request + // Node(Batch).Alloc[request] := Node.Total - Node.Reserved - System.Reserved - sum(Pod(Prod/Mid).Request) // System.Reserved = max(Node.Anno.Reserved, Node.Kubelet.Reserved) batchAllocatableByRequest := quotav1.Max(quotav1.Subtract(quotav1.Subtract(quotav1.Subtract( nodeCapacity, nodeReserved), systemReserved), podHPReq), util.NewZeroResourceList()) + // Node(Batch).Alloc[maxUsageRequest] := Node.Total - Node.Reserved - System.Used - sum(max(Pod(Prod/Mid).Request, Pod(Prod/Mid).Used)) + batchAllocatableByMaxUsageRequest := quotav1.Max(quotav1.Subtract(quotav1.Subtract(quotav1.Subtract( + nodeCapacity, nodeReserved), systemUsed), podHPMaxUsedReq), util.NewZeroResourceList()) + batchAllocatable := batchAllocatableByUsage - cpuMsg := fmt.Sprintf("batchAllocatable[CPU(Milli-Core)]:%v = nodeCapacity:%v - nodeReservation:%v - systemUsageOrReserved:%v - podHPUsed:%v", - batchAllocatable.Cpu().MilliValue(), nodeCapacity.Cpu().MilliValue(), nodeReserved.Cpu().MilliValue(), - systemUsed.Cpu().MilliValue(), podHPUsed.Cpu().MilliValue()) + + var cpuMsg string + // batch cpu support policy "usage" and "maxUsageRequest" + if strategy != nil && strategy.CPUCalculatePolicy != nil && *strategy.CPUCalculatePolicy == configuration.CalculateByPodMaxUsageRequest { + batchAllocatable[corev1.ResourceCPU] = *batchAllocatableByMaxUsageRequest.Cpu() + cpuMsg = fmt.Sprintf("batchAllocatable[CPU(Milli-Core)]:%v = nodeCapacity:%v - nodeReservation:%v - systemUsageOrReserved:%v - podHPMaxUsedRequest:%v", + batchAllocatable.Cpu().MilliValue(), nodeCapacity.Cpu().MilliValue(), nodeReserved.Cpu().MilliValue(), + systemUsed.Cpu().MilliValue(), podHPMaxUsedReq.Cpu().MilliValue()) + } else { // use CalculatePolicy "usage" by default + cpuMsg = fmt.Sprintf("batchAllocatable[CPU(Milli-Core)]:%v = nodeCapacity:%v - nodeReservation:%v - systemUsageOrReserved:%v - podHPUsed:%v", + batchAllocatable.Cpu().MilliValue(), nodeCapacity.Cpu().MilliValue(), nodeReserved.Cpu().MilliValue(), + systemUsed.Cpu().MilliValue(), podHPUsed.Cpu().MilliValue()) + } var memMsg string + // batch memory support policy "usage", "request" and "maxUsageRequest" if strategy != nil && strategy.MemoryCalculatePolicy != nil && *strategy.MemoryCalculatePolicy == configuration.CalculateByPodRequest { batchAllocatable[corev1.ResourceMemory] = *batchAllocatableByRequest.Memory() memMsg = fmt.Sprintf("batchAllocatable[Mem(GB)]:%v = nodeCapacity:%v - nodeReservation:%v - systemReserved:%v - podHPRequest:%v", batchAllocatable.Memory().ScaledValue(resource.Giga), nodeCapacity.Memory().ScaledValue(resource.Giga), nodeReserved.Memory().ScaledValue(resource.Giga), systemReserved.Memory().ScaledValue(resource.Giga), podHPReq.Memory().ScaledValue(resource.Giga)) + } else if strategy != nil && strategy.MemoryCalculatePolicy != nil && *strategy.MemoryCalculatePolicy == configuration.CalculateByPodMaxUsageRequest { + batchAllocatable[corev1.ResourceMemory] = *batchAllocatableByMaxUsageRequest.Memory() + memMsg = fmt.Sprintf("batchAllocatable[Mem(GB)]:%v = nodeCapacity:%v - nodeReservation:%v - systemUsage:%v - podHPMaxUsedRequest:%v", + batchAllocatable.Memory().ScaledValue(resource.Giga), nodeCapacity.Memory().ScaledValue(resource.Giga), + nodeReserved.Memory().ScaledValue(resource.Giga), systemUsed.Memory().ScaledValue(resource.Giga), + podHPMaxUsedReq.Memory().ScaledValue(resource.Giga)) } else { // use CalculatePolicy "usage" by default memMsg = fmt.Sprintf("batchAllocatable[Mem(GB)]:%v = nodeCapacity:%v - nodeReservation:%v - systemUsage:%v - podHPUsed:%v", batchAllocatable.Memory().ScaledValue(resource.Giga), nodeCapacity.Memory().ScaledValue(resource.Giga), @@ -196,15 +217,24 @@ func divideResourceList(rl corev1.ResourceList, divisor float64) corev1.Resource return divided } -func addZoneResourceList(a, b []corev1.ResourceList, zoneNum int) []corev1.ResourceList { +func zoneResourceListHandler(a, b []corev1.ResourceList, zoneNum int, + handleFn func(a corev1.ResourceList, b corev1.ResourceList) corev1.ResourceList) []corev1.ResourceList { // assert len(a) == len(b) == zoneNum result := make([]corev1.ResourceList, zoneNum) for i := 0; i < zoneNum; i++ { - result[i] = quotav1.Add(a[i], b[i]) + result[i] = handleFn(a[i], b[i]) } return result } +func addZoneResourceList(a, b []corev1.ResourceList, zoneNum int) []corev1.ResourceList { + return zoneResourceListHandler(a, b, zoneNum, quotav1.Add) +} + +func maxZoneResourceList(a, b []corev1.ResourceList, zoneNum int) []corev1.ResourceList { + return zoneResourceListHandler(a, b, zoneNum, quotav1.Max) +} + func getResourceListForCPUAndMemory(rl corev1.ResourceList) corev1.ResourceList { return quotav1.Mask(rl, []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory}) } diff --git a/pkg/slo-controller/noderesource/plugins/cpunormalization/plugin.go b/pkg/slo-controller/noderesource/plugins/cpunormalization/plugin.go index 17789e187..8773a22da 100644 --- a/pkg/slo-controller/noderesource/plugins/cpunormalization/plugin.go +++ b/pkg/slo-controller/noderesource/plugins/cpunormalization/plugin.go @@ -106,7 +106,7 @@ func (p *Plugin) NeedSyncMeta(_ *configuration.ColocationStrategy, oldNode, newN return true, "ratio is different" } -func (p *Plugin) Execute(_ *configuration.ColocationStrategy, node *corev1.Node, nr *framework.NodeResource) error { +func (p *Plugin) Prepare(_ *configuration.ColocationStrategy, node *corev1.Node, nr *framework.NodeResource) error { ratioStr, ok := nr.Annotations[extension.AnnotationCPUNormalizationRatio] if !ok { klog.V(6).Infof("skip for whose has no cpu normalization ratio to set, node %s", node.Name) diff --git a/pkg/slo-controller/noderesource/plugins/cpunormalization/plugin_test.go b/pkg/slo-controller/noderesource/plugins/cpunormalization/plugin_test.go index dd8bfb633..08f4ca64f 100644 --- a/pkg/slo-controller/noderesource/plugins/cpunormalization/plugin_test.go +++ b/pkg/slo-controller/noderesource/plugins/cpunormalization/plugin_test.go @@ -191,7 +191,7 @@ func TestPluginNeedSyncMeta(t *testing.T) { } } -func TestPluginExecute(t *testing.T) { +func TestPluginPrepare(t *testing.T) { type args struct { node *corev1.Node nr *framework.NodeResource @@ -276,7 +276,7 @@ func TestPluginExecute(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { p := &Plugin{} - gotErr := p.Execute(nil, tt.args.node, tt.args.nr) + gotErr := p.Prepare(nil, tt.args.node, tt.args.nr) assert.Equal(t, tt.wantErr, gotErr != nil) assert.Equal(t, tt.wantField, tt.args.node) if tt.wantField1 != nil { diff --git a/pkg/slo-controller/noderesource/plugins/midresource/plugin.go b/pkg/slo-controller/noderesource/plugins/midresource/plugin.go index 5e980c275..50bbf7490 100644 --- a/pkg/slo-controller/noderesource/plugins/midresource/plugin.go +++ b/pkg/slo-controller/noderesource/plugins/midresource/plugin.go @@ -61,7 +61,7 @@ func (p *Plugin) NeedSync(strategy *configuration.ColocationStrategy, oldNode, n return false, "" } -func (p *Plugin) Execute(strategy *configuration.ColocationStrategy, node *corev1.Node, nr *framework.NodeResource) error { +func (p *Plugin) Prepare(_ *configuration.ColocationStrategy, node *corev1.Node, nr *framework.NodeResource) error { for _, resourceName := range ResourceNames { prepareNodeForResource(node, nr, resourceName) } diff --git a/pkg/slo-controller/noderesource/plugins/midresource/plugin_test.go b/pkg/slo-controller/noderesource/plugins/midresource/plugin_test.go index eacbdfeac..e929dbdb7 100644 --- a/pkg/slo-controller/noderesource/plugins/midresource/plugin_test.go +++ b/pkg/slo-controller/noderesource/plugins/midresource/plugin_test.go @@ -102,7 +102,7 @@ func TestPluginNeedSync(t *testing.T) { } } -func TestPluginExecute(t *testing.T) { +func TestPluginPrepare(t *testing.T) { testNode := getTestNode(nil) testWantNodeMidChange := getTestNode(corev1.ResourceList{ extension.MidCPU: *resource.NewQuantity(30000, resource.DecimalSI), @@ -158,7 +158,7 @@ func TestPluginExecute(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { p := &Plugin{} - gotErr := p.Execute(tt.args.strategy, tt.args.node, tt.args.nr) + gotErr := p.Prepare(tt.args.strategy, tt.args.node, tt.args.nr) assert.Equal(t, tt.wantErr, gotErr != nil) assert.Equal(t, tt.wantField, tt.args.node) }) diff --git a/pkg/slo-controller/noderesource/plugins_profile.go b/pkg/slo-controller/noderesource/plugins_profile.go index 9416f26f4..9537d44e9 100644 --- a/pkg/slo-controller/noderesource/plugins_profile.go +++ b/pkg/slo-controller/noderesource/plugins_profile.go @@ -35,9 +35,10 @@ func init() { func addPlugins(filter framework.FilterFn) { // NOTE: plugins run in order of the registration. framework.RegisterSetupExtender(filter, setupPlugins...) + framework.RegisterNodePreUpdateExtender(filter, nodePreUpdatePlugins...) framework.RegisterNodePrepareExtender(filter, nodePreparePlugins...) - framework.RegisterNodeSyncExtender(filter, nodeSyncPlugins...) - framework.RegisterNodeMetaSyncExtender(filter, nodeMetaSyncPlugins...) + framework.RegisterNodeStatusCheckExtender(filter, nodeStatusCheckPlugins...) + framework.RegisterNodeMetaCheckExtender(filter, nodeMetaCheckPlugins...) framework.RegisterResourceCalculateExtender(filter, resourceCalculatePlugins...) } @@ -47,6 +48,10 @@ var ( &cpunormalization.Plugin{}, &batchresource.Plugin{}, } + // NodePreUpdatePlugin implements node resource pre-updating. + nodePreUpdatePlugins = []framework.NodePreUpdatePlugin{ + &batchresource.Plugin{}, + } // NodePreparePlugin implements node resource preparing for the calculated results. nodePreparePlugins = []framework.NodePreparePlugin{ &cpunormalization.Plugin{}, // should be first @@ -54,12 +59,12 @@ var ( &batchresource.Plugin{}, } // NodeSyncPlugin implements the check of resource updating. - nodeSyncPlugins = []framework.NodeSyncPlugin{ + nodeStatusCheckPlugins = []framework.NodeStatusCheckPlugin{ &midresource.Plugin{}, &batchresource.Plugin{}, } - // NodeMetaSyncPlugin implements the check of node meta updating. - nodeMetaSyncPlugins = []framework.NodeMetaSyncPlugin{ + // nodeMetaCheckPlugins implements the check of node meta updating. + nodeMetaCheckPlugins = []framework.NodeMetaCheckPlugin{ &cpunormalization.Plugin{}, } // ResourceCalculatePlugin implements resource counting and overcommitment algorithms. diff --git a/pkg/slo-controller/noderesource/resource_calculator.go b/pkg/slo-controller/noderesource/resource_calculator.go index f9f49788d..bc29d6ec0 100644 --- a/pkg/slo-controller/noderesource/resource_calculator.go +++ b/pkg/slo-controller/noderesource/resource_calculator.go @@ -73,6 +73,9 @@ func (r *NodeResourceReconciler) updateNodeResource(node *corev1.Node, nr *frame nodeCopy := node.DeepCopy() // avoid overwriting the cache strategy := sloconfig.GetNodeColocationStrategy(r.cfgCache.GetCfgCopy(), node) + // pre-update once + framework.RunNodePreUpdateExtenders(strategy, node, nr) + r.prepareNodeResource(strategy, nodeCopy, nr) needSyncStatus, needSyncMeta := r.isNodeResourceSyncNeeded(strategy, node, nodeCopy) if !needSyncStatus && !needSyncMeta { @@ -189,12 +192,12 @@ func (r *NodeResourceReconciler) isNodeResourceSyncNeeded(strategy *configuratio needSyncStatus = true } - isNodeNeedSync := framework.RunNodeSyncExtenders(strategy, oldNode, newNode) + isNodeNeedSync := framework.RunNodeStatusCheckExtenders(strategy, oldNode, newNode) if isNodeNeedSync { needSyncStatus = isNodeNeedSync klog.V(6).InfoS("need sync for node by extender", "node", newNode.Name) } - isNodeMetaNeedSync := framework.RunNodeMetaSyncExtenders(strategy, oldNode, newNode) + isNodeMetaNeedSync := framework.RunNodeMetaCheckExtenders(strategy, oldNode, newNode) if isNodeMetaNeedSync { needSyncMeta = isNodeMetaNeedSync klog.V(6).InfoS("need sync for node meta by extender", "node", newNode.Name) diff --git a/pkg/slo-controller/noderesource/resource_calculator_test.go b/pkg/slo-controller/noderesource/resource_calculator_test.go index e39879af3..2154aa5ec 100644 --- a/pkg/slo-controller/noderesource/resource_calculator_test.go +++ b/pkg/slo-controller/noderesource/resource_calculator_test.go @@ -65,18 +65,18 @@ func (f *FakeCfgCache) IsErrorStatus() bool { return f.errorStatus } -var _ framework.NodeMetaSyncPlugin = (*fakeNodeMetaSyncPlugin)(nil) +var _ framework.NodeMetaCheckPlugin = (*fakeNodeMetaCheckPlugin)(nil) -type fakeNodeMetaSyncPlugin struct { +type fakeNodeMetaCheckPlugin struct { CheckLabels []string AlwaysSync bool } -func (p *fakeNodeMetaSyncPlugin) Name() string { - return "fakeNodeMetaSyncPlugin" +func (p *fakeNodeMetaCheckPlugin) Name() string { + return "fakeNodeMetaCheckPlugin" } -func (p *fakeNodeMetaSyncPlugin) NeedSyncMeta(strategy *configuration.ColocationStrategy, oldNode, newNode *corev1.Node) (bool, string) { +func (p *fakeNodeMetaCheckPlugin) NeedSyncMeta(strategy *configuration.ColocationStrategy, oldNode, newNode *corev1.Node) (bool, string) { if p.AlwaysSync { return true, "always sync" } @@ -1220,10 +1220,10 @@ func Test_updateNodeResource(t *testing.T) { }, } type fields struct { - Client client.Client - config *configuration.ColocationCfg - SyncContext *framework.SyncContext - prepareNodeMetaSyncPlugin []framework.NodeMetaSyncPlugin + Client client.Client + config *configuration.ColocationCfg + SyncContext *framework.SyncContext + prepareNodeMetaCheckPlugin []framework.NodeMetaCheckPlugin } type args struct { oldNode *corev1.Node @@ -1709,8 +1709,8 @@ func Test_updateNodeResource(t *testing.T) { SyncContext: framework.NewSyncContext().WithContext( map[string]time.Time{"/test-node0": time.Now()}, ), - prepareNodeMetaSyncPlugin: []framework.NodeMetaSyncPlugin{ - &fakeNodeMetaSyncPlugin{ + prepareNodeMetaCheckPlugin: []framework.NodeMetaCheckPlugin{ + &fakeNodeMetaCheckPlugin{ AlwaysSync: true, }, }, @@ -1771,11 +1771,11 @@ func Test_updateNodeResource(t *testing.T) { Clock: clock.RealClock{}, } oldNodeCopy := tt.args.oldNode.DeepCopy() - if len(tt.fields.prepareNodeMetaSyncPlugin) > 0 { - framework.RegisterNodeMetaSyncExtender(framework.AllPass, tt.fields.prepareNodeMetaSyncPlugin...) + if len(tt.fields.prepareNodeMetaCheckPlugin) > 0 { + framework.RegisterNodeMetaCheckExtender(framework.AllPass, tt.fields.prepareNodeMetaCheckPlugin...) defer func() { - for _, p := range tt.fields.prepareNodeMetaSyncPlugin { - framework.UnregisterNodeMetaSyncExtender(p.Name()) + for _, p := range tt.fields.prepareNodeMetaCheckPlugin { + framework.UnregisterNodeMetaCheckExtender(p.Name()) } }() } @@ -1801,8 +1801,8 @@ func Test_updateNodeResource(t *testing.T) { func Test_isNodeResourceSyncNeeded(t *testing.T) { type fields struct { - SyncContext *framework.SyncContext - prepareNodeMetaSyncPlugin []framework.NodeMetaSyncPlugin + SyncContext *framework.SyncContext + prepareNodeMetaCheckPlugin []framework.NodeMetaCheckPlugin } type args struct { strategy *configuration.ColocationStrategy @@ -2030,8 +2030,8 @@ func Test_isNodeResourceSyncNeeded(t *testing.T) { SyncContext: framework.NewSyncContext().WithContext( map[string]time.Time{"/test-node0": time.Now()}, ), - prepareNodeMetaSyncPlugin: []framework.NodeMetaSyncPlugin{ - &fakeNodeMetaSyncPlugin{ + prepareNodeMetaCheckPlugin: []framework.NodeMetaCheckPlugin{ + &fakeNodeMetaCheckPlugin{ CheckLabels: []string{"expect-to-change-label"}, }, }, @@ -2094,11 +2094,11 @@ func Test_isNodeResourceSyncNeeded(t *testing.T) { NodeSyncContext: tt.fields.SyncContext, Clock: clock.RealClock{}, } - if len(tt.fields.prepareNodeMetaSyncPlugin) > 0 { - framework.RegisterNodeMetaSyncExtender(framework.AllPass, tt.fields.prepareNodeMetaSyncPlugin...) + if len(tt.fields.prepareNodeMetaCheckPlugin) > 0 { + framework.RegisterNodeMetaCheckExtender(framework.AllPass, tt.fields.prepareNodeMetaCheckPlugin...) defer func() { - for _, p := range tt.fields.prepareNodeMetaSyncPlugin { - framework.UnregisterNodeMetaSyncExtender(p.Name()) + for _, p := range tt.fields.prepareNodeMetaCheckPlugin { + framework.UnregisterNodeMetaCheckExtender(p.Name()) } }() } diff --git a/pkg/util/sloconfig/colocation_config.go b/pkg/util/sloconfig/colocation_config.go index f870d0f4d..924793383 100644 --- a/pkg/util/sloconfig/colocation_config.go +++ b/pkg/util/sloconfig/colocation_config.go @@ -17,15 +17,20 @@ limitations under the License. package sloconfig import ( + "encoding/json" + "fmt" "reflect" + "strconv" "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/klog/v2" "k8s.io/utils/pointer" "github.com/koordinator-sh/koordinator/apis/configuration" + "github.com/koordinator-sh/koordinator/apis/extension" slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" "github.com/koordinator-sh/koordinator/pkg/util" ) @@ -42,8 +47,8 @@ func DefaultColocationCfg() configuration.ColocationCfg { } func DefaultColocationStrategy() configuration.ColocationStrategy { - calculatePolicy := configuration.CalculateByPodUsage - var defaultMemoryCollectPolicy slov1alpha1.NodeMemoryCollectPolicy = slov1alpha1.UsageWithoutPageCache + var cpuCalculatePolicy, memoryCalculatePolicy = configuration.CalculateByPodUsage, configuration.CalculateByPodUsage + var defaultMemoryCollectPolicy = slov1alpha1.UsageWithoutPageCache cfg := configuration.ColocationStrategy{ Enable: pointer.Bool(false), MetricAggregateDurationSeconds: pointer.Int64(300), @@ -57,8 +62,9 @@ func DefaultColocationStrategy() configuration.ColocationStrategy { }, MetricMemoryCollectPolicy: &defaultMemoryCollectPolicy, CPUReclaimThresholdPercent: pointer.Int64(60), + CPUCalculatePolicy: &cpuCalculatePolicy, MemoryReclaimThresholdPercent: pointer.Int64(65), - MemoryCalculatePolicy: &calculatePolicy, + MemoryCalculatePolicy: &memoryCalculatePolicy, DegradeTimeMinutes: pointer.Int64(15), UpdateTimeThresholdSeconds: pointer.Int64(300), ResourceDiffThreshold: pointer.Float64(0.1), @@ -116,5 +122,78 @@ func GetNodeColocationStrategy(cfg *configuration.ColocationCfg, node *corev1.No break } + // update strategy according to node metadata + UpdateColocationStrategyForNode(strategy, node) + return strategy } + +func UpdateColocationStrategyForNode(strategy *configuration.ColocationStrategy, node *corev1.Node) { + strategyOnNode, err := GetColocationStrategyOnNode(node) + if err != nil { + klog.V(5).Infof("failed to parse node colocation strategy for node %s, err: %s", node.Name, err) + } else if strategyOnNode != nil { + merged, _ := util.MergeCfg(strategy, strategyOnNode) + *strategy = *(merged.(*configuration.ColocationStrategy)) + klog.V(6).Infof("node %s use merged colocation strategy from node annotations, merged: %+v", + node.Name, strategy) + } + + cpuReclaimPercent := getNodeReclaimPercent(node, extension.LabelCPUReclaimRatio) + if cpuReclaimPercent != nil { + klog.V(6).Infof("node %s use cpu reclaim percent from node metadata, original: %+v, new: %v", + node.Name, strategy.CPUReclaimThresholdPercent, *cpuReclaimPercent) + strategy.CPUReclaimThresholdPercent = cpuReclaimPercent + } + + memReclaimPercent := getNodeReclaimPercent(node, extension.LabelMemoryReclaimRatio) + if memReclaimPercent != nil { + klog.V(6).Infof("node %s use memory reclaim percent from node metadata, original: %+v, new: %v", + node.Name, strategy.MemoryReclaimThresholdPercent, *memReclaimPercent) + strategy.MemoryReclaimThresholdPercent = memReclaimPercent + } +} + +// GetColocationStrategyOnNode gets the colocation strategy in the node annotations. +func GetColocationStrategyOnNode(node *corev1.Node) (*configuration.ColocationStrategy, error) { + if node.Annotations == nil { + return nil, nil + } + + s, ok := node.Annotations[extension.AnnotationNodeColocationStrategy] + if !ok { + return nil, nil + } + + strategy := &configuration.ColocationStrategy{} + if err := json.Unmarshal([]byte(s), strategy); err != nil { + return nil, fmt.Errorf("parse node colocation strategy failed, err: %w", err) + } + + return strategy, nil +} + +func getNodeReclaimPercent(node *corev1.Node, key string) *int64 { + if node.Labels == nil { + return nil + } + + s, ok := node.Labels[key] + if !ok { + return nil + } + + v, err := strconv.ParseFloat(s, 64) + if err != nil { + klog.V(5).Infof("failed to parse reclaim ratio for node %s, key %s, err: %s", + node.Name, key, err) + return nil + } + if v < 0 { + klog.V(5).Infof("failed to validate reclaim ratio for node %s, key %s, ratio %v", + node.Name, key, v) + return nil + } + + return pointer.Int64(int64(v * 100)) +} diff --git a/pkg/util/sloconfig/colocation_config_extender_test.go b/pkg/util/sloconfig/colocation_config_extender_test.go index 43333e761..8d1027253 100644 --- a/pkg/util/sloconfig/colocation_config_extender_test.go +++ b/pkg/util/sloconfig/colocation_config_extender_test.go @@ -43,12 +43,7 @@ func Test_registerDefaultColocationExtension(t *testing.T) { configBytes, fmtErr := json.Marshal(defautlColocationCfg) configStr := string(configBytes) - expectStr := "{\"enable\":false,\"metricAggregateDurationSeconds\":300,\"metricReportIntervalSeconds\":60," + - "\"metricAggregatePolicy\":{\"durations\":[\"5m0s\",\"10m0s\",\"30m0s\"]}," + - "\"metricMemoryCollectPolicy\":\"usageWithoutPageCache\"," + - "\"cpuReclaimThresholdPercent\":60,\"memoryReclaimThresholdPercent\":65,\"memoryCalculatePolicy\":\"usage\"," + - "\"degradeTimeMinutes\":15,\"updateTimeThresholdSeconds\":300,\"resourceDiffThreshold\":0.1," + - "\"extensions\":{\"test-ext-key\":{\"testBoolVal\":true}}}" + expectStr := `{"enable":false,"metricAggregateDurationSeconds":300,"metricReportIntervalSeconds":60,"metricAggregatePolicy":{"durations":["5m0s","10m0s","30m0s"]},"metricMemoryCollectPolicy":"usageWithoutPageCache","cpuReclaimThresholdPercent":60,"cpuCalculatePolicy":"usage","memoryReclaimThresholdPercent":65,"memoryCalculatePolicy":"usage","degradeTimeMinutes":15,"updateTimeThresholdSeconds":300,"resourceDiffThreshold":0.1,"extensions":{"test-ext-key":{"testBoolVal":true}}}` assert.Equal(t, expectStr, configStr, "config json") assert.NoError(t, fmtErr, "default colocation config marshall") diff --git a/pkg/util/sloconfig/colocation_config_test.go b/pkg/util/sloconfig/colocation_config_test.go index eb5458048..1cc0a59c1 100644 --- a/pkg/util/sloconfig/colocation_config_test.go +++ b/pkg/util/sloconfig/colocation_config_test.go @@ -26,12 +26,14 @@ import ( "k8s.io/utils/pointer" "github.com/koordinator-sh/koordinator/apis/configuration" + "github.com/koordinator-sh/koordinator/apis/extension" slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" ) func Test_GetNodeColocationStrategy(t *testing.T) { memoryCalcPolicyByUsage := configuration.CalculateByPodUsage - var defaultMemoryCollectPolicy slov1alpha1.NodeMemoryCollectPolicy = slov1alpha1.UsageWithoutPageCache + cpuCalcPolicyByUsage := configuration.CalculateByPodUsage + var defaultMemoryCollectPolicy = slov1alpha1.UsageWithoutPageCache defaultCfg := NewDefaultColocationCfg() type args struct { cfg *configuration.ColocationCfg @@ -189,6 +191,7 @@ func Test_GetNodeColocationStrategy(t *testing.T) { MetricReportIntervalSeconds: pointer.Int64(60), MetricAggregatePolicy: DefaultColocationStrategy().MetricAggregatePolicy, CPUReclaimThresholdPercent: pointer.Int64(60), + CPUCalculatePolicy: &cpuCalcPolicyByUsage, MemoryReclaimThresholdPercent: pointer.Int64(65), MemoryCalculatePolicy: &memoryCalcPolicyByUsage, DegradeTimeMinutes: pointer.Int64(15), @@ -260,6 +263,138 @@ func Test_GetNodeColocationStrategy(t *testing.T) { ResourceDiffThreshold: pointer.Float64(0.1), }, }, + { + name: "get strategy merged with node reclaim ratios", + args: args{ + cfg: &configuration.ColocationCfg{ + ColocationStrategy: configuration.ColocationStrategy{ + Enable: pointer.Bool(false), + CPUReclaimThresholdPercent: pointer.Int64(65), + MemoryReclaimThresholdPercent: pointer.Int64(65), + DegradeTimeMinutes: pointer.Int64(15), + UpdateTimeThresholdSeconds: pointer.Int64(300), + ResourceDiffThreshold: pointer.Float64(0.1), + }, + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + Labels: map[string]string{ + extension.LabelCPUReclaimRatio: "0.7", + extension.LabelMemoryReclaimRatio: "0.75", + }, + }, + }, + }, + want: &configuration.ColocationStrategy{ + Enable: pointer.Bool(false), + CPUReclaimThresholdPercent: pointer.Int64(70), + MemoryReclaimThresholdPercent: pointer.Int64(75), + DegradeTimeMinutes: pointer.Int64(15), + UpdateTimeThresholdSeconds: pointer.Int64(300), + ResourceDiffThreshold: pointer.Float64(0.1), + }, + }, + { + name: "get strategy while parse node reclaim ratios failed", + args: args{ + cfg: &configuration.ColocationCfg{ + ColocationStrategy: configuration.ColocationStrategy{ + Enable: pointer.Bool(false), + CPUReclaimThresholdPercent: pointer.Int64(65), + MemoryReclaimThresholdPercent: pointer.Int64(65), + DegradeTimeMinutes: pointer.Int64(15), + UpdateTimeThresholdSeconds: pointer.Int64(300), + ResourceDiffThreshold: pointer.Float64(0.1), + }, + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + Labels: map[string]string{ + extension.LabelCPUReclaimRatio: "-1", + extension.LabelMemoryReclaimRatio: "invalidField", + }, + }, + }, + }, + want: &configuration.ColocationStrategy{ + Enable: pointer.Bool(false), + CPUReclaimThresholdPercent: pointer.Int64(65), + MemoryReclaimThresholdPercent: pointer.Int64(65), + DegradeTimeMinutes: pointer.Int64(15), + UpdateTimeThresholdSeconds: pointer.Int64(300), + ResourceDiffThreshold: pointer.Float64(0.1), + }, + }, + { + name: "get strategy merged with node strategy on annotations", + args: args{ + cfg: &configuration.ColocationCfg{ + ColocationStrategy: configuration.ColocationStrategy{ + Enable: pointer.Bool(false), + CPUReclaimThresholdPercent: pointer.Int64(65), + MemoryReclaimThresholdPercent: pointer.Int64(65), + DegradeTimeMinutes: pointer.Int64(15), + UpdateTimeThresholdSeconds: pointer.Int64(300), + ResourceDiffThreshold: pointer.Float64(0.1), + }, + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + Annotations: map[string]string{ + extension.AnnotationNodeColocationStrategy: ` +{ + "cpuReclaimThresholdPercent": 70, + "memoryReclaimThresholdPercent": 75 +} +`, + }, + }, + }, + }, + want: &configuration.ColocationStrategy{ + Enable: pointer.Bool(false), + CPUReclaimThresholdPercent: pointer.Int64(70), + MemoryReclaimThresholdPercent: pointer.Int64(75), + DegradeTimeMinutes: pointer.Int64(15), + UpdateTimeThresholdSeconds: pointer.Int64(300), + ResourceDiffThreshold: pointer.Float64(0.1), + }, + }, + { + name: "get strategy disabled by node strategy on annotations", + args: args{ + cfg: &configuration.ColocationCfg{ + ColocationStrategy: configuration.ColocationStrategy{ + Enable: pointer.Bool(true), + CPUReclaimThresholdPercent: pointer.Int64(65), + MemoryReclaimThresholdPercent: pointer.Int64(65), + DegradeTimeMinutes: pointer.Int64(15), + UpdateTimeThresholdSeconds: pointer.Int64(300), + ResourceDiffThreshold: pointer.Float64(0.1), + }, + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + Annotations: map[string]string{ + extension.AnnotationNodeColocationStrategy: `{"enable": false}`, + }, + Labels: map[string]string{}, + }, + }, + }, + want: &configuration.ColocationStrategy{ + Enable: pointer.Bool(false), + CPUReclaimThresholdPercent: pointer.Int64(65), + MemoryReclaimThresholdPercent: pointer.Int64(65), + DegradeTimeMinutes: pointer.Int64(15), + UpdateTimeThresholdSeconds: pointer.Int64(300), + ResourceDiffThreshold: pointer.Float64(0.1), + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -269,6 +404,141 @@ func Test_GetNodeColocationStrategy(t *testing.T) { } } +func TestUpdateColocationStrategyForNode(t *testing.T) { + defaultCfg := DefaultColocationStrategy() + disabledCfg := defaultCfg.DeepCopy() + disabledCfg.Enable = pointer.Bool(false) + cfg1 := defaultCfg.DeepCopy() + cfg1.CPUReclaimThresholdPercent = pointer.Int64(100) + cfg2 := defaultCfg.DeepCopy() + cfg2.CPUReclaimThresholdPercent = pointer.Int64(80) + type args struct { + strategy *configuration.ColocationStrategy + node *corev1.Node + } + tests := []struct { + name string + args args + wantField *configuration.ColocationStrategy + }{ + { + name: "no node-level modification", + args: args{ + strategy: defaultCfg.DeepCopy(), + node: &corev1.Node{}, + }, + wantField: defaultCfg.DeepCopy(), + }, + { + name: "update strategy according to annotations", + args: args{ + strategy: defaultCfg.DeepCopy(), + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + extension.AnnotationNodeColocationStrategy: `{"enable": false}`, + }, + }, + }, + }, + wantField: disabledCfg, + }, + { + name: "update strategy according to ratios", + args: args{ + strategy: defaultCfg.DeepCopy(), + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{}, + Labels: map[string]string{ + extension.LabelCPUReclaimRatio: `1.0`, + }, + }, + }, + }, + wantField: cfg1, + }, + { + name: "update strategy according to mixed node configs", + args: args{ + strategy: defaultCfg.DeepCopy(), + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + extension.AnnotationNodeColocationStrategy: `{"cpuReclaimThresholdPercent": 100}`, + }, + Labels: map[string]string{ + extension.LabelCPUReclaimRatio: `0.8`, + }, + }, + }, + }, + wantField: cfg2, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + UpdateColocationStrategyForNode(tt.args.strategy, tt.args.node) + assert.Equal(t, tt.wantField, tt.args.strategy) + }) + } +} + +func TestGetColocationStrategyOnNode(t *testing.T) { + tests := []struct { + name string + arg *corev1.Node + want *configuration.ColocationStrategy + wantErr bool + }{ + { + name: "get no strategy", + arg: &corev1.Node{}, + }, + { + name: "get no strategy 1", + arg: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{}, + }, + }, + }, + { + name: "parse strategy failed", + arg: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + extension.AnnotationNodeColocationStrategy: `invalidField`, + }, + }, + }, + want: nil, + wantErr: true, + }, + { + name: "parse strategy correctly", + arg: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + extension.AnnotationNodeColocationStrategy: `{"enable": true}`, + }, + }, + }, + want: &configuration.ColocationStrategy{ + Enable: pointer.Bool(true), + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, gotErr := GetColocationStrategyOnNode(tt.arg) + assert.Equal(t, tt.want, got) + assert.Equal(t, tt.wantErr, gotErr != nil, gotErr) + }) + } +} + func Test_IsColocationStrategyValid(t *testing.T) { type args struct { strategy *configuration.ColocationStrategy