Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backend] Nested pipeline does not pass artifacts outputs #10041

Closed
omriel1 opened this issue Sep 28, 2023 · 18 comments
Closed

[backend] Nested pipeline does not pass artifacts outputs #10041

omriel1 opened this issue Sep 28, 2023 · 18 comments

Comments

@omriel1
Copy link

omriel1 commented Sep 28, 2023

Hi!
I was trying to run a nested pipeline, where the first pipeline outputs two artifacts (Dataset) and the second one consumes them. I was generally the following example from the docs as the motivation: https://www.kubeflow.org/docs/components/pipelines/v2/data-types/artifacts/#using-output-artifacts

Even though the pipeline compiled successfully and in the UI it seems right, it seems like the pipeline fails (a failure sign near the run, but in the graph itself it seems like the first pipeline still running but its outputs are ready. I attached an image below), and there is no indication in the second pipeline for the failure reason.

If I look at the logs of the failing pod I see:

31 main.go:76] KFP driver: driver.DAG(pipelineName=parent-pipeline, runID=b11af0ad-3793-4d8a-a13a-43cce00898e4, task="second-inner-pipeline", component="comp-second-inner-pipeline", dagExecutionID=1, componentSpec) failed: failed to resolve inputs: failed to resolve input artifact table_1 with spec task_output_artifact:{producer_task:"first-inner-pipeline" output_artifact_key:"table_1"}: cannot find output artifact key "table_1" in producer task "first-inner-pipeline"
time="2023-09-28T17:18:05.473Z" level=info msg="sub-process exited" argo=true error="<nil>"
time="2023-09-28T17:18:05.534Z" level=error msg="cannot save parameter /tmp/outputs/execution-id" argo=true error="open /tmp/outputs/execution-id: no such file or directory"
time="2023-09-28T17:18:05.536Z" level=error msg="cannot save parameter /tmp/outputs/iteration-count" argo=true error="open /tmp/outputs/iteration-count: no such file or directory"
time="2023-09-28T17:18:05.536Z" level=error msg="cannot save parameter /tmp/outputs/condition" argo=true error="open /tmp/outputs/condition: no such file or directory"
Error: exit status 1

The UI:

Screenshot 2023-09-28 at 20 23 54

Environment

I used Local deployment on kind:

kind create cluster --name kfp

export PIPELINE_VERSION=2.0.0
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic?ref=$PIPELINE_VERSION"

and I'm using kfp version 2.0.1

Steps to reproduce

Here's the code I've used:

from typing import NamedTuple

from kfp import dsl, compiler, Client
from kfp.components.types.artifact_types import Dataset
from kfp.dsl import Output, Input

# ----------------
# Components

@dsl.component(packages_to_install=['pandas==1.3.5'])
def generate_table_1(table_1_output: Output[Dataset]):
    import pandas as pd

    data = pd.DataFrame({"a": [1,2,3], "b": [4,5,6]})
    data.to_csv(table_1_output.path, index=False)

@dsl.component(packages_to_install=['pandas==1.3.5'])
def generate_table_2(table_2_output: Output[Dataset]):
    import pandas as pd

    data = pd.DataFrame({"a": [1,2,3], "c": [7,8,9]})
    data.to_csv(table_2_output.path, index=False)

@dsl.component(packages_to_install=['pandas==1.3.5'])
def join_tables(table_1: Input[Dataset], table_2: Input[Dataset], joined_table: Output[Dataset]):
    import pandas as pd

    left = pd.read_csv(table_1.path)
    right = pd.read_csv(table_2.path)
    result = pd.merge(left=left, right=right, on="a", how="inner")
    result.to_csv(joined_table.path, index=False)

@dsl.component
def get_table_size(table: Input[Dataset]) -> int:
    import os
    return os.path.getsize(table.path)

# ----------------
# Pipelines

@dsl.pipeline
def first_inner_pipeline() -> NamedTuple('Outputs', table_1=Dataset, table_2=Dataset):
    Outputs = NamedTuple('Outputs', table_1=Dataset, table_2=Dataset)
    generate_table_1_task = generate_table_1()
    generate_table_2_task = generate_table_2()
    return Outputs(
        table_1=generate_table_1_task.outputs['table_1_output'],
        table_2=generate_table_2_task.outputs['table_2_output'])

@dsl.pipeline
def second_inner_pipeline(table_1: Input[Dataset], table_2: Input[Dataset]) -> int:
    join_tables_task = join_tables(table_1=table_1, table_2=table_2)
    get_table_size_task = get_table_size(table=join_tables_task.outputs['joined_table'])
    return get_table_size_task.output


@dsl.pipeline
def parent_pipeline() -> int:
    first_pipeline_task = first_inner_pipeline()
    second_pipeline_task = second_inner_pipeline(
        table_1=first_pipeline_task.outputs['table_1'],
        table_2 = first_pipeline_task.outputs['table_2']
    )
    return second_pipeline_task.output

if __name__ == "__main__":
    compiler.Compiler().compile(
        pipeline_func=parent_pipeline,
        package_path="./kfp_usecase_2/issue.yaml")

    client = Client()
    run = client.create_run_from_pipeline_func(
        pipeline_func=parent_pipeline,
        experiment_name="nested-pipeline-exp",
        enable_caching=False,
        run_name="nested-pipeline"
    )

Impacted by this bug? Give it a 👍.

@omriel1
Copy link
Author

omriel1 commented Sep 28, 2023

similar to #10039 but with artifacts

@zijianjoy
Copy link
Collaborator

/assign @chensun

Copy link

github-actions bot commented Jan 4, 2024

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Jan 4, 2024
@HumairAK
Copy link
Collaborator

/remove-lifecycle stale

@google-oss-prow google-oss-prow bot removed the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Jan 31, 2024
Copy link

github-actions bot commented Apr 1, 2024

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Apr 1, 2024
Copy link

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

@zetinaricky
Copy link

I'm facing the same issue still. Was there a solution or workaround for this?

@thesuperzapper
Copy link
Member

@zetinaricky what version of Kubeflow Pipeline (backend) are you using?

@zetinaricky
Copy link

backend version is v1.8.0

@thesuperzapper
Copy link
Member

thesuperzapper commented Jun 13, 2024

@zetinaricky that's seems very unlikely, given this issue is about KFP v2.

I assume you are using Kubeflow 1.8, which includes Kubeflow Pipelines 2.0.3?

@zetinaricky
Copy link

Kubefkow backend version: v1.8.0
Kubeflow pipelines version 2.2.0
Pipeline built and compiled with kfp version 2.7.0

Do you think the issue is unrelated to this thread then?
Thanks!

@boarder7395
Copy link
Contributor

boarder7395 commented Aug 8, 2024

I'm running kubeflow 1.9 with kubeflow pipelines 2.2.0 and this is still an issue. Its concerning that code provided in documentation is broken.

Kubeflow Backend Version: v1.9.0
Kubeflow pipelines Version: 2.2.0
Kubeflow pipelines SDK: 2.8.0

Relevant logs of the issue:

I0808 00:48:51.435502      15 driver.go:252] parent DAG: id:96  name:"run/abb87731-e26c-4d9b-9d58-3be8d07fd5fe"  type_id:13  type:"system.DAGExecution"  last_known_state:RUNNING  custom_properties:{key:"display_name"  value:{string_value:""}}  custom_properties:{key:"inputs"  value:{struct_value:{fields:{key:"a"  value:{number_value:3.2}}  fields:{key:"b"  value:{number_value:9.2}}}}}  custom_properties:{key:"task_name"  value:{string_value:""}}  create_time_since_epoch:1723078026064  last_update_time_since_epoch:1723078026064
I0808 00:48:51.877802      15 driver.go:926] parent DAG input parameters: map[a:number_value:3.2 b:number_value:9.2], artifacts: map[]
F0808 00:48:52.134460      15 main.go:79] KFP driver: driver.Container(pipelineName=pythagorean, runID=abb87731-e26c-4d9b-9d58-3be8d07fd5fe, task="square-root", component="comp-square-root", dagExecutionID=96, componentSpec) failed: failed to resolve inputs: resolving input parameter x with spec task_output_parameter:{producer_task:"square-and-sum"  output_parameter_key:"Output"}: cannot find output parameter key "Output" in producer task "square-and-sum"
time="2024-08-08T00:48:52.832Z" level=info msg="sub-process exited" argo=true error="<nil>"
time="2024-08-08T00:48:52.832Z" level=error msg="cannot save parameter /tmp/outputs/pod-spec-patch" argo=true error="open /tmp/outputs/pod-spec-patch: no such file or directory"
time="2024-08-08T00:48:52.832Z" level=error msg="cannot save parameter /tmp/outputs/cached-decision" argo=true error="open /tmp/outputs/cached-decision: no such file or directory"
time="2024-08-08T00:48:52.832Z" level=error msg="cannot save parameter /tmp/outputs/condition" argo=true error="open /tmp/outputs/condition: no such file or directory"
Error: exit status 1

PipelineSpec:
pipeline_in_pipeline_spec.txt

@chensun Can this issue be reopened and frozen from getting auto closed.

@chensun chensun reopened this Aug 8, 2024
@github-actions github-actions bot removed the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Aug 9, 2024
@github-project-automation github-project-automation bot moved this to Needs triage in KFP Runtime Triage Aug 29, 2024
@droctothorpe
Copy link
Contributor

This issue appears to be redundant with #10039.

@boarder7395
Copy link
Contributor

@droctothorpe Thoughts on closing this in favor of #10039

@droctothorpe
Copy link
Contributor

I think that's a good idea 👍 .

Copy link

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Nov 12, 2024
@droctothorpe
Copy link
Contributor

/close

Copy link

@droctothorpe: Closing this issue.

In response to this:

/close

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@github-project-automation github-project-automation bot moved this from Needs triage to Closed in KFP Runtime Triage Nov 12, 2024
@stale stale bot removed the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Nov 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Closed
Development

No branches or pull requests

8 participants