Skip to content

Commit

Permalink
koord-descheduler: add migration object limiter for namespace (#2068)
Browse files Browse the repository at this point in the history
Signed-off-by: songtao98 <[email protected]>
  • Loading branch information
songtao98 authored Jul 22, 2024
1 parent beac34f commit 02e680e
Show file tree
Hide file tree
Showing 5 changed files with 376 additions and 61 deletions.
3 changes: 2 additions & 1 deletion pkg/descheduler/apis/config/types_pluginargs.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ type MigrationControllerArgs struct {
type MigrationLimitObjectType string

const (
MigrationLimitObjectWorkload MigrationLimitObjectType = "workload"
MigrationLimitObjectWorkload MigrationLimitObjectType = "workload"
MigrationLimitObjectNamespace MigrationLimitObjectType = "namespace"
)

type ObjectLimiterMap map[MigrationLimitObjectType]MigrationObjectLimiter
Expand Down
1 change: 1 addition & 0 deletions pkg/descheduler/apis/config/v1alpha2/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var (
MigrationLimitObjectWorkload: {
Duration: metav1.Duration{Duration: 5 * time.Minute},
},
// namespace object limiter is disabled as default
}

defaultLoadAnomalyCondition = &LoadAnomalyCondition{
Expand Down
4 changes: 2 additions & 2 deletions pkg/descheduler/controllers/migration/arbitrator/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,13 @@ func (f *filter) initFilters(args *deschedulerconfig.MigrationControllerArgs, ha
if err != nil {
return err
}
retriablePodFilters := podutil.WrapFilterFuncs(
retryablePodFilters := podutil.WrapFilterFuncs(
f.filterMaxMigratingPerNode,
f.filterMaxMigratingPerNamespace,
f.filterMaxMigratingOrUnavailablePerWorkload,
)
f.retryablePodFilter = func(pod *corev1.Pod) bool {
return evictionsutil.HaveEvictAnnotation(pod) || retriablePodFilters(pod)
return evictionsutil.HaveEvictAnnotation(pod) || retryablePodFilters(pod)
}
f.nonRetryablePodFilter = func(pod *corev1.Pod) bool {
// any annotated as evictable pod pass non-retryable filter
Expand Down
167 changes: 111 additions & 56 deletions pkg/descheduler/controllers/migration/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ type Reconciler struct {
assumedCache *assumedCache
clock clock.Clock

arbitrator arbitrator.Arbitrator
objectLimiters map[types.UID]*rate.Limiter
limiterCache *gocache.Cache
limiterLock sync.Mutex
arbitrator arbitrator.Arbitrator

limiterMap map[deschedulerconfig.MigrationLimitObjectType]map[string]*rate.Limiter
limiterCacheMap map[deschedulerconfig.MigrationLimitObjectType]*gocache.Cache
limiterLock sync.Mutex
}

func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) {
Expand Down Expand Up @@ -452,27 +453,67 @@ func (r *Reconciler) preparePodRef(ctx context.Context, job *sev1alpha1.PodMigra
}

func (r *Reconciler) checkPodExceedObjectLimiter(pod *corev1.Pod) bool {
if r.objectLimiters == nil || r.limiterCache == nil {
if r.limiterMap == nil || len(r.limiterMap) == 0 || r.limiterCacheMap == nil || len(r.limiterCacheMap) == 0 {
return false
}
objectLimiterArgs, ok := r.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload]
if !ok || objectLimiterArgs.Duration.Duration == 0 {
for limiterType, objectLimiterArgs := range r.args.ObjectLimiters {
if objectLimiterArgs.Duration.Duration == 0 {
continue
}
limiterKey, processScope := getLimiterKeyAndProcessScope(pod, limiterType)
if limiterKey == "" {
continue
}
logInfo := getLogInfo(pod, limiterType, processScope)
if r.exceeded(limiterKey, limiterType) {
klog.V(4).InfoS("Pod fails the following checks", logInfo...)
return true
}
}
return false
}

func (r *Reconciler) exceeded(limiterKey string, limiterType deschedulerconfig.MigrationLimitObjectType) bool {
r.limiterLock.Lock()
defer r.limiterLock.Unlock()
limiters, ok := r.limiterMap[limiterType]
if !ok {
return false
}
if ownerRef := metav1.GetControllerOf(pod); ownerRef != nil {
r.limiterLock.Lock()
defer r.limiterLock.Unlock()
if limiter := r.objectLimiters[ownerRef.UID]; limiter != nil {
if remainTokens := limiter.Tokens() - float64(1); remainTokens < 0 {
klog.V(4).InfoS("Pod fails the following checks", "pod", klog.KObj(pod), "checks", "limitedObject",
"owner", fmt.Sprintf("%s/%s/%s", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion))
return true
}
limiter := limiters[limiterKey]
if limiter != nil {
if remainTokens := limiter.Tokens() - float64(1); remainTokens < 0 {
return true
}
}
return false
}

func getLimiterKeyAndProcessScope(pod *corev1.Pod, limiterType deschedulerconfig.MigrationLimitObjectType) (limiterKey, processScope string) {
switch limiterType {
case deschedulerconfig.MigrationLimitObjectWorkload:
if ownerRef := metav1.GetControllerOf(pod); ownerRef != nil {
limiterKey = string(ownerRef.UID)
processScope = fmt.Sprintf("%s/%s/%s", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion)
}
case deschedulerconfig.MigrationLimitObjectNamespace:
limiterKey = pod.Namespace
processScope = fmt.Sprintf("%s", pod.Namespace)
}
return limiterKey, processScope
}

func getLogInfo(pod *corev1.Pod, limiterType deschedulerconfig.MigrationLimitObjectType, processScope string) []interface{} {
logInfo := []interface{}{"pod", klog.KObj(pod), "checks", fmt.Sprintf("limitedObject: %s", limiterType)}
switch limiterType {
case deschedulerconfig.MigrationLimitObjectWorkload:
logInfo = append(logInfo, "owner", processScope)
case deschedulerconfig.MigrationLimitObjectNamespace:
logInfo = append(logInfo, "namespace", processScope)
}
return logInfo
}

func (r *Reconciler) requeueJobIfObjectLimiterFailed(ctx context.Context, job *sev1alpha1.PodMigrationJob) bool {
if evictionsutil.HaveEvictAnnotation(job) {
return false
Expand Down Expand Up @@ -819,48 +860,59 @@ func (r *Reconciler) prepareJobWithReservationScheduleSuccess(ctx context.Contex
}

func (r *Reconciler) trackEvictedPod(pod *corev1.Pod) {
if r.objectLimiters == nil || r.limiterCache == nil {
return
}
ownerRef := metav1.GetControllerOf(pod)
if ownerRef == nil {
return
}

objectLimiterArgs, ok := r.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload]
if !ok || objectLimiterArgs.Duration.Seconds() == 0 {
if r.limiterMap == nil || len(r.limiterMap) == 0 || r.limiterCacheMap == nil || len(r.limiterCacheMap) == 0 {
return
}

var maxMigratingReplicas int
if expectedReplicas, err := r.controllerFinder.GetExpectedScaleForPod(pod); err == nil {
maxMigrating := objectLimiterArgs.MaxMigrating
if maxMigrating == nil {
maxMigrating = r.args.MaxMigratingPerWorkload
for limiterType, objectLimiterArgs := range r.args.ObjectLimiters {
if objectLimiterArgs.Duration.Seconds() == 0 {
continue
}
maxMigratingReplicas, _ = util.GetMaxMigrating(int(expectedReplicas), maxMigrating)
}
if maxMigratingReplicas == 0 {
return
limiterKey, processScope := getLimiterKeyAndProcessScope(pod, limiterType)
if limiterKey == "" {
continue
}
var maxMigratingReplicas int
if expectedReplicas, err := r.controllerFinder.GetExpectedScaleForPod(pod); err == nil {
maxMigrating := objectLimiterArgs.MaxMigrating
if maxMigrating == nil {
maxMigrating = r.args.MaxMigratingPerWorkload
}
maxMigratingReplicas, _ = util.GetMaxMigrating(int(expectedReplicas), maxMigrating)
}
if maxMigratingReplicas == 0 {
return
}
limit := rate.Limit(maxMigratingReplicas) / rate.Limit(objectLimiterArgs.Duration.Seconds())

r.track(limit, limiterKey, processScope, limiterType, maxMigratingReplicas)
}
}

func (r *Reconciler) track(limit rate.Limit, limiterKey, processScope string, limiterType deschedulerconfig.MigrationLimitObjectType, maxMigratingReplicas int) {
r.limiterLock.Lock()
defer r.limiterLock.Unlock()

uid := ownerRef.UID
limit := rate.Limit(maxMigratingReplicas) / rate.Limit(objectLimiterArgs.Duration.Seconds())
limiter := r.objectLimiters[uid]
limiters, ok := r.limiterMap[limiterType]
if !ok {
klog.Errorf("failed to find limiters for type %s", limiterType)
return
}
limiter := limiters[limiterKey]
if limiter == nil {
limiter = rate.NewLimiter(limit, maxMigratingReplicas)
r.objectLimiters[uid] = limiter
limiters[limiterKey] = limiter
} else if limiter.Limit() != limit {
limiter.SetLimit(limit)
}

if !limiter.AllowN(r.clock.Now(), 1) {
klog.Infof("The workload %s/%s/%s has been frequently descheduled recently and needs to be limited for r period of time", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion)
klog.Infof("The %s %s has been frequently descheduled recently and needs to be limited for f period of time", limiterType, processScope)
}
limiterCache, ok := r.limiterCacheMap[limiterType]
if !ok {
klog.Errorf("failed to find limiterCache for type %s", limiterType)
}
r.limiterCache.Set(string(uid), 0, gocache.DefaultExpiration)
limiterCache.Set(limiterKey, 0, gocache.DefaultExpiration)
}

func (r *Reconciler) deleteReservation(ctx context.Context, job *sev1alpha1.PodMigrationJob) error {
Expand Down Expand Up @@ -1040,20 +1092,23 @@ func (r *Reconciler) PreEvictionFilter(pod *corev1.Pod) bool {
}

func (r *Reconciler) initObjectLimiters() {
var trackExpiration time.Duration
for _, v := range r.args.ObjectLimiters {
if v.Duration.Duration > trackExpiration {
trackExpiration = v.Duration.Duration
r.limiterMap = make(map[deschedulerconfig.MigrationLimitObjectType]map[string]*rate.Limiter)
r.limiterCacheMap = make(map[deschedulerconfig.MigrationLimitObjectType]*gocache.Cache)

for limiterType, limiterConfig := range r.args.ObjectLimiters {
var trackExpiration time.Duration
if limiterConfig.Duration.Duration > trackExpiration {
trackExpiration = limiterConfig.Duration.Duration
}
if trackExpiration > 0 {
r.limiterMap[limiterType] = make(map[string]*rate.Limiter)
limiterExpiration := trackExpiration + trackExpiration/2
r.limiterCacheMap[limiterType] = gocache.New(limiterExpiration, limiterExpiration)
r.limiterCacheMap[limiterType].OnEvicted(func(s string, _ interface{}) {
r.limiterLock.Lock()
defer r.limiterLock.Unlock()
delete(r.limiterMap[limiterType], s)
})
}
}
if trackExpiration > 0 {
r.objectLimiters = make(map[types.UID]*rate.Limiter)
limiterExpiration := trackExpiration + trackExpiration/2
r.limiterCache = gocache.New(limiterExpiration, limiterExpiration)
r.limiterCache.OnEvicted(func(s string, _ interface{}) {
r.limiterLock.Lock()
defer r.limiterLock.Unlock()
delete(r.objectLimiters, types.UID(s))
})
}
}
Loading

0 comments on commit 02e680e

Please sign in to comment.