diff --git a/internal/batchscheduler/volcano/volcano.go b/internal/batchscheduler/volcano/volcano.go index 01649fc9..31ec8d74 100644 --- a/internal/batchscheduler/volcano/volcano.go +++ b/internal/batchscheduler/volcano/volcano.go @@ -241,6 +241,12 @@ func getDeploymentResources(spec *appsv1.DeploymentSpec) *corev1.ResourceRequire for i := int32(0); i < *spec.Replicas; i++ { tmResource := getPodResource(&spec.Template.Spec) addResourceRequirements(reqs, tmResource) + + for _, volume := range spec.Template.Spec.Volumes { + if volume.Ephemeral != nil && volume.Ephemeral.VolumeClaimTemplate != nil { + addResourceRequirements(reqs, &volume.Ephemeral.VolumeClaimTemplate.Spec.Resources) + } + } } return reqs } diff --git a/internal/batchscheduler/volcano/volcano_test.go b/internal/batchscheduler/volcano/volcano_test.go index 7623d757..0a2b1304 100644 --- a/internal/batchscheduler/volcano/volcano_test.go +++ b/internal/batchscheduler/volcano/volcano_test.go @@ -28,10 +28,13 @@ import ( "github.com/spotify/flink-on-k8s-operator/internal/model" ) -func TestGetClusterResource(t *testing.T) { - jmRep := int32(1) - replicas := int32(4) - desiredState := &model.DesiredClusterState{ +var ( + jmRep = int32(1) + replicas = int32(4) +) + +func getDesiredState() *model.DesiredClusterState { + return &model.DesiredClusterState{ JmStatefulSet: &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: "flinkjobcluster-sample-jobmanager", @@ -142,149 +145,319 @@ func TestGetClusterResource(t *testing.T) { }, }, }, - TmStatefulSet: &appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "flinkjobcluster-sample-taskmanager", - Namespace: "default", - Labels: map[string]string{ + } +} + +func TestGetClusterResource(t *testing.T) { + desiredState := getDesiredState() + desiredState.TmStatefulSet = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "flinkjobcluster-sample-taskmanager", + Namespace: "default", + Labels: map[string]string{ + "app": "flink", + "cluster": "flinkjobcluster-sample", + "component": "taskmanager", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + ServiceName: "flinkjobcluster-sample-taskmanager", + PodManagementPolicy: "Parallel", + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ "app": "flink", "cluster": "flinkjobcluster-sample", "component": "taskmanager", }, }, - Spec: appsv1.StatefulSetSpec{ - Replicas: &replicas, - ServiceName: "flinkjobcluster-sample-taskmanager", - PodManagementPolicy: "Parallel", - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ + VolumeClaimTemplates: []v1.PersistentVolumeClaim{{ + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: "tm-claim", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceStorage: resource.MustParse("100Gi"), + }, + }, + }, + }}, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ "app": "flink", "cluster": "flinkjobcluster-sample", "component": "taskmanager", }, + Annotations: map[string]string{ + "example.com": "example", + }, }, - VolumeClaimTemplates: []v1.PersistentVolumeClaim{{ - Spec: v1.PersistentVolumeClaimSpec{ - VolumeName: "tm-claim", - Resources: v1.ResourceRequirements{ - Requests: map[v1.ResourceName]resource.Quantity{ - v1.ResourceStorage: resource.MustParse("100Gi"), + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Name: "taskmanager", + Image: "flink:1.8.1", + Args: []string{"taskmanager"}, + Ports: []v1.ContainerPort{ + {Name: "data", ContainerPort: 6121}, + {Name: "rpc", ContainerPort: 6122}, + {Name: "query", ContainerPort: 6125}, + }, + Env: []v1.EnvVar{ + { + Name: "TASK_MANAGER_CPU_LIMIT", + ValueFrom: &v1.EnvVarSource{ + ResourceFieldRef: &v1.ResourceFieldSelector{ + ContainerName: "taskmanager", + Resource: "limits.cpu", + Divisor: resource.MustParse("1m"), + }, + }, + }, + { + Name: "TASK_MANAGER_MEMORY_LIMIT", + ValueFrom: &v1.EnvVarSource{ + ResourceFieldRef: &v1.ResourceFieldSelector{ + ContainerName: "taskmanager", + Resource: "limits.memory", + Divisor: resource.MustParse("1Mi"), + }, + }, + }, + {Name: "HADOOP_CONF_DIR", Value: "/etc/hadoop/conf"}, + { + Name: "GOOGLE_APPLICATION_CREDENTIALS", + Value: "/etc/gcp_service_account/gcp_service_account_key.json", + }, + { + Name: "FOO", + Value: "abc", + }, + }, + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("200m"), + v1.ResourceMemory: resource.MustParse("512Mi"), + }, + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("500m"), + v1.ResourceMemory: resource.MustParse("1Gi"), + }, + }, + VolumeMounts: []v1.VolumeMount{ + {Name: "cache-volume", MountPath: "/cache"}, + {Name: "flink-config-volume", MountPath: "/opt/flink/conf"}, + { + Name: "hadoop-config-volume", + MountPath: "/etc/hadoop/conf", + ReadOnly: true, + }, + { + Name: "gcp-service-account-volume", + MountPath: "/etc/gcp_service_account/", + ReadOnly: true, + }, }, }, + v1.Container{Name: "sidecar", Image: "alpine"}, }, - }}, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "app": "flink", - "cluster": "flinkjobcluster-sample", - "component": "taskmanager", + Volumes: []v1.Volume{ + { + Name: "cache-volume", + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{}, + }, }, - Annotations: map[string]string{ - "example.com": "example", + { + Name: "flink-config-volume", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "flinkjobcluster-sample-configmap", + }, + }, + }, }, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - v1.Container{ - Name: "taskmanager", - Image: "flink:1.8.1", - Args: []string{"taskmanager"}, - Ports: []v1.ContainerPort{ - {Name: "data", ContainerPort: 6121}, - {Name: "rpc", ContainerPort: 6122}, - {Name: "query", ContainerPort: 6125}, + { + Name: "hadoop-config-volume", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "hadoop-configmap", + }, }, - Env: []v1.EnvVar{ - { - Name: "TASK_MANAGER_CPU_LIMIT", - ValueFrom: &v1.EnvVarSource{ - ResourceFieldRef: &v1.ResourceFieldSelector{ - ContainerName: "taskmanager", - Resource: "limits.cpu", - Divisor: resource.MustParse("1m"), - }, + }, + }, + { + Name: "gcp-service-account-volume", + VolumeSource: v1.VolumeSource{ + Secret: &v1.SecretVolumeSource{ + SecretName: "gcp-service-account-secret", + }, + }, + }, + }, + }, + }, + }, + } + + res, size := getClusterResource(desiredState) + assert.Assert(t, size == 5) + assert.Assert(t, res.Requests.Memory().String() == "2304Mi") + assert.Assert(t, res.Requests.Cpu().MilliValue() == 900) + assert.Assert(t, res.Requests.Storage().String() == "400Gi") +} + +func TestGetClusterResourceForDeployment(t *testing.T) { + desiredState := getDesiredState() + desiredState.TmDeployment = &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "flinkjobcluster-sample-taskmanager", + Namespace: "default", + Labels: map[string]string{ + "app": "flink", + "cluster": "flinkjobcluster-sample", + "component": "taskmanager", + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "flink", + "cluster": "flinkjobcluster-sample", + "component": "taskmanager", + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "flink", + "cluster": "flinkjobcluster-sample", + "component": "taskmanager", + }, + Annotations: map[string]string{ + "example.com": "example", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Name: "taskmanager", + Image: "flink:1.8.1", + Args: []string{"taskmanager"}, + Ports: []v1.ContainerPort{ + {Name: "data", ContainerPort: 6121}, + {Name: "rpc", ContainerPort: 6122}, + {Name: "query", ContainerPort: 6125}, + }, + Env: []v1.EnvVar{ + { + Name: "TASK_MANAGER_CPU_LIMIT", + ValueFrom: &v1.EnvVarSource{ + ResourceFieldRef: &v1.ResourceFieldSelector{ + ContainerName: "taskmanager", + Resource: "limits.cpu", + Divisor: resource.MustParse("1m"), }, }, - { - Name: "TASK_MANAGER_MEMORY_LIMIT", - ValueFrom: &v1.EnvVarSource{ - ResourceFieldRef: &v1.ResourceFieldSelector{ - ContainerName: "taskmanager", - Resource: "limits.memory", - Divisor: resource.MustParse("1Mi"), - }, + }, + { + Name: "TASK_MANAGER_MEMORY_LIMIT", + ValueFrom: &v1.EnvVarSource{ + ResourceFieldRef: &v1.ResourceFieldSelector{ + ContainerName: "taskmanager", + Resource: "limits.memory", + Divisor: resource.MustParse("1Mi"), }, }, - {Name: "HADOOP_CONF_DIR", Value: "/etc/hadoop/conf"}, - { - Name: "GOOGLE_APPLICATION_CREDENTIALS", - Value: "/etc/gcp_service_account/gcp_service_account_key.json", - }, - { - Name: "FOO", - Value: "abc", - }, }, - Resources: v1.ResourceRequirements{ - Requests: map[v1.ResourceName]resource.Quantity{ - v1.ResourceCPU: resource.MustParse("200m"), - v1.ResourceMemory: resource.MustParse("512Mi"), - }, - Limits: map[v1.ResourceName]resource.Quantity{ - v1.ResourceCPU: resource.MustParse("500m"), - v1.ResourceMemory: resource.MustParse("1Gi"), - }, + {Name: "HADOOP_CONF_DIR", Value: "/etc/hadoop/conf"}, + { + Name: "GOOGLE_APPLICATION_CREDENTIALS", + Value: "/etc/gcp_service_account/gcp_service_account_key.json", }, - VolumeMounts: []v1.VolumeMount{ - {Name: "cache-volume", MountPath: "/cache"}, - {Name: "flink-config-volume", MountPath: "/opt/flink/conf"}, - { - Name: "hadoop-config-volume", - MountPath: "/etc/hadoop/conf", - ReadOnly: true, - }, - { - Name: "gcp-service-account-volume", - MountPath: "/etc/gcp_service_account/", - ReadOnly: true, - }, + { + Name: "FOO", + Value: "abc", }, }, - v1.Container{Name: "sidecar", Image: "alpine"}, - }, - Volumes: []v1.Volume{ - { - Name: "cache-volume", - VolumeSource: v1.VolumeSource{ - EmptyDir: &v1.EmptyDirVolumeSource{}, + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("200m"), + v1.ResourceMemory: resource.MustParse("512Mi"), + }, + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("500m"), + v1.ResourceMemory: resource.MustParse("1Gi"), }, }, - { - Name: "flink-config-volume", - VolumeSource: v1.VolumeSource{ - ConfigMap: &v1.ConfigMapVolumeSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: "flinkjobcluster-sample-configmap", - }, + VolumeMounts: []v1.VolumeMount{ + {Name: "cache-volume", MountPath: "/cache"}, + {Name: "flink-config-volume", MountPath: "/opt/flink/conf"}, + { + Name: "hadoop-config-volume", + MountPath: "/etc/hadoop/conf", + ReadOnly: true, + }, + { + Name: "gcp-service-account-volume", + MountPath: "/etc/gcp_service_account/", + ReadOnly: true, + }, + }, + }, + v1.Container{Name: "sidecar", Image: "alpine"}, + }, + Volumes: []v1.Volume{ + { + Name: "cache-volume", + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{}, + }, + }, + { + Name: "flink-config-volume", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "flinkjobcluster-sample-configmap", }, }, }, - { - Name: "hadoop-config-volume", - VolumeSource: v1.VolumeSource{ - ConfigMap: &v1.ConfigMapVolumeSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: "hadoop-configmap", - }, + }, + { + Name: "hadoop-config-volume", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "hadoop-configmap", }, }, }, - { - Name: "gcp-service-account-volume", - VolumeSource: v1.VolumeSource{ - Secret: &v1.SecretVolumeSource{ - SecretName: "gcp-service-account-secret", + }, + { + Name: "gcp-service-account-volume", + VolumeSource: v1.VolumeSource{ + Secret: &v1.SecretVolumeSource{ + SecretName: "gcp-service-account-secret", + }, + }, + }, + { + Name: "tm-claim", + VolumeSource: v1.VolumeSource{ + Ephemeral: &v1.EphemeralVolumeSource{ + VolumeClaimTemplate: &v1.PersistentVolumeClaimTemplate{ + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceStorage: resource.MustParse("100Gi"), + }, + }, + }, }, }, }, @@ -294,7 +467,6 @@ func TestGetClusterResource(t *testing.T) { }, }, } - res, size := getClusterResource(desiredState) assert.Assert(t, size == 5) assert.Assert(t, res.Requests.Memory().String() == "2304Mi")