Skip to content

Commit

Permalink
koordlet: add taskids in statesinformer (#2057)
Browse files Browse the repository at this point in the history
Signed-off-by: Zhang Kang <[email protected]>
  • Loading branch information
kangclzjc authored Jun 20, 2024
1 parent 0db9540 commit ab5b62b
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 2 deletions.
27 changes: 25 additions & 2 deletions pkg/koordlet/statesinformer/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,37 @@ import (
)

type PodMeta struct {
Pod *corev1.Pod
CgroupDir string
Pod *corev1.Pod
CgroupDir string
ContainerTaskIds map[string][]int32
}

// DeepCopyContainerTaskIds creates a deep copy of ContainerTaskIds
func DeepCopyContainerTaskIds(in map[string][]int32) map[string][]int32 {
if in == nil {
return nil
}

out := make(map[string][]int32, len(in))

for key, value := range in {
if value == nil {
out[key] = nil
} else {
copyValue := make([]int32, len(value))
copy(copyValue, value)
out[key] = copyValue
}
}

return out
}

func (in *PodMeta) DeepCopy() *PodMeta {
out := new(PodMeta)
out.Pod = in.Pod.DeepCopy()
out.CgroupDir = in.CgroupDir
out.ContainerTaskIds = DeepCopyContainerTaskIds(in.ContainerTaskIds)
return out
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/koordlet/statesinformer/impl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Config struct {
DisableQueryKubeletConfig bool
EnableNodeMetricReport bool
MetricReportInterval time.Duration // Deprecated
EnablePodTaskIds bool
}

func NewDefaultConfig() *Config {
Expand All @@ -45,6 +46,7 @@ func NewDefaultConfig() *Config {
NodeTopologySyncInterval: 3 * time.Second,
DisableQueryKubeletConfig: false,
EnableNodeMetricReport: true,
EnablePodTaskIds: false,
}
}

Expand All @@ -58,4 +60,5 @@ func (c *Config) InitFlags(fs *flag.FlagSet) {
fs.BoolVar(&c.DisableQueryKubeletConfig, "disable-query-kubelet-config", c.DisableQueryKubeletConfig, "Disables querying the kubelet configuration from kubelet. Flag must be set to true if kubelet-insecure-tls=true is configured")
fs.DurationVar(&c.MetricReportInterval, "report-interval", c.MetricReportInterval, "Deprecated since v1.1, use ColocationStrategy.MetricReportIntervalSeconds in config map of slo-controller")
fs.BoolVar(&c.EnableNodeMetricReport, "enable-node-metric-report", c.EnableNodeMetricReport, "Enable status update of node metric crd.")
fs.BoolVar(&c.EnablePodTaskIds, "enable-pod-taskids", c.EnablePodTaskIds, "Enable pod taskids in statesinformer.")
}
5 changes: 5 additions & 0 deletions pkg/koordlet/statesinformer/impl/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestNewDefaultConfig(t *testing.T) {
DisableQueryKubeletConfig: false,
EnableNodeMetricReport: true,
MetricReportInterval: 0,
EnablePodTaskIds: false,
},
},
}
Expand All @@ -64,6 +65,7 @@ func TestConfig_InitFlags(t *testing.T) {
"--node-topology-sync-interval=10s",
"--disable-query-kubelet-config=true",
"--enable-node-metric-report=false",
"--enable-pod-taskids=true",
}
fs := flag.NewFlagSet(cmdArgs[0], flag.ExitOnError)

Expand All @@ -76,6 +78,7 @@ func TestConfig_InitFlags(t *testing.T) {
NodeTopologySyncInterval time.Duration
DisableQueryKubeletConfig bool
EnableNodeMetricReport bool
EnablePodTaskIds bool
}
type args struct {
fs *flag.FlagSet
Expand All @@ -96,6 +99,7 @@ func TestConfig_InitFlags(t *testing.T) {
NodeTopologySyncInterval: 10 * time.Second,
DisableQueryKubeletConfig: true,
EnableNodeMetricReport: false,
EnablePodTaskIds: true,
},
args: args{fs: fs},
},
Expand All @@ -111,6 +115,7 @@ func TestConfig_InitFlags(t *testing.T) {
NodeTopologySyncInterval: tt.fields.NodeTopologySyncInterval,
DisableQueryKubeletConfig: tt.fields.DisableQueryKubeletConfig,
EnableNodeMetricReport: tt.fields.EnableNodeMetricReport,
EnablePodTaskIds: tt.fields.EnablePodTaskIds,
}
c := NewDefaultConfig()
c.InitFlags(tt.args.fs)
Expand Down
73 changes: 73 additions & 0 deletions pkg/koordlet/statesinformer/impl/states_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
apiext "github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metrics"
"github.com/koordinator-sh/koordinator/pkg/koordlet/pleg"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
koordletutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
Expand All @@ -57,6 +58,7 @@ type podsInformer struct {
nodeInformer *nodeInformer

callbackRunner *callbackRunner
cgroupReader resourceexecutor.CgroupReader
}

func NewPodsInformer() *podsInformer {
Expand Down Expand Up @@ -85,6 +87,8 @@ func (s *podsInformer) Setup(ctx *PluginOption, states *PluginState) {
s.nodeInformer = nodeInformer

s.callbackRunner = states.callbackRunner

s.cgroupReader = resourceexecutor.NewCgroupReader()
}

func (s *podsInformer) Start(stopCh <-chan struct{}) {
Expand Down Expand Up @@ -141,6 +145,70 @@ func (s *podsInformer) GetAllPods() []*statesinformer.PodMeta {
return pods
}

func (s *podsInformer) getTaskIds(podMeta *statesinformer.PodMeta) {
pod := podMeta.Pod
containerMap := make(map[string]*corev1.Container, len(pod.Spec.Containers))
for i := range pod.Spec.Containers {
container := &pod.Spec.Containers[i]
containerMap[container.Name] = container
}

for _, containerStat := range pod.Status.ContainerStatuses {
container, exist := containerMap[containerStat.Name]
if !exist {
klog.Warningf("container %s/%s/%s lost during reconcile resctrl group", pod.Namespace,
pod.Name, containerStat.Name)
continue
}

containerDir, err := koordletutil.GetContainerCgroupParentDir(podMeta.CgroupDir, &containerStat)
if err != nil {
klog.V(4).Infof("failed to get pod container cgroup path for container %s/%s/%s, err: %s",
pod.Namespace, pod.Name, container.Name, err)
continue
}
ids, err := s.cgroupReader.ReadCPUTasks(containerDir)
if err == nil && ids != nil && podMeta.ContainerTaskIds != nil {
podMeta.ContainerTaskIds[containerStat.ContainerID] = ids
}
if err != nil && resourceexecutor.IsCgroupDirErr(err) {
klog.V(5).Infof("failed to read container task ids whose cgroup path %s does not exists, err: %s",
containerDir, err)
continue
} else if err != nil {
klog.Warningf("failed to get pod container cgroup task ids for container %s/%s/%s, err: %s",
pod.Namespace, pod.Name, container.Name, err)
continue
}
}

sandboxID, err := koordletutil.GetPodSandboxContainerID(pod)
if err != nil {
klog.V(4).Infof("failed to get sandbox container ID for pod %s/%s, err: %s",
pod.Namespace, pod.Name, err)
return
}
sandboxContainerDir, err := koordletutil.GetContainerCgroupParentDirByID(podMeta.CgroupDir, sandboxID)
if err != nil {
klog.V(4).Infof("failed to get pod container cgroup path for sandbox container %s/%s/%s, err: %s",
pod.Namespace, pod.Name, sandboxID, err)
return
}
ids, err := s.cgroupReader.ReadCPUTasks(sandboxContainerDir)
if err == nil && ids != nil && podMeta.ContainerTaskIds != nil {
podMeta.ContainerTaskIds[sandboxID] = ids
}
if err != nil && resourceexecutor.IsCgroupDirErr(err) {
klog.V(5).Infof("failed to read container task ids whose cgroup path %s does not exists, err: %s",
sandboxContainerDir, err)
return
} else if err != nil {
klog.Warningf("failed to get pod container cgroup task ids for sandbox container %s/%s/%s, err: %s",
pod.Namespace, pod.Name, sandboxID, err)
return
}
}

func (s *podsInformer) syncPods() error {
podList, err := s.kubelet.GetAllPods()

Expand All @@ -159,6 +227,11 @@ func (s *podsInformer) syncPods() error {
CgroupDir: genPodCgroupParentDir(pod),
}
newPodMap[string(pod.UID)] = podMeta
if s.config.EnablePodTaskIds && !util.IsPodTerminated(pod) {
podMeta.ContainerTaskIds = make(map[string][]int32)
// record pod's containers taskids
s.getTaskIds(podMeta)
}
// record pod container metrics
recordPodResourceMetrics(podMeta)
}
Expand Down
Loading

0 comments on commit ab5b62b

Please sign in to comment.