Skip to content

Commit

Permalink
feat(backend)Add Semaphore and Mutex fields to Workflow Spec
Browse files Browse the repository at this point in the history
Signed-off-by: ddalvi <[email protected]>
  • Loading branch information
DharmitD committed Dec 12, 2024
1 parent 60a8865 commit ccc5fd0
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 23 deletions.
44 changes: 28 additions & 16 deletions backend/src/apiserver/template/v2_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,28 @@ var (
Launcher = ""
)

func getKubernetesSpec(platformSpec map[string]*pipelinespec.SinglePlatformSpec) (*pipelinespec.SinglePlatformSpec, *argocompiler.Options) {
var kubernetesSpec *pipelinespec.SinglePlatformSpec
var pipelineOptions *argocompiler.Options

// Check for "kubernetes" key in the platformSpec map
if platformSpec != nil {
if platform, ok := platformSpec["kubernetes"]; ok && platform != nil {
kubernetesSpec = platform
if platform.PipelineConfig != nil {

Check failure on line 56 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 56 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
pipelineOptions = &argocompiler.Options{}
if platform.PipelineConfig.SemaphoreKey != "" {

Check failure on line 58 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 58 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
pipelineOptions.SemaphoreKey = platform.PipelineConfig.SemaphoreKey

Check failure on line 59 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 59 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
}
if platform.PipelineConfig.MutexName != "" {

Check failure on line 61 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 61 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
pipelineOptions.MutexName = platform.PipelineConfig.MutexName

Check failure on line 62 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)

Check failure on line 62 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platform.PipelineConfig undefined (type *pipelinespec.SinglePlatformSpec has no field or method PipelineConfig)
}
}
}
}
return kubernetesSpec, pipelineOptions
}

// Converts modelJob to ScheduledWorkflow.
func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) {
job := &pipelinespec.PipelineJob{}
Expand All @@ -69,17 +91,11 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche
return nil, util.Wrap(err, "invalid pipeline job inputs")
}

// Pick out Kubernetes platform configs
var kubernetesSpec *pipelinespec.SinglePlatformSpec
if t.platformSpec != nil {
if _, ok := t.platformSpec.Platforms["kubernetes"]; ok {
kubernetesSpec = t.platformSpec.Platforms["kubernetes"]
}
}
kubernetesSpec, pipelineOptions := getKubernetesSpec(t.platformSpec.Platforms)

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
obj, err = argocompiler.Compile(job, kubernetesSpec, pipelineOptions)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, &tektoncompiler.Options{LauncherImage: Launcher})
}
Expand Down Expand Up @@ -292,17 +308,13 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u
if err = t.validatePipelineJobInputs(job); err != nil {
return nil, util.Wrap(err, "invalid pipeline job inputs")
}
// Pick out Kubernetes platform configs
var kubernetesSpec *pipelinespec.SinglePlatformSpec
if t.platformSpec != nil {
if _, ok := t.platformSpec.Platforms["kubernetes"]; ok {
kubernetesSpec = t.platformSpec.Platforms["kubernetes"]
}
}

kubernetesSpec, pipelineOptions := getKubernetesSpec(t.platformSpec.Platforms)


var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
obj, err = argocompiler.Compile(job, kubernetesSpec, pipelineOptions)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, nil)
}
Expand Down
42 changes: 36 additions & 6 deletions backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"strings"
"os"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
Expand All @@ -28,6 +29,7 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
k8score "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
k8sres "k8s.io/apimachinery/pkg/api/resource"
k8smeta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand All @@ -40,6 +42,16 @@ type Options struct {
// optional
PipelineRoot string
// TODO(Bobgy): add an option -- dev mode, ImagePullPolicy should only be Always in dev mode.
SemaphoreKey string
MutexName string
}

func getSemaphoreConfigMapName() string {
const defaultConfigMapName = "semaphore-config"
if name := os.Getenv("SEMAPHORE_CONFIGMAP_NAME"); name != "" {
return name
}
return defaultConfigMapName
}

func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.SinglePlatformSpec, opts *Options) (*wfapi.Workflow, error) {
Expand Down Expand Up @@ -76,6 +88,14 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
}
}

var semaphoreKey, mutexName string
if opts != nil && opts.SemaphoreKey != "" {
semaphoreKey = opts.SemaphoreKey
}
if opts != nil && opts.MutexName != "" {
mutexName = opts.MutexName
}

var kubernetesSpec *pipelinespec.SinglePlatformSpec
if kubernetesSpecArg != nil {
// clone kubernetesSpecArg, because we don't want to change it
Expand All @@ -94,13 +114,9 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
},
ObjectMeta: k8smeta.ObjectMeta{
GenerateName: retrieveLastValidString(spec.GetPipelineInfo().GetName()) + "-",
// Note, uncomment the following during development to view argo inputs/outputs in KFP UI.
// TODO(Bobgy): figure out what annotations we should use for v2 engine.
// For now, comment this annotation, so that in KFP UI, it shows argo input/output params/artifacts
// suitable for debugging.
//
// Uncomment during development for better debugging in KFP UI
// Annotations: map[string]string{
// "pipelines.kubeflow.org/v2_pipeline": "true",
// "pipelines.kubeflow.org/v2_pipeline": "true",
// },
},
Spec: wfapi.WorkflowSpec{
Expand All @@ -117,8 +133,22 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
},
ServiceAccountName: "pipeline-runner",
Entrypoint: tmplEntrypoint,
Synchronization: &wfapi.Synchronization{
Semaphore: &wfapi.SemaphoreRef{
ConfigMapKeyRef: &v1.ConfigMapKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: getSemaphoreConfigMapName(),
},
Key: semaphoreKey,
},
},
Mutex: &wfapi.Mutex{
Name: mutexName,
},
},
},
}

c := &workflowCompiler{
wf: wf,
templates: make(map[string]*wfapi.Template),
Expand Down
2 changes: 1 addition & 1 deletion backend/src/v2/compiler/visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (state *pipelineDFS) dfs(name string, component *pipelinespec.ComponentSpec
}

// Add kubernetes spec to annotation
if state.kubernetesSpec != nil {
if state.kubernetesSpec != nil && state.kubernetesSpec.DeploymentSpec != nil {
kubernetesExecSpec, ok := state.kubernetesSpec.DeploymentSpec.Executors[executorLabel]
if ok {
state.visitor.AddKubernetesSpec(name, kubernetesExecSpec)
Expand Down

0 comments on commit ccc5fd0

Please sign in to comment.