Skip to content

Commit

Permalink
Add an empty Semaphore ConfigMap manifest
Browse files Browse the repository at this point in the history
Signed-off-by: ddalvi <[email protected]>
  • Loading branch information
DharmitD committed Dec 13, 2024
1 parent 89b19cf commit 85d4fe3
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 32 deletions.
37 changes: 22 additions & 15 deletions backend/src/apiserver/template/v2_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,34 @@ var (
Launcher = ""
)

func getKubernetesSpec(platformSpec map[string]*pipelinespec.SinglePlatformSpec) (*pipelinespec.SinglePlatformSpec, *argocompiler.Options) {
func getKubernetesSpec(platformSpec map[string]*pipelinespec.SinglePlatformSpec) *pipelinespec.SinglePlatformSpec {
var kubernetesSpec *pipelinespec.SinglePlatformSpec
var pipelineOptions *argocompiler.Options

// Check for "kubernetes" key in the platformSpec map
if platformSpec != nil {
if platform, ok := platformSpec["kubernetes"]; ok && platform != nil {
kubernetesSpec = platform
if platform.PipelineConfig != nil {
pipelineOptions = &argocompiler.Options{}
if platform.PipelineConfig.SemaphoreKey != "" {
pipelineOptions.SemaphoreKey = platform.PipelineConfig.SemaphoreKey
}
if platform.PipelineConfig.MutexName != "" {
pipelineOptions.MutexName = platform.PipelineConfig.MutexName
}
}
}
}
return kubernetesSpec, pipelineOptions
return kubernetesSpec
}

func getPipelineOptions(platform *pipelinespec.SinglePlatformSpec) *argocompiler.Options {
var pipelineOptions *argocompiler.Options

if platform != nil && platform.PipelineConfig != nil {

Check failure on line 63 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 63 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
pipelineOptions = &argocompiler.Options{}
if platform.PipelineConfig.SemaphoreKey != "" {

Check failure on line 65 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 65 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
pipelineOptions.SemaphoreKey = platform.PipelineConfig.SemaphoreKey

Check failure on line 66 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 66 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
}
if platform.PipelineConfig.MutexName != "" {

Check failure on line 68 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 68 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
pipelineOptions.MutexName = platform.PipelineConfig.MutexName

Check failure on line 69 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 69 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
}
}
return pipelineOptions
}


// Converts modelJob to ScheduledWorkflow.
func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) {
job := &pipelinespec.PipelineJob{}
Expand All @@ -91,7 +97,8 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche
return nil, util.Wrap(err, "invalid pipeline job inputs")
}

kubernetesSpec, pipelineOptions := getKubernetesSpec(t.platformSpec.Platforms)
kubernetesSpec := getKubernetesSpec(t.platformSpec.Platforms)
pipelineOptions := getPipelineOptions(kubernetesSpec)

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
Expand Down Expand Up @@ -309,8 +316,8 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u
return nil, util.Wrap(err, "invalid pipeline job inputs")
}

kubernetesSpec, pipelineOptions := getKubernetesSpec(t.platformSpec.Platforms)

kubernetesSpec := getKubernetesSpec(t.platformSpec.Platforms)
pipelineOptions := getPipelineOptions(kubernetesSpec)

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
Expand Down
37 changes: 20 additions & 17 deletions backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,13 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
}
}

// initialization
wf := &wfapi.Workflow{
TypeMeta: k8smeta.TypeMeta{
APIVersion: "argoproj.io/v1alpha1",
Kind: "Workflow",
},
ObjectMeta: k8smeta.ObjectMeta{
GenerateName: retrieveLastValidString(spec.GetPipelineInfo().GetName()) + "-",
// Uncomment during development for better debugging in KFP UI
// Annotations: map[string]string{
// "pipelines.kubeflow.org/v2_pipeline": "true",
// },
},
Spec: wfapi.WorkflowSpec{
PodMetadata: &wfapi.Metadata{
Expand All @@ -133,22 +128,30 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
},
ServiceAccountName: "pipeline-runner",
Entrypoint: tmplEntrypoint,
Synchronization: &wfapi.Synchronization{
Semaphore: &wfapi.SemaphoreRef{
ConfigMapKeyRef: &v1.ConfigMapKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: getSemaphoreConfigMapName(),
},
Key: semaphoreKey,
},
}

if semaphoreKey != "" {
wf.Spec.Synchronization = &wfapi.Synchronization{
Semaphore: &wfapi.SemaphoreRef{
ConfigMapKeyRef: &v1.ConfigMapKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: getSemaphoreConfigMapName(),
},
},
Mutex: &wfapi.Mutex{
Name: mutexName,
Key: semaphoreKey,
},
},
},
}
}


if mutexName != "" {
wf.Spec.Synchronization = &wfapi.Synchronization{
Mutex: &wfapi.Mutex{
Name: mutexName,
},
}
}

c := &workflowCompiler{
wf: wf,
templates: make(map[string]*wfapi.Template),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
kind: ConfigMap
apiVersion: v1
metadata:
name: semaphore-config
data: {}

0 comments on commit 85d4fe3

Please sign in to comment.