Skip to content

Commit

Permalink
koordlet: fix core sched conflicts with GI and revise API (#1829)
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <[email protected]>
  • Loading branch information
saintube authored Feb 26, 2024
1 parent c70f410 commit 8467b8e
Show file tree
Hide file tree
Showing 19 changed files with 1,674 additions and 362 deletions.
57 changes: 40 additions & 17 deletions apis/slo/v1alpha1/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"

corev1 "k8s.io/api/core/v1"
"k8s.io/utils/pointer"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
)
Expand Down Expand Up @@ -68,12 +67,8 @@ func GetPodMemoryQoSConfig(pod *corev1.Pod) (*PodMemoryQOSConfig, error) {

const (
// LabelCoreSchedGroupID is the label key of the group ID of the Linux Core Scheduling.
// Value should be a valid UUID or the none value "0".
// When the value is a valid UUID, pods with that group ID and the equal CoreExpelled status on the node will be
// assigned to the same core sched cookie.
// When the value is the none value "0", pod will be reset to the default core sched cookie `0`.
// When the k-v pair is missing but the node-level strategy enables the core sched, the pod will be assigned an
// internal group according to the pod's UID.
// Value can be a valid UUID or empty. If it is empty, the pod is considered to belong to a core sched group "".
// Otherwise, the pod is set its core sched group ID according to the value.
//
// Core Sched: https://docs.kernel.org/admin-guide/hw-vuln/core-scheduling.html
// When the Core Sched is enabled, pods with the different core sched group IDs will not be running at the same SMT
Expand All @@ -85,20 +80,48 @@ const (
// are the same.
LabelCoreSchedGroupID = apiext.DomainPrefix + "core-sched-group-id"

// CoreSchedGroupIDNone is the none value of the core sched group ID which indicates the core sched is disabled for
// LabelCoreSchedPolicy is the label key that indicates the particular policy of the core scheduling.
// It is optional and the policy is considered as the default when the label is not set.
// It supports the following policies:
// - "" or not set (default): If the core sched is enabled for the node, the pod is set the group ID according to
// the value of the LabelCoreSchedGroupID.
// - "none": The core sched is explicitly disabled for the pod even if the node-level strategy is enabled.
// - "exclusive": If the core sched is enabled for the node, the pod is set the group ID according to the pod UID,
// so that the pod is exclusive to any other pods.
LabelCoreSchedPolicy = apiext.DomainPrefix + "core-sched-policy"
)

type CoreSchedPolicy string

const (
// CoreSchedPolicyDefault is the default policy of the core scheduling which indicates the core sched group ID
// is set according to the value of the LabelCoreSchedGroupID.
CoreSchedPolicyDefault CoreSchedPolicy = ""
// CoreSchedPolicyNone is the none policy of the core scheduling which indicates the core sched is disabled for
// the pod. The pod will be reset to the system-default cookie `0`.
CoreSchedGroupIDNone = "0"
CoreSchedPolicyNone CoreSchedPolicy = "none"
// CoreSchedPolicyExclusive is the exclusive policy of the core scheduling which indicates the core sched group ID
// is set the same as the pod's UID
CoreSchedPolicyExclusive CoreSchedPolicy = "exclusive"
)

// GetCoreSchedGroupID gets the core sched group ID from the pod labels.
// It returns the core sched group ID and whether the pod explicitly disables the core sched.
func GetCoreSchedGroupID(labels map[string]string) (string, *bool) {
// GetCoreSchedGroupID gets the core sched group ID for the pod according to the labels.
func GetCoreSchedGroupID(labels map[string]string) string {
if labels != nil {
return labels[LabelCoreSchedGroupID]
}
return ""
}

// GetCoreSchedPolicy gets the core sched policy for the pod according to the labels.
func GetCoreSchedPolicy(labels map[string]string) CoreSchedPolicy {
if labels == nil {
return "", nil
return CoreSchedPolicyDefault
}
value, ok := labels[LabelCoreSchedGroupID]
if !ok {
return "", nil
if v := labels[LabelCoreSchedPolicy]; v == string(CoreSchedPolicyNone) {
return CoreSchedPolicyNone
} else if v == string(CoreSchedPolicyExclusive) {
return CoreSchedPolicyExclusive
}
return value, pointer.Bool(value == CoreSchedGroupIDNone)
return CoreSchedPolicyDefault
}
6 changes: 6 additions & 0 deletions pkg/koordlet/koordlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/metricsadvisor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/prediction"
"github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
statesinformerimpl "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer/impl"
Expand All @@ -63,6 +64,7 @@ type daemon struct {
qosManager qosmanager.QOSManager
runtimeHook runtimehooks.RuntimeHook
predictServer prediction.PredictServer
executor resourceexecutor.ResourceUpdateExecutor
}

func NewDaemon(config *config.Configuration) (Daemon, error) {
Expand Down Expand Up @@ -115,6 +117,7 @@ func NewDaemon(config *config.Configuration) (Daemon, error) {
qosManager: qosManager,
runtimeHook: runtimeHook,
predictServer: predictServer,
executor: resourceexecutor.NewResourceUpdateExecutor(),
}

return d, nil
Expand Down Expand Up @@ -163,6 +166,9 @@ func (d *daemon) Run(stopCh <-chan struct{}) {
}
}()

// start resource executor before the writers modules
go d.executor.Run(stopCh)

// start qos manager
go func() {
if err := d.qosManager.Run(stopCh); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/koordlet/runtimehooks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const (
CPUNormalization featuregate.Feature = "CPUNormalization"

// CoreSched manages Linux Core Scheduling cookies for containers who enable the core sched.
// NOTE: CoreSched is an alternative policy of the CPU QoS, and it is exclusive to the Group Identity feature.
//
// owner: @saintube @zwzhang0107
// alpha: v1.4
Expand Down
109 changes: 80 additions & 29 deletions pkg/koordlet/runtimehooks/hooks/coresched/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"k8s.io/utils/pointer"

"github.com/koordinator-sh/koordinator/apis/extension"
slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metrics"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks"
Expand All @@ -51,6 +50,8 @@ const (

// ExpellerGroupSuffix is the default suffix of the expeller core sched group.
ExpellerGroupSuffix = "-expeller"
// NoneGroupID is the special ID denoting none core sched group.
NoneGroupID = "__0__"
)

// SYSTEM QoS is excluded from the cookie mutating.
Expand All @@ -66,9 +67,9 @@ type Plugin struct {
initialized *atomic.Bool // whether the cache has been initialized
allPodsSyncOnce sync.Once // sync once for AllPods

sysSupported *bool
supportedMsg string
sysEnabled bool
sysSupported *bool
supportedMsg string
giSysctlSupported *bool

cookieCache *gocache.Cache // core-sched-group-id -> cookie id, set<pid>; if the group has had cookie
cookieCacheRWMutex sync.RWMutex
Expand Down Expand Up @@ -103,10 +104,12 @@ func (p *Plugin) Register(op hooks.Options) {
// TODO: hook NRI events RunPodSandbox, PostStartContainer
rule.Register(ruleNameForNodeSLO, description,
rule.WithParseFunc(statesinformer.RegisterTypeNodeSLOSpec, p.parseRuleForNodeSLO),
rule.WithUpdateCallback(p.ruleUpdateCb))
rule.WithUpdateCallback(p.ruleUpdateCb),
rule.WithSystemSupported(p.SystemSupported))
rule.Register(ruleNameForAllPods, description,
rule.WithParseFunc(statesinformer.RegisterTypeAllPods, p.parseForAllPods),
rule.WithUpdateCallback(p.ruleUpdateCb))
rule.WithUpdateCallback(p.ruleUpdateCb),
rule.WithSystemSupported(p.SystemSupported))
reconciler.RegisterCgroupReconciler(reconciler.ContainerLevel, sysutil.VirtualCoreSchedCookie,
"set core sched cookie to process groups of container specified",
p.SetContainerCookie, reconciler.PodQOSFilter(), podQOSConditions...)
Expand All @@ -125,26 +128,15 @@ func (p *Plugin) Setup(op hooks.Options) {
p.cse = sysutil.NewCoreSchedExtended()
}

func (p *Plugin) SystemSupported() (bool, string) {
func (p *Plugin) SystemSupported() bool {
if p.sysSupported == nil {
isSupported, msg := sysutil.EnableCoreSchedIfSupported()
p.sysSupported = pointer.Bool(isSupported)
sysSupported, msg := sysutil.IsCoreSchedSupported()
p.sysSupported = pointer.Bool(sysSupported)
p.supportedMsg = msg
klog.Infof("update system supported info for plugin %s, supported %v, msg %s",
name, *p.sysSupported, p.supportedMsg)
klog.Infof("update system supported info to %v for plugin %v, supported msg %s",
sysSupported, name, msg)
}
return *p.sysSupported, p.supportedMsg
}

func (p *Plugin) InitCache(podMetas []*statesinformer.PodMeta) bool {
if p.initialized.Load() {
return true
}

synced := p.LoadAllCookies(podMetas)

p.initialized.Store(synced)
return synced
return *p.sysSupported
}

func (p *Plugin) IsCacheInited() bool {
Expand Down Expand Up @@ -200,10 +192,17 @@ func (p *Plugin) SetContainerCookie(proto protocol.HooksProtocol) error {
podUID := containerCtx.Request.PodMeta.UID
// only process sandbox container or container has valid ID
if len(podUID) <= 0 || len(containerCtx.Request.ContainerMeta.ID) <= 0 {
return fmt.Errorf("invalid container ID for plugin %s, pod UID %s, container ID %s",
name, podUID, containerCtx.Request.ContainerMeta.ID)
klog.V(5).Infof("aborted to manage cookie for container %s/%s, err: empty pod UID %s or container ID %s",
containerCtx.Request.PodMeta.String(), containerCtx.Request.ContainerMeta.Name,
podUID, containerCtx.Request.ContainerMeta.ID)
return nil
}

if !p.SystemSupported() {
klog.V(6).Infof("skipped to set cookie for container %s/%s, core sched is unsupported, msg: %s",
containerCtx.Request.PodMeta.String(), containerCtx.Request.ContainerMeta.Name, p.supportedMsg)
return nil
}
if !p.rule.IsInited() || !p.IsCacheInited() {
klog.V(5).Infof("plugin %s has not been inited, rule inited %v, cache inited %v, aborted to set cookie for container %s/%s",
name, p.rule.IsInited(), p.IsCacheInited(), containerCtx.Request.PodMeta.String(), containerCtx.Request.ContainerMeta.Name)
Expand All @@ -219,15 +218,35 @@ func (p *Plugin) SetContainerCookie(proto protocol.HooksProtocol) error {
// 1. disabled -> enabled: Add or Assign.
// 2. keep enabled: Check the differences of cookie, group ID and the PIDs, and do Assign.
if isEnabled {
// FIXME(saintube): Currently we need to ensure the group identity is disabled via sysctl before enabling
// the core sched cookie in the container reconciler, because the disabling during the rule update might
// fail. This check can be removed once the kernel feature provides a way to disable the group identity.
if err := p.initSystem(true); err != nil {
klog.V(4).Infof("plugin %s failed to initialize system for container %s/%s, err: %s",
name, containerCtx.Request.PodMeta.String(), containerCtx.Request.ContainerMeta.Name, err)
return nil
}

return p.enableContainerCookie(containerCtx, groupID)
}
// else pod disables

return p.disableContainerCookie(containerCtx, groupID)
}

// LoadAllCookies syncs the current core sched cookies of all pods into the cookie cache.
func (p *Plugin) LoadAllCookies(podMetas []*statesinformer.PodMeta) bool {
func (p *Plugin) initCache(podMetas []*statesinformer.PodMeta) bool {
if p.initialized.Load() {
return true
}

synced := p.loadAllCookies(podMetas)

p.initialized.Store(synced)
return synced
}

// loadAllCookies syncs the current core sched cookies of all pods into the cookie cache.
func (p *Plugin) loadAllCookies(podMetas []*statesinformer.PodMeta) bool {
hasSynced := false
p.cookieCacheRWMutex.Lock()
defer p.cookieCacheRWMutex.Unlock()
Expand Down Expand Up @@ -301,6 +320,38 @@ func (p *Plugin) LoadAllCookies(podMetas []*statesinformer.PodMeta) bool {
return hasSynced
}

func (p *Plugin) initSystem(isEnabled bool) error {
if !isEnabled { // only switch sysctl if enabled
return nil
}

// NOTE: Currently the kernel feature core scheduling is strictly excluded with the group identity's
// bvt=-1. So we have to check if the GroupIdentity can be disabled before creating core sched cookies.
err := p.tryDisableGroupIdentity()
if err != nil {
return err
}

sysEnabled, msg := sysutil.EnableCoreSchedIfSupported()
if !sysEnabled {
return fmt.Errorf("failed to enable core sched, msg: %s", msg)
}

return nil
}

// tryDisableGroupIdentity tries disabling the group identity via sysctl to safely enable the core sched.
func (p *Plugin) tryDisableGroupIdentity() error {
if p.giSysctlSupported == nil {
p.giSysctlSupported = pointer.Bool(sysutil.IsGroupIdentitySysctlSupported())
}
if !*p.giSysctlSupported { // not support either the group identity or the core sched
return nil
}

return sysutil.SetSchedGroupIdentity(false)
}

// enableContainerCookie adds or assigns a core sched cookie for the container.
func (p *Plugin) enableContainerCookie(containerCtx *protocol.ContainerContext, groupID string) error {
podMetaName := containerCtx.Request.PodMeta.String()
Expand Down Expand Up @@ -405,7 +456,7 @@ func (p *Plugin) disableContainerCookie(containerCtx *protocol.ContainerContext,
// invalid lastGroupID means container not in group cache (container should be cleared or not ever added)
// invalid lastCookieEntry means group not in cookie cache (group should be cleared)
// let its cached PIDs expire or removed by siblings' Assign
if (len(lastGroupID) <= 0 || lastGroupID == slov1alpha1.CoreSchedGroupIDNone) && lastCookieEntry == nil {
if (len(lastGroupID) <= 0 || lastGroupID == NoneGroupID) && lastCookieEntry == nil {
return nil
}

Expand Down Expand Up @@ -446,7 +497,7 @@ func (p *Plugin) getCookieCacheForContainer(groupID, containerUID string) (strin
defer p.cookieCacheRWMutex.RUnlock()

lastGroupIDIf, containerHasGroup := p.groupCache.Get(containerUID)
lastGroupID := slov1alpha1.CoreSchedGroupIDNone
lastGroupID := NoneGroupID
if containerHasGroup {
lastGroupID = lastGroupIDIf.(string)
}
Expand Down
Loading

0 comments on commit 8467b8e

Please sign in to comment.