Skip to content

Commit

Permalink
Avoid spawning flink cluster on user control annotation (#376)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored Apr 20, 2022
1 parent cf74e89 commit dff92c5
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
7 changes: 3 additions & 4 deletions controllers/flinkcluster/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func getDesiredClusterState(observed *ObservedClusterState) *model.DesiredCluste
jobStatus := cluster.Status.Components.Job

keepJobState := (shouldStopJob(cluster) || jobStatus.IsStopped()) &&
(!shouldUpdateJob(observed) && !jobStatus.ShouldRestart(jobSpec))
(!shouldUpdateJob(observed) && !jobStatus.ShouldRestart(jobSpec)) &&
shouldCleanup(cluster, "Job")

if !keepJobState {
state.Job = newJob(cluster)
Expand Down Expand Up @@ -933,10 +934,8 @@ func getJobManagerIngressHost(ingressHostFormat string, clusterName string) stri

// Checks whether the component should be deleted according to the cleanup
// policy. Always return false for session cluster.
func shouldCleanup(
cluster *v1beta1.FlinkCluster, component string) bool {
func shouldCleanup(cluster *v1beta1.FlinkCluster, component string) bool {
var jobStatus = cluster.Status.Components.Job

// Session cluster.
if jobStatus == nil {
return false
Expand Down
30 changes: 22 additions & 8 deletions controllers/flinkcluster/flinkcluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,12 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) {
var newControlStatus *v1beta1.FlinkClusterControlStatus
defer reconciler.updateStatus(&newSavepointStatus, &newControlStatus)

observedSubmitter := observed.flinkJobSubmitter.job

if desiredJob != nil && job.IsTerminated(jobSpec) {
return ctrl.Result{}, nil
}

// Create new Flink job submitter when starting new job, updating job or restarting job in failure.
if desiredJob != nil && !job.IsActive() {
log.Info("Deploying Flink job")
Expand All @@ -604,7 +610,6 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) {
log.Info("Failed to update the job status for job submission")
return requeueResult, err
}
var observedSubmitter = observed.flinkJobSubmitter.job
if observedSubmitter != nil {
log.Info("Found old job submitter")
err = reconciler.deleteJob(observedSubmitter)
Expand Down Expand Up @@ -655,17 +660,26 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) {
}

// Job cancel requested. Stop Flink job.
if desiredJob == nil && job.IsActive() {
log.Info("Stopping job", "jobID", jobID)
if err := reconciler.cancelRunningJobs(true /* takeSavepoint */); err != nil {
if desiredJob == nil {
if job.IsActive() {
userControl := getNewControlRequest(observed.cluster)
if userControl == v1beta1.ControlNameJobCancel {
newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress)
}

log.Info("Stopping job", "jobID", jobID)
if err := reconciler.cancelRunningJobs(true /* takeSavepoint */); err != nil {
return requeueResult, err
}

return requeueResult, err
}

var userControl = getNewControlRequest(observed.cluster)
if userControl == v1beta1.ControlNameJobCancel {
newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress)
if job.IsStopped() && observedSubmitter != nil {
if err := reconciler.deleteJob(observedSubmitter); err != nil {
return requeueResult, err
}
}
return requeueResult, err
}

if job.IsStopped() {
Expand Down
7 changes: 5 additions & 2 deletions controllers/flinkcluster/flinkcluster_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,10 +569,10 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus {
newJobState = v1beta1.JobStateUpdating
case oldJob.ShouldRestart(jobSpec):
newJobState = v1beta1.JobStateRestarting
case oldJob.IsPending() && oldJob.DeployTime != "":
newJobState = v1beta1.JobStateDeploying
case oldJob.IsStopped():
newJobState = oldJob.State
case oldJob.IsPending() && oldJob.DeployTime != "":
newJobState = v1beta1.JobStateDeploying
// Derive the job state from the observed Flink job, if it exists.
case observedFlinkJob != nil:
newJobState = getFlinkJobDeploymentState(observedFlinkJob.State)
Expand All @@ -590,6 +590,8 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus {
} else {
newJobState = oldJob.State
}
case shouldStopJob(observedCluster):
newJobState = v1beta1.JobStateCancelled
// When Flink job not found in JobManager or JobManager is unavailable
case isFlinkAPIReady(observed.flinkJob.list):
if oldJob.State == v1beta1.JobStateRunning {
Expand All @@ -615,6 +617,7 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus {
newJobState = v1beta1.JobStateDeployFailed
break
}

newJobState = oldJob.State
}
// Update State
Expand Down

0 comments on commit dff92c5

Please sign in to comment.