diff --git a/controllers/flinkcluster/flinkcluster_converter.go b/controllers/flinkcluster/flinkcluster_converter.go index 28323cc5..cc8695db 100644 --- a/controllers/flinkcluster/flinkcluster_converter.go +++ b/controllers/flinkcluster/flinkcluster_converter.go @@ -112,7 +112,8 @@ func getDesiredClusterState(observed *ObservedClusterState) *model.DesiredCluste jobStatus := cluster.Status.Components.Job keepJobState := (shouldStopJob(cluster) || jobStatus.IsStopped()) && - (!shouldUpdateJob(observed) && !jobStatus.ShouldRestart(jobSpec)) + (!shouldUpdateJob(observed) && !jobStatus.ShouldRestart(jobSpec)) && + shouldCleanup(cluster, "Job") if !keepJobState { state.Job = newJob(cluster) @@ -933,10 +934,8 @@ func getJobManagerIngressHost(ingressHostFormat string, clusterName string) stri // Checks whether the component should be deleted according to the cleanup // policy. Always return false for session cluster. -func shouldCleanup( - cluster *v1beta1.FlinkCluster, component string) bool { +func shouldCleanup(cluster *v1beta1.FlinkCluster, component string) bool { var jobStatus = cluster.Status.Components.Job - // Session cluster. if jobStatus == nil { return false diff --git a/controllers/flinkcluster/flinkcluster_reconciler.go b/controllers/flinkcluster/flinkcluster_reconciler.go index 470ed0a9..74c46e74 100644 --- a/controllers/flinkcluster/flinkcluster_reconciler.go +++ b/controllers/flinkcluster/flinkcluster_reconciler.go @@ -579,6 +579,12 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) { var newControlStatus *v1beta1.FlinkClusterControlStatus defer reconciler.updateStatus(&newSavepointStatus, &newControlStatus) + observedSubmitter := observed.flinkJobSubmitter.job + + if desiredJob != nil && job.IsTerminated(jobSpec) { + return ctrl.Result{}, nil + } + // Create new Flink job submitter when starting new job, updating job or restarting job in failure. if desiredJob != nil && !job.IsActive() { log.Info("Deploying Flink job") @@ -604,7 +610,6 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) { log.Info("Failed to update the job status for job submission") return requeueResult, err } - var observedSubmitter = observed.flinkJobSubmitter.job if observedSubmitter != nil { log.Info("Found old job submitter") err = reconciler.deleteJob(observedSubmitter) @@ -655,17 +660,26 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) { } // Job cancel requested. Stop Flink job. - if desiredJob == nil && job.IsActive() { - log.Info("Stopping job", "jobID", jobID) - if err := reconciler.cancelRunningJobs(true /* takeSavepoint */); err != nil { + if desiredJob == nil { + if job.IsActive() { + userControl := getNewControlRequest(observed.cluster) + if userControl == v1beta1.ControlNameJobCancel { + newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress) + } + + log.Info("Stopping job", "jobID", jobID) + if err := reconciler.cancelRunningJobs(true /* takeSavepoint */); err != nil { + return requeueResult, err + } + return requeueResult, err } - var userControl = getNewControlRequest(observed.cluster) - if userControl == v1beta1.ControlNameJobCancel { - newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress) + if job.IsStopped() && observedSubmitter != nil { + if err := reconciler.deleteJob(observedSubmitter); err != nil { + return requeueResult, err + } } - return requeueResult, err } if job.IsStopped() { diff --git a/controllers/flinkcluster/flinkcluster_updater.go b/controllers/flinkcluster/flinkcluster_updater.go index 6124696e..924cb6ff 100644 --- a/controllers/flinkcluster/flinkcluster_updater.go +++ b/controllers/flinkcluster/flinkcluster_updater.go @@ -569,10 +569,10 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus { newJobState = v1beta1.JobStateUpdating case oldJob.ShouldRestart(jobSpec): newJobState = v1beta1.JobStateRestarting - case oldJob.IsPending() && oldJob.DeployTime != "": - newJobState = v1beta1.JobStateDeploying case oldJob.IsStopped(): newJobState = oldJob.State + case oldJob.IsPending() && oldJob.DeployTime != "": + newJobState = v1beta1.JobStateDeploying // Derive the job state from the observed Flink job, if it exists. case observedFlinkJob != nil: newJobState = getFlinkJobDeploymentState(observedFlinkJob.State) @@ -590,6 +590,8 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus { } else { newJobState = oldJob.State } + case shouldStopJob(observedCluster): + newJobState = v1beta1.JobStateCancelled // When Flink job not found in JobManager or JobManager is unavailable case isFlinkAPIReady(observed.flinkJob.list): if oldJob.State == v1beta1.JobStateRunning { @@ -615,6 +617,7 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus { newJobState = v1beta1.JobStateDeployFailed break } + newJobState = oldJob.State } // Update State