diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index 6b69bdad8dd..32536912182 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -45,28 +45,34 @@ var ( Launcher = "" ) -func getKubernetesSpec(platformSpec map[string]*pipelinespec.SinglePlatformSpec) (*pipelinespec.SinglePlatformSpec, *argocompiler.Options) { +func getKubernetesSpec(platformSpec map[string]*pipelinespec.SinglePlatformSpec) *pipelinespec.SinglePlatformSpec { var kubernetesSpec *pipelinespec.SinglePlatformSpec - var pipelineOptions *argocompiler.Options // Check for "kubernetes" key in the platformSpec map if platformSpec != nil { if platform, ok := platformSpec["kubernetes"]; ok && platform != nil { kubernetesSpec = platform - if platform.PipelineConfig != nil { - pipelineOptions = &argocompiler.Options{} - if platform.PipelineConfig.SemaphoreKey != "" { - pipelineOptions.SemaphoreKey = platform.PipelineConfig.SemaphoreKey - } - if platform.PipelineConfig.MutexName != "" { - pipelineOptions.MutexName = platform.PipelineConfig.MutexName - } - } } } - return kubernetesSpec, pipelineOptions + return kubernetesSpec +} + +func getPipelineOptions(platform *pipelinespec.SinglePlatformSpec) *argocompiler.Options { + var pipelineOptions *argocompiler.Options + + if platform != nil && platform.PipelineConfig != nil { + pipelineOptions = &argocompiler.Options{} + if platform.PipelineConfig.SemaphoreKey != "" { + pipelineOptions.SemaphoreKey = platform.PipelineConfig.SemaphoreKey + } + if platform.PipelineConfig.MutexName != "" { + pipelineOptions.MutexName = platform.PipelineConfig.MutexName + } + } + return pipelineOptions } + // Converts modelJob to ScheduledWorkflow. func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) { job := &pipelinespec.PipelineJob{} @@ -91,7 +97,8 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche return nil, util.Wrap(err, "invalid pipeline job inputs") } - kubernetesSpec, pipelineOptions := getKubernetesSpec(t.platformSpec.Platforms) + kubernetesSpec := getKubernetesSpec(t.platformSpec.Platforms) + pipelineOptions := getPipelineOptions(kubernetesSpec) var obj interface{} if util.CurrentExecutionType() == util.ArgoWorkflow { @@ -309,8 +316,8 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u return nil, util.Wrap(err, "invalid pipeline job inputs") } - kubernetesSpec, pipelineOptions := getKubernetesSpec(t.platformSpec.Platforms) - + kubernetesSpec := getKubernetesSpec(t.platformSpec.Platforms) + pipelineOptions := getPipelineOptions(kubernetesSpec) var obj interface{} if util.CurrentExecutionType() == util.ArgoWorkflow { diff --git a/backend/src/v2/compiler/argocompiler/argo.go b/backend/src/v2/compiler/argocompiler/argo.go index c7c5ebc43fb..790af0fc9ee 100644 --- a/backend/src/v2/compiler/argocompiler/argo.go +++ b/backend/src/v2/compiler/argocompiler/argo.go @@ -106,7 +106,6 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S } } - // initialization wf := &wfapi.Workflow{ TypeMeta: k8smeta.TypeMeta{ APIVersion: "argoproj.io/v1alpha1", @@ -114,10 +113,6 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S }, ObjectMeta: k8smeta.ObjectMeta{ GenerateName: retrieveLastValidString(spec.GetPipelineInfo().GetName()) + "-", - // Uncomment during development for better debugging in KFP UI - // Annotations: map[string]string{ - // "pipelines.kubeflow.org/v2_pipeline": "true", - // }, }, Spec: wfapi.WorkflowSpec{ PodMetadata: &wfapi.Metadata{ @@ -133,22 +128,30 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S }, ServiceAccountName: "pipeline-runner", Entrypoint: tmplEntrypoint, - Synchronization: &wfapi.Synchronization{ - Semaphore: &wfapi.SemaphoreRef{ - ConfigMapKeyRef: &v1.ConfigMapKeySelector{ - LocalObjectReference: v1.LocalObjectReference{ - Name: getSemaphoreConfigMapName(), - }, - Key: semaphoreKey, + }, + } + + if semaphoreKey != "" { + wf.Spec.Synchronization = &wfapi.Synchronization{ + Semaphore: &wfapi.SemaphoreRef{ + ConfigMapKeyRef: &v1.ConfigMapKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: getSemaphoreConfigMapName(), }, - }, - Mutex: &wfapi.Mutex{ - Name: mutexName, + Key: semaphoreKey, }, }, - }, + } } - + + if mutexName != "" { + wf.Spec.Synchronization = &wfapi.Synchronization{ + Mutex: &wfapi.Mutex{ + Name: mutexName, + }, + } + } + c := &workflowCompiler{ wf: wf, templates: make(map[string]*wfapi.Template), diff --git a/manifests/kustomize/base/pipeline/ml-pipeline-semaphore-configmap.yaml b/manifests/kustomize/base/pipeline/ml-pipeline-semaphore-configmap.yaml new file mode 100644 index 00000000000..5d7ed474dd3 --- /dev/null +++ b/manifests/kustomize/base/pipeline/ml-pipeline-semaphore-configmap.yaml @@ -0,0 +1,5 @@ +kind: ConfigMap +apiVersion: v1 +metadata: + name: semaphore-config +data: {}