Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug] Nested pipelines fail to run #10039

Closed
JosepSampe opened this issue Sep 27, 2023 · 23 comments
Closed

[bug] Nested pipelines fail to run #10039

JosepSampe opened this issue Sep 27, 2023 · 23 comments
Assignees
Labels

Comments

@JosepSampe
Copy link
Member

I'm trying to run the "Pipelines as Components" example from the documentation, but it seems I'm not able to run it successfully. it always compile, but it always produces an error after the subpipeline is executed properly. Looking at the logs of the pod that produces the error, I can see:

driver.DAG(pipelineName=superpipeline, runID=525fd190-491a-42c1-9558-65772a931637, task="square-and-multiply", component="comp-square-and-multiply", dagExecutionID=237, componentSpec) failed: failed to resolve inputs: resolving input parameter a with spec task_output_parameter:{producer_task:"square-and-sum" output_parameter_key:"Output"}: cannot find output parameter key "Output" in producer task "square-and-sum"

Is there a different way to access the output of a "pipeline as component"?

Environment

Using a kind cluster created with: kind create cluster --name kfp

  • How do you deploy Kubeflow Pipelines (KFP)?
export PIPELINE_VERSION=2.0.1
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/dev?ref=$PIPELINE_VERSION"
  • KFP version: 2.0.1
  • KFP SDK version:
    kfp 2.2.0
    kfp-pipeline-spec 0.2.2
    kfp-server-api 2.0.1

Steps to reproduce

from kfp import dsl
from kfp.client import Client

@dsl.component
def square(x: float) -> float:
    return x ** 2

@dsl.component
def add(x: float, y: float) -> float:
    return x + y

@dsl.component
def square_root(x: float) -> float:
    return x ** .5

@dsl.pipeline
def square_and_sum(a: float, b: float) -> float:
    a_sq_task = square(x=a)
    b_sq_task = square(x=b)
    return add(x=a_sq_task.output, y=b_sq_task.output).output

@dsl.pipeline
def pythagorean(a: float = 1.2, b: float = 1.2) -> float:
    sq_and_sum_task = square_and_sum(a=a, b=b)
    return square_root(x=sq_and_sum_task.output).output


if __name__ == "__main__":
    client = Client()

    run = client.create_run_from_pipeline_func(
        pipeline_func=pythagorean,
        experiment_name="pythagorean-2",
        enable_caching=False,
        arguments={
            'a': 3.5,
            'b': 4.5,
        },
        run_name="pythagorean-run-3"
    )

I also tried to explicitly access the output with

return square_root(x=sq_and_sum_task.outputs['Output']).output

but I get the same error

Expected result

Successfull execution

Materials and reference

Labels


Impacted by this bug? Give it a 👍.

@zijianjoy
Copy link
Collaborator

/assign @chensun

@chensun
Copy link
Member

chensun commented Oct 12, 2023

I can reproduce this, looks like a backend bug.

Copy link

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Jan 11, 2024
@github-actions github-actions bot removed the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Jan 26, 2024
Copy link

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Mar 26, 2024
Copy link

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

@KyleKaminky
Copy link
Contributor

I'm still seeing this issue on kfp 2.7, anyone else? @JosepSampe @chensun

@KyleKaminky
Copy link
Contributor

/reopen

Copy link

@KyleKaminky: You can't reopen an issue/PR unless you authored it or you are a collaborator.

In response to this:

/reopen

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@JosepSampe
Copy link
Member Author

/reopen

Copy link

@JosepSampe: Reopened this issue.

In response to this:

/reopen

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@google-oss-prow google-oss-prow bot reopened this May 16, 2024
@github-actions github-actions bot removed the lifecycle/stale The issue / pull request is stale, any activities remove this label. label May 17, 2024
Copy link

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Jul 17, 2024
Copy link

github-actions bot commented Aug 8, 2024

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

@github-actions github-actions bot closed this as completed Aug 8, 2024
@droctothorpe
Copy link
Contributor

/reopen

Copy link

@droctothorpe: Reopened this issue.

In response to this:

/reopen

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@google-oss-prow google-oss-prow bot reopened this Sep 3, 2024
@github-project-automation github-project-automation bot moved this from Closed to Needs triage in KFP Runtime Triage Sep 3, 2024
@droctothorpe
Copy link
Contributor

I plan to investigate / debug this issue this sprint.

@github-actions github-actions bot removed the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Sep 4, 2024
@droctothorpe
Copy link
Contributor

I'm keeping a log in case anyone else wants to follow along or has any thoughts / suggestion. I'll keep it updated as I make progress.

2024-09-03

I'm going to start by validating that I can recreate the failure scenario. I'm
going to use a simplified example though.

from kfp import dsl
from kfp.client import Client


@dsl.component
def inner_comp() -> str:
    return "foobar"


@dsl.component
def outer_comp(input: str):
    print("input: ", input)


@dsl.pipeline
def inner_pipeline() -> str:
    inner_comp_task = inner_comp()
    inner_comp_task.set_caching_options(False)
    return inner_comp_task.output


@dsl.pipeline
def outer_pipeline():
    inner_pipeline_task = inner_pipeline()
    outer_comp_task = outer_comp(input=inner_pipeline_task.output)
    outer_comp_task.set_caching_options(False)


if __name__ == "__main__":
    client = Client()

    run = client.create_run_from_pipeline_func(
        pipeline_func=outer_pipeline,
        enable_caching=False,
    )

Confirming that this failed.

Let's simplify and remove variables. I'm going to only run the inner pipeline:

from kfp import dsl
from kfp.client import Client


@dsl.component
def inner_comp() -> str:
    return "inner"

@dsl.pipeline
def inner_pipeline() -> str:
    inner_comp_task = inner_comp()
    inner_comp_task.set_caching_options(False)
    return inner_comp_task.output


if __name__ == "__main__":
    client = Client()

    run = client.create_run_from_pipeline_func(
        pipeline_func=inner_pipeline,
        enable_caching=False,
    )

That ran without any issues.

What if we modify it slightly such that we have a sub-DAG but we never reference
its output? I'm just going to change the outer_pipeline to look like this:

@dsl.pipeline
def outer_pipeline():
    inner_pipeline()
    outer_comp_task = outer_comp(input="foo") # Note, we never reference the output of inner_pipeline.
    outer_comp_task.set_caching_options(False)

That worked!

Let's copy the successful and failed AWF manifests to a file and diff /
interrogate them.

- indicates failed manifest.
+ indicates succeeded manifest.

metadata:
    annotations: 
        - pipelines.kubeflow.org/components-root: '{"dag":{"tasks":{"inner-pipeline":{"cachingOptions":{},"componentRef":{"name":"comp-inner-pipeline"},"taskInfo":{"name":"inner-pipeline"}},"outer-comp":{"cachingOptions":{},"componentRef":{"name":"comp-outer-comp"},"dependentTasks":["inner-pipeline"],"inputs":{"parameters":{"input":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"inner-pipeline"}}}},"taskInfo":{"name":"outer-comp"}}}}}'
        + pipelines.kubeflow.org/components-root: '{"dag":{"tasks":{"inner-pipeline":{"cachingOptions":{},"componentRef":{"name":"comp-inner-pipeline"},"taskInfo":{"name":"inner-pipeline"}},"outer-comp":{"cachingOptions":{},"componentRef":{"name":"comp-outer-comp"},"inputs":{"parameters":{"input":{"runtimeValue":{"constant":"foo"}}}},"taskInfo":{"name":"outer-comp"}}}}}'
spec:
    templates:
        dag:
            tasks:
                arguments:
                    name: task
                    value:
                        - '{"cachingOptions":{},"componentRef":{"name":"comp-outer-comp"},"dependentTasks":["inner-pipeline"],"inputs":{"parameters":{"input":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"inner-pipeline"}}}},"taskInfo":{"name":"outer-comp"}}'
                        + '{"cachingOptions":{},"componentRef":{"name":"comp-outer-comp"},"inputs":{"parameters":{"input":{"runtimeValue":{"constant":"foo"}}}},"taskInfo":{"name":"outer-comp"}}'

I wonder if the compiler is just misconfiguring the workflow manifest somehow.
Potentially relevant information from stackoverflow:
https://stackoverflow.com/a/64996549.

Let's take a minute to parse and grok the failed manifest.

Under spec/templates, we have 3 containers and 4 dags. Here's a simplified
view of the tasks:

- container
  - name: system-container-driver
  - command: driver
- dag
  - name: system-container-executor
  - tasks:
    - template: system-container-impl
- container
  - name: system-container-impl
- dag
  - name: comp-inner-pipeline
  - tasks:
    1.
      - template: system-container-driver
      - name: inner-comp-driver
    1. 
       - template: system-container-executor
       - name: inner-comp
       - depends: inner-comp-driver.Succeeded
- container
  - name: system-dag-driver
- dag
  - name: root
- dag
  - name: entrypoint

The root.outer-comp-driver task is what fails.

Here are the corresponding logs:

Stream closed EOF for kubeflow/outer-pipeline-sq9vj-system-container-driver-3067012875 (main)
init time="2024-09-03T19:35:07.623Z" level=info msg="Starting Workflow Executor" version=v3.4.16
init time="2024-09-03T19:35:07.685Z" level=info msg="Using executor retry strategy" Duration=1s Factor=1.6 Jitter=0.5 Steps=5
init time="2024-09-03T19:35:07.691Z" level=info msg="Executor initialized" deadline="0001-01-01 00:00:00 +0000 UTC" includeScriptOutput=false namespace=kubeflow podName=outer-pipeline-sq9vj-system-container-driver-3067012875 template="{\"name\":\"system-container-driver\",\"inputs\":{\"parameters\":[{\"name\":\"component\",\"value\":\"{\\\"executorLabel\\\":\\\"exec-outer-comp\\\",\\\"inputDefinitions\\\":{\\\"parameters\\\":{\\\"input\\\":{\\\"parameterType\\\":\\\"STRING\\\"}}}}\"},{\"name\":\"task\",\"value\":\"{\\\"cachingOptions\\\":{},\\\"componentRef\\\":{\\\"name\\\":\\\"comp-outer-comp\\\"},\\\"dependentTasks\\\":[\\\"inner-pipeline\\\"],\\\"inputs\\\":{\\\"parameters\\\":{\\\"input\\\":{\\\"taskOutputParameter\\\":{\\\"outputParameterKey\\\":\\\"Output\\\",\\\"producerTask\\\":\\\"inner-pipeline\\\"}}}},\\\"taskInfo\\\":{\\\"name\\\":\\\"outer-comp\\\"}}\"},{\"name\":\"container\",\"value\":\"{\\\"args\\\":[\\\"--executor_input\\\",\\\"{{$}}\\\",\\\"--function_to_execute\\\",\\\"outer_comp\\\"],\\\"command\\\":[\\\"sh\\\",\\\"-c\\\",\\\"\\\\nif ! [ -x \\\\\\\"$(command -v pip)\\\\\\\" ]; then\\\\n    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\\\\nfi\\\\n\\\\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0' '--no-deps' 'typing-extensions\\\\u003e=3.7.4,\\\\u003c5; python_version\\\\u003c\\\\\\\"3.9\\\\\\\"' \\\\u0026\\\\u0026 \\\\\\\"$0\\\\\\\" \\\\\\\"$@\\\\\\\"\\\\n\\\",\\\"sh\\\",\\\"-ec\\\",\\\"program_path=$(mktemp -d)\\\\n\\\\nprintf \\\\\\\"%s\\\\\\\" \\\\\\\"$0\\\\\\\" \\\\u003e \\\\\\\"$program_path/ephemeral_component.py\\\\\\\"\\\\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         \\\\\\\"$program_path/ephemeral_component.py\\\\\\\"                         \\\\\\\"$@\\\\\\\"\\\\n\\\",\\\"\\\\nimport kfp\\\\nfrom kfp import dsl\\\\nfrom kfp.dsl import *\\\\nfrom typing import *\\\\n\\\\ndef outer_comp(input: str):\\\\n    print(\\\\\\\"input: \\\\\\\", input)\\\\n\\\\n\\\"],\\\"image\\\":\\\"python:3.8\\\"}\"},{\"name\":\"parent-dag-id\",\"value\":\"234\"},{\"name\":\"iteration-index\",\"default\":\"-1\",\"value\":\"-1\"},{\"name\":\"kubernetes-config\",\"default\":\"\",\"value\":\"\"}]},\"outputs\":{\"parameters\":[{\"name\":\"pod-spec-patch\",\"valueFrom\":{\"path\":\"/tmp/outputs/pod-spec-patch\",\"default\":\"\"}},{\"name\":\"cached-decision\",\"default\":\"false\",\"valueFrom\":{\"path\":\"/tmp/outputs/cached-decision\",\"default\":\"false\"}},{\"name\":\"condition\",\"valueFrom\":{\"path\":\"/tmp/outputs/condition\",\"default\":\"true\"}}]},\"metadata\":{\"annotations\":{\"sidecar.istio.io/inject\":\"false\"}},\"container\":{\"name\":\"\",\"image\":\"gcr.io/ml-pipeline/kfp-driver@sha256:3c0665cd36aa87e4359a4c8b6271dcba5bdd817815cd0496ed12eb5dde5fd2ec\",\"command\":[\"driver\"],\"args\":[\"--type\",\"CONTAINER\",\"--pipeline_name\",\"outer-pipeline\",\"--run_id\",\"2fda42a6-a492-4f81-8a7f-f08118a34bcb\",\"--dag_execution_id\",\"234\",\"--component\",\"{\\\"executorLabel\\\":\\\"exec-outer-comp\\\",\\\"inputDefinitions\\\":{\\\"parameters\\\":{\\\"input\\\":{\\\"parameterType\\\":\\\"STRING\\\"}}}}\",\"--task\",\"{\\\"cachingOptions\\\":{},\\\"componentRef\\\":{\\\"name\\\":\\\"comp-outer-comp\\\"},\\\"dependentTasks\\\":[\\\"inner-pipeline\\\"],\\\"inputs\\\":{\\\"parameters\\\":{\\\"input\\\":{\\\"taskOutputParameter\\\":{\\\"outputParameterKey\\\":\\\"Output\\\",\\\"producerTask\\\":\\\"inner-pipeline\\\"}}}},\\\"taskInfo\\\":{\\\"name\\\":\\\"outer-comp\\\"}}\",\"--container\",\"{\\\"args\\\":[\\\"--executor_input\\\",\\\"{{$}}\\\",\\\"--function_to_execute\\\",\\\"outer_comp\\\"],\\\"command\\\":[\\\"sh\\\",\\\"-c\\\",\\\"\\\\nif ! [ -x \\\\\\\"$(command -v pip)\\\\\\\" ]; then\\\\n    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\\\\nfi\\\\n\\\\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0' '--no-deps' 'typing-extensions\\\\u003e=3.7.4,\\\\u003c5; python_version\\\\u003c\\\\\\\"3.9\\\\\\\"' \\\\u0026\\\\u0026 \\\\\\\"$0\\\\\\\" \\\\\\\"$@\\\\\\\"\\\\n\\\",\\\"sh\\\",\\\"-ec\\\",\\\"program_path=$(mktemp -d)\\\\n\\\\nprintf \\\\\\\"%s\\\\\\\" \\\\\\\"$0\\\\\\\" \\\\u003e \\\\\\\"$program_path/ephemeral_component.py\\\\\\\"\\\\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         \\\\\\\"$program_path/ephemeral_component.py\\\\\\\"                         \\\\\\\"$@\\\\\\\"\\\\n\\\",\\\"\\\\nimport kfp\\\\nfrom kfp import dsl\\\\nfrom kfp.dsl import *\\\\nfrom typing import *\\\\n\\\\ndef outer_comp(input: str):\\\\n    print(\\\\\\\"input: \\\\\\\", input)\\\\n\\\\n\\\"],\\\"image\\\":\\\"python:3.8\\\"}\",\"--iteration_index\",\"-1\",\"--cached_decision_path\",\"/tmp/outputs/cached-decision\",\"--pod_spec_patch_path\",\"/tmp/outputs/pod-spec-patch\",\"--condition_path\",\"/tmp/outputs/condition\",\"--kubernetes_config\",\"\"],\"resources\":{\"limits\":{\"cpu\":\"500m\",\"memory\":\"512Mi\"},\"requests\":{\"cpu\":\"100m\",\"memory\":\"64Mi\"}}},\"archiveLocation\":{\"archiveLogs\":true,\"s3\":{\"endpoint\":\"minio-service.kubeflow:9000\",\"bucket\":\"mlpipeline\",\"insecure\":true,\"accessKeySecret\":{\"name\":\"mlpipeline-minio-artifact\",\"key\":\"accesskey\"},\"secretKeySecret\":{\"name\":\"mlpipeline-minio-artifact\",\"key\":\"secretkey\"},\"key\":\"artifacts/outer-pipeline-sq9vj/2024/09/03/outer-pipeline-sq9vj-system-container-driver-3067012875\"}}}" version="&Version{Version:v3.4.16,BuildDate:2024-01-14T05:29:17Z,GitCommit:910a9aabce5de6568b54350c181a431f8263605a,GitTag:v3.4.16,GitTreeState:clean,GoVersion:go1.20.13,Compiler:gc,Platform:linux/amd64,}"
init time="2024-09-03T19:35:07.829Z" level=info msg="Start loading input artifacts..."
init time="2024-09-03T19:35:07.830Z" level=info msg="Alloc=6975 TotalAlloc=12544 Sys=27773 NumGC=4 Goroutines=4"
wait time="2024-09-03T19:35:08.854Z" level=info msg="Starting Workflow Executor" version=v3.4.16
wait time="2024-09-03T19:35:08.897Z" level=info msg="Using executor retry strategy" Duration=1s Factor=1.6 Jitter=0.5 Steps=5
wait time="2024-09-03T19:35:08.900Z" level=info msg="Executor initialized" deadline="0001-01-01 00:00:00 +0000 UTC" includeScriptOutput=false namespace=kubeflow podName=outer-pipeline-sq9vj-system-container-driver-3067012875 template="{\"name\":\"system-container-driver\",\"inputs\":{\"parameters\":[{\"name\":\"component\",\"value\":\"{\\\"executorLabel\\\":\\\"exec-outer-comp\\\",\\\"inputDefinitions\\\":{\\\"parameters\\\":{\\\"input\\\":{\\\"parameterType\\\":\\\"STRING\\\"}}}}\"},{\"name\":\"task\",\"value\":\"{\\\"cachingOptions\\\":{},\\\"componentRef\\\":{\\\"name\\\":\\\"comp-outer-comp\\\"},\\\"dependentTasks\\\":[\\\"inner-pipeline\\\"],\\\"inputs\\\":{\\\"parameters\\\":{\\\"input\\\":{\\\"taskOutputParameter\\\":{\\\"outputParameterKey\\\":\\\"Output\\\",\\\"producerTask\\\":\\\"inner-pipeline\\\"}}}},\\\"taskInfo\\\":{\\\"name\\\":\\\"outer-comp\\\"}}\"},{\"name\":\"container\",\"value\":\"{\\\"args\\\":[\\\"--executor_input\\\",\\\"{{$}}\\\",\\\"--function_to_execute\\\",\\\"outer_comp\\\"],\\\"command\\\":[\\\"sh\\\",\\\"-c\\\",\\\"\\\\nif ! [ -x \\\\\\\"$(command -v pip)\\\\\\\" ]; then\\\\n    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\\\\nfi\\\\n\\\\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0' '--no-deps' 'typing-extensions\\\\u003e=3.7.4,\\\\u003c5; python_version\\\\u003c\\\\\\\"3.9\\\\\\\"' \\\\u0026\\\\u0026 \\\\\\\"$0\\\\\\\" \\\\\\\"$@\\\\\\\"\\\\n\\\",\\\"sh\\\",\\\"-ec\\\",\\\"program_path=$(mktemp -d)\\\\n\\\\nprintf \\\\\\\"%s\\\\\\\" \\\\\\\"$0\\\\\\\" \\\\u003e \\\\\\\"$program_path/ephemeral_component.py\\\\\\\"\\\\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         \\\\\\\"$program_path/ephemeral_component.py\\\\\\\"                         \\\\\\\"$@\\\\\\\"\\\\n\\\",\\\"\\\\nimport kfp\\\\nfrom kfp import dsl\\\\nfrom kfp.dsl import *\\\\nfrom typing import *\\\\n\\\\ndef outer_comp(input: str):\\\\n    print(\\\\\\\"input: \\\\\\\", input)\\\\n\\\\n\\\"],\\\"image\\\":\\\"python:3.8\\\"}\"},{\"name\":\"parent-dag-id\",\"value\":\"234\"},{\"name\":\"iteration-index\",\"default\":\"-1\",\"value\":\"-1\"},{\"name\":\"kubernetes-config\",\"default\":\"\",\"value\":\"\"}]},\"outputs\":{\"parameters\":[{\"name\":\"pod-spec-patch\",\"valueFrom\":{\"path\":\"/tmp/outputs/pod-spec-patch\",\"default\":\"\"}},{\"name\":\"cached-decision\",\"default\":\"false\",\"valueFrom\":{\"path\":\"/tmp/outputs/cached-decision\",\"default\":\"false\"}},{\"name\":\"condition\",\"valueFrom\":{\"path\":\"/tmp/outputs/condition\",\"default\":\"true\"}}]},\"metadata\":{\"annotations\":{\"sidecar.istio.io/inject\":\"false\"}},\"container\":{\"name\":\"\",\"image\":\"gcr.io/ml-pipeline/kfp-driver@sha256:3c0665cd36aa87e4359a4c8b6271dcba5bdd817815cd0496ed12eb5dde5fd2ec\",\"command\":[\"driver\"],\"args\":[\"--type\",\"CONTAINER\",\"--pipeline_name\",\"outer-pipeline\",\"--run_id\",\"2fda42a6-a492-4f81-8a7f-f08118a34bcb\",\"--dag_execution_id\",\"234\",\"--component\",\"{\\\"executorLabel\\\":\\\"exec-outer-comp\\\",\\\"inputDefinitions\\\":{\\\"parameters\\\":{\\\"input\\\":{\\\"parameterType\\\":\\\"STRING\\\"}}}}\",\"--task\",\"{\\\"cachingOptions\\\":{},\\\"componentRef\\\":{\\\"name\\\":\\\"comp-outer-comp\\\"},\\\"dependentTasks\\\":[\\\"inner-pipeline\\\"],\\\"inputs\\\":{\\\"parameters\\\":{\\\"input\\\":{\\\"taskOutputParameter\\\":{\\\"outputParameterKey\\\":\\\"Output\\\",\\\"producerTask\\\":\\\"inner-pipeline\\\"}}}},\\\"taskInfo\\\":{\\\"name\\\":\\\"outer-comp\\\"}}\",\"--container\",\"{\\\"args\\\":[\\\"--executor_input\\\",\\\"{{$}}\\\",\\\"--function_to_execute\\\",\\\"outer_comp\\\"],\\\"command\\\":[\\\"sh\\\",\\\"-c\\\",\\\"\\\\nif ! [ -x \\\\\\\"$(command -v pip)\\\\\\\" ]; then\\\\n    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\\\\nfi\\\\n\\\\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0' '--no-deps' 'typing-extensions\\\\u003e=3.7.4,\\\\u003c5; python_version\\\\u003c\\\\\\\"3.9\\\\\\\"' \\\\u0026\\\\u0026 \\\\\\\"$0\\\\\\\" \\\\\\\"$@\\\\\\\"\\\\n\\\",\\\"sh\\\",\\\"-ec\\\",\\\"program_path=$(mktemp -d)\\\\n\\\\nprintf \\\\\\\"%s\\\\\\\" \\\\\\\"$0\\\\\\\" \\\\u003e \\\\\\\"$program_path/ephemeral_component.py\\\\\\\"\\\\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         \\\\\\\"$program_path/ephemeral_component.py\\\\\\\"                         \\\\\\\"$@\\\\\\\"\\\\n\\\",\\\"\\\\nimport kfp\\\\nfrom kfp import dsl\\\\nfrom kfp.dsl import *\\\\nfrom typing import *\\\\n\\\\ndef outer_comp(input: str):\\\\n    print(\\\\\\\"input: \\\\\\\", input)\\\\n\\\\n\\\"],\\\"image\\\":\\\"python:3.8\\\"}\",\"--iteration_index\",\"-1\",\"--cached_decision_path\",\"/tmp/outputs/cached-decision\",\"--pod_spec_patch_path\",\"/tmp/outputs/pod-spec-patch\",\"--condition_path\",\"/tmp/outputs/condition\",\"--kubernetes_config\",\"\"],\"resources\":{\"limits\":{\"cpu\":\"500m\",\"memory\":\"512Mi\"},\"requests\":{\"cpu\":\"100m\",\"memory\":\"64Mi\"}}},\"archiveLocation\":{\"archiveLogs\":true,\"s3\":{\"endpoint\":\"minio-service.kubeflow:9000\",\"bucket\":\"mlpipeline\",\"insecure\":true,\"accessKeySecret\":{\"name\":\"mlpipeline-minio-artifact\",\"key\":\"accesskey\"},\"secretKeySecret\":{\"name\":\"mlpipeline-minio-artifact\",\"key\":\"secretkey\"},\"key\":\"artifacts/outer-pipeline-sq9vj/2024/09/03/outer-pipeline-sq9vj-system-container-driver-3067012875\"}}}" version="&Version{Version:v3.4.16,BuildDate:2024-01-14T05:29:17Z,GitCommit:910a9aabce5de6568b54350c181a431f8263605a,GitTag:v3.4.16,GitTreeState:clean,GoVersion:go1.20.13,Compiler:gc,Platform:linux/amd64,}"
wait time="2024-09-03T19:35:08.904Z" level=info msg="Starting deadline monitor"
wait time="2024-09-03T19:35:11.908Z" level=info msg="Main container completed" error="<nil>"
wait time="2024-09-03T19:35:11.908Z" level=info msg="No Script output reference in workflow. Capturing script output ignored"
wait time="2024-09-03T19:35:11.908Z" level=info msg="Saving output parameters"
wait time="2024-09-03T19:35:11.908Z" level=info msg="Saving path output parameter: pod-spec-patch"
wait time="2024-09-03T19:35:11.908Z" level=info msg="Copying /tmp/outputs/pod-spec-patch from base image layer"
wait time="2024-09-03T19:35:11.909Z" level=info msg="Successfully saved output parameter: pod-spec-patch"
wait time="2024-09-03T19:35:11.909Z" level=info msg="Saving path output parameter: cached-decision"
wait time="2024-09-03T19:35:11.909Z" level=info msg="Copying /tmp/outputs/cached-decision from base image layer"
wait time="2024-09-03T19:35:11.909Z" level=info msg="Successfully saved output parameter: cached-decision"
main time="2024-09-03T19:35:09.694Z" level=info msg="capturing logs" argo=true
main I0903 19:35:10.683550      18 main.go:108] input ComponentSpec:{
main   "executorLabel": "exec-outer-comp",
main   "inputDefinitions": {
main     "parameters": {
main       "input": {
main         "parameterType": "STRING"
main       }
main     }
main   }
main }
main I0903 19:35:10.696924      18 main.go:115] input TaskSpec:{
main   "cachingOptions": {},
main   "componentRef": {
main     "name": "comp-outer-comp"
main   },
main   "dependentTasks": [
main     "inner-pipeline"
main   ],
main   "inputs": {
main     "parameters": {
main       "input": {
main         "taskOutputParameter": {
main           "outputParameterKey": "Output",
main           "producerTask": "inner-pipeline"
main         }
main       }
main     }
main   },
main   "taskInfo": {
main     "name": "outer-comp"
main   }
main }
main I0903 19:35:10.700383      18 main.go:121] input ContainerSpec:{
main   "args": [
main     "--executor_input",
main     "{{$}}",
main     "--function_to_execute",
main     "outer_comp"
main   ],
main   "command": [
main     "sh",
main     "-c",
main     "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.8.0' '--no-deps' 'typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"' \u0026\u0026 \"$0\" \"$@\"\n",
main     "sh",
main     "-ec",
main     "program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         \"$program_path/ephemeral_component.py\"                         \"$@\"\n",
main     "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef outer_comp(input: str):\n    print(\"input: \", input)\n\n"
main   ],
main   "image": "python:3.8"
main }
main I0903 19:35:10.712435      18 cache.go:116] Connecting to cache endpoint 10.43.52.53:8887
main I0903 19:35:10.863280      18 client.go:302] Pipeline Context: id:72  name:"outer-pipeline"  type_id:11  type:"system.Pipeline"  create_time_since_epoch:1725384213787  last_update_time_since_epoch:1725384213787
main I0903 19:35:10.904456      18 client.go:311] Pipeline Run Context: id:82  name:"2fda42a6-a492-4f81-8a7f-f08118a34bcb"  type_id:12  type:"system.PipelineRun"  custom_properties:{key:"namespace"  value:{string_value:"kubeflow"}}  custom_properties:{key:"pipeline_root"  value:{string_value:"minio://mlpipeline/v2/artifacts/outer-pipeline/2fda42a6-a492-4f81-8a7f-f08118a34bcb"}}  custom_properties:{key:"resource_name"  value:{string_value:"run-resource"}}  custom_properties:{key:"store_session_info"  value:{string_value:"{\"Provider\":\"minio\",\"Params\":{\"accessKeyKey\":\"accesskey\",\"disableSSL\":\"true\",\"endpoint\":\"10.43.150.231:9000\",\"fromEnv\":\"false\",\"region\":\"minio\",\"secretKeyKey\":\"secretkey\",\"secretName\":\"mlpipeline-minio-artifact\"}}"}}  create_time_since_epoch:1725392061240  last_update_time_since_epoch:1725392061240
wait time="2024-09-03T19:35:11.909Z" level=info msg="Saving path output parameter: condition"
wait time="2024-09-03T19:35:11.909Z" level=info msg="Copying /tmp/outputs/condition from base image layer"
wait time="2024-09-03T19:35:11.909Z" level=info msg="Successfully saved output parameter: condition"
wait time="2024-09-03T19:35:11.909Z" level=info msg="No output artifacts"
main I0903 19:35:11.089839      18 driver.go:252] parent DAG: id:234  name:"run/2fda42a6-a492-4f81-8a7f-f08118a34bcb"  type_id:13  type:"system.DAGExecution"  last_known_state:RUNNING  custom_properties:{key:"display_name"  value:{string_value:""}}  custom_properties:{key:"task_name"  value:{string_value:""}}  create_time_since_epoch:1725392061428  last_update_time_since_epoch:1725392061428
main I0903 19:35:11.201532      18 driver.go:926] parent DAG input parameters: map], artifacts: map]
main F0903 19:35:11.286410      18 main.go:79] KFP driver: driver.Container(pipelineName=outer-pipeline, runID=2fda42a6-a492-4f81-8a7f-f08118a34bcb, task="outer-comp", component="comp-outer-comp", dagExecutionID=234, componentSpec) failed: failed to resolve inputs: resolving input parameter input with spec task_output_parameter:{producer_task:"inner-pipeline"  output_parameter_key:"Output"}: cannot find output parameter key "Output" in producer task "inner-pipeline"
main time="2024-09-03T19:35:11.709Z" level=info msg="sub-process exited" argo=true error="<nil>"
main time="2024-09-03T19:35:11.709Z" level=error msg="cannot save parameter /tmp/outputs/pod-spec-patch" argo=true error="open /tmp/outputs/pod-spec-patch: no such file or directory"
main time="2024-09-03T19:35:11.710Z" level=error msg="cannot save parameter /tmp/outputs/cached-decision" argo=true error="open /tmp/outputs/cached-decision: no such file or directory"
main time="2024-09-03T19:35:11.710Z" level=error msg="cannot save parameter /tmp/outputs/condition" argo=true error="open /tmp/outputs/condition: no such file or directory"
main Error: exit status 1
wait time="2024-09-03T19:35:11.912Z" level=info msg="S3 Save path: /tmp/argo/outputs/logs/main.log, key: artifacts/outer-pipeline-sq9vj/2024/09/03/outer-pipeline-sq9vj-system-container-driver-3067012875/main.log"
wait time="2024-09-03T19:35:11.912Z" level=info msg="Creating minio client using static credentials" endpoint="minio-service.kubeflow:9000"
wait time="2024-09-03T19:35:11.914Z" level=info msg="Saving file to s3" bucket=mlpipeline endpoint="minio-service.kubeflow:9000" key=artifacts/outer-pipeline-sq9vj/2024/09/03/outer-pipeline-sq9vj-system-container-driver-3067012875/main.log path=/tmp/argo/outputs/logs/main.log
wait time="2024-09-03T19:35:11.965Z" level=info msg="Save artifact" artifactName=main-logs duration=53.683795ms error="<nil>" key=artifacts/outer-pipeline-sq9vj/2024/09/03/outer-pipeline-sq9vj-system-container-driver-3067012875/main.log
wait time="2024-09-03T19:35:11.966Z" level=info msg="not deleting local artifact" localArtPath=/tmp/argo/outputs/logs/main.log
wait time="2024-09-03T19:35:11.966Z" level=info msg="Successfully saved file: /tmp/argo/outputs/logs/main.log"
wait time="2024-09-03T19:35:12.016Z" level=warning msg="failed to patch task set, falling back to legacy/insecure pod patch, see https://argo-workflows.readthedocs.io/en/release-3.4/workflow-rbac/" error="workflowtaskresults.argoproj.io is forbidden: User \"system:serviceaccount:kubeflow:pipeline-runner\" cannot create resource \"workflowtaskresults\" in API group \"argoproj.io\" in the namespace \"kubeflow\""
wait time="2024-09-03T19:35:12.045Z" level=info msg="Alloc=8334 TotalAlloc=13898 Sys=28285 NumGC=4 Goroutines=10"
wait time="2024-09-03T19:35:12.047Z" level=info msg="Deadline monitor stopped"
wait time="2024-09-03T19:35:12.047Z" level=info msg="stopping progress monitor (context done)" error="context canceled"
Stream closed EOF for kubeflow/outer-pipeline-sq9vj-system-container-driver-3067012875 (init)
Stream closed EOF for kubeflow/outer-pipeline-sq9vj-system-container-driver-3067012875 (wait)

Here's the key line:

main F0903 19:35:11.286410      18 main.go:79] KFP driver: driver.Container(pipelineName=outer-pipeline, runID=2fda42a6-a492-4f81-8a7f-f08118a34bcb, task="outer-comp", component="comp-outer-comp", dagExecutionID=234, componentSpec) failed: failed to resolve inputs: resolving input parameter input with spec task_output_parameter:{producer_task:"inner-pipeline"  output_parameter_key:"Output"}: cannot find output parameter key "Output" in producer task "inner-pipeline"

Let's confirm that the write is happening. It looks like it is, judging by this log from the inner executor:

main I0903 19:34:54.738676      32 launcher_v2.go:705] ExecutorOutput: {
main   "parameterValues": {
main     "Output": "inner"
main   }
main }
main I0903 19:34:54.814778      32 launcher_v2.go:151] publish success.

I spent some time combing through the mysql databases and tables then asked
where the MLMD value parameters are saved in the CNCF slack /
#kubeflow-pipelines channel.

2024-09-4

@HumairAK generously ran the failing pipeline and confirmed that the output
parameter is saved to the metadb database in the ExecutionProperty table and
that the value is likely serialized.

We've confirmed it's not a write problem. It's likely (1) a read problem (the
driver is attempting to read from the wrong location), or (2) a compiler problem
(the workflow manifest is misconfigured).

Next steps to follow.

@droctothorpe
Copy link
Contributor

droctothorpe commented Sep 4, 2024

Added some debug logging to the driver.

main I0904 19:30:29.527880      19 driver.go:1130] Beginning to iterate through task.GetInputs().GetParameters().
main I0904 19:30:29.527970      19 driver.go:1132] name: input
main I0904 19:30:29.528053      19 driver.go:1133] paramSpec: task_output_parameter:{producer_task:"inner-pipeline"  output_parameter_key:"Output"}
main I0904 19:30:29.528290      19 driver.go:1152] taskOutput: producer_task:"inner-pipeline"  output_parameter_key:"Output"
main I0904 19:31:29.582117      19 driver.go:1170] producer: id:252  type_id:13  type:"system.DAGExecution"  last_known_state:RUNNING  custom_properties:{key:"display_name"  value:{string_value:"inner-pipeline"}}  custom_properties:{key:"inputs"  value:{struct_value:{}}}  custom_properties:{key:"parent_dag_id"  value:{int_value:251}}  custom_properties:{key:"task_name"  value:{string_value:"inner-pipeline"}}  create_time_since_epoch:1725478188680  last_update_time_since_epoch:1725478188680
main I0904 19:31:29.585325      19 driver.go:1178] execution: id:252  type_id:13  type:"system.DAGExecution"  last_known_state:RUNNING  custom_properties:{key:"display_name"  value:{string_value:"inner-pipeline"}}  custom_properties:{key:"inputs"  value:{struct_value:{}}}  custom_properties:{key:"parent_dag_id"  value:{int_value:251}}  custom_properties:{key:"task_name"  value:{string_value:"inner-pipeline"}}  create_time_since_epoch:1725478188680  last_update_time_since_epoch:1725478188680
main I0904 19:31:29.585871      19 driver.go:1180] properties: map[display_name:string_value:"inner-pipeline" inputs:struct_value:{} parent_dag_id:int_value:251 task_name:string_value:"inner-pipeline"]
main I0904 19:31:29.587862      19 driver.go:1187] outputs: map]
main I0904 19:31:29.588125      19 driver.go:1189] param: <nil>
main F0904 19:31:29.589903      19 main.go:79] KFP driver: driver.Container(pipelineName=outer-pipeline, runID=225f0001-9e6b-4a69-9b90-cf2ecfebddbc, task="outer-comp", component="comp-outer-comp", dagExecutionID=251, componentSpec) failed: failed to resolve inputs: resolving input parameter input with spec task_output_parameter:{producer_task:"inner-pipeline"  output_parameter_key:"Output"}: cannot find output parameter key "Output" in producer task "inner-pipeline"
main time="2024-09-04T19:31:29.854Z" level=info msg="sub-process exited" argo=true error="<nil>"

Okay, here's the exact line of code that returns the error:
https://github.com/kubeflow/pipelines/blob/master/backend/src/v2/driver/driver.go#L1159

This case handles inputs that are outputs from previous tasks. When the previous task is a container, the resulting producer looks like this:

main I0904 19:36:40.607523      21 driver.go:1170] producer: id:255  type_id:14  type:"system.ContainerExecution"  last_known_state:COMPLETE  custom_properties:{key:"display_name"  value:{string_value:"one"}}  custom_properties:{key:"image"  value:{string_value:""}}  custom_properties:{key:"inputs"  value:{struct_value:{}}}  custom_properties:{key:"namespace"  value:{string_value:"kubeflow"}}  custom_properties:{key:"outputs"  value:{struct_value:{fields:{key:"Output"  value:{string_value:"one"}}}}}  custom_properties:{key:"parent_dag_id"  value:{int_value:254}}  custom_properties:{key:"pod_name"  value:{string_value:"pipeline-jlhqs-system-container-impl-1061929419"}}  custom_properties:{key:"pod_uid"  value:{string_value:"734bcf94-36f2-4643-a955-f38f737263ef"}}  custom_properties:{key:"task_name"  value:{string_value:"one"}}  create_time_since_epoch:1725478520142  last_update_time_since_epoch:1725478533166

In particular, note the following in the custom_properties map:

custom_properties:{key:"outputs"  value:{struct_value:{fields:{key:"Output"  value:{string_value:"one"}

That's our previous / producer task output.

Now look at what producer looks like when the task is a sub-DAG rather than a container:

main I0904 19:31:29.582117      19 driver.go:1170] producer: id:252  type_id:13  type:"system.DAGExecution"  last_known_state:RUNNING  custom_properties:{key:"display_name"  value:{string_value:"inner-pipeline"}}  custom_properties:{key:"inputs"  value:{struct_value:{}}}  custom_properties:{key:"parent_dag_id"  value:{int_value:251}}  custom_properties:{key:"task_name"  value:{string_value:"inner-pipeline"}}  create_time_since_epoch:1725478188680  last_update_time_since_epoch:1725478188680

In particular, take note of the fact that there is no custom property with a key of outputs!

We know that the output is in the database, as @HumairAK demonstrated, but for
some reason, the response to the call to GetProducerTask() here doesn't include it.

Why not?

This might actually be a write problem after all, i.e. it's possible that the
relationship between DAG tasks / executions and their corresponding outputs is
not being set properly at write time.

Will pick up where I left off tomorrow.

@droctothorpe
Copy link
Contributor

Apologies for not updating more consistently. We really got into the weeds with this. The abstractions are complex enough that even just communicating about them is quite difficult. The good news is that we have a working POC.

All of the updates are restricted to driver.go. We updated the DAG function to add information about the producer subtask in the execution write to MLMD. That fixed the "write." We then updated resolveInputs (the "read") to handle DAGExecution type tasks by gathering their tasks and essentially flattening the tasks list (shoutout to @zazulam for this brilliant suggestion). We then redirected resolveInputs to lookup the producer subtask's output (which is now in the tasks list thanks to the aforementioned flattening), which it already handles since component output > component input works fine.

It requires a lot more polish, validation, test file updates, recursive flattening (for when sub-DAGs have sub-DAGs), and we need to test against NamedTuple updates, but having a functional POC is a great milestone. Adding this topic to the community call agenda today for a possible informal design review of sorts, even though it is still a WIP.

cc @chensun.

@ianbenlolo
Copy link

This actually seems related to my discussion started here: #11181.
Out of curiosity, will this fix what I'm trying to do there? Namely, using retries with nested pipelines.

Thanks! Looking forward to this fix.

@zazulam
Copy link
Contributor

zazulam commented Sep 18, 2024

This actually seems related to my discussion started here: #11181. Out of curiosity, will this fix what I'm trying to do there? Namely, using retries with nested pipelines.

Thanks! Looking forward to this fix.

Thanks for bringing that up @ianbenlolo! Right now we're aiming to support PipelineParameters & Artifacts to move between nested DAGs. The retry piece may be a bit out of scope but could definitely be tested against our solution.

@droctothorpe
Copy link
Contributor

Closed by #11196.

@droctothorpe
Copy link
Contributor

/close

Copy link

@droctothorpe: Closing this issue.

In response to this:

/close

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@github-project-automation github-project-automation bot moved this from Needs triage to Closed in KFP Runtime Triage Oct 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Closed
Development

No branches or pull requests

7 participants