Skip to content

Commit

Permalink
add rdma device at controller
Browse files Browse the repository at this point in the history
  • Loading branch information
208824 authored and wangjianyu.wjy committed Nov 20, 2024
1 parent 0ce6314 commit 2fa36c7
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 0 deletions.
1 change: 1 addition & 0 deletions apis/extension/device_share.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type DeviceAllocation struct {
Minor int32 `json:"minor"`
Resources corev1.ResourceList `json:"resources"`
Extension *DeviceAllocationExtension `json:"extension,omitempty"`
BusID string `json:"busID,omitempty"`
}

type DeviceAllocationExtension struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,17 @@ func (p *Plugin) Calculate(_ *configuration.ColocationStrategy, node *corev1.Nod
return p.resetGPUNodeResource()
}

existsGPU := false
for _, d := range device.Spec.Devices {
if d.Type == schedulingv1alpha1.GPU && d.Health {
existsGPU = true
}
}
if !existsGPU {
klog.V(5).InfoS("device not found, reset gpu resources on node")
return p.resetGPUNodeResource()
}

// TODO: calculate NUMA-level resources against NRT
return p.calculate(node, device)
}
Expand Down
223 changes: 223 additions & 0 deletions pkg/slo-controller/noderesource/plugins/rdmadevicereource/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rdmadeviceresource

import (
"context"
"fmt"
"sort"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/koordinator-sh/koordinator/apis/configuration"
"github.com/koordinator-sh/koordinator/apis/extension"
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource/framework"
"github.com/koordinator-sh/koordinator/pkg/util"
)

const PluginName = "RDMADeviceResource"

const (
ResetResourcesMsg = "reset node rdma resources"
UpdateResourcesMsg = "node rdma resources from device"
UpdateLabelsMsg = "node rdma labels from device"

NeedSyncForResourceDiffMsg = "rdma resource diff is big than threshold"
NeedSyncForRDMAModelMsgFmt = "rdma device label %s changed"
)

var (
ResourceNames = []corev1.ResourceName{
extension.ResourceRDMA, //koordinator.sh/rdma
}

//TODO add label for rdma
/*Labels = []string{
extension.LabelRDMAModel,
extension.LabelRDMADriverVersion,
}*/
)

var client ctrlclient.Client

type Plugin struct{}

func (p *Plugin) Name() string {
return PluginName
}

// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;patch
// +kubebuilder:rbac:groups=core,resources=nodes/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
// +kubebuilder:rbac:groups=scheduling.koordinator.sh,resources=devices,verbs=get;list;watch
// +kubebuilder:rbac:groups=topology.node.k8s.io,resources=noderesourcetopologies,verbs=get;list;watch;create;update

func (p *Plugin) Setup(opt *framework.Option) error {
client = opt.Client

opt.Builder = opt.Builder.Watches(&schedulingv1alpha1.Device{}, &RDMADeviceHandler{})

return nil
}

func (p *Plugin) NeedSync(strategy *configuration.ColocationStrategy, oldNode, newNode *corev1.Node) (bool, string) {
klog.V(4).Info("rdma isNeedSync start", "oldNode.Status.Allocatable:", oldNode.Status.Allocatable, "newNode.Status.Allocatable:", newNode.Status.Allocatable)
for _, resourceName := range ResourceNames {
klog.V(4).Info("rdma IsResourceDiff start")
if util.IsResourceDiff(oldNode.Status.Allocatable, newNode.Status.Allocatable, resourceName,
*strategy.ResourceDiffThreshold) {
klog.V(4).InfoS("need sync node since resource diff bigger than threshold", "node", newNode.Name,
"resource", resourceName, "threshold", *strategy.ResourceDiffThreshold)
return true, NeedSyncForResourceDiffMsg
}
}

return false, ""
}

func (p *Plugin) Prepare(_ *configuration.ColocationStrategy, node *corev1.Node, nr *framework.NodeResource) error {
// prepare node resources
for _, resourceName := range ResourceNames {
if nr.Resets[resourceName] {
delete(node.Status.Allocatable, resourceName)
delete(node.Status.Capacity, resourceName)
continue
}

q := nr.Resources[resourceName]
if q == nil {
// ignore missing resources
// TBD: shall we remove the resource when some resource types are missing
continue
}
node.Status.Allocatable[resourceName] = *q
node.Status.Capacity[resourceName] = *q
}
return nil
}

func (p *Plugin) Reset(node *corev1.Node, message string) []framework.ResourceItem {
return nil
}

func (p *Plugin) Calculate(_ *configuration.ColocationStrategy, node *corev1.Node, _ *corev1.PodList, _ *framework.ResourceMetrics) ([]framework.ResourceItem, error) {
if node == nil || node.Status.Allocatable == nil {
return nil, fmt.Errorf("missing essential arguments")
}

// calculate device resources
device := &schedulingv1alpha1.Device{}
if err := client.Get(context.TODO(), types.NamespacedName{Name: node.Name, Namespace: node.Namespace}, device); err != nil {
if !errors.IsNotFound(err) {
klog.V(4).InfoS("failed to get device for node", "node", node.Name, "err", err)
return nil, fmt.Errorf("failed to get device resources: %w", err)
}

// device not found, reset rdma resources on node
return p.resetRDMANodeResource()
}

// Check whether the rdma device exists
existsRDMA := false
for _, d := range device.Spec.Devices {
if d.Type == schedulingv1alpha1.RDMA && d.Health {
existsRDMA = true
}
}
if !existsRDMA {
klog.V(5).InfoS("device not found, reset rdma resources on node")
return p.resetRDMANodeResource()
}

// TODO: calculate NUMA-level resources against NRT
return p.calculate(node, device)
}

// 计算RDMA资源 每个网卡资源虚拟出来是100
func (p *Plugin) calculate(node *corev1.Node, device *schedulingv1alpha1.Device) ([]framework.ResourceItem, error) {
if device == nil {
return nil, fmt.Errorf("invalid device")
}

// calculate rdma resources
rdmaResources := make(corev1.ResourceList)
rdmaStasMap := make(map[int32]int)
rdmaPFNum := 0

for _, d := range device.Spec.Devices {
if d.Type != schedulingv1alpha1.RDMA || !d.Health {
continue
}
rdmaPFNum++
if d.VFGroups != nil {
rdmaVFNum := 0
for _, vg := range d.VFGroups {
rdmaVFNum += len(vg.VFs)
}
rdmaStasMap[*d.Minor] = rdmaVFNum
}
}

var vfs int64
for _, values := range rdmaStasMap {
vfs += int64(values)
}

//For now, only one server supports one form, either PF or VF
if vfs == 0 {
totalPF := resource.NewQuantity(int64(rdmaPFNum)*100, resource.BinarySI) //TODO 跟端侧保持一致,Device上报资源乘以100
rdmaResources[extension.ResourceRDMA] = *totalPF
} else {
totalVF := resource.NewQuantity(vfs*100, resource.BinarySI) //TODO 跟端侧保持一致,Device上报资源乘以100
rdmaResources[extension.ResourceRDMA] = *totalVF
}

var items []framework.ResourceItem
// FIXME: shall we add node resources in devices but not in ResourceNames?
for resourceName := range rdmaResources {
q := rdmaResources[resourceName]
items = append(items, framework.ResourceItem{
Name: resourceName,
Quantity: &q,
Message: UpdateResourcesMsg,
})
}
sort.Slice(items, func(i, j int) bool { return items[i].Name < items[j].Name })
klog.V(5).InfoS("calculate rdma resources", "node", node.Name, "resources", rdmaResources)

return items, nil
}

func (p *Plugin) resetRDMANodeResource() ([]framework.ResourceItem, error) {
items := make([]framework.ResourceItem, len(ResourceNames))
// FIXME: shall we reset node resources in devices but not in ResourceNames?
for i := range ResourceNames {
items[i] = framework.ResourceItem{
Name: ResourceNames[i],
Reset: true,
Message: ResetResourcesMsg,
}
}
return items, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rdmadeviceresource

import (
"context"
"reflect"

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
)

var _ handler.EventHandler = &RDMADeviceHandler{}

type RDMADeviceHandler struct{}

func (d *RDMADeviceHandler) Create(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
device := e.Object.(*schedulingv1alpha1.Device)

q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: device.Name,
},
})
}

func (d *RDMADeviceHandler) Update(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
newDevice := e.ObjectNew.(*schedulingv1alpha1.Device)
oldDevice := e.ObjectOld.(*schedulingv1alpha1.Device)
if reflect.DeepEqual(newDevice.Spec, oldDevice.Spec) {
return
}
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: newDevice.Name,
},
})
}

func (d *RDMADeviceHandler) Delete(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
device, ok := e.Object.(*schedulingv1alpha1.Device)
if !ok {
return
}
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: device.Name,
},
})
}

func (d *RDMADeviceHandler) Generic(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
}
5 changes: 5 additions & 0 deletions pkg/slo-controller/noderesource/plugins_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func init() {
addPluginOption(&cpunormalization.Plugin{}, true)
addPluginOption(&resourceamplification.Plugin{}, true)
addPluginOption(&gpudeviceresource.Plugin{}, true)
addPluginOption(&rdmadeviceresource.Plugin{}, true)

Check failure on line 37 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / unit-tests(Verify govet)

undefined: rdmadeviceresource

Check failure on line 37 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go build)

undefined: rdmadeviceresource

Check failure on line 37 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: rdmadeviceresource

Check failure on line 37 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: rdmadeviceresource

Check failure on line 37 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go test)

undefined: rdmadeviceresource
}

func addPlugins(filter framework.FilterFn) {
Expand All @@ -53,6 +54,7 @@ var (
&resourceamplification.Plugin{},
&batchresource.Plugin{},
&gpudeviceresource.Plugin{},
&rdmadeviceresource.Plugin{},

Check failure on line 57 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / unit-tests(Verify govet)

undefined: rdmadeviceresource

Check failure on line 57 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go build)

undefined: rdmadeviceresource

Check failure on line 57 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: rdmadeviceresource

Check failure on line 57 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: rdmadeviceresource

Check failure on line 57 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go test)

undefined: rdmadeviceresource
}
// NodePreUpdatePlugin implements node resource pre-updating.
nodePreUpdatePlugins = []framework.NodePreUpdatePlugin{
Expand All @@ -65,12 +67,14 @@ var (
&midresource.Plugin{},
&batchresource.Plugin{},
&gpudeviceresource.Plugin{},
&rdmadeviceresource.Plugin{},

Check failure on line 70 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / unit-tests(Verify govet)

undefined: rdmadeviceresource

Check failure on line 70 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go build)

undefined: rdmadeviceresource

Check failure on line 70 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: rdmadeviceresource

Check failure on line 70 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: rdmadeviceresource

Check failure on line 70 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go test)

undefined: rdmadeviceresource
}
// NodeSyncPlugin implements the check of resource updating.
nodeStatusCheckPlugins = []framework.NodeStatusCheckPlugin{
&midresource.Plugin{},
&batchresource.Plugin{},
&gpudeviceresource.Plugin{},
&rdmadeviceresource.Plugin{},

Check failure on line 77 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / unit-tests(Verify govet)

undefined: rdmadeviceresource

Check failure on line 77 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go build)

undefined: rdmadeviceresource

Check failure on line 77 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: rdmadeviceresource

Check failure on line 77 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go test)

undefined: rdmadeviceresource
}
// nodeMetaCheckPlugins implements the check of node meta updating.
nodeMetaCheckPlugins = []framework.NodeMetaCheckPlugin{
Expand All @@ -85,5 +89,6 @@ var (
&midresource.Plugin{},
&batchresource.Plugin{},
&gpudeviceresource.Plugin{},
&rdmadeviceresource.Plugin{},

Check failure on line 92 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / unit-tests(Verify govet)

undefined: rdmadeviceresource

Check failure on line 92 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go build)

undefined: rdmadeviceresource

Check failure on line 92 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: rdmadeviceresource) (typecheck)

Check failure on line 92 in pkg/slo-controller/noderesource/plugins_profile.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go test)

undefined: rdmadeviceresource
}
)

0 comments on commit 2fa36c7

Please sign in to comment.