Skip to content

Commit

Permalink
scheduler: revise numa-aware hints generator (#1732)
Browse files Browse the repository at this point in the history
Signed-off-by: Joseph <[email protected]>
  • Loading branch information
eahydra authored Nov 3, 2023
1 parent 7601ae4 commit 5b7f0af
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 80 deletions.
181 changes: 139 additions & 42 deletions pkg/scheduler/plugins/nodenumaresource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/client-go/tools/cache"
corehelper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/scheduler/framework"

schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config"
Expand Down Expand Up @@ -128,15 +129,45 @@ func (c *resourceManager) GetTopologyHints(node *corev1.Node, pod *corev1.Pod, o
if err != nil {
return nil, err
}

result := generateResourceHints(topologyOptions.NUMANodeResources, options.requests, totalAvailable, options.numaScorer)
hints := make(map[string][]topologymanager.NUMATopologyHint)
for k, v := range result {
hints[k] = v
if err := c.trimNUMANodeResources(node.Name, totalAvailable, options); err != nil {
return nil, err
}

hints := generateResourceHints(topologyOptions.NUMANodeResources, options.requests, totalAvailable, options.numaScorer)
return hints, nil
}

func (c *resourceManager) trimNUMANodeResources(nodeName string, totalAvailable map[int]corev1.ResourceList, options *ResourceOptions) error {
if !options.requiredCPUBindPolicy {
return nil
}
availableCPUs, _, err := c.GetAvailableCPUs(nodeName, options.preferredCPUs)
if err != nil {
return err
}
cpuDetails := options.topologyOptions.CPUTopology.CPUDetails.KeepOnly(availableCPUs)
for numaNode, available := range totalAvailable {
cpuQuantity := available[corev1.ResourceCPU]
if cpuQuantity.IsZero() {
continue
}
availableCPUs := cpuDetails.CPUsInNUMANodes(numaNode)
if int64(availableCPUs.Size()*1000) >= cpuQuantity.MilliValue() {
availableCPUs = filterCPUsByRequiredCPUBindPolicy(
options.cpuBindPolicy,
availableCPUs,
cpuDetails,
options.topologyOptions.CPUTopology.CPUsPerCore(),
)
}
if int64(availableCPUs.Size())*1000 < cpuQuantity.MilliValue() {
cpuQuantity.SetMilli(int64(availableCPUs.Size() * 1000))
available[corev1.ResourceCPU] = cpuQuantity
}
}
return nil
}

func (c *resourceManager) Allocate(node *corev1.Node, pod *corev1.Pod, options *ResourceOptions) (*PodAllocation, error) {
allocation := &PodAllocation{
UID: pod.UID,
Expand Down Expand Up @@ -249,7 +280,12 @@ func (c *resourceManager) allocateCPUSet(node *corev1.Node, pod *corev1.Pod, all
topologyOptions := &options.topologyOptions
if options.requiredCPUBindPolicy {
cpuDetails := topologyOptions.CPUTopology.CPUDetails.KeepOnly(availableCPUs)
availableCPUs = filterAvailableCPUsByRequiredCPUBindPolicy(options.cpuBindPolicy, availableCPUs, cpuDetails, topologyOptions.CPUTopology.CPUsPerCore())
availableCPUs = filterCPUsByRequiredCPUBindPolicy(
options.cpuBindPolicy,
availableCPUs,
cpuDetails,
topologyOptions.CPUTopology.CPUsPerCore(),
)
}

if availableCPUs.Size() < options.numCPUsNeeded {
Expand Down Expand Up @@ -380,21 +416,25 @@ func (c *resourceManager) getAvailableNUMANodeResources(nodeName string, topolog
}

func generateResourceHints(numaNodeResources []NUMANodeResource, podRequests corev1.ResourceList, totalAvailable map[int]corev1.ResourceList, numaScorer *resourceAllocationScorer) map[string][]topologymanager.NUMATopologyHint {
// Initialize minAffinitySize to include all NUMA Cells.
minAffinitySizeMap := map[corev1.ResourceName]*int{}
generator := hintsGenerator{
minAffinitySize: make(map[corev1.ResourceName]int),
hints: map[string][]topologymanager.NUMATopologyHint{},
}
var memoryResourceNames []corev1.ResourceName
for resourceName := range podRequests {
size := len(numaNodeResources)
minAffinitySizeMap[resourceName] = &size
generator.minAffinitySize[resourceName] = len(numaNodeResources)
if resourceName == corev1.ResourceMemory || corehelper.IsHugePageResourceName(resourceName) {
memoryResourceNames = append(memoryResourceNames, resourceName)
}
}

hints := map[string][]topologymanager.NUMATopologyHint{}

numaNodes := make([]int, 0, len(numaNodeResources))
for _, v := range numaNodeResources {
numaNodes = append(numaNodes, v.Node)
}

podRequestResources := framework.NewResource(podRequests)
totalResourceNames := sets.NewString()
bitmask.IterateBitMasks(numaNodes, func(mask bitmask.BitMask) {
maskBits := mask.GetBits()
available := make(corev1.ResourceList)
Expand All @@ -415,55 +455,112 @@ func generateResourceHints(numaNodeResources []NUMANodeResource, podRequests cor
score, _ = numaScorer.score(framework.NewResource(requested), framework.NewResource(total), podRequestResources)
}

for resourceName, request := range podRequests {
minAffinitySize := minAffinitySizeMap[resourceName]
if !shouldGenerateHint(total[resourceName], available[resourceName], request, mask.Count(), minAffinitySize) {
continue
// verify that for all memory types the node mask has enough allocatable resources
generator.generateHints(mask, score, total, available, podRequests, memoryResourceNames...)

for resourceName := range podRequests {
if _, ok := total[resourceName]; ok {
totalResourceNames.Insert(string(resourceName))
}
if _, ok := hints[string(resourceName)]; !ok {
hints[string(resourceName)] = []topologymanager.NUMATopologyHint{}
if resourceName == corev1.ResourceMemory || corehelper.IsHugePageResourceName(resourceName) {
continue
}
hints[string(resourceName)] = append(hints[string(resourceName)], topologymanager.NUMATopologyHint{
NUMANodeAffinity: mask,
Preferred: false,
Score: score,
})
generator.generateHints(mask, score, total, available, podRequests, resourceName)
}
})

// update hints preferred according to multiNUMAGroups, in case when it wasn't provided, the default
// behavior to prefer the minimal amount of NUMA nodes will be used
for resourceName := range podRequests {
minAffinitySize := *minAffinitySizeMap[resourceName]
for i, hint := range hints[string(resourceName)] {
hints[string(resourceName)][i].Preferred = len(hint.NUMANodeAffinity.GetBits()) == minAffinitySize
minAffinitySize := generator.minAffinitySize[resourceName]
for i, hint := range generator.hints[string(resourceName)] {
generator.hints[string(resourceName)][i].Preferred = len(hint.NUMANodeAffinity.GetBits()) == minAffinitySize
}
}

return hints
for resourceName := range podRequests {
if totalResourceNames.Has(string(resourceName)) {
hints := generator.hints[string(resourceName)]
if hints == nil {
// no possible NUMA affinities for resource
hints = []topologymanager.NUMATopologyHint{}
generator.hints[string(resourceName)] = hints
}
}
}
return generator.hints
}

func shouldGenerateHint(total resource.Quantity, available resource.Quantity, request resource.Quantity, nodeCount int, minAffinitySize *int) bool {
if total.Cmp(request) < 0 {
return false
type hintsGenerator struct {
minAffinitySize map[corev1.ResourceName]int
hints map[string][]topologymanager.NUMATopologyHint
}

func (g *hintsGenerator) generateHints(mask bitmask.BitMask, score int64, totalAllocatable, totalFree corev1.ResourceList, podRequests corev1.ResourceList, resourceNames ...corev1.ResourceName) {
for _, resourceName := range resourceNames {
total, request := totalAllocatable[resourceName], podRequests[resourceName]
if total.Cmp(request) < 0 {
return
}
}
if nodeCount < *minAffinitySize {
*minAffinitySize = nodeCount

nodeCount := mask.Count()
for _, resourceName := range resourceNames {
affinitySize := g.minAffinitySize[resourceName]
if nodeCount < affinitySize {
g.minAffinitySize[resourceName] = nodeCount
}
}

for _, resourceName := range resourceNames {
free, request := totalFree[resourceName], podRequests[resourceName]
if free.Cmp(request) < 0 {
return
}
}
if available.Cmp(request) < 0 {
return false

for _, resourceName := range resourceNames {
if _, ok := g.hints[string(resourceName)]; !ok {
g.hints[string(resourceName)] = []topologymanager.NUMATopologyHint{}
}
g.hints[string(resourceName)] = append(g.hints[string(resourceName)], topologymanager.NUMATopologyHint{
NUMANodeAffinity: mask,
Preferred: false,
Score: score,
})
}
return true
}

func filterAvailableCPUsByRequiredCPUBindPolicy(policy schedulingconfig.CPUBindPolicy, availableCPUs cpuset.CPUSet, cpuDetails CPUDetails, cpusPerCore int) cpuset.CPUSet {
if policy == schedulingconfig.CPUBindPolicyFullPCPUs {
cpuDetails.KeepOnly(availableCPUs)
cpus := cpuDetails.CPUsInCores(cpuDetails.Cores().ToSliceNoSort()...)
if cpus.Size()%cpusPerCore != 0 {
return availableCPUs
func filterCPUsByRequiredCPUBindPolicy(policy schedulingconfig.CPUBindPolicy, availableCPUs cpuset.CPUSet, cpuDetails CPUDetails, cpusPerCore int) cpuset.CPUSet {
builder := cpuset.NewCPUSetBuilder()
cpuDetails = cpuDetails.KeepOnly(availableCPUs)
switch policy {
case schedulingconfig.CPUBindPolicyFullPCPUs:
for _, core := range cpuDetails.Cores().ToSliceNoSort() {
cpus := cpuDetails.CPUsInCores(core)
if cpus.Size() == cpusPerCore {
builder.Add(cpus.ToSliceNoSort()...)
}
}
availableCPUs = builder.Result()
case schedulingconfig.CPUBindPolicySpreadByPCPUs:
for _, core := range cpuDetails.Cores().ToSliceNoSort() {
// TODO(joseph): Maybe we should support required exclusive policy as following
// allocated := allocatedCPUs.CPUsInCores(core)
// if allocated.Size() > 0 {
// cpuInfo := allocatedCPUs[allocated.ToSliceNoSort()[0]]
// if cpuInfo.ExclusivePolicy != "" &&
// cpuInfo.ExclusivePolicy != schedulingconfig.CPUExclusivePolicyNone &&
// cpuInfo.ExclusivePolicy == exclusivePolicy {
// continue
// }
// }

// Using only one CPU per core ensures correct hints are generated
cpus := cpuDetails.CPUsInCores(core).ToSlice()
builder.Add(cpus[0])
}
return cpus
availableCPUs = builder.Result()
}
return availableCPUs
}
Expand Down
112 changes: 74 additions & 38 deletions pkg/scheduler/plugins/nodenumaresource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,22 +722,7 @@ func TestResourceManagerGetTopologyHint(t *testing.T) {
},
},
want: map[string][]topologymanager.NUMATopologyHint{
string(corev1.ResourceCPU): {
{
NUMANodeAffinity: func() bitmask.BitMask {
mask, _ := bitmask.NewBitMask(0)
return mask
}(),
Preferred: true,
},
{
NUMANodeAffinity: func() bitmask.BitMask {
mask, _ := bitmask.NewBitMask(0, 1)
return mask
}(),
Preferred: false,
},
},
string(corev1.ResourceCPU): {},
},
wantErr: false,
},
Expand Down Expand Up @@ -865,22 +850,7 @@ func TestResourceManagerGetTopologyHint(t *testing.T) {
},
},
want: map[string][]topologymanager.NUMATopologyHint{
string(corev1.ResourceCPU): {
{
NUMANodeAffinity: func() bitmask.BitMask {
mask, _ := bitmask.NewBitMask(0)
return mask
}(),
Preferred: true,
},
{
NUMANodeAffinity: func() bitmask.BitMask {
mask, _ := bitmask.NewBitMask(0, 1)
return mask
}(),
Preferred: false,
},
},
string(corev1.ResourceCPU): {},
},
wantErr: false,
},
Expand Down Expand Up @@ -933,6 +903,69 @@ func TestResourceManagerGetTopologyHint(t *testing.T) {
},
},
},
{
name: "failed to generate hints with insufficient memory and hugepages",
pod: &corev1.Pod{},
options: &ResourceOptions{
numCPUsNeeded: 4,
requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("4"),
corev1.ResourceMemory: resource.MustParse("4Gi"),
corev1.ResourceHugePagesPrefix + "1Gi": resource.MustParse("4Gi"),
},
},
allocated: &PodAllocation{
UID: "123456",
Name: "test-xxx",
Namespace: "default",
NUMANodeResources: []NUMANodeResource{
{
Node: 0,
Resources: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("4"),
corev1.ResourceMemory: resource.MustParse("120Gi"),
corev1.ResourceHugePagesPrefix + "1Gi": resource.MustParse("4Gi"),
},
},
{
Node: 1,
Resources: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("4"),
corev1.ResourceMemory: resource.MustParse("120Gi"),
corev1.ResourceHugePagesPrefix + "1Gi": resource.MustParse("4Gi"),
},
},
},
},
want: map[string][]topologymanager.NUMATopologyHint{
string(corev1.ResourceCPU): {
{
NUMANodeAffinity: func() bitmask.BitMask {
mask, _ := bitmask.NewBitMask(0)
return mask
}(),
Preferred: true,
},
{
NUMANodeAffinity: func() bitmask.BitMask {
mask, _ := bitmask.NewBitMask(1)
return mask
}(),
Preferred: true,
},
{
NUMANodeAffinity: func() bitmask.BitMask {
mask, _ := bitmask.NewBitMask(0, 1)
return mask
}(),
Preferred: false,
},
},
string(corev1.ResourceMemory): {},
corev1.ResourceHugePagesPrefix + "1Gi": {},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -944,15 +977,17 @@ func TestResourceManagerGetTopologyHint(t *testing.T) {
{
Node: 0,
Resources: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("52"),
corev1.ResourceMemory: resource.MustParse("128Gi"),
corev1.ResourceCPU: resource.MustParse("52"),
corev1.ResourceMemory: resource.MustParse("128Gi"),
corev1.ResourceHugePagesPrefix + "1Gi": resource.MustParse("4Gi"),
},
},
{
Node: 1,
Resources: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("52"),
corev1.ResourceMemory: resource.MustParse("128Gi"),
corev1.ResourceCPU: resource.MustParse("52"),
corev1.ResourceMemory: resource.MustParse("128Gi"),
corev1.ResourceHugePagesPrefix + "1Gi": resource.MustParse("4Gi"),
},
},
}
Expand All @@ -963,8 +998,9 @@ func TestResourceManagerGetTopologyHint(t *testing.T) {
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("104"),
corev1.ResourceMemory: resource.MustParse("256Gi"),
corev1.ResourceCPU: resource.MustParse("104"),
corev1.ResourceMemory: resource.MustParse("256Gi"),
corev1.ResourceHugePagesPrefix + "1Gi": resource.MustParse("8Gi"),
},
},
}
Expand Down

0 comments on commit 5b7f0af

Please sign in to comment.