Skip to content

Commit

Permalink
scheduler: extend schedulerMonitor (#2314)
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <[email protected]>
Co-authored-by: shenxin <[email protected]>
  • Loading branch information
saintube and shenxin authored Jan 8, 2025
1 parent b3ea5b6 commit 1e0be0b
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/frameworkext/framework_extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (ext *frameworkExtenderImpl) runPreBindExtensionPlugins(ctx context.Context

func (ext *frameworkExtenderImpl) RunPostBindPlugins(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) {
if ext.monitor != nil {
defer ext.monitor.Complete(pod)
defer ext.monitor.Complete(pod, nil)
}
ext.Framework.RunPostBindPlugins(ctx, state, pod, nodeName)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/frameworkext/framework_extender_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func (f *FrameworkExtenderFactory) InitScheduler(sched Scheduler) {
}
// Deep copy podInfo to allow pod modification during scheduling
podInfo = podInfo.DeepCopy()
f.monitor.RecordNextPod(podInfo)
return podInfo, nil
}
}
Expand Down Expand Up @@ -192,7 +193,7 @@ func (f *FrameworkExtenderFactory) InterceptSchedulerError(sched *scheduler.Sche
f.errorHandlerDispatcher.setDefaultHandler(sched.FailureHandler)
sched.FailureHandler = func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time) {
f.errorHandlerDispatcher.Error(ctx, fwk, podInfo, status, nominatingInfo, start)
f.monitor.Complete(podInfo.Pod)
f.monitor.Complete(podInfo.Pod, status)
}
}

Expand Down
80 changes: 65 additions & 15 deletions pkg/scheduler/frameworkext/scheduler_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

"github.com/koordinator-sh/koordinator/pkg/scheduler/metrics"
)
Expand All @@ -41,17 +42,29 @@ func init() {
pflag.DurationVar(&schedulingTimeout, "scheduling-timeout", schedulingTimeout, "The maximum acceptable scheduling time interval. After timeout, the metric will be updated and the log will be printed.")
}

var (
StartMonitor = defaultStartMonitor
CompleteMonitor = defaultCompleteMonitor
RecordQueuePodInfo = func(podInfo *framework.QueuedPodInfo, state *podScheduleState) {}
)

type SchedulerMonitor struct {
timeout time.Duration
lock sync.Mutex
schedulingPods map[types.UID]podScheduleState
}

type podScheduleState struct {
start time.Time
namespace string
name string
schedulerName string
// scheduling info
start time.Time
// queue info
dequeued time.Time
lastEnqueued time.Time
attempts int
initialEnqueued *time.Time
}

func NewSchedulerMonitor(period time.Duration, timeout time.Duration) *SchedulerMonitor {
Expand All @@ -73,33 +86,70 @@ func (m *SchedulerMonitor) monitor() {
}
}

func (m *SchedulerMonitor) StartMonitoring(pod *corev1.Pod) {
klog.Infof("start monitoring pod %v(%s)", klog.KObj(pod), pod.UID)
func (m *SchedulerMonitor) RecordNextPod(podInfo *framework.QueuedPodInfo) {
if podInfo == nil || podInfo.Pod == nil {
return
}
pod := podInfo.Pod
now := time.Now()
scheduleState := podScheduleState{
namespace: pod.Namespace,
name: pod.Name,
schedulerName: pod.Spec.SchedulerName,
dequeued: now,
lastEnqueued: podInfo.Timestamp,
attempts: podInfo.Attempts,
initialEnqueued: podInfo.InitialAttemptTimestamp,
}
RecordQueuePodInfo(podInfo, &scheduleState)

m.lock.Lock()
defer m.lock.Unlock()
m.schedulingPods[pod.UID] = podScheduleState{
start: time.Now(),
namespace: pod.Namespace,
name: pod.Name,
schedulerName: pod.Spec.SchedulerName,
}
m.schedulingPods[pod.UID] = scheduleState
m.lock.Unlock()
}

func (m *SchedulerMonitor) Complete(pod *corev1.Pod) {
klog.Infof("pod %v(%s) scheduled complete", klog.KObj(pod), pod.UID)
func (m *SchedulerMonitor) StartMonitoring(pod *corev1.Pod) {
now := time.Now()

m.lock.Lock()
defer m.lock.Unlock()
scheduleState, exists := m.schedulingPods[pod.UID]
if !exists {
scheduleState = podScheduleState{
start: now,
namespace: pod.Namespace,
name: pod.Name,
schedulerName: pod.Spec.SchedulerName,
}
} else {
scheduleState.start = now
}
m.schedulingPods[pod.UID] = scheduleState
m.lock.Unlock()

StartMonitor(pod, &scheduleState)
}

func (m *SchedulerMonitor) Complete(pod *corev1.Pod, status *framework.Status) {
m.lock.Lock()
state, ok := m.schedulingPods[pod.UID]
delete(m.schedulingPods, pod.UID)
m.lock.Unlock()

if ok {
now := time.Now()
recordIfSchedulingTimeout(pod.UID, &state, now, m.timeout)
delete(m.schedulingPods, pod.UID)
CompleteMonitor(pod, &state, now, m.timeout, status)
}
}

func defaultStartMonitor(pod *corev1.Pod, state *podScheduleState) {
klog.Infof("start monitoring pod %v(%s)", klog.KObj(pod), pod.UID)
}

func defaultCompleteMonitor(pod *corev1.Pod, state *podScheduleState, end time.Time, timeout time.Duration, status *framework.Status) {
klog.Infof("pod %v(%s) scheduled complete", klog.KObj(pod), pod.UID)
recordIfSchedulingTimeout(pod.UID, state, end, timeout)
}

func recordIfSchedulingTimeout(uid types.UID, state *podScheduleState, now time.Time, timeout time.Duration) {
if interval := now.Sub(state.start); interval > timeout {
logWarningF("!!!CRITICAL TIMEOUT!!! scheduling pod %s/%s(%s) took longer (%s) than the timeout %v", state.namespace, state.name, uid, interval, timeout)
Expand Down
16 changes: 13 additions & 3 deletions pkg/scheduler/frameworkext/scheduler_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)

func TestSchedulerMonitor_Timeout(t *testing.T) {
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestSchedulerMonitor_Timeout(t *testing.T) {
if len(capturedLog) == 0 || !strings.Contains(capturedLog, "!!!CRITICAL TIMEOUT!!!") {
t.Errorf("Expected a timeout log to be recorded, but got: %s", capturedLog)
}
monitor.Complete(pod)
monitor.Complete(pod, nil)
}

func TestSchedulerMonitor_NoTimeout(t *testing.T) {
Expand Down Expand Up @@ -86,7 +87,7 @@ func TestSchedulerMonitor_NoTimeout(t *testing.T) {
if len(capturedLog) > 0 {
t.Errorf("Expected no timeout log to be recorded, but got: %s", capturedLog)
}
monitor.Complete(pod)
monitor.Complete(pod, nil)
}

func TestSchedulerMonitor_StartAndCompleteMonitoring(t *testing.T) {
Expand All @@ -101,6 +102,15 @@ func TestSchedulerMonitor_StartAndCompleteMonitoring(t *testing.T) {
UID: types.UID("test-uid"),
},
}
podInfo, err := framework.NewPodInfo(pod)
if err != nil {
t.Errorf("Failed to create podInfo for pod, err: %s", err)
}
queuePodInfo := &framework.QueuedPodInfo{
Timestamp: time.Now(),
PodInfo: podInfo,
}
monitor.RecordNextPod(queuePodInfo)

monitor.StartMonitoring(pod)
state, ok := monitor.schedulingPods[pod.UID]
Expand All @@ -111,7 +121,7 @@ func TestSchedulerMonitor_StartAndCompleteMonitoring(t *testing.T) {
t.Errorf("Start time should not be zero after StartMonitoring")
}
time.Sleep(2 * timeout)
monitor.Complete(pod)
monitor.Complete(pod, framework.NewStatus(framework.Unschedulable, "node(s) is unschedulable"))
if _, exists := monitor.schedulingPods[pod.UID]; exists {
t.Errorf("Pod should be removed from schedulingPods after Complete")
}
Expand Down

0 comments on commit 1e0be0b

Please sign in to comment.