diff --git a/pkg/scheduler/frameworkext/framework_extender.go b/pkg/scheduler/frameworkext/framework_extender.go index 16c661b0a..70fad37f4 100644 --- a/pkg/scheduler/frameworkext/framework_extender.go +++ b/pkg/scheduler/frameworkext/framework_extender.go @@ -344,7 +344,7 @@ func (ext *frameworkExtenderImpl) runPreBindExtensionPlugins(ctx context.Context func (ext *frameworkExtenderImpl) RunPostBindPlugins(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) { if ext.monitor != nil { - defer ext.monitor.Complete(pod) + defer ext.monitor.Complete(pod, nil) } ext.Framework.RunPostBindPlugins(ctx, state, pod, nodeName) } diff --git a/pkg/scheduler/frameworkext/framework_extender_factory.go b/pkg/scheduler/frameworkext/framework_extender_factory.go index 830b58609..e164fa87e 100644 --- a/pkg/scheduler/frameworkext/framework_extender_factory.go +++ b/pkg/scheduler/frameworkext/framework_extender_factory.go @@ -151,6 +151,7 @@ func (f *FrameworkExtenderFactory) InitScheduler(sched Scheduler) { } // Deep copy podInfo to allow pod modification during scheduling podInfo = podInfo.DeepCopy() + f.monitor.RecordNextPod(podInfo) return podInfo, nil } } @@ -192,7 +193,7 @@ func (f *FrameworkExtenderFactory) InterceptSchedulerError(sched *scheduler.Sche f.errorHandlerDispatcher.setDefaultHandler(sched.FailureHandler) sched.FailureHandler = func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time) { f.errorHandlerDispatcher.Error(ctx, fwk, podInfo, status, nominatingInfo, start) - f.monitor.Complete(podInfo.Pod) + f.monitor.Complete(podInfo.Pod, status) } } diff --git a/pkg/scheduler/frameworkext/scheduler_monitor.go b/pkg/scheduler/frameworkext/scheduler_monitor.go index 8bff747b2..3a7628d81 100644 --- a/pkg/scheduler/frameworkext/scheduler_monitor.go +++ b/pkg/scheduler/frameworkext/scheduler_monitor.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" "github.com/koordinator-sh/koordinator/pkg/scheduler/metrics" ) @@ -41,6 +42,12 @@ func init() { pflag.DurationVar(&schedulingTimeout, "scheduling-timeout", schedulingTimeout, "The maximum acceptable scheduling time interval. After timeout, the metric will be updated and the log will be printed.") } +var ( + StartMonitor = defaultStartMonitor + CompleteMonitor = defaultCompleteMonitor + RecordQueuePodInfo = func(podInfo *framework.QueuedPodInfo, state *podScheduleState) {} +) + type SchedulerMonitor struct { timeout time.Duration lock sync.Mutex @@ -48,10 +55,16 @@ type SchedulerMonitor struct { } type podScheduleState struct { - start time.Time namespace string name string schedulerName string + // scheduling info + start time.Time + // queue info + dequeued time.Time + lastEnqueued time.Time + attempts int + initialEnqueued *time.Time } func NewSchedulerMonitor(period time.Duration, timeout time.Duration) *SchedulerMonitor { @@ -73,33 +86,70 @@ func (m *SchedulerMonitor) monitor() { } } -func (m *SchedulerMonitor) StartMonitoring(pod *corev1.Pod) { - klog.Infof("start monitoring pod %v(%s)", klog.KObj(pod), pod.UID) +func (m *SchedulerMonitor) RecordNextPod(podInfo *framework.QueuedPodInfo) { + if podInfo == nil || podInfo.Pod == nil { + return + } + pod := podInfo.Pod + now := time.Now() + scheduleState := podScheduleState{ + namespace: pod.Namespace, + name: pod.Name, + schedulerName: pod.Spec.SchedulerName, + dequeued: now, + lastEnqueued: podInfo.Timestamp, + attempts: podInfo.Attempts, + initialEnqueued: podInfo.InitialAttemptTimestamp, + } + RecordQueuePodInfo(podInfo, &scheduleState) m.lock.Lock() - defer m.lock.Unlock() - m.schedulingPods[pod.UID] = podScheduleState{ - start: time.Now(), - namespace: pod.Namespace, - name: pod.Name, - schedulerName: pod.Spec.SchedulerName, - } + m.schedulingPods[pod.UID] = scheduleState + m.lock.Unlock() } -func (m *SchedulerMonitor) Complete(pod *corev1.Pod) { - klog.Infof("pod %v(%s) scheduled complete", klog.KObj(pod), pod.UID) +func (m *SchedulerMonitor) StartMonitoring(pod *corev1.Pod) { + now := time.Now() m.lock.Lock() - defer m.lock.Unlock() + scheduleState, exists := m.schedulingPods[pod.UID] + if !exists { + scheduleState = podScheduleState{ + start: now, + namespace: pod.Namespace, + name: pod.Name, + schedulerName: pod.Spec.SchedulerName, + } + } else { + scheduleState.start = now + } + m.schedulingPods[pod.UID] = scheduleState + m.lock.Unlock() + + StartMonitor(pod, &scheduleState) +} +func (m *SchedulerMonitor) Complete(pod *corev1.Pod, status *framework.Status) { + m.lock.Lock() state, ok := m.schedulingPods[pod.UID] + delete(m.schedulingPods, pod.UID) + m.lock.Unlock() + if ok { now := time.Now() - recordIfSchedulingTimeout(pod.UID, &state, now, m.timeout) - delete(m.schedulingPods, pod.UID) + CompleteMonitor(pod, &state, now, m.timeout, status) } } +func defaultStartMonitor(pod *corev1.Pod, state *podScheduleState) { + klog.Infof("start monitoring pod %v(%s)", klog.KObj(pod), pod.UID) +} + +func defaultCompleteMonitor(pod *corev1.Pod, state *podScheduleState, end time.Time, timeout time.Duration, status *framework.Status) { + klog.Infof("pod %v(%s) scheduled complete", klog.KObj(pod), pod.UID) + recordIfSchedulingTimeout(pod.UID, state, end, timeout) +} + func recordIfSchedulingTimeout(uid types.UID, state *podScheduleState, now time.Time, timeout time.Duration) { if interval := now.Sub(state.start); interval > timeout { logWarningF("!!!CRITICAL TIMEOUT!!! scheduling pod %s/%s(%s) took longer (%s) than the timeout %v", state.namespace, state.name, uid, interval, timeout) diff --git a/pkg/scheduler/frameworkext/scheduler_monitor_test.go b/pkg/scheduler/frameworkext/scheduler_monitor_test.go index 867798cbb..e73305b4f 100644 --- a/pkg/scheduler/frameworkext/scheduler_monitor_test.go +++ b/pkg/scheduler/frameworkext/scheduler_monitor_test.go @@ -26,6 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" ) func TestSchedulerMonitor_Timeout(t *testing.T) { @@ -56,7 +57,7 @@ func TestSchedulerMonitor_Timeout(t *testing.T) { if len(capturedLog) == 0 || !strings.Contains(capturedLog, "!!!CRITICAL TIMEOUT!!!") { t.Errorf("Expected a timeout log to be recorded, but got: %s", capturedLog) } - monitor.Complete(pod) + monitor.Complete(pod, nil) } func TestSchedulerMonitor_NoTimeout(t *testing.T) { @@ -86,7 +87,7 @@ func TestSchedulerMonitor_NoTimeout(t *testing.T) { if len(capturedLog) > 0 { t.Errorf("Expected no timeout log to be recorded, but got: %s", capturedLog) } - monitor.Complete(pod) + monitor.Complete(pod, nil) } func TestSchedulerMonitor_StartAndCompleteMonitoring(t *testing.T) { @@ -101,6 +102,15 @@ func TestSchedulerMonitor_StartAndCompleteMonitoring(t *testing.T) { UID: types.UID("test-uid"), }, } + podInfo, err := framework.NewPodInfo(pod) + if err != nil { + t.Errorf("Failed to create podInfo for pod, err: %s", err) + } + queuePodInfo := &framework.QueuedPodInfo{ + Timestamp: time.Now(), + PodInfo: podInfo, + } + monitor.RecordNextPod(queuePodInfo) monitor.StartMonitoring(pod) state, ok := monitor.schedulingPods[pod.UID] @@ -111,7 +121,7 @@ func TestSchedulerMonitor_StartAndCompleteMonitoring(t *testing.T) { t.Errorf("Start time should not be zero after StartMonitoring") } time.Sleep(2 * timeout) - monitor.Complete(pod) + monitor.Complete(pod, framework.NewStatus(framework.Unschedulable, "node(s) is unschedulable")) if _, exists := monitor.schedulingPods[pod.UID]; exists { t.Errorf("Pod should be removed from schedulingPods after Complete") }