Skip to content

Commit

Permalink
fix(backend): Remove DAG Driver from Iterator abstraction template
Browse files Browse the repository at this point in the history
- Removes the Driver pod from the Iterator abstraction-layer template
  as it confuses MLMD and is purley an Argo implementation
- Drivers still used on the Component and Iteration-item templates

Signed-off-by: Giulio Frasca <[email protected]>
  • Loading branch information
gmfrasca committed Dec 3, 2024
1 parent 018db34 commit f13c0c2
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 63 deletions.
30 changes: 3 additions & 27 deletions backend/src/v2/compiler/argocompiler/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,29 +265,11 @@ func (c *workflowCompiler) iteratorTask(name string, task *pipelinespec.Pipeline
}
}()
componentName := task.GetComponentRef().GetName()
componentSpecPlaceholder, err := c.useComponentSpec(componentName)
if err != nil {
return nil, err
}

// Set up Loop Control Template
loopDriverArgoName := name + "-loop-driver"
loopDriverInputs := dagDriverInputs{
component: componentSpecPlaceholder,
parentDagID: parentDagID,
task: taskJson, // TODO(Bobgy): avoid duplicating task JSON twice in the template.
}
loopDriver, loopDriverOutputs, err := c.dagDriverTask(loopDriverArgoName, loopDriverInputs)
if err != nil {
return nil, err
}
loopDriver.Depends = depends(task.GetDependentTasks())

iteratorTasks, err := c.iterationItemTask("iteration", task, taskJson, parentDagID)
if err != nil {
return nil, err
}

loopTmpl := &wfapi.Template{
Inputs: wfapi.Inputs{
Parameters: []wfapi.Parameter{
Expand All @@ -303,22 +285,16 @@ func (c *workflowCompiler) iteratorTask(name string, task *pipelinespec.Pipeline
loopTmpl.Parallelism = &parallelismLimit
}

loopTmplName, err := c.addTemplate(loopTmpl, fmt.Sprintf("%s-loop-iterator", componentName))
loopTmplName, err := c.addTemplate(loopTmpl, fmt.Sprintf("%s-%s-iterator", componentName, name))
if err != nil {
return nil, err
}
when := ""
if task.GetTriggerPolicy().GetCondition() != "" {
when = loopDriverOutputs.condition + " != false"
}

tasks = []wfapi.DAGTask{
*loopDriver,
{
Name: name + "-loop",
Template: loopTmplName,
Depends: depends([]string{loopDriverArgoName}),
When: when,
Depends: depends(task.GetDependentTasks()),
Arguments: wfapi.Arguments{
Parameters: []wfapi.Parameter{
{
Expand Down Expand Up @@ -379,7 +355,7 @@ func (c *workflowCompiler) iterationItemTask(name string, task *pipelinespec.Pip
Tasks: iterationTasks,
},
}
iterationsTmplName, err := c.addTemplate(iterationsTmpl, componentName+"-iteration")
iterationsTmplName, err := c.addTemplate(iterationsTmpl, componentName+"-"+name)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ spec:
parameters:
- name: parent-dag-id
metadata: {}
name: comp-for-loop-2-loop-iterator
name: comp-for-loop-2-for-loop-2-iterator
outputs: {}
parallelism: 2
- dag:
Expand Down Expand Up @@ -500,55 +500,25 @@ spec:
parameters:
- name: parent-dag-id
metadata: {}
name: comp-for-loop-4-loop-iterator
name: comp-for-loop-4-for-loop-4-iterator
outputs: {}
parallelism: 4
- dag:
tasks:
- arguments:
parameters:
- name: component
value: '{{workflow.parameters.components-comp-for-loop-2}}'
- name: parent-dag-id
value: '{{inputs.parameters.parent-dag-id}}'
- name: task
value: '{"componentRef":{"name":"comp-for-loop-2"},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[{\"A_a\":
\"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\":
\"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\":
\"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\":
\"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\":
\"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"foo"}}'
name: for-loop-2-loop-driver
template: system-dag-driver
- arguments:
parameters:
- name: parent-dag-id
value: '{{inputs.parameters.parent-dag-id}}'
depends: for-loop-2-loop-driver.Succeeded
depends: ""
name: for-loop-2-loop
template: comp-for-loop-2-loop-iterator
- arguments:
parameters:
- name: component
value: '{{workflow.parameters.components-comp-for-loop-4}}'
- name: parent-dag-id
value: '{{inputs.parameters.parent-dag-id}}'
- name: task
value: '{"componentRef":{"name":"comp-for-loop-4"},"iteratorPolicy":{"parallelismLimit":4},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-3","items":{"raw":"[{\"A_a\":
\"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\":
\"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\":
\"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\":
\"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\":
\"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"bar"}}'
name: for-loop-4-loop-driver
template: system-dag-driver
template: comp-for-loop-2-for-loop-2-iterator
- arguments:
parameters:
- name: parent-dag-id
value: '{{inputs.parameters.parent-dag-id}}'
depends: for-loop-4-loop-driver.Succeeded
depends: ""
name: for-loop-4-loop
template: comp-for-loop-4-loop-iterator
template: comp-for-loop-4-for-loop-4-iterator
inputs:
parameters:
- name: parent-dag-id
Expand Down

0 comments on commit f13c0c2

Please sign in to comment.