diff --git a/api/v1beta1/flinkcluster_types.go b/api/v1beta1/flinkcluster_types.go index 72ab456a..723d593b 100644 --- a/api/v1beta1/flinkcluster_types.go +++ b/api/v1beta1/flinkcluster_types.go @@ -458,6 +458,24 @@ type JobSpec struct { Mode *JobMode `json:"mode,omitempty"` } +type BatchSchedulerSpec struct { + Name string `json:"name"` + + // Queue defines the queue in which resources will be allocates; if queue is + // not specified, resources will be allocated in the schedulers default queue. + // +optional + Queue string `json:"queue,omitempty"` + + // If specified, indicates the PodGroup's priority. "system-node-critical" and + // "system-cluster-critical" are two special keywords which indicate the + // highest priorities with the former being the highest priority. Any other + // name must be defined by creating a PriorityClass object with that name. + // If not specified, the priority will be default or zero if there is no + // default. + // +optional + PriorityClassName string `json:"priorityClassName,omitempty"` +} + // FlinkClusterSpec defines the desired state of FlinkCluster type FlinkClusterSpec struct { // The version of Flink to be managed. This version must match the version in the image. @@ -469,10 +487,14 @@ type FlinkClusterSpec struct { // The service account assigned to JobManager, TaskManager and Job submitter Pods. If empty, the default service account in the namespace will be used. ServiceAccountName *string `json:"serviceAccountName,omitempty"` - // BatchSchedulerName specifies the batch scheduler name for JobManager, TaskManager. + // Deprecated: BatchSchedulerName specifies the batch scheduler name for JobManager, TaskManager. // If empty, no batch scheduling is enabled. BatchSchedulerName *string `json:"batchSchedulerName,omitempty"` + // BatchScheduler specifies the batch scheduler for JobManager, TaskManager. + // If empty, no batch scheduling is enabled. + BatchScheduler *BatchSchedulerSpec `json:"batchScheduler,omitempty"` + // Flink JobManager spec. JobManager JobManagerSpec `json:"jobManager"` diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index 5c76f6aa..b4ecd0dd 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -26,6 +26,21 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BatchSchedulerSpec) DeepCopyInto(out *BatchSchedulerSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BatchSchedulerSpec. +func (in *BatchSchedulerSpec) DeepCopy() *BatchSchedulerSpec { + if in == nil { + return nil + } + out := new(BatchSchedulerSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CleanupPolicy) DeepCopyInto(out *CleanupPolicy) { *out = *in @@ -180,6 +195,11 @@ func (in *FlinkClusterSpec) DeepCopyInto(out *FlinkClusterSpec) { *out = new(string) **out = **in } + if in.BatchScheduler != nil { + in, out := &in.BatchScheduler, &out.BatchScheduler + *out = new(BatchSchedulerSpec) + **out = **in + } in.JobManager.DeepCopyInto(&out.JobManager) in.TaskManager.DeepCopyInto(&out.TaskManager) if in.Job != nil { diff --git a/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml b/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml index 4334d804..0a2a1dc9 100644 --- a/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml +++ b/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml @@ -38,6 +38,17 @@ spec: type: object spec: properties: + batchScheduler: + properties: + name: + type: string + priorityClassName: + type: string + queue: + type: string + required: + - name + type: object batchSchedulerName: type: string envFrom: diff --git a/controllers/batchscheduler/volcano/volcano.go b/controllers/batchscheduler/volcano/volcano.go index 8cf42662..e6d339a8 100644 --- a/controllers/batchscheduler/volcano/volcano.go +++ b/controllers/batchscheduler/volcano/volcano.go @@ -33,7 +33,8 @@ import ( ) const ( - schedulerName = "volcano" + schedulerName = "volcano" + podGroupNameFormat = "flink-%s" ) // volcano scheduler implements the BatchScheduler interface. @@ -77,6 +78,7 @@ func (v *VolcanoBatchScheduler) setSchedulerMeta(pg *scheduling.PodGroup, state setMeta := func(podTemplateSpec *corev1.PodTemplateSpec) { if podTemplateSpec != nil { podTemplateSpec.Spec.SchedulerName = v.Name() + podTemplateSpec.Spec.PriorityClassName = pg.Spec.PriorityClassName if podTemplateSpec.Annotations == nil { podTemplateSpec.Annotations = make(map[string]string) } @@ -96,7 +98,7 @@ func (v *VolcanoBatchScheduler) setSchedulerMeta(pg *scheduling.PodGroup, state } func (v *VolcanoBatchScheduler) getPodGroupName(cluster *v1beta1.FlinkCluster) string { - return fmt.Sprintf("flink-%s", cluster.Name) + return fmt.Sprintf(podGroupNameFormat, cluster.Name) } // Converts the FlinkCluster as owner reference for its child resources. @@ -111,40 +113,54 @@ func newOwnerReference(flinkCluster *v1beta1.FlinkCluster) metav1.OwnerReference } } -func (v *VolcanoBatchScheduler) syncPodGroup(cluster *v1beta1.FlinkCluster, size int32, minResource corev1.ResourceList) (*scheduling.PodGroup, error) { +func (v *VolcanoBatchScheduler) getPodGroup(cluster *v1beta1.FlinkCluster) (*scheduling.PodGroup, error) { podGroupName := v.getPodGroupName(cluster) - pg, err := v.volcanoClient. + return v.volcanoClient. SchedulingV1beta1(). PodGroups(cluster.Namespace). Get(context.TODO(), podGroupName, metav1.GetOptions{}) +} + +func (v *VolcanoBatchScheduler) createPodGroup(cluster *v1beta1.FlinkCluster, size int32, minResource corev1.ResourceList) (*scheduling.PodGroup, error) { + pg := scheduling.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: cluster.Namespace, + Name: v.getPodGroupName(cluster), + OwnerReferences: []metav1.OwnerReference{newOwnerReference(cluster)}, + }, + Spec: scheduling.PodGroupSpec{ + MinMember: size, + MinResources: &minResource, + Queue: cluster.Spec.BatchScheduler.Queue, + PriorityClassName: cluster.Spec.BatchScheduler.PriorityClassName, + }, + } + + return v.volcanoClient. + SchedulingV1beta1(). + PodGroups(pg.Namespace). + Create(context.TODO(), &pg, metav1.CreateOptions{}) +} + +func (v *VolcanoBatchScheduler) updatePodGroup(pg *scheduling.PodGroup) (*scheduling.PodGroup, error) { + return v.volcanoClient. + SchedulingV1beta1(). + PodGroups(pg.Namespace). + Update(context.TODO(), pg, metav1.UpdateOptions{}) +} + +func (v *VolcanoBatchScheduler) syncPodGroup(cluster *v1beta1.FlinkCluster, size int32, minResource corev1.ResourceList) (*scheduling.PodGroup, error) { + pg, err := v.getPodGroup(cluster) if err != nil { if !errors.IsNotFound(err) { return nil, err } - pg := scheduling.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: cluster.Namespace, - Name: podGroupName, - OwnerReferences: []metav1.OwnerReference{newOwnerReference(cluster)}, - }, - Spec: scheduling.PodGroupSpec{ - MinMember: size, - MinResources: &minResource, - }, - } - - return v.volcanoClient. - SchedulingV1beta1(). - PodGroups(pg.Namespace). - Create(context.TODO(), &pg, metav1.CreateOptions{}) + return v.createPodGroup(cluster, size, minResource) } if pg.Spec.MinMember != size { pg.Spec.MinMember = size - return v.volcanoClient. - SchedulingV1beta1(). - PodGroups(pg.Namespace). - Update(context.TODO(), pg, metav1.UpdateOptions{}) + return v.updatePodGroup(pg) } return pg, nil diff --git a/controllers/flinkcluster_reconciler.go b/controllers/flinkcluster_reconciler.go index 1edcc9f7..012cc851 100644 --- a/controllers/flinkcluster_reconciler.go +++ b/controllers/flinkcluster_reconciler.go @@ -72,18 +72,10 @@ func (reconciler *ClusterReconciler) reconcile() (ctrl.Result, error) { if shouldUpdateCluster(&reconciler.observed) { reconciler.log.Info("The cluster update is in progress") } - // If batch-scheduling enabled - if reconciler.observed.cluster.Spec.BatchSchedulerName != nil && - *reconciler.observed.cluster.Spec.BatchSchedulerName != "" { - scheduler, err := batchscheduler.GetScheduler(*reconciler.observed.cluster.Spec.BatchSchedulerName) - if err != nil { - return ctrl.Result{}, err - } - err = scheduler.Schedule(reconciler.observed.cluster, &reconciler.desired) - if err != nil { - return ctrl.Result{}, err - } + err = reconciler.reconcileBatchScheduler() + if err != nil { + return ctrl.Result{}, err } err = reconciler.reconcileConfigMap() @@ -124,6 +116,31 @@ func (reconciler *ClusterReconciler) reconcile() (ctrl.Result, error) { return result, nil } +func (reconciler *ClusterReconciler) reconcileBatchScheduler() error { + var batchSchedulerName string + if reconciler.observed.cluster.Spec.BatchSchedulerName != nil { + batchSchedulerName = *reconciler.observed.cluster.Spec.BatchSchedulerName + } + if reconciler.observed.cluster.Spec.BatchScheduler != nil { + batchSchedulerName = reconciler.observed.cluster.Spec.BatchScheduler.Name + } + if batchSchedulerName == "" { + return nil + } + + scheduler, err := batchscheduler.GetScheduler(batchSchedulerName) + if err != nil { + return err + } + + err = scheduler.Schedule(reconciler.observed.cluster, &reconciler.desired) + if err != nil { + return err + } + + return nil +} + func (reconciler *ClusterReconciler) reconcileJobManagerStatefulSet() error { return reconciler.reconcileStatefulSet( "JobManager", diff --git a/docs/crd.md b/docs/crd.md index b64dbc3f..bc0b4bed 100644 --- a/docs/crd.md +++ b/docs/crd.md @@ -17,6 +17,10 @@ FlinkCluster |__ pullPolicy |__ pullSecrets |__ batchSchedulerName + |__ batchScheduler + |__ name + |__ queue + |__ priorityClassName |__ serviceAccountName |__ jobManager |__ accessScope @@ -161,8 +165,16 @@ FlinkCluster - **name** (required): Image name. - **pullPolicy** (optional): Image pull policy. - **pullSecrets** (optional): Secrets for image pull. - - **batchSchedulerName** (optional): BatchSchedulerName specifies the batch scheduler name for JobManager, TaskManager. + - **batchSchedulerName (deprecated)** (optional): BatchSchedulerName specifies the batch scheduler name for JobManager, TaskManager. + If empty, no batch scheduling is enabled. + - **batchScheduler** (optional): BatchScheduler specifies the batch scheduler properties for Job, JobManager, TaskManager. If empty, no batch scheduling is enabled. + - **name** (required): Scheduler name. + - **queue** (optional): Queue name in which resources will be allocates. + - **priorityClassName** (optional): If specified, indicates the PodGroup's priority. + "system-node-critical" and "system-cluster-critical" are two special keywords which indicate the highest priorities with the former being the highest priority. + Any other name must be defined by creating a PriorityClass object with that name. + If not specified, the priority will be default or zero if there is no default. - **serviceAccountName** (optional): the name of the service account(which must already exist in the namespace). If empty, the default service account in the namespace will be used. - **jobManager** (required): JobManager spec.