Skip to content

Commit

Permalink
Merge pull request #935 from revit13/fix_kfpv2
Browse files Browse the repository at this point in the history
Fix path issues when running superworkflow pipeline sample for kfp v2
  • Loading branch information
revit13 authored Jan 14, 2025
2 parents 2b19b7f + 60dcaa1 commit ae2575f
Show file tree
Hide file tree
Showing 43 changed files with 284 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# limitations under the License.
################################################################################

import os
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
Expand All @@ -19,7 +20,7 @@
# empty comment to triigger pre-commit
# Components
# For every sub workflow we need a separate components, that knows about this subworkflow.
component_spec_path = "../../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", "../../../../../kfp/kfp_ray_components/")
run_code_to_parquet_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_code_quality_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_malware_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os

import kfp.compiler as compiler
import kfp.components as comp
Expand All @@ -17,7 +18,7 @@

# Components
# path to kfp component specifications files
component_spec_path = "../../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", "../../../../../kfp/kfp_ray_components/")
# For every sub workflow we need a separate components, that knows about this subworkflow.
run_doc_id_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_exact_dedup_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
Expand Down
2 changes: 1 addition & 1 deletion examples/kfp-pipelines/superworkflows/ray/kfp_v2/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ worflow-clean:: .workflows.clean
.PHONY: workflow-build
workflow-build: workflow-venv
@for file in $(YAML_WF); do \
$(MAKE) $$file; \
$(MAKE) KFP_COMPONENT_SPEC_PATH=${REPOROOT}/kfp/kfp_ray_components/ $$file; \
done

workflow-test::
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
from workflow_support.compile_utils.component import ONE_HOUR_SEC, ONE_DAY_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils.component import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@

logger = get_logger(__name__)

# Default path for KFP component specification files
DEFAULT_KFP_COMPONENT_SPEC_PATH = "../../../../kfp/kfp_ray_components/"

ONE_HOUR_SEC = 60 * 60
ONE_DAY_SEC = ONE_HOUR_SEC * 24
ONE_WEEK_SEC = ONE_DAY_SEC * 7
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
from workflow_support.compile_utils.component import ONE_HOUR_SEC, ONE_DAY_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils.component import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

RUN_NAME = "KFP_RUN_NAME"

# Default path for KFP component specification files
DEFAULT_KFP_COMPONENT_SPEC_PATH = "../../../../kfp/kfp_ray_components/"

ONE_HOUR_SEC = 60 * 60
ONE_DAY_SEC = ONE_HOUR_SEC * 24
ONE_WEEK_SEC = ONE_DAY_SEC * 7
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pipeline_parameters:
multi_s3: False
compute_func_name: ""
compute_func_import: ""
component_spec_path: ""

pipeline_common_input_parameters_values:
kfp_base_image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"
Expand Down
5 changes: 0 additions & 5 deletions kfp/pipeline_generator/single-pipeline/pipeline_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,10 @@
common_input_params_values = pipeline_definitions[PIPELINE_COMMON_INPUT_PARAMETERS_VALUES]
pipeline_transform_input_parameters = pipeline_definitions[PIPELINE_TRANSFORM_INPUT_PARAMETERS]

component_spec_path = pipeline_parameters.get("component_spec_path", "")
if component_spec_path == "":
component_spec_path = "../../../../kfp/kfp_ray_components/"

content = template.render(
transform_image=common_input_params_values["transform_image"],
script_name=pipeline_parameters["script_name"],
kfp_base_image=common_input_params_values["kfp_base_image"],
component_spec_path=component_spec_path,
pipeline_arguments=pipeline_transform_input_parameters["pipeline_arguments"],
pipeline_name=pipeline_parameters[NAME],
pipeline_description=pipeline_parameters["description"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
import kfp.components as comp
import kfp.dsl as dsl

from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


task_image = "{{ transform_image }}"
Expand All @@ -27,7 +32,7 @@
base_kfp_image = "{{ kfp_base_image }}"

# path to kfp component specifications files
component_spec_path = "{{ component_spec_path }}"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)

# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
# NOTE: This file is auto generated by Pipeline Generator.
import os

import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# path to kfp component specifications files
component_spec_path = "../../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)
# For every sub workflow we need a separate components, that knows about this subworkflow.
run_doc_id_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_ededup_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
################################################################################

import yaml

import os

PRE_COMMIT = "../pre-commit-config.yaml"
PIPELINE_TEMPLATE_FILE = "template_superpipeline.py"
Expand Down Expand Up @@ -69,10 +69,6 @@ def get_generic_params(params, prefix="") -> str:
pipeline_tasks = pipeline_definitions[PIPELINE_TASKS]
common_input_params = pipeline_definitions[COMMON_INPUT_PARAMETERS]

component_spec_path = pipeline_metadata.get("component_spec_path", "")
if component_spec_path == "":
component_spec_path = "../../../../../kfp/kfp_ray_components/"

for task in pipeline_tasks:
task_name = task["name"]
task_pipeline_name = task["pipeline_name"]
Expand Down Expand Up @@ -144,7 +140,6 @@ def get_generic_params(params, prefix="") -> str:
superpipeline_name=pipeline_metadata[NAME],
superpipeline_description=pipeline_metadata[DESCRIPTION],
sub_workflows_components=pipeline_definitions[PIPELINE_TASKS],
component_spec_path=component_spec_path,
p1_parameters=pipeline_definitions[PIPELINE_TASKS],
add_p2_parameters=common_input_params,
sub_workflows_parameters=sub_workflows_parameters,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
# NOTE: This file is auto generated by Pipeline Generator.

import os
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# path to kfp component specifications files
component_spec_path = "__component_spec_path__"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)
# For every sub workflow we need a separate components, that knows about this subworkflow.
__sub_workflows_components__

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
# NOTE: This file is auto generated by Pipeline Generator.

import os

import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# path to kfp component specifications files
component_spec_path = "{{ component_spec_path }}"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)
# For every sub workflow we need a separate components, that knows about this subworkflow.
{%- for component in sub_workflows_components %}
run_{{ component.name }}_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
Expand Down
10 changes: 8 additions & 2 deletions transforms/code/code2parquet/kfp_ray/code2parquet_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# the name of the job script
Expand All @@ -28,7 +33,8 @@
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)


# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
Expand Down
10 changes: 8 additions & 2 deletions transforms/code/code_quality/kfp_ray/code_quality_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# the name of the job script
Expand All @@ -27,7 +32,8 @@
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)


# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
Expand Down
10 changes: 8 additions & 2 deletions transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# the name of the job script
Expand All @@ -27,7 +32,8 @@
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)


# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
Expand Down
10 changes: 8 additions & 2 deletions transforms/code/license_select/kfp_ray/license_select_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# the name of the job script
Expand All @@ -28,7 +33,8 @@
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)


# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
Expand Down
10 changes: 8 additions & 2 deletions transforms/code/malware/kfp_ray/malware_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# the name of the job script
Expand All @@ -27,7 +32,8 @@
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)


# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
Expand Down
10 changes: 8 additions & 2 deletions transforms/code/proglang_select/kfp_ray/proglang_select_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


# the name of the job script
Expand All @@ -27,7 +32,8 @@
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)



# compute execution parameters. Here different transforms might need different implementations. As
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils
from workflow_support.compile_utils import (
DEFAULT_KFP_COMPONENT_SPEC_PATH,
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)


task_image = "quay.io/dataprep1/data-prep-kit/repo_level_order-ray:latest"
Expand All @@ -27,7 +32,8 @@
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"
component_spec_path = os.getenv("KFP_COMPONENT_SPEC_PATH", DEFAULT_KFP_COMPONENT_SPEC_PATH)


# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
Expand Down
Loading

0 comments on commit ae2575f

Please sign in to comment.