Skip to content

Commit

Permalink
Allow setting batch scheduler queue and priority class (#166)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored Dec 14, 2021
1 parent 93f1f1c commit ef07906
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 37 deletions.
24 changes: 23 additions & 1 deletion api/v1beta1/flinkcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,24 @@ type JobSpec struct {
Mode *JobMode `json:"mode,omitempty"`
}

type BatchSchedulerSpec struct {
Name string `json:"name"`

// Queue defines the queue in which resources will be allocates; if queue is
// not specified, resources will be allocated in the schedulers default queue.
// +optional
Queue string `json:"queue,omitempty"`

// If specified, indicates the PodGroup's priority. "system-node-critical" and
// "system-cluster-critical" are two special keywords which indicate the
// highest priorities with the former being the highest priority. Any other
// name must be defined by creating a PriorityClass object with that name.
// If not specified, the priority will be default or zero if there is no
// default.
// +optional
PriorityClassName string `json:"priorityClassName,omitempty"`
}

// FlinkClusterSpec defines the desired state of FlinkCluster
type FlinkClusterSpec struct {
// The version of Flink to be managed. This version must match the version in the image.
Expand All @@ -469,10 +487,14 @@ type FlinkClusterSpec struct {
// The service account assigned to JobManager, TaskManager and Job submitter Pods. If empty, the default service account in the namespace will be used.
ServiceAccountName *string `json:"serviceAccountName,omitempty"`

// BatchSchedulerName specifies the batch scheduler name for JobManager, TaskManager.
// Deprecated: BatchSchedulerName specifies the batch scheduler name for JobManager, TaskManager.
// If empty, no batch scheduling is enabled.
BatchSchedulerName *string `json:"batchSchedulerName,omitempty"`

// BatchScheduler specifies the batch scheduler for JobManager, TaskManager.
// If empty, no batch scheduling is enabled.
BatchScheduler *BatchSchedulerSpec `json:"batchScheduler,omitempty"`

// Flink JobManager spec.
JobManager JobManagerSpec `json:"jobManager"`

Expand Down
20 changes: 20 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ spec:
type: object
spec:
properties:
batchScheduler:
properties:
name:
type: string
priorityClassName:
type: string
queue:
type: string
required:
- name
type: object
batchSchedulerName:
type: string
envFrom:
Expand Down
64 changes: 40 additions & 24 deletions controllers/batchscheduler/volcano/volcano.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import (
)

const (
schedulerName = "volcano"
schedulerName = "volcano"
podGroupNameFormat = "flink-%s"
)

// volcano scheduler implements the BatchScheduler interface.
Expand Down Expand Up @@ -77,6 +78,7 @@ func (v *VolcanoBatchScheduler) setSchedulerMeta(pg *scheduling.PodGroup, state
setMeta := func(podTemplateSpec *corev1.PodTemplateSpec) {
if podTemplateSpec != nil {
podTemplateSpec.Spec.SchedulerName = v.Name()
podTemplateSpec.Spec.PriorityClassName = pg.Spec.PriorityClassName
if podTemplateSpec.Annotations == nil {
podTemplateSpec.Annotations = make(map[string]string)
}
Expand All @@ -96,7 +98,7 @@ func (v *VolcanoBatchScheduler) setSchedulerMeta(pg *scheduling.PodGroup, state
}

func (v *VolcanoBatchScheduler) getPodGroupName(cluster *v1beta1.FlinkCluster) string {
return fmt.Sprintf("flink-%s", cluster.Name)
return fmt.Sprintf(podGroupNameFormat, cluster.Name)
}

// Converts the FlinkCluster as owner reference for its child resources.
Expand All @@ -111,40 +113,54 @@ func newOwnerReference(flinkCluster *v1beta1.FlinkCluster) metav1.OwnerReference
}
}

func (v *VolcanoBatchScheduler) syncPodGroup(cluster *v1beta1.FlinkCluster, size int32, minResource corev1.ResourceList) (*scheduling.PodGroup, error) {
func (v *VolcanoBatchScheduler) getPodGroup(cluster *v1beta1.FlinkCluster) (*scheduling.PodGroup, error) {
podGroupName := v.getPodGroupName(cluster)
pg, err := v.volcanoClient.
return v.volcanoClient.
SchedulingV1beta1().
PodGroups(cluster.Namespace).
Get(context.TODO(), podGroupName, metav1.GetOptions{})
}

func (v *VolcanoBatchScheduler) createPodGroup(cluster *v1beta1.FlinkCluster, size int32, minResource corev1.ResourceList) (*scheduling.PodGroup, error) {
pg := scheduling.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Name: v.getPodGroupName(cluster),
OwnerReferences: []metav1.OwnerReference{newOwnerReference(cluster)},
},
Spec: scheduling.PodGroupSpec{
MinMember: size,
MinResources: &minResource,
Queue: cluster.Spec.BatchScheduler.Queue,
PriorityClassName: cluster.Spec.BatchScheduler.PriorityClassName,
},
}

return v.volcanoClient.
SchedulingV1beta1().
PodGroups(pg.Namespace).
Create(context.TODO(), &pg, metav1.CreateOptions{})
}

func (v *VolcanoBatchScheduler) updatePodGroup(pg *scheduling.PodGroup) (*scheduling.PodGroup, error) {
return v.volcanoClient.
SchedulingV1beta1().
PodGroups(pg.Namespace).
Update(context.TODO(), pg, metav1.UpdateOptions{})
}

func (v *VolcanoBatchScheduler) syncPodGroup(cluster *v1beta1.FlinkCluster, size int32, minResource corev1.ResourceList) (*scheduling.PodGroup, error) {
pg, err := v.getPodGroup(cluster)
if err != nil {
if !errors.IsNotFound(err) {
return nil, err
}
pg := scheduling.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Name: podGroupName,
OwnerReferences: []metav1.OwnerReference{newOwnerReference(cluster)},
},
Spec: scheduling.PodGroupSpec{
MinMember: size,
MinResources: &minResource,
},
}

return v.volcanoClient.
SchedulingV1beta1().
PodGroups(pg.Namespace).
Create(context.TODO(), &pg, metav1.CreateOptions{})
return v.createPodGroup(cluster, size, minResource)
}

if pg.Spec.MinMember != size {
pg.Spec.MinMember = size
return v.volcanoClient.
SchedulingV1beta1().
PodGroups(pg.Namespace).
Update(context.TODO(), pg, metav1.UpdateOptions{})
return v.updatePodGroup(pg)
}

return pg, nil
Expand Down
39 changes: 28 additions & 11 deletions controllers/flinkcluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,10 @@ func (reconciler *ClusterReconciler) reconcile() (ctrl.Result, error) {
if shouldUpdateCluster(&reconciler.observed) {
reconciler.log.Info("The cluster update is in progress")
}
// If batch-scheduling enabled
if reconciler.observed.cluster.Spec.BatchSchedulerName != nil &&
*reconciler.observed.cluster.Spec.BatchSchedulerName != "" {
scheduler, err := batchscheduler.GetScheduler(*reconciler.observed.cluster.Spec.BatchSchedulerName)
if err != nil {
return ctrl.Result{}, err
}

err = scheduler.Schedule(reconciler.observed.cluster, &reconciler.desired)
if err != nil {
return ctrl.Result{}, err
}
err = reconciler.reconcileBatchScheduler()
if err != nil {
return ctrl.Result{}, err
}

err = reconciler.reconcileConfigMap()
Expand Down Expand Up @@ -124,6 +116,31 @@ func (reconciler *ClusterReconciler) reconcile() (ctrl.Result, error) {
return result, nil
}

func (reconciler *ClusterReconciler) reconcileBatchScheduler() error {
var batchSchedulerName string
if reconciler.observed.cluster.Spec.BatchSchedulerName != nil {
batchSchedulerName = *reconciler.observed.cluster.Spec.BatchSchedulerName
}
if reconciler.observed.cluster.Spec.BatchScheduler != nil {
batchSchedulerName = reconciler.observed.cluster.Spec.BatchScheduler.Name
}
if batchSchedulerName == "" {
return nil
}

scheduler, err := batchscheduler.GetScheduler(batchSchedulerName)
if err != nil {
return err
}

err = scheduler.Schedule(reconciler.observed.cluster, &reconciler.desired)
if err != nil {
return err
}

return nil
}

func (reconciler *ClusterReconciler) reconcileJobManagerStatefulSet() error {
return reconciler.reconcileStatefulSet(
"JobManager",
Expand Down
14 changes: 13 additions & 1 deletion docs/crd.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ FlinkCluster
|__ pullPolicy
|__ pullSecrets
|__ batchSchedulerName
|__ batchScheduler
|__ name
|__ queue
|__ priorityClassName
|__ serviceAccountName
|__ jobManager
|__ accessScope
Expand Down Expand Up @@ -161,8 +165,16 @@ FlinkCluster
- **name** (required): Image name.
- **pullPolicy** (optional): Image pull policy.
- **pullSecrets** (optional): Secrets for image pull.
- **batchSchedulerName** (optional): BatchSchedulerName specifies the batch scheduler name for JobManager, TaskManager.
- **batchSchedulerName (deprecated)** (optional): BatchSchedulerName specifies the batch scheduler name for JobManager, TaskManager.
If empty, no batch scheduling is enabled.
- **batchScheduler** (optional): BatchScheduler specifies the batch scheduler properties for Job, JobManager, TaskManager.
If empty, no batch scheduling is enabled.
- **name** (required): Scheduler name.
- **queue** (optional): Queue name in which resources will be allocates.
- **priorityClassName** (optional): If specified, indicates the PodGroup's priority.
"system-node-critical" and "system-cluster-critical" are two special keywords which indicate the highest priorities with the former being the highest priority.
Any other name must be defined by creating a PriorityClass object with that name.
If not specified, the priority will be default or zero if there is no default.
- **serviceAccountName** (optional): the name of the service account(which must already exist in the namespace).
If empty, the default service account in the namespace will be used.
- **jobManager** (required): JobManager spec.
Expand Down

0 comments on commit ef07906

Please sign in to comment.