Skip to content

Commit

Permalink
issue-11296 - WIP - Cleaned unnecessary changes in resource_manager.go
Browse files Browse the repository at this point in the history
Signed-off-by: Helber Belmiro <[email protected]>
  • Loading branch information
hbelmiro committed Dec 17, 2024
1 parent b3ef996 commit 8b85284
Showing 1 changed file with 97 additions and 142 deletions.
239 changes: 97 additions & 142 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"fmt"
swfapi "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
"io"
"net"
"strconv"
Expand Down Expand Up @@ -471,16 +470,11 @@ func (r *ResourceManager) GetPipelineLatestTemplate(pipelineId string) ([]byte,
func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model.Run, error) {
// Create a template based on the manifest of an existing pipeline version or used-provided manifest.
// Update the run.PipelineSpec if an existing pipeline version is used.
manifest, err := r.fetchManifestFromPipelineSpec(&run.PipelineSpec)
tmpl, manifest, err := r.fetchTemplateFromPipelineSpec(&run.PipelineSpec)
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to create a run due to error fetching manifest")
}

tmpl, err := r.fetchTemplateFromPipelineSpec(manifest)
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to create a run due to error fetching template")
}

// TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB).
// Proposed flow:
// 1. Create an entry and assign creation timestamp and uuid.
Expand Down Expand Up @@ -573,6 +567,82 @@ func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model
return newRun, nil
}

func (r *ResourceManager) RefreshSwf(ctx context.Context) error {
filterContext := &model.FilterContext{
ReferenceKey: &model.ReferenceKey{Type: model.NamespaceResourceType, ID: "kubeflow"},
}

opts := list.EmptyOptions()

jobs, _, _, err := r.jobStore.ListJobs(filterContext, opts)

if err != nil {
return util.Wrap(err, "Failed to refresh ScheduledWorkflow Kubernetes resources")
}

for i := 0; i < len(jobs); i++ {
tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec)
if err != nil {
return failedToRefreshSwfCrError(err)
}

scheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i])
if err != nil {
return failedToRefreshSwfCrError(err)
}

err = r.patchSwfKubernetesResource(ctx, jobs[i].Namespace, jobs[i].K8SName, scheduledWorkflow.Spec.Workflow.Spec)
if err != nil {
if util.IsNotFound(errors.Cause(err)) {
continue
}
return failedToRefreshSwfCrError(err)
}
}

return nil
}

func failedToRefreshSwfCrError(err error) error {
return util.Wrap(err, "Failed to refresh ScheduledWorkflow Kubernetes resources")
}

func (r *ResourceManager) patchSwfKubernetesResource(ctx context.Context, k8sNamespace string, crdName string, newSpec interface{}) error {
if k8sNamespace == "" {
k8sNamespace = common.GetPodNamespace()
}
if k8sNamespace == "" {
return errors.New("Namespace cannot be empty when deleting a ScheduledWorkflow Kubernetes resource.")
}

patchPayload := map[string]interface{}{
"spec": map[string]interface{}{
"workflow": map[string]interface{}{
"spec": newSpec,
},
},
}

patchBytes, err := json.Marshal(patchPayload)
if err != nil {
return util.NewInternalServerError(err,
"Failed to marshal patch spec")
}

_, err = r.getScheduledWorkflowClient(k8sNamespace).Patch(
ctx,
crdName,
types.MergePatchType,
patchBytes,
)
if err != nil {
return util.NewInternalServerError(err,
"Failed to patch ScheduledWorkflow")
}

return nil
}

// Fetches a run with a given id.
func (r *ResourceManager) GetRun(runId string) (*model.Run, error) {
run, err := r.runStore.GetRun(runId)
Expand Down Expand Up @@ -970,151 +1040,39 @@ func (r *ResourceManager) fetchPipelineVersionFromPipelineSpec(pipelineSpec mode
return nil, nil
}

func (r *ResourceManager) RefreshSwf(ctx context.Context) error {
filterContext := &model.FilterContext{
ReferenceKey: &model.ReferenceKey{Type: model.NamespaceResourceType, ID: "kubeflow"},
}

opts := list.EmptyOptions()

jobs, _, _, err := r.jobStore.ListJobs(filterContext, opts)

if err != nil {
return util.Wrap(err, "Failed to refresh ScheduledWorkflow Kubernetes resources")
}

for i := 0; i < len(jobs); i++ {
manifest, err := r.fetchManifestFromPipelineSpec(&jobs[i].PipelineSpec)
if err != nil {
return failedToRefreshSwfCrError(err)
}

tmpl, err := r.fetchTemplateFromPipelineSpec(manifest)
if err != nil {
return failedToRefreshSwfCrError(err)
}

scheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i])
if err != nil {
return failedToRefreshSwfCrError(err)
}

err = r.patchSwfKubernetesResource(ctx, jobs[i].Namespace, jobs[i].K8SName, scheduledWorkflow.Spec.Workflow.Spec)
if err != nil {
if util.IsNotFound(errors.Cause(err)) {
continue
}
return failedToRefreshSwfCrError(err)
}
}

return nil
}

func failedToRefreshSwfCrError(err error) error {
return util.Wrap(err, "Failed to refresh ScheduledWorkflow Kubernetes resources")
}

// Creates a recurring run.
// Manifest's namespace gets overwritten with the job.Namespace if the later is non-empty.
// Otherwise, job.Namespace gets overwritten by the manifest.
func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model.Job, error) {
tmpl, manifest, newScheduledWorkflow, err := r.createSwfKubernetesResource(ctx, job)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
return nil, util.NewUnavailableServerError(err, "Failed to create a recurring run during scheduling a workflow - try again later")
}
return nil, util.Wrap(err, "Failed to create a recurring run during scheduling a workflow")
}
r.completeModelJob(job, newScheduledWorkflow, manifest, tmpl.GetTemplateType())
return r.jobStore.CreateJob(job)
}

func (r *ResourceManager) deleteSwfKubernetesResource(ctx context.Context, k8sNamespace string, crdName string) error {
if k8sNamespace == "" {
k8sNamespace = common.GetPodNamespace()
}
if k8sNamespace == "" {
return errors.New("Namespace cannot be empty when deleting a ScheduledWorkflow Kubernetes resource.")
}

err := r.getScheduledWorkflowClient(k8sNamespace).Delete(ctx, crdName, &v1.DeleteOptions{})
if err != nil {
return util.Wrap(err, "Error while deleting the ScheduledWorkflow Kubernetes resource named "+crdName)
}

return nil
}

func (r *ResourceManager) patchSwfKubernetesResource(ctx context.Context, k8sNamespace string, crdName string, newSpec interface{}) error {
if k8sNamespace == "" {
k8sNamespace = common.GetPodNamespace()
}
if k8sNamespace == "" {
return errors.New("Namespace cannot be empty when deleting a ScheduledWorkflow Kubernetes resource.")
}

patchPayload := map[string]interface{}{
"spec": map[string]interface{}{
"workflow": map[string]interface{}{
"spec": newSpec,
},
},
}

patchBytes, err := json.Marshal(patchPayload)
if err != nil {
return util.NewInternalServerError(err,
"Failed to marshal patch spec")
}

_, err = r.getScheduledWorkflowClient(k8sNamespace).Patch(
ctx,
crdName,
types.MergePatchType,
patchBytes,
)
if err != nil {
return util.NewInternalServerError(err,
"Failed to patch ScheduledWorkflow")
}

return nil
}

func (r *ResourceManager) createSwfKubernetesResource(ctx context.Context, job *model.Job) (template.Template, string, *swfapi.ScheduledWorkflow, error) {
// Create a template based on the manifest of an existing pipeline version or used-provided manifest.
// Update the job.PipelineSpec if an existing pipeline version is used.
manifest, err := r.fetchManifestFromPipelineSpec(&job.PipelineSpec)
if err != nil {
return nil, "", nil, util.NewInternalServerError(err, "Failed to create a recurring run with an invalid pipeline spec manifest")
}

tmpl, err := r.fetchTemplateFromPipelineSpec(manifest)
tmpl, manifest, err := r.fetchTemplateFromPipelineSpec(&job.PipelineSpec)
if err != nil {
return nil, "", nil, util.NewInternalServerError(err, "Failed to create a ScheduledWorkflow Kubernetes resource")
return nil, util.NewInternalServerError(err, "Failed to create a recurring run with an invalid pipeline spec manifest")
}

// TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB).
// Convert modelJob into scheduledWorkflow.
scheduledWorkflow, err := tmpl.ScheduledWorkflow(job)
if err != nil {
return nil, "", nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation")
return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation")
}
// Create a new ScheduledWorkflow at the ScheduledWorkflow client.
k8sNamespace := job.Namespace
if k8sNamespace == "" {
k8sNamespace = common.GetPodNamespace()
}
if k8sNamespace == "" {
return nil, "", nil, util.NewInternalServerError(util.NewInvalidInputError("Namespace cannot be empty when creating an Argo scheduled workflow. Check if you have specified POD_NAMESPACE or try adding the parent namespace to the request"), "Failed to create a recurring run due to empty namespace")
return nil, util.NewInternalServerError(util.NewInvalidInputError("Namespace cannot be empty when creating an Argo scheduled workflow. Check if you have specified POD_NAMESPACE or try adding the parent namespace to the request"), "Failed to create a recurring run due to empty namespace")
}
newScheduledWorkflow, err := r.getScheduledWorkflowClient(k8sNamespace).Create(ctx, scheduledWorkflow)
return tmpl, manifest, newScheduledWorkflow, err
}

// Complete modelJob with info coming back from ScheduledWorkflow client.
func (r *ResourceManager) completeModelJob(job *model.Job, newScheduledWorkflow *swfapi.ScheduledWorkflow, manifest string, templateType template.TemplateType) {
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
return nil, util.NewUnavailableServerError(err, "Failed to create a recurring run during scheduling a workflow - try again later")
}
return nil, util.Wrap(err, "Failed to create a recurring run during scheduling a workflow")
}
// Complete modelJob with info coming back from ScheduledWorkflow client.
swf := util.NewScheduledWorkflow(newScheduledWorkflow)
job.UUID = string(swf.UID)
job.K8SName = swf.Name
Expand All @@ -1132,11 +1090,12 @@ func (r *ResourceManager) completeModelJob(job *model.Job, newScheduledWorkflow
}
}
job.ServiceAccount = serviceAccount
if templateType == template.V1 {
if tmpl.GetTemplateType() == template.V1 {
job.PipelineSpec.WorkflowSpecManifest = manifest
} else {
job.PipelineSpec.PipelineSpecManifest = manifest
}
return r.jobStore.CreateJob(job)
}

// Enables or disables a recurring run with given id.
Expand Down Expand Up @@ -1432,15 +1391,15 @@ func (r *ResourceManager) ReportScheduledWorkflowResource(swf *util.ScheduledWor
return r.jobStore.UpdateJob(swf)
}

// Returns the manifest in the following priority:
// Returns a workflow template based on the manifest in the following priority:
// 1. Pipeline spec manifest from an existing pipeline version,
// 2. Pipeline spec manifest or workflow spec manifest provided by a user.
// If an existing pipeline version is found, the referenced pipeline and pipeline version are updated.
func (r *ResourceManager) fetchManifestFromPipelineSpec(pipelineSpec *model.PipelineSpec) (string, error) {
func (r *ResourceManager) fetchTemplateFromPipelineSpec(pipelineSpec *model.PipelineSpec) (template.Template, string, error) {
manifest := ""
pipelineVersion, err := r.fetchPipelineVersionFromPipelineSpec(*pipelineSpec)
if err != nil {
return "", util.Wrapf(err, "Failed to fetch a template due to error retrieving pipeline version")
return nil, "", util.Wrapf(err, "Failed to fetch a template due to error retrieving pipeline version")
} else if pipelineVersion != nil {
// Update references to the existing pipeline version
pipelineSpec.PipelineId = pipelineVersion.PipelineId
Expand All @@ -1449,7 +1408,7 @@ func (r *ResourceManager) fetchManifestFromPipelineSpec(pipelineSpec *model.Pipe
// Fetch the template from PipelineSpec field or the corresponding YAML file
tempBytes, _, err := r.fetchTemplateFromPipelineVersion(pipelineVersion)
if err != nil {
return "", util.Wrapf(err, "Failed to fetch a template due invalid manifest in pipeline version %v", pipelineSpec.PipelineVersionId)
return nil, "", util.Wrapf(err, "Failed to fetch a template due invalid manifest in pipeline version %v", pipelineSpec.PipelineVersionId)
}
manifest = string(tempBytes)
} else {
Expand All @@ -1459,18 +1418,14 @@ func (r *ResourceManager) fetchManifestFromPipelineSpec(pipelineSpec *model.Pipe
manifest = pipelineSpec.WorkflowSpecManifest
}
if manifest == "" {
return "", util.NewInvalidInputError("Failed to fetch a template with an empty pipeline spec manifest")
return nil, "", util.NewInvalidInputError("Failed to fetch a template with an empty pipeline spec manifest")
}
}
return manifest, nil
}

func (r *ResourceManager) fetchTemplateFromPipelineSpec(manifest string) (template.Template, error) {
tmpl, err := template.New([]byte(manifest))
if err != nil {
return nil, util.Wrap(err, "Failed to fetch a template with an invalid pipeline spec manifest")
return nil, "", util.Wrap(err, "Failed to fetch a template with an invalid pipeline spec manifest")
}
return tmpl, nil
return tmpl, manifest, nil
}

// Fetches PipelineSpec as []byte array and a new URI of PipelineSpec.
Expand Down

0 comments on commit 8b85284

Please sign in to comment.