Skip to content

Commit

Permalink
scheduler: fix reserveNUMAResource bug (#2124)
Browse files Browse the repository at this point in the history
Signed-off-by: wangjianyu.wjy <[email protected]>
Co-authored-by: wangjianyu.wjy <[email protected]>
  • Loading branch information
ZiMengSheng and wangjianyu.wjy authored Jul 3, 2024
1 parent 4a61e34 commit 091f73e
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 5 deletions.
25 changes: 20 additions & 5 deletions pkg/scheduler/plugins/nodenumaresource/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nodenumaresource
import (
"context"
"fmt"
"reflect"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -42,9 +43,10 @@ type nodeReservationRestoreStateData struct {
matched map[types.UID]reservationAlloc
unmatched map[types.UID]reservationAlloc

mergedMatchedAllocatable map[int]corev1.ResourceList
mergedMatchedAllocated map[int]corev1.ResourceList
mergedUnmatchedUsed map[int]corev1.ResourceList
mergedMatchedReservedCPUs cpuset.CPUSet
mergedMatchedAllocatable map[int]corev1.ResourceList
mergedMatchedAllocated map[int]corev1.ResourceList
mergedUnmatchedUsed map[int]corev1.ResourceList
}

type reservationAlloc struct {
Expand Down Expand Up @@ -98,12 +100,15 @@ func (rs *nodeReservationRestoreStateData) mergeReservationAllocations() {
if len(matched) > 0 {
mergedMatchedAllocatable := map[int]corev1.ResourceList{}
mergedMatchedAllocated := map[int]corev1.ResourceList{}
mergedCPUSetBuilder := cpuset.NewCPUSetBuilder()
for _, alloc := range matched {
mergedMatchedAllocatable = appendAllocated(mergedMatchedAllocatable, alloc.allocatable)
mergedMatchedAllocated = appendAllocated(mergedMatchedAllocated, alloc.allocated)
mergedCPUSetBuilder.Add(alloc.reservedCPUs.ToSliceNoSort()...)
}
rs.mergedMatchedAllocatable = mergedMatchedAllocatable
rs.mergedMatchedAllocated = mergedMatchedAllocated
rs.mergedMatchedReservedCPUs = mergedCPUSetBuilder.Result()
}

return
Expand Down Expand Up @@ -244,9 +249,10 @@ func tryAllocateFromReservation(
for _, alloc := range matchedReservations {
rInfo := alloc.rInfo

reusableResource = appendAllocated(nil, reusableResource, alloc.remained)
resourceOptions.reusableResources = reusableResource
reservationReusableResource := appendAllocated(nil, reusableResource, alloc.remained)
resourceOptions.reusableResources = reservationReusableResource
resourceOptions.preferredCPUs = alloc.reservedCPUs
resourceOptions.requiredResources = nil

allocatePolicy := rInfo.GetAllocatePolicy()
if allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyDefault ||
Expand Down Expand Up @@ -274,8 +280,17 @@ func tryAllocateFromReservation(
}
hasSatisfiedReservation = true
break
} else {
klog.V(5).InfoS("failed to allocated from reservation", "reservation", rInfo.Reservation.Name, "pod", pod.Name, "node", node.Name, "status", status.Message(), "numaNode", resourceOptions.hint.NUMANodeAffinity.String())
logStruct(reflect.ValueOf(resourceOptions), "options", 6)
logStruct(reflect.ValueOf(restoreState), "restoreState", 6)
}
} else {
klog.V(5).InfoS("failed to allocated from reservation", "reservation", rInfo.Reservation.Name, "pod", pod.Name, "node", node.Name, "status", status.Message(), "numaNode", resourceOptions.hint.NUMANodeAffinity.String())
logStruct(reflect.ValueOf(resourceOptions), "options", 6)
logStruct(reflect.ValueOf(restoreState), "restoreState", 6)
}

reservationReasons = append(reservationReasons, status)
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduler/plugins/nodenumaresource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
Expand Down Expand Up @@ -130,13 +131,16 @@ func (c *resourceManager) GetTopologyHints(node *corev1.Node, pod *corev1.Pod, o
}

options.reusableResources = appendAllocated(nil, restoreStateData.mergedUnmatchedUsed, restoreStateData.mergedMatchedAllocatable)
options.preferredCPUs = restoreStateData.mergedMatchedReservedCPUs
totalAvailable, _, err := c.getAvailableNUMANodeResources(node.Name, topologyOptions, options.reusableResources)
if err != nil {
return nil, err
}
klog.V(6).InfoS("GetTopologyHints", "pod", klog.KObj(pod), "node", node.Name, "before trimmed", totalAvailable)
if err := c.trimNUMANodeResources(node.Name, totalAvailable, options); err != nil {
return nil, err
}
klog.V(6).InfoS("GetTopologyHints", "pod", klog.KObj(pod), "node", node.Name, "after trimmed", totalAvailable)

hints := c.generateResourceHints(node, pod, topologyOptions.NUMANodeResources, options, totalAvailable, policy, restoreStateData)
return hints, nil
Expand Down Expand Up @@ -485,6 +489,8 @@ func (c *resourceManager) generateResourceHints(node *corev1.Node, pod *corev1.P
}
}

klog.V(5).InfoS("generate resource hints", "numaNodesLackResource", numaNodesLackResource, "pod", klog.KObj(pod), "node", node.Name)

generator := hintsGenerator{
numaNodesLackResource: numaNodesLackResource,
minAffinitySize: make(map[corev1.ResourceName]int),
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/plugins/nodenumaresource/topology_hint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
Expand Down Expand Up @@ -69,6 +70,7 @@ func (p *Plugin) GetPodTopologyHints(ctx context.Context, cycleState *framework.
resourceOptions.numaScorer = p.numaScorer
hints, err := p.resourceManager.GetTopologyHints(node, pod, resourceOptions, numaTopologyPolicy, restoreState)
if err != nil {
klog.V(5).ErrorS(err, "failed to get topology hints", "pod", klog.KObj(pod), "node", nodeName)
return nil, framework.NewStatus(framework.Unschedulable, "node(s) Insufficient NUMA Node resources")
}
return hints, nil
Expand Down
45 changes: 45 additions & 0 deletions pkg/scheduler/plugins/nodenumaresource/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ package nodenumaresource

import (
"errors"
"fmt"
"reflect"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

"github.com/koordinator-sh/koordinator/apis/extension"
Expand Down Expand Up @@ -132,3 +136,44 @@ func requestCPUBind(state *preFilterState, nodeCPUBindPolicy extension.NodeCPUBi
}
return false, nil
}

func logStruct(v reflect.Value, key string, verbosity klog.Level) {
rawStr := &strings.Builder{}
logValue(v, 0, rawStr)
klog.V(verbosity).Infof("key: %s, value: %s", key, rawStr.String())
}

// logValue is a recursive function that prints the contents of any value.
// For pointers to structs, it recursively unwraps until it reaches the underlying struct.
func logValue(v reflect.Value, depth int, builder *strings.Builder) {
// Indent for pretty printing
indent := strings.Repeat(" ", depth)
switch v.Kind() {
case reflect.Ptr:
// For pointers, obtain the value being pointed to
elem := v.Elem()
if !elem.IsValid() {
builder.WriteString(fmt.Sprintf("%s<nil>\t", indent)) // Appends "nil" representation for nil pointers
} else {
logValue(elem, depth+1, builder) // Recursively log the pointer element
}
case reflect.Struct:
// For structs, iterate through all fields
builder.WriteString("\t")
for i := 0; i < v.NumField(); i++ {
field := v.Field(i)
fieldType := v.Type().Field(i)
builder.WriteString(fmt.Sprintf("%s%s:\t", indent, fieldType.Name)) // Appends the field name
logValue(field, depth+2, builder) // Recursively log each struct field
}
case reflect.Slice, reflect.Array:
// For slices or arrays, iterate through each element
for i := 0; i < v.Len(); i++ {
builder.WriteString(fmt.Sprintf("%s[%d]:\t", indent, i)) // Appends the index of the element
logValue(v.Index(i), depth+2, builder) // Recursively log each element
}
default:
// For other types, print the value directly
builder.WriteString(fmt.Sprintf("%s%v\t", indent, v)) // Appends the value
}
}

0 comments on commit 091f73e

Please sign in to comment.