diff --git a/pkg/slo-controller/noderesource/plugins/gpudeviceresource/plugin.go b/pkg/slo-controller/noderesource/plugins/gpudeviceresource/plugin.go index bdaf6f14e..c1ca46765 100644 --- a/pkg/slo-controller/noderesource/plugins/gpudeviceresource/plugin.go +++ b/pkg/slo-controller/noderesource/plugins/gpudeviceresource/plugin.go @@ -151,6 +151,17 @@ func (p *Plugin) Calculate(_ *configuration.ColocationStrategy, node *corev1.Nod return p.resetGPUNodeResource() } + existsGPU := false + for _, d := range device.Spec.Devices { + if d.Type == schedulingv1alpha1.GPU && d.Health { + existsGPU = true + } + } + if !existsGPU { + klog.V(5).InfoS("gpu not found in device, reset gpu resources on node", "node", node.Name) + return p.resetGPUNodeResource() + } + // TODO: calculate NUMA-level resources against NRT return p.calculate(node, device) } diff --git a/pkg/slo-controller/noderesource/plugins/gpudeviceresource/plugin_test.go b/pkg/slo-controller/noderesource/plugins/gpudeviceresource/plugin_test.go index a09dc2ab6..58992230b 100644 --- a/pkg/slo-controller/noderesource/plugins/gpudeviceresource/plugin_test.go +++ b/pkg/slo-controller/noderesource/plugins/gpudeviceresource/plugin_test.go @@ -602,6 +602,14 @@ func TestPluginCalculate(t *testing.T) { }, }, } + deviceMissingGPU := &schedulingv1alpha1.Device{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNode.Name, + }, + Spec: schedulingv1alpha1.DeviceSpec{ + Devices: []schedulingv1alpha1.DeviceInfo{}, + }, + } type fields struct { client ctrlclient.Client } @@ -838,6 +846,43 @@ func TestPluginCalculate(t *testing.T) { }, wantErr: false, }, + { + name: "calculate resetting device resources", + fields: fields{ + client: fake.NewClientBuilder().WithScheme(testScheme).WithObjects(testNode, deviceMissingGPU).Build(), + }, + args: args{ + node: testNode, + }, + want: []framework.ResourceItem{ + { + Name: extension.ResourceGPU, + Reset: true, + Message: ResetResourcesMsg, + }, + { + Name: extension.ResourceGPUCore, + Reset: true, + Message: ResetResourcesMsg, + }, + { + Name: extension.ResourceGPUMemory, + Reset: true, + Message: ResetResourcesMsg, + }, + { + Name: extension.ResourceGPUMemoryRatio, + Reset: true, + Message: ResetResourcesMsg, + }, + { + Name: extension.ResourceGPUShared, + Reset: true, + Message: ResetResourcesMsg, + }, + }, + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/slo-controller/noderesource/plugins/rdmadevicereource/device_event_handler.go b/pkg/slo-controller/noderesource/plugins/rdmadevicereource/device_event_handler.go new file mode 100644 index 000000000..3ce165b03 --- /dev/null +++ b/pkg/slo-controller/noderesource/plugins/rdmadevicereource/device_event_handler.go @@ -0,0 +1,71 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rdmadeviceresource + +import ( + "context" + "reflect" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" +) + +var _ handler.EventHandler = &DeviceHandler{} + +type DeviceHandler struct{} + +func (d *DeviceHandler) Create(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) { + device := e.Object.(*schedulingv1alpha1.Device) + q.Add(reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: device.Name, + }, + }) +} + +func (d *DeviceHandler) Update(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) { + newDevice := e.ObjectNew.(*schedulingv1alpha1.Device) + oldDevice := e.ObjectOld.(*schedulingv1alpha1.Device) + if reflect.DeepEqual(newDevice.Spec, oldDevice.Spec) { + return + } + q.Add(reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: newDevice.Name, + }, + }) +} + +func (d *DeviceHandler) Delete(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) { + device, ok := e.Object.(*schedulingv1alpha1.Device) + if !ok { + return + } + q.Add(reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: device.Name, + }, + }) +} + +func (d *DeviceHandler) Generic(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) { +} diff --git a/pkg/slo-controller/noderesource/plugins/rdmadevicereource/device_event_handler_test.go b/pkg/slo-controller/noderesource/plugins/rdmadevicereource/device_event_handler_test.go new file mode 100644 index 000000000..babaea10c --- /dev/null +++ b/pkg/slo-controller/noderesource/plugins/rdmadevicereource/device_event_handler_test.go @@ -0,0 +1,166 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rdmadeviceresource + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" +) + +func Test_EnqueueRequestForNodeMetricMetric(t *testing.T) { + tests := []struct { + name string + fn func(handler handler.EventHandler, q workqueue.RateLimitingInterface) + hasEvent bool + eventName string + }{ + { + name: "create device event", + fn: func(handler handler.EventHandler, q workqueue.RateLimitingInterface) { + handler.Create(context.TODO(), event.CreateEvent{ + Object: &schedulingv1alpha1.Device{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + }, q) + }, + hasEvent: true, + eventName: "node1", + }, + { + name: "delete device event", + fn: func(handler handler.EventHandler, q workqueue.RateLimitingInterface) { + handler.Delete(context.TODO(), event.DeleteEvent{ + Object: &schedulingv1alpha1.Device{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + }, q) + }, + hasEvent: true, + eventName: "node1", + }, + { + name: "delete event not device", + fn: func(handler handler.EventHandler, q workqueue.RateLimitingInterface) { + handler.Delete(context.TODO(), event.DeleteEvent{ + Object: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + }, + }, q) + }, + hasEvent: false, + }, + { + name: "generic event ignore", + fn: func(handler handler.EventHandler, q workqueue.RateLimitingInterface) { + handler.Generic(context.TODO(), event.GenericEvent{}, q) + }, + hasEvent: false, + }, + { + name: "update device event", + fn: func(handler handler.EventHandler, q workqueue.RateLimitingInterface) { + handler.Update(context.TODO(), event.UpdateEvent{ + ObjectOld: &schedulingv1alpha1.Device{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + ResourceVersion: "100", + }, + Spec: schedulingv1alpha1.DeviceSpec{ + Devices: []schedulingv1alpha1.DeviceInfo{ + {}, + }, + }, + }, + ObjectNew: &schedulingv1alpha1.Device{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + ResourceVersion: "101", + }, + Spec: schedulingv1alpha1.DeviceSpec{ + Devices: []schedulingv1alpha1.DeviceInfo{ + {}, + {}, + }, + }, + }, + }, q) + }, + hasEvent: true, + eventName: "node1", + }, + { + name: "update device event ignore", + fn: func(handler handler.EventHandler, q workqueue.RateLimitingInterface) { + handler.Update(context.TODO(), event.UpdateEvent{ + ObjectOld: &schedulingv1alpha1.Device{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + ResourceVersion: "100", + }, + Spec: schedulingv1alpha1.DeviceSpec{ + Devices: []schedulingv1alpha1.DeviceInfo{ + {}, + }, + }, + }, + ObjectNew: &schedulingv1alpha1.Device{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + ResourceVersion: "100", + }, + Spec: schedulingv1alpha1.DeviceSpec{ + Devices: []schedulingv1alpha1.DeviceInfo{ + {}, + }, + }, + }, + }, q) + }, + hasEvent: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + h := &DeviceHandler{} + tt.fn(h, queue) + assert.Equal(t, tt.hasEvent, queue.Len() > 0, "unexpected event") + if tt.hasEvent { + assert.True(t, queue.Len() >= 0, "expected event") + e, _ := queue.Get() + assert.Equal(t, tt.eventName, e.(reconcile.Request).Name) + } + }) + } + +} diff --git a/pkg/slo-controller/noderesource/plugins/rdmadevicereource/plugin.go b/pkg/slo-controller/noderesource/plugins/rdmadevicereource/plugin.go new file mode 100644 index 000000000..bc88e2c20 --- /dev/null +++ b/pkg/slo-controller/noderesource/plugins/rdmadevicereource/plugin.go @@ -0,0 +1,189 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rdmadeviceresource + +import ( + "context" + "fmt" + "sort" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/koordinator-sh/koordinator/apis/configuration" + "github.com/koordinator-sh/koordinator/apis/extension" + schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource/framework" + "github.com/koordinator-sh/koordinator/pkg/util" +) + +const PluginName = "RDMADeviceResource" + +const ( + ResetResourcesMsg = "reset node rdma resources" + UpdateResourcesMsg = "node rdma resources from device" + + NeedSyncForResourceDiffMsg = "rdma resource diff is big than threshold" +) + +var ( + ResourceNames = []corev1.ResourceName{ + extension.ResourceRDMA, + } +) + +var client ctrlclient.Client + +type Plugin struct{} + +func (p *Plugin) Name() string { + return PluginName +} + +// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch +// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;patch +// +kubebuilder:rbac:groups=core,resources=nodes/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch +// +kubebuilder:rbac:groups=scheduling.koordinator.sh,resources=devices,verbs=get;list;watch +// +kubebuilder:rbac:groups=topology.node.k8s.io,resources=noderesourcetopologies,verbs=get;list;watch;create;update + +func (p *Plugin) Setup(opt *framework.Option) error { + client = opt.Client + + opt.Builder = opt.Builder.Watches(&schedulingv1alpha1.Device{}, &DeviceHandler{}) + + return nil +} + +func (p *Plugin) NeedSync(strategy *configuration.ColocationStrategy, oldNode, newNode *corev1.Node) (bool, string) { + for _, resourceName := range ResourceNames { + if util.IsResourceDiff(oldNode.Status.Allocatable, newNode.Status.Allocatable, resourceName, + *strategy.ResourceDiffThreshold) { + klog.V(4).InfoS("need sync node since resource diff bigger than threshold", "node", newNode.Name, + "resource", resourceName, "threshold", *strategy.ResourceDiffThreshold) + return true, NeedSyncForResourceDiffMsg + } + } + + return false, "" +} + +func (p *Plugin) Prepare(_ *configuration.ColocationStrategy, node *corev1.Node, nr *framework.NodeResource) error { + // prepare node resources + for _, resourceName := range ResourceNames { + if nr.Resets[resourceName] { + delete(node.Status.Allocatable, resourceName) + delete(node.Status.Capacity, resourceName) + continue + } + + q := nr.Resources[resourceName] + if q == nil { + // ignore missing resources + // TBD: shall we remove the resource when some resource types are missing + continue + } + node.Status.Allocatable[resourceName] = *q + node.Status.Capacity[resourceName] = *q + } + return nil +} + +func (p *Plugin) Reset(node *corev1.Node, message string) []framework.ResourceItem { + return nil +} + +func (p *Plugin) Calculate(_ *configuration.ColocationStrategy, node *corev1.Node, _ *corev1.PodList, _ *framework.ResourceMetrics) ([]framework.ResourceItem, error) { + if node == nil || node.Status.Allocatable == nil { + return nil, fmt.Errorf("missing essential arguments") + } + + // calculate device resources + device := &schedulingv1alpha1.Device{} + if err := client.Get(context.TODO(), types.NamespacedName{Name: node.Name, Namespace: node.Namespace}, device); err != nil { + if !errors.IsNotFound(err) { + klog.V(4).InfoS("failed to get device for node", "node", node.Name, "err", err) + return nil, fmt.Errorf("failed to get device resources: %w", err) + } + + // device not found, reset rdma resources on node + return p.resetRDMANodeResource() + } + + // Check whether the rdma device exists + existsRDMA := false + for _, d := range device.Spec.Devices { + if d.Type == schedulingv1alpha1.RDMA && d.Health { + existsRDMA = true + } + } + if !existsRDMA { + klog.V(5).InfoS("rdma not found in device, reset rdma resources on node", "node", node.Name) + return p.resetRDMANodeResource() + } + + // TODO: calculate NUMA-level resources against NRT + return p.calculate(node, device) +} + +func (p *Plugin) calculate(node *corev1.Node, device *schedulingv1alpha1.Device) ([]framework.ResourceItem, error) { + if device == nil { + return nil, fmt.Errorf("invalid device") + } + + // calculate rdma resources + rdmaPFNum := 0 + for _, d := range device.Spec.Devices { + if d.Type != schedulingv1alpha1.RDMA || !d.Health { + continue + } + rdmaPFNum++ + } + rdmaResources := make(corev1.ResourceList) + rdmaResources[extension.ResourceRDMA] = *resource.NewQuantity(int64(rdmaPFNum)*100, resource.DecimalSI) + var items []framework.ResourceItem + // FIXME: shall we add node resources in devices but not in ResourceNames? + for resourceName := range rdmaResources { + q := rdmaResources[resourceName] + items = append(items, framework.ResourceItem{ + Name: resourceName, + Quantity: &q, + Message: UpdateResourcesMsg, + }) + } + sort.Slice(items, func(i, j int) bool { return items[i].Name < items[j].Name }) + klog.V(5).InfoS("calculate rdma resources", "node", node.Name, "resources", rdmaResources) + + return items, nil +} + +func (p *Plugin) resetRDMANodeResource() ([]framework.ResourceItem, error) { + items := make([]framework.ResourceItem, len(ResourceNames)) + // FIXME: shall we reset node resources in devices but not in ResourceNames? + for i := range ResourceNames { + items[i] = framework.ResourceItem{ + Name: ResourceNames[i], + Reset: true, + Message: ResetResourcesMsg, + } + } + return items, nil +} diff --git a/pkg/slo-controller/noderesource/plugins/rdmadevicereource/plugin_test.go b/pkg/slo-controller/noderesource/plugins/rdmadevicereource/plugin_test.go new file mode 100644 index 000000000..0748079f7 --- /dev/null +++ b/pkg/slo-controller/noderesource/plugins/rdmadevicereource/plugin_test.go @@ -0,0 +1,514 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rdmadeviceresource + +import ( + "testing" + + topov1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/builder" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/koordinator-sh/koordinator/apis/configuration" + "github.com/koordinator-sh/koordinator/apis/extension" + schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource/framework" + "github.com/koordinator-sh/koordinator/pkg/util/testutil" +) + +func TestPlugin(t *testing.T) { + t.Run("test", func(t *testing.T) { + p := &Plugin{} + assert.Equal(t, PluginName, p.Name()) + + testScheme := runtime.NewScheme() + testOpt := &framework.Option{ + Scheme: testScheme, + Client: fake.NewClientBuilder().WithScheme(testScheme).Build(), + Builder: builder.ControllerManagedBy(&testutil.FakeManager{}), + } + err := p.Setup(testOpt) + assert.NoError(t, err) + + got := p.Reset(nil, "") + assert.Nil(t, got) + }) +} + +func TestPluginNeedSync(t *testing.T) { + testStrategy := &configuration.ColocationStrategy{ + Enable: pointer.Bool(true), + CPUReclaimThresholdPercent: pointer.Int64(65), + MemoryReclaimThresholdPercent: pointer.Int64(65), + DegradeTimeMinutes: pointer.Int64(15), + UpdateTimeThresholdSeconds: pointer.Int64(300), + ResourceDiffThreshold: pointer.Float64(0.1), + } + testNodeWithoutDevice := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + }, + Capacity: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + }, + }, + } + testNodeWithDevice := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + extension.ResourceRDMA: *resource.NewQuantity(200, resource.DecimalSI), + }, + Capacity: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + extension.ResourceRDMA: *resource.NewQuantity(200, resource.DecimalSI), + }, + }, + } + testNodeWithDeviceDriverUpdate := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + extension.ResourceRDMA: *resource.NewQuantity(200, resource.DecimalSI), + }, + Capacity: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + extension.ResourceRDMA: *resource.NewQuantity(200, resource.DecimalSI), + }, + }, + } + testNodeWithDeviceResourceUpdate := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + extension.ResourceRDMA: *resource.NewQuantity(300, resource.DecimalSI), + }, + Capacity: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + extension.ResourceRDMA: *resource.NewQuantity(300, resource.DecimalSI), + }, + }, + } + t.Run("test", func(t *testing.T) { + p := &Plugin{} + + // nothing change, both have no gpu device + got, got1 := p.NeedSync(testStrategy, testNodeWithoutDevice, testNodeWithoutDevice) + assert.False(t, got) + assert.Equal(t, "", got1) + // nothing change, both has gpu devices + got, got1 = p.NeedSync(testStrategy, testNodeWithDevice, testNodeWithDevice) + assert.False(t, got) + assert.Equal(t, "", got1) + // ignore labels change + got, got1 = p.NeedSync(testStrategy, testNodeWithDevice, testNodeWithDeviceDriverUpdate) + assert.False(t, got) + assert.Equal(t, "", got1) + + // add resources + got, got1 = p.NeedSync(testStrategy, testNodeWithoutDevice, testNodeWithDevice) + assert.True(t, got) + assert.Equal(t, NeedSyncForResourceDiffMsg, got1) + // resource update + got, got1 = p.NeedSync(testStrategy, testNodeWithDevice, testNodeWithDeviceResourceUpdate) + assert.True(t, got) + assert.Equal(t, NeedSyncForResourceDiffMsg, got1) + + // delete resources + got, got1 = p.NeedSync(testStrategy, testNodeWithDevice, testNodeWithoutDevice) + assert.True(t, got) + assert.Equal(t, NeedSyncForResourceDiffMsg, got1) + }) +} + +func TestPluginPrepare(t *testing.T) { + testNodeWithoutDevice := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + }, + Capacity: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + }, + }, + } + testNodeWithDevice := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + extension.ResourceRDMA: *resource.NewQuantity(200, resource.DecimalSI), + }, + Capacity: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + extension.ResourceRDMA: *resource.NewQuantity(200, resource.DecimalSI), + }, + }, + } + testNodeWithoutDeviceResources := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + }, + Capacity: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + }, + }, + } + type args struct { + node *corev1.Node + nr *framework.NodeResource + } + tests := []struct { + name string + args args + wantErr bool + wantField *corev1.Node + }{ + { + name: "nothing to prepare", + args: args{ + node: testNodeWithoutDevice, + nr: framework.NewNodeResource(), + }, + wantErr: false, + wantField: testNodeWithoutDevice, + }, + { + name: "update resources and labels correctly", + args: args{ + node: testNodeWithoutDevice, + nr: &framework.NodeResource{ + Resources: map[corev1.ResourceName]*resource.Quantity{ + extension.ResourceRDMA: resource.NewQuantity(200, resource.DecimalSI), + }, + ZoneResources: map[string]corev1.ResourceList{}, + Messages: map[corev1.ResourceName]string{}, + Resets: map[corev1.ResourceName]bool{}, + }, + }, + wantErr: false, + wantField: testNodeWithDevice, + }, + { + name: "reset resources correctly", + args: args{ + node: testNodeWithDevice, + nr: &framework.NodeResource{ + Resets: map[corev1.ResourceName]bool{ + extension.ResourceRDMA: true, + }, + }, + }, + wantErr: false, + wantField: testNodeWithoutDeviceResources, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &Plugin{} + gotErr := p.Prepare(nil, tt.args.node, tt.args.nr) + assert.Equal(t, tt.wantErr, gotErr != nil, gotErr) + assert.Equal(t, tt.wantField, tt.args.node) + }) + } +} + +func TestPluginCalculate(t *testing.T) { + testScheme := runtime.NewScheme() + err := clientgoscheme.AddToScheme(testScheme) + assert.NoError(t, err) + err = topov1alpha1.AddToScheme(testScheme) + assert.NoError(t, err) + err = schedulingv1alpha1.AddToScheme(testScheme) + assert.NoError(t, err) + testNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + }, + Capacity: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + }, + }, + } + testDevice := &schedulingv1alpha1.Device{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNode.Name, + Labels: map[string]string{ + extension.LabelGPUModel: "A100", + extension.LabelGPUDriverVersion: "480", + }, + }, + Spec: schedulingv1alpha1.DeviceSpec{ + Devices: []schedulingv1alpha1.DeviceInfo{ + { + UUID: "1", + Minor: pointer.Int32(0), + Health: true, + Type: schedulingv1alpha1.RDMA, + Resources: map[corev1.ResourceName]resource.Quantity{ + extension.ResourceRDMA: *resource.NewQuantity(200, resource.DecimalSI), + }, + }, + { + UUID: "2", + Minor: pointer.Int32(1), + Health: true, + Type: schedulingv1alpha1.RDMA, + Resources: map[corev1.ResourceName]resource.Quantity{ + extension.ResourceRDMA: *resource.NewQuantity(200, resource.DecimalSI), + }, + }, + }, + }, + } + deviceMissingRDMA := &schedulingv1alpha1.Device{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNode.Name, + }, + Spec: schedulingv1alpha1.DeviceSpec{ + Devices: []schedulingv1alpha1.DeviceInfo{}, + }, + } + type fields struct { + client ctrlclient.Client + } + type args struct { + node *corev1.Node + } + tests := []struct { + name string + fields fields + args args + want []framework.ResourceItem + wantErr bool + }{ + { + name: "args missing essential fields", + fields: fields{ + client: fake.NewClientBuilder().WithScheme(testScheme).Build(), + }, + args: args{ + node: &corev1.Node{}, + }, + want: nil, + wantErr: true, + }, + { + name: "get device object error", + fields: fields{ + client: fake.NewClientBuilder().WithScheme(runtime.NewScheme()).Build(), + }, + args: args{ + node: testNode, + }, + want: nil, + wantErr: true, + }, + { + name: "calculate device resources correctly", + fields: fields{ + client: fake.NewClientBuilder().WithScheme(testScheme).WithObjects(testNode, testDevice).Build(), + }, + args: args{ + node: testNode, + }, + want: []framework.ResourceItem{ + { + Name: extension.ResourceRDMA, + Quantity: resource.NewQuantity(200, resource.DecimalSI), + Message: UpdateResourcesMsg, + }, + }, + wantErr: false, + }, + { + name: "calculate device resources correctly", + fields: fields{ + client: fake.NewClientBuilder().WithScheme(testScheme).WithObjects(testNode, testDevice).Build(), + }, + args: args{ + node: testNode, + }, + want: []framework.ResourceItem{ + { + Name: extension.ResourceRDMA, + Quantity: resource.NewQuantity(200, resource.DecimalSI), + Message: UpdateResourcesMsg, + }, + }, + wantErr: false, + }, + { + name: "calculate resetting device resources", + fields: fields{ + client: fake.NewClientBuilder().WithScheme(testScheme).WithObjects(testNode).Build(), + }, + args: args{ + node: testNode, + }, + want: []framework.ResourceItem{ + { + Name: extension.ResourceRDMA, + Reset: true, + Message: ResetResourcesMsg, + }, + }, + wantErr: false, + }, + { + name: "calculate resetting device resources", + fields: fields{ + client: fake.NewClientBuilder().WithScheme(testScheme).WithObjects(testNode, deviceMissingRDMA).Build(), + }, + args: args{ + node: testNode, + }, + want: []framework.ResourceItem{ + { + Name: extension.ResourceRDMA, + Reset: true, + Message: ResetResourcesMsg, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &Plugin{} + client = tt.fields.client + defer testPluginCleanup() + got, gotErr := p.Calculate(nil, tt.args.node, nil, nil) + assert.Equal(t, tt.wantErr, gotErr != nil, gotErr) + assert.Equal(t, tt.want, got) + }) + } +} + +func Test_cleanupGPUNodeResource(t *testing.T) { + testScheme := runtime.NewScheme() + err := clientgoscheme.AddToScheme(testScheme) + assert.NoError(t, err) + err = topov1alpha1.AddToScheme(testScheme) + assert.NoError(t, err) + err = schedulingv1alpha1.AddToScheme(testScheme) + assert.NoError(t, err) + testNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + Labels: map[string]string{ + "test-label": "test-value", + }, + }, + Status: corev1.NodeStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + }, + Capacity: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + }, + }, + } + testNodeWithoutLabels := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + Labels: map[string]string{ + "test-label": "test-value", + }, + }, + Status: corev1.NodeStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + extension.ResourceRDMA: *resource.NewQuantity(200, resource.DecimalSI), + }, + Capacity: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("400Gi"), + extension.ResourceRDMA: *resource.NewQuantity(200, resource.DecimalSI), + }, + }, + } + t.Run("cleanup success", func(t *testing.T) { + p := &Plugin{} + client = fake.NewClientBuilder().WithScheme(testScheme).Build() + defer testPluginCleanup() + node := testNodeWithoutLabels.DeepCopy() + resourceItems, err := p.Calculate(nil, node, nil, nil) + assert.NoError(t, err, "expect calculate success") + nr := framework.NewNodeResource(resourceItems...) + err = p.Prepare(nil, node, nr) + assert.NoError(t, err) + assert.Equal(t, testNode, node) + }) +} + +func testPluginCleanup() { + client = nil +} diff --git a/pkg/slo-controller/noderesource/plugins_profile.go b/pkg/slo-controller/noderesource/plugins_profile.go index cef68e715..4dad3c4eb 100644 --- a/pkg/slo-controller/noderesource/plugins_profile.go +++ b/pkg/slo-controller/noderesource/plugins_profile.go @@ -22,6 +22,7 @@ import ( "github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource/plugins/cpunormalization" "github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource/plugins/gpudeviceresource" "github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource/plugins/midresource" + rdmadeviceresource "github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource/plugins/rdmadevicereource" "github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource/plugins/resourceamplification" ) @@ -34,6 +35,7 @@ func init() { addPluginOption(&cpunormalization.Plugin{}, true) addPluginOption(&resourceamplification.Plugin{}, true) addPluginOption(&gpudeviceresource.Plugin{}, true) + addPluginOption(&rdmadeviceresource.Plugin{}, true) } func addPlugins(filter framework.FilterFn) { @@ -53,6 +55,7 @@ var ( &resourceamplification.Plugin{}, &batchresource.Plugin{}, &gpudeviceresource.Plugin{}, + &rdmadeviceresource.Plugin{}, } // NodePreUpdatePlugin implements node resource pre-updating. nodePreUpdatePlugins = []framework.NodePreUpdatePlugin{ @@ -65,12 +68,14 @@ var ( &midresource.Plugin{}, &batchresource.Plugin{}, &gpudeviceresource.Plugin{}, + &rdmadeviceresource.Plugin{}, } // NodeSyncPlugin implements the check of resource updating. nodeStatusCheckPlugins = []framework.NodeStatusCheckPlugin{ &midresource.Plugin{}, &batchresource.Plugin{}, &gpudeviceresource.Plugin{}, + &rdmadeviceresource.Plugin{}, } // nodeMetaCheckPlugins implements the check of node meta updating. nodeMetaCheckPlugins = []framework.NodeMetaCheckPlugin{ @@ -85,5 +90,6 @@ var ( &midresource.Plugin{}, &batchresource.Plugin{}, &gpudeviceresource.Plugin{}, + &rdmadeviceresource.Plugin{}, } )