diff --git a/controllers/flinkcluster/flinkcluster_converter.go b/controllers/flinkcluster/flinkcluster_converter.go index cb87fe09..0f882fea 100644 --- a/controllers/flinkcluster/flinkcluster_converter.go +++ b/controllers/flinkcluster/flinkcluster_converter.go @@ -29,6 +29,7 @@ import ( v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1" "github.com/spotify/flink-on-k8s-operator/internal/flink" "github.com/spotify/flink-on-k8s-operator/internal/model" + "github.com/spotify/flink-on-k8s-operator/internal/util" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" @@ -775,12 +776,12 @@ func convertFromSavepoint(jobSpec *v1beta1.JobSpec, jobStatus *v1beta1.JobStatus switch { // Creating for the first time case jobStatus == nil: - if !IsBlank(jobSpec.FromSavepoint) { + if !util.IsBlank(jobSpec.FromSavepoint) { return jobSpec.FromSavepoint } return nil // Updating with FromSavepoint provided - case revision.IsUpdateTriggered() && !IsBlank(jobSpec.FromSavepoint): + case revision.IsUpdateTriggered() && !util.IsBlank(jobSpec.FromSavepoint): return jobSpec.FromSavepoint // Latest savepoint case jobStatus.SavepointLocation != "": diff --git a/controllers/flinkcluster/flinkcluster_observer.go b/controllers/flinkcluster/flinkcluster_observer.go index 4ad5ed23..310a4623 100644 --- a/controllers/flinkcluster/flinkcluster_observer.go +++ b/controllers/flinkcluster/flinkcluster_observer.go @@ -26,6 +26,7 @@ import ( v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1" "github.com/spotify/flink-on-k8s-operator/internal/controllers/history" flink "github.com/spotify/flink-on-k8s-operator/internal/flink" + "github.com/spotify/flink-on-k8s-operator/internal/util" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -604,7 +605,7 @@ func (observer *ClusterStateObserver) syncRevisionStatus(observed *ObservedClust } // create a new revision from the current cluster - nextRevision, err := newRevision(cluster, getNextRevisionNumber(revisions), &collisionCount) + nextRevision, err := newRevision(cluster, util.GetNextRevisionNumber(revisions), &collisionCount) if err != nil { return err } @@ -675,7 +676,7 @@ func (observer *ClusterStateObserver) truncateHistory(observed *ObservedClusterS historyLimit = 10 } - nonLiveHistory := getNonLiveHistory(revisions, historyLimit) + nonLiveHistory := util.GetNonLiveHistory(revisions, historyLimit) // delete any non-live history to maintain the revision limit. for i := 0; i < len(nonLiveHistory); i++ { diff --git a/controllers/flinkcluster/flinkcluster_reconciler.go b/controllers/flinkcluster/flinkcluster_reconciler.go index 0d93caeb..5976b32f 100644 --- a/controllers/flinkcluster/flinkcluster_reconciler.go +++ b/controllers/flinkcluster/flinkcluster_reconciler.go @@ -34,6 +34,7 @@ import ( schedulerTypes "github.com/spotify/flink-on-k8s-operator/internal/batchscheduler/types" "github.com/spotify/flink-on-k8s-operator/internal/flink" "github.com/spotify/flink-on-k8s-operator/internal/model" + "github.com/spotify/flink-on-k8s-operator/internal/util" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" @@ -572,7 +573,7 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) { if recorded.Revision.IsUpdateTriggered() { log.Info("Preparing job update") var takeSavepoint = jobSpec.TakeSavepointOnUpdate == nil || *jobSpec.TakeSavepointOnUpdate - var shouldSuspend = takeSavepoint && IsBlank(jobSpec.FromSavepoint) + var shouldSuspend = takeSavepoint && util.IsBlank(jobSpec.FromSavepoint) if shouldSuspend { newSavepointStatus, err = reconciler.trySuspendJob() } else if shouldUpdateJob(&observed) { @@ -826,7 +827,7 @@ func (reconciler *ClusterReconciler) shouldTakeSavepoint() v1beta1.SavepointReas case jobSpec.AutoSavepointSeconds != nil: // When previous try was failed, check retry interval. if savepoint.IsFailed() && savepoint.TriggerReason == v1beta1.SavepointReasonScheduled { - var nextRetryTime = GetTime(savepoint.UpdateTime).Add(SavepointRetryIntervalSeconds * time.Second) + var nextRetryTime = util.GetTime(savepoint.UpdateTime).Add(SavepointRetryIntervalSeconds * time.Second) if time.Now().After(nextRetryTime) { return v1beta1.SavepointReasonScheduled } else { @@ -931,7 +932,7 @@ func (reconciler *ClusterReconciler) updateStatus( if controlStatus != nil { newStatus.Control = controlStatus } - SetTimestamp(&newStatus.LastUpdateTime) + util.SetTimestamp(&newStatus.LastUpdateTime) log.Info("Updating cluster status", "clusterClone", clusterClone, "newStatus", newStatus) statusUpdateErr = reconciler.k8sClient.Status().Update(reconciler.context, clusterClone) if statusUpdateErr == nil { @@ -966,8 +967,8 @@ func (reconciler *ClusterReconciler) updateJobDeployStatus() error { newJob.CompletionTime = nil // Mark as job submitter is deployed. - SetTimestamp(&newJob.DeployTime) - SetTimestamp(&clusterClone.Status.LastUpdateTime) + util.SetTimestamp(&newJob.DeployTime) + util.SetTimestamp(&clusterClone.Status.LastUpdateTime) // Latest savepoint location should be fromSavepoint. var fromSavepoint = getFromSavepoint(desiredJobSubmitter.Spec) @@ -992,7 +993,7 @@ func (reconciler *ClusterReconciler) getNewSavepointStatus(triggerID string, tri var jobID = reconciler.getFlinkJobID() var savepointState string var now string - SetTimestamp(&now) + util.SetTimestamp(&now) if triggerSuccess { savepointState = v1beta1.SavepointStateInProgress @@ -1014,7 +1015,7 @@ func (reconciler *ClusterReconciler) getNewSavepointStatus(triggerID string, tri // Convert raw time to object and add `addedSeconds` to it, // getting a time object for the parsed `rawTime` with `addedSeconds` added to it. func getTimeAfterAddedSeconds(rawTime string, addedSeconds int64) time.Time { - var tc = &TimeConverter{} + var tc = &util.TimeConverter{} var lastTriggerTime = time.Time{} if len(rawTime) != 0 { lastTriggerTime = tc.FromString(rawTime) diff --git a/controllers/flinkcluster/flinkcluster_updater.go b/controllers/flinkcluster/flinkcluster_updater.go index 964621e8..6124696e 100644 --- a/controllers/flinkcluster/flinkcluster_updater.go +++ b/controllers/flinkcluster/flinkcluster_updater.go @@ -31,6 +31,7 @@ import ( "github.com/go-logr/logr" v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1" + "github.com/spotify/flink-on-k8s-operator/internal/util" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" @@ -76,7 +77,7 @@ func (updater *ClusterStatusUpdater) updateStatusIfChanged() ( updater.observed.cluster.Status, "new", newStatus) updater.createStatusChangeEvents(oldStatus, newStatus) - var tc = &TimeConverter{} + var tc = &util.TimeConverter{} newStatus.LastUpdateTime = tc.ToString(time.Now()) return true, updater.updateClusterStatus(newStatus) } @@ -632,7 +633,7 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus { newJob.RestartCount++ } case newJob.State == v1beta1.JobStateRunning: - SetTimestamp(&newJob.StartTime) + util.SetTimestamp(&newJob.StartTime) newJob.CompletionTime = nil // When job started, the savepoint is not the final state of the job any more. if oldJob.FinalSavepoint { @@ -675,7 +676,7 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus { // Currently savepoint complete timestamp is not included in savepoints API response. // Whereas checkpoint API returns the timestamp latest_ack_timestamp. // Note: https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html#jobs-jobid-checkpoints-details-checkpointid - SetTimestamp(&newJob.SavepointTime) + util.SetTimestamp(&newJob.SavepointTime) } return newJob @@ -953,7 +954,7 @@ func deriveControlStatus( } // Update time when state changed. if c.State != v1beta1.ControlStateInProgress { - SetTimestamp(&c.UpdateTime) + util.SetTimestamp(&c.UpdateTime) } return c } @@ -981,10 +982,10 @@ func deriveRevisionStatus( } // Update revision status. - r.NextRevision = getRevisionWithNameNumber(observedRevision.nextRevision) + r.NextRevision = util.GetRevisionWithNameNumber(observedRevision.nextRevision) if r.CurrentRevision == "" { if recordedRevision.CurrentRevision == "" { - r.CurrentRevision = getRevisionWithNameNumber(observedRevision.currentRevision) + r.CurrentRevision = util.GetRevisionWithNameNumber(observedRevision.currentRevision) } else { r.CurrentRevision = recordedRevision.CurrentRevision } diff --git a/controllers/flinkcluster/flinkcluster_util.go b/controllers/flinkcluster/flinkcluster_util.go index 9d34d1aa..7b5b471e 100644 --- a/controllers/flinkcluster/flinkcluster_util.go +++ b/controllers/flinkcluster/flinkcluster_util.go @@ -18,10 +18,8 @@ package flinkcluster import ( "bytes" - "context" "encoding/json" "fmt" - "io" "os" "regexp" "strconv" @@ -29,6 +27,7 @@ import ( "time" "github.com/spotify/flink-on-k8s-operator/internal/flink" + "github.com/spotify/flink-on-k8s-operator/internal/util" v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1" "github.com/spotify/flink-on-k8s-operator/internal/controllers/history" @@ -125,40 +124,6 @@ func getSubmitterJobName(clusterName string) string { return clusterName + "-job-submitter" } -// TimeConverter converts between time.Time and string. -type TimeConverter struct{} - -// FromString converts string to time.Time. -func (tc *TimeConverter) FromString(timeStr string) time.Time { - timestamp, err := time.Parse( - time.RFC3339, timeStr) - if err != nil { - panic(fmt.Sprintf("Failed to parse time string: %s", timeStr)) - } - return timestamp -} - -// ToString converts time.Time to string. -func (tc *TimeConverter) ToString(timestamp time.Time) string { - return timestamp.Format(time.RFC3339) -} - -// SetTimestamp sets the current timestamp to the target. -func SetTimestamp(target *string) { - var tc = &TimeConverter{} - var now = time.Now() - *target = tc.ToString(now) -} - -func GetTime(timeStr string) time.Time { - var tc TimeConverter - return tc.FromString(timeStr) -} - -func IsBlank(s *string) bool { - return s == nil || strings.TrimSpace(*s) == "" -} - // Checks whether it is possible to take savepoint. func canTakeSavepoint(cluster *v1beta1.FlinkCluster) bool { var jobSpec = cluster.Spec.Job @@ -242,14 +207,6 @@ func getPatch(cluster *v1beta1.FlinkCluster) ([]byte, error) { return patch, err } -func getNextRevisionNumber(revisions []*appsv1.ControllerRevision) int64 { - count := len(revisions) - if count <= 0 { - return 1 - } - return revisions[count-1].Revision + 1 -} - func getCurrentRevisionName(r *v1beta1.RevisionStatus) string { return r.CurrentRevision[:strings.LastIndex(r.CurrentRevision, "-")] } @@ -258,11 +215,6 @@ func getNextRevisionName(r *v1beta1.RevisionStatus) string { return r.NextRevision[:strings.LastIndex(r.NextRevision, "-")] } -// Compose revision in FlinkClusterStatus with name and number of ControllerRevision -func getRevisionWithNameNumber(cr *appsv1.ControllerRevision) string { - return fmt.Sprintf("%v-%v", cr.Name, cr.Revision) -} - func getRetryCount(data map[string]string) (string, error) { var err error var retries, ok = data["retries"] @@ -292,7 +244,7 @@ func getControlStatus(controlName string, state string) *v1beta1.FlinkClusterCon var controlStatus = new(v1beta1.FlinkClusterControlStatus) controlStatus.Name = controlName controlStatus.State = state - SetTimestamp(&controlStatus.UpdateTime) + util.SetTimestamp(&controlStatus.UpdateTime) return controlStatus } @@ -359,7 +311,7 @@ func isUserControlFinished(controlStatus *v1beta1.FlinkClusterControlStatus) boo // Check time has passed func hasTimeElapsed(timeToCheckStr string, now time.Time, intervalSec int) bool { - tc := &TimeConverter{} + tc := &util.TimeConverter{} timeToCheck := tc.FromString(timeToCheckStr) intervalPassedTime := timeToCheck.Add(time.Duration(int64(intervalSec) * int64(time.Second))) return now.After(intervalPassedTime) @@ -489,20 +441,6 @@ func shouldUpdateCluster(observed *ObservedClusterState) bool { return !job.IsActive() && observed.updateState == UpdateStateInProgress } -func getNonLiveHistory(revisions []*appsv1.ControllerRevision, historyLimit int) []*appsv1.ControllerRevision { - - history := append([]*appsv1.ControllerRevision{}, revisions...) - nonLiveHistory := make([]*appsv1.ControllerRevision, 0) - - historyLen := len(history) - if historyLen <= historyLimit { - return nonLiveHistory - } - - nonLiveHistory = append(nonLiveHistory, history[:(historyLen-historyLimit)]...) - return nonLiveHistory -} - func getFlinkJobDeploymentState(flinkJobState string) string { switch flinkJobState { case "INITIALIZING", "CREATED", "RUNNING", "FAILING", "CANCELLING", "RESTARTING", "RECONCILING", "SUSPENDED": @@ -518,32 +456,9 @@ func getFlinkJobDeploymentState(flinkJobState string) string { } } -func getPodLogs(clientset *kubernetes.Clientset, pod *corev1.Pod) (string, error) { - if pod == nil { - return "", fmt.Errorf("no job pod found, even though submission completed") - } - pods := clientset.CoreV1().Pods(pod.Namespace) - - req := pods.GetLogs(pod.Name, &corev1.PodLogOptions{}) - podLogs, err := req.Stream(context.TODO()) - if err != nil { - return "", fmt.Errorf("failed to get logs for pod %s: %v", pod.Name, err) - } - defer podLogs.Close() - - buf := new(bytes.Buffer) - _, err = io.Copy(buf, podLogs) - if err != nil { - return "", fmt.Errorf("error in copy information from pod logs to buf") - } - str := buf.String() - - return str, nil -} - // getFlinkJobSubmitLog extract logs from the job submitter pod. func getFlinkJobSubmitLog(clientset *kubernetes.Clientset, observedPod *corev1.Pod) (*SubmitterLog, error) { - log, err := getPodLogs(clientset, observedPod) + log, err := util.GetPodLogs(clientset, observedPod) if err != nil { return nil, err } diff --git a/controllers/flinkcluster/flinkcluster_util_test.go b/controllers/flinkcluster/flinkcluster_util_test.go index 87997aa2..ae341483 100644 --- a/controllers/flinkcluster/flinkcluster_util_test.go +++ b/controllers/flinkcluster/flinkcluster_util_test.go @@ -29,11 +29,12 @@ import ( "k8s.io/apimachinery/pkg/runtime" v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1" + "github.com/spotify/flink-on-k8s-operator/internal/util" "gotest.tools/v3/assert" ) func TestTimeConverter(t *testing.T) { - var tc = &TimeConverter{} + var tc = &util.TimeConverter{} var str1 = "2019-10-23T05:10:36Z" var tm1 = tc.FromString(str1) @@ -199,11 +200,11 @@ func TestCanTakeSavepoint(t *testing.T) { func TestGetNextRevisionNumber(t *testing.T) { var revisions []*appsv1.ControllerRevision - var nextRevision = getNextRevisionNumber(revisions) + var nextRevision = util.GetNextRevisionNumber(revisions) assert.Equal(t, nextRevision, int64(1)) revisions = []*appsv1.ControllerRevision{{Revision: 1}, {Revision: 2}} - nextRevision = getNextRevisionNumber(revisions) + nextRevision = util.GetNextRevisionNumber(revisions) assert.Equal(t, nextRevision, int64(3)) } @@ -308,7 +309,7 @@ func TestGetUpdateState(t *testing.T) { } func TestHasTimeElapsed(t *testing.T) { - var tc = &TimeConverter{} + var tc = &util.TimeConverter{} var timeToCheckStr = "2020-01-01T00:00:00+00:00" var timeToCompare = tc.FromString("2020-01-01T00:00:20+00:00") var elapsed = hasTimeElapsed(timeToCheckStr, timeToCompare, 10) @@ -348,12 +349,12 @@ func TestGetNonLiveHistory(t *testing.T) { revisions := []*appsv1.ControllerRevision{&revison0, &revison1} historyLimit := 1 - nonLiveHistory := getNonLiveHistory(revisions, historyLimit) + nonLiveHistory := util.GetNonLiveHistory(revisions, historyLimit) assert.Equal(t, len(nonLiveHistory), 1) assert.Equal(t, nonLiveHistory[0].Revision, int64(0)) historyLimit = 3 - nonLiveHistory = getNonLiveHistory(revisions, historyLimit) + nonLiveHistory = util.GetNonLiveHistory(revisions, historyLimit) assert.Equal(t, len(nonLiveHistory), 0) } diff --git a/internal/util/kube.go b/internal/util/kube.go new file mode 100644 index 00000000..fb5f2acb --- /dev/null +++ b/internal/util/kube.go @@ -0,0 +1,62 @@ +package util + +import ( + "bytes" + "context" + "fmt" + "io" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" +) + +func GetPodLogs(clientset *kubernetes.Clientset, pod *corev1.Pod) (string, error) { + if pod == nil { + return "", fmt.Errorf("no job pod found, even though submission completed") + } + pods := clientset.CoreV1().Pods(pod.Namespace) + + req := pods.GetLogs(pod.Name, &corev1.PodLogOptions{}) + podLogs, err := req.Stream(context.TODO()) + if err != nil { + return "", fmt.Errorf("failed to get logs for pod %s: %v", pod.Name, err) + } + defer podLogs.Close() + + buf := new(bytes.Buffer) + _, err = io.Copy(buf, podLogs) + if err != nil { + return "", fmt.Errorf("error in copy information from pod logs to buf") + } + str := buf.String() + + return str, nil +} + +func GetNextRevisionNumber(revisions []*appsv1.ControllerRevision) int64 { + count := len(revisions) + if count <= 0 { + return 1 + } + return revisions[count-1].Revision + 1 +} + +// Compose revision in FlinkClusterStatus with name and number of ControllerRevision +func GetRevisionWithNameNumber(cr *appsv1.ControllerRevision) string { + return fmt.Sprintf("%v-%v", cr.Name, cr.Revision) +} + +func GetNonLiveHistory(revisions []*appsv1.ControllerRevision, historyLimit int) []*appsv1.ControllerRevision { + + history := append([]*appsv1.ControllerRevision{}, revisions...) + nonLiveHistory := make([]*appsv1.ControllerRevision, 0) + + historyLen := len(history) + if historyLen <= historyLimit { + return nonLiveHistory + } + + nonLiveHistory = append(nonLiveHistory, history[:(historyLen-historyLimit)]...) + return nonLiveHistory +} diff --git a/internal/util/strings.go b/internal/util/strings.go new file mode 100644 index 00000000..37ea5c30 --- /dev/null +++ b/internal/util/strings.go @@ -0,0 +1,7 @@ +package util + +import "strings" + +func IsBlank(s *string) bool { + return s == nil || strings.TrimSpace(*s) == "" +} diff --git a/internal/util/time.go b/internal/util/time.go new file mode 100644 index 00000000..64e25a3b --- /dev/null +++ b/internal/util/time.go @@ -0,0 +1,36 @@ +package util + +import ( + "fmt" + "time" +) + +// TimeConverter converts between time.Time and string. +type TimeConverter struct{} + +// FromString converts string to time.Time. +func (tc *TimeConverter) FromString(timeStr string) time.Time { + timestamp, err := time.Parse( + time.RFC3339, timeStr) + if err != nil { + panic(fmt.Sprintf("Failed to parse time string: %s", timeStr)) + } + return timestamp +} + +// ToString converts time.Time to string. +func (tc *TimeConverter) ToString(timestamp time.Time) string { + return timestamp.Format(time.RFC3339) +} + +// SetTimestamp sets the current timestamp to the target. +func SetTimestamp(target *string) { + var tc = &TimeConverter{} + var now = time.Now() + *target = tc.ToString(now) +} + +func GetTime(timeStr string) time.Time { + var tc TimeConverter + return tc.FromString(timeStr) +}