From 79e7e08f8c3e62fbd6afb1d807aa4fdf9a9d4dc5 Mon Sep 17 00:00:00 2001 From: Revital Sur Date: Mon, 20 Jan 2025 09:47:47 +0200 Subject: [PATCH] Obtain the Ray cluster run ID from the user for KFP v2. Signed-off-by: Revital Sur --- kfp/doc/simple_transform_pipeline.md | 25 +++++++++++++++---- .../src/runtime_utils/kfp_utils.py | 7 +++--- .../templates/simple_pipeline.py | 25 ++++++++++--------- .../code2parquet/kfp_ray/code2parquet_wf.py | 24 +++++++++--------- .../code_quality/kfp_ray/code_quality_wf.py | 24 +++++++++--------- .../kfp_ray/header_cleanser_wf.py | 14 ++--------- .../kfp_ray/license_select_wf.py | 24 +++++++++--------- transforms/code/malware/kfp_ray/malware_wf.py | 24 +++++++++--------- .../kfp_ray/proglang_select_wf.py | 24 +++++++++--------- .../kfp_ray/repo_level_order_wf.py | 24 +++++++++--------- .../kfp_ray/doc_chunk_multiple_wf.py | 24 +++++++++--------- .../doc_chunk/kfp_ray/doc_chunk_wf.py | 24 +++++++++--------- .../kfp_ray/doc_quality_multiple_wf.py | 24 +++++++++--------- .../doc_quality/kfp_ray/doc_quality_wf.py | 25 ++++++++++--------- .../html2parquet/kfp_ray/html2parquet_wf.py | 24 +++++++++--------- .../lang_id/kfp_ray/lang_id_multiple_wf.py | 24 +++++++++--------- .../language/lang_id/kfp_ray/lang_id_wf.py | 24 +++++++++--------- .../kfp_ray/pdf2parquet_multiple_wf.py | 24 +++++++++--------- .../pdf2parquet/kfp_ray/pdf2parquet_wf.py | 24 +++++++++--------- .../pii_redactor/kfp_ray/pii_redactor_wf.py | 24 +++++++++--------- .../kfp_ray/text_encoder_multiple_wf.py | 24 +++++++++--------- .../text_encoder/kfp_ray/text_encoder_wf.py | 24 +++++++++--------- .../universal/doc_id/kfp_ray/doc_id_wf.py | 24 +++++++++--------- .../universal/ededup/kfp_ray/ededup_wf.py | 18 +++++++++---- .../universal/fdedup/kfp_ray/fdedup_wf.py | 18 +++++++++---- .../universal/filter/kfp_ray/filter_wf.py | 24 +++++++++--------- transforms/universal/hap/kfp_ray/hap_wf.py | 24 +++++++++--------- .../noop/kfp_ray/noop_multiple_wf.py | 24 +++++++++--------- transforms/universal/noop/kfp_ray/noop_wf.py | 24 +++++++++--------- .../universal/profiler/kfp_ray/profiler_wf.py | 12 +++++++++ .../universal/resize/kfp_ray/resize_wf.py | 24 +++++++++--------- .../tokenization/kfp_ray/tokenization_wf.py | 12 +++++++++ 32 files changed, 378 insertions(+), 330 deletions(-) diff --git a/kfp/doc/simple_transform_pipeline.md b/kfp/doc/simple_transform_pipeline.md index 10341c24b..e49eef625 100644 --- a/kfp/doc/simple_transform_pipeline.md +++ b/kfp/doc/simple_transform_pipeline.md @@ -57,11 +57,16 @@ Ray cluster. For each step we have to define a component that will execute them: ```python # components base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.0.2" - # compute execution parameters. Here different transforms might need different implementations. As - # a result, instead of creating a component we are creating it in place here. - compute_exec_params_op = comp.func_to_container_op( - func=ComponentUtils.default_compute_execution_params, base_image=base_kfp_image - ) + # KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the + # `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path. + # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use + # this if/else statement and explicitly call the decorator. + if os.getenv("KFPv2", "0") == "1": + compute_exec_params_op = dsl.component_decorator.component( + func=compute_exec_params_func, base_image=base_kfp_image + ) + else: + compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) # create Ray cluster create_ray_op = comp.load_component_from_file("../../../kfp_ray_components/createRayComponent.yaml") # execute job @@ -148,6 +153,16 @@ Now, when all components and input parameters are defined, we can implement pipe component execution and parameters submitted to every component. ```python + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url, additional_params=additional_params) ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) diff --git a/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/kfp_utils.py b/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/kfp_utils.py index 7fa76453f..3a281e48a 100644 --- a/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/kfp_utils.py +++ b/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/kfp_utils.py @@ -81,9 +81,10 @@ def runtime_name(ray_name: str = "", run_id: str = "") -> str: # the return value plus namespace name will be the name of the Ray Route, # which length is restricted to 64 characters, # therefore we restrict the return name by 15 character. - if run_id != "": - return f"{ray_name[:9]}-{run_id[:5]}" - return ray_name[:15] + if run_id == "": + logger.error("Run ID must not be provided") + sys.exit(1) + return f"{ray_name[:9]}-{run_id[:5]}" @staticmethod def dict_to_req(d: dict[str, Any], executor: str = "transformer_launcher.py") -> str: diff --git a/kfp/pipeline_generator/single-pipeline/templates/simple_pipeline.py b/kfp/pipeline_generator/single-pipeline/templates/simple_pipeline.py index ce7657a5c..2022e8359 100644 --- a/kfp/pipeline_generator/single-pipeline/templates/simple_pipeline.py +++ b/kfp/pipeline_generator/single-pipeline/templates/simple_pipeline.py @@ -73,23 +73,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -111,9 +99,11 @@ def {{ pipeline_name }}( ray_name: str = "{{ pipeline_name }}-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed {%- if image_pull_secret != "" %} + ray_id_KFPv2: str = "", ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": "{{ image_pull_secret }}", "image": task_image}, ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": "{{ image_pull_secret }}", "image": task_image}, {%- else %} + ray_id_KFPv2: str = "", ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, {%- endif %} @@ -142,6 +132,7 @@ def {{ pipeline_name }}( """ Pipeline to execute {{ pipeline_name }} transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -177,6 +168,16 @@ def {{ pipeline_name }}( {%- endfor %} :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) diff --git a/transforms/code/code2parquet/kfp_ray/code2parquet_wf.py b/transforms/code/code2parquet/kfp_ray/code2parquet_wf.py index e506ab5b3..8afde87d4 100644 --- a/transforms/code/code2parquet/kfp_ray/code2parquet_wf.py +++ b/transforms/code/code2parquet/kfp_ray/code2parquet_wf.py @@ -77,23 +77,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster @@ -113,6 +101,7 @@ def compute_exec_params_func( ) def code2parquet( ray_name: str = "code2parquet-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, @@ -139,6 +128,7 @@ def code2parquet( """ Pipeline to execute NOOP transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -178,6 +168,16 @@ def code2parquet( (here we are assuming that select language info is in S3, but potentially in the different bucket) :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) diff --git a/transforms/code/code_quality/kfp_ray/code_quality_wf.py b/transforms/code/code_quality/kfp_ray/code_quality_wf.py index f37fb5870..ba2d8e53f 100644 --- a/transforms/code/code_quality/kfp_ray/code_quality_wf.py +++ b/transforms/code/code_quality/kfp_ray/code_quality_wf.py @@ -74,23 +74,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster @@ -112,6 +100,7 @@ def compute_exec_params_func( def code_quality( # Ray cluster ray_name: str = "code_quality-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, @@ -136,6 +125,7 @@ def code_quality( """ Pipeline to execute Code Quality transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -171,6 +161,16 @@ def code_quality( :param cq_hf_token - Huggingface auth token to download and use the tokenizer :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) diff --git a/transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py b/transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py index 6fdf1862a..107795463 100644 --- a/transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py +++ b/transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py @@ -82,23 +82,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster @@ -120,6 +108,7 @@ def compute_exec_params_func( def header_cleanser( # Ray cluster ray_name: str = "header_cleanser-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, @@ -148,6 +137,7 @@ def header_cleanser( """ Pipeline to execute Header Cleanser transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory diff --git a/transforms/code/license_select/kfp_ray/license_select_wf.py b/transforms/code/license_select/kfp_ray/license_select_wf.py index 7c10b1c34..b92cb6498 100644 --- a/transforms/code/license_select/kfp_ray/license_select_wf.py +++ b/transforms/code/license_select/kfp_ray/license_select_wf.py @@ -71,23 +71,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -106,6 +94,7 @@ def compute_exec_params_func( ) def license_select( ray_name: str = "license_select-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = { @@ -135,6 +124,7 @@ def license_select( """ Pipeline to execute License Select transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -166,6 +156,16 @@ def license_select( :param lc_licenses_file - path to license list json file :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) ComponentUtils.add_settings_to_component(clean_up_task, 60) diff --git a/transforms/code/malware/kfp_ray/malware_wf.py b/transforms/code/malware/kfp_ray/malware_wf.py index 30525e870..ad1bf4aaf 100644 --- a/transforms/code/malware/kfp_ray/malware_wf.py +++ b/transforms/code/malware/kfp_ray/malware_wf.py @@ -70,23 +70,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -104,6 +92,7 @@ def compute_exec_params_func( ) def malware( ray_name: str = "malware-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, @@ -126,6 +115,7 @@ def malware( """ Pipeline to execute malware transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -160,6 +150,16 @@ def malware( :param malware_output_column - output column name :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) diff --git a/transforms/code/proglang_select/kfp_ray/proglang_select_wf.py b/transforms/code/proglang_select/kfp_ray/proglang_select_wf.py index f1b271d3c..3ba7d8926 100644 --- a/transforms/code/proglang_select/kfp_ray/proglang_select_wf.py +++ b/transforms/code/proglang_select/kfp_ray/proglang_select_wf.py @@ -71,23 +71,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -106,6 +94,7 @@ def compute_exec_params_func( ) def lang_select( ray_name: str = "proglang-match-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, @@ -129,6 +118,7 @@ def lang_select( """ Pipeline to execute NOOP transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -165,6 +155,16 @@ def lang_select( (here we are assuming that select language info is in S3, but potentially in the different bucket) :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) diff --git a/transforms/code/repo_level_ordering/kfp_ray/repo_level_order_wf.py b/transforms/code/repo_level_ordering/kfp_ray/repo_level_order_wf.py index 47388f394..38099a192 100644 --- a/transforms/code/repo_level_ordering/kfp_ray/repo_level_order_wf.py +++ b/transforms/code/repo_level_ordering/kfp_ray/repo_level_order_wf.py @@ -87,23 +87,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -123,6 +111,7 @@ def compute_exec_params_func( def repo_level_order( # Ray cluster ray_name: str = "repo_level_order-kfp-ray", + ray_id_KFPv2: str = "", ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = { "replicas": 2, @@ -159,6 +148,7 @@ def repo_level_order( """ Pipeline to execute repo_level_order transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -201,6 +191,16 @@ def repo_level_order( :param repo_lvl_combine_rows - # If specified, output rows per repo are combined to form a single repo :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op( ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params diff --git a/transforms/language/doc_chunk/kfp_ray/doc_chunk_multiple_wf.py b/transforms/language/doc_chunk/kfp_ray/doc_chunk_multiple_wf.py index 62161be6d..5518f0ba1 100644 --- a/transforms/language/doc_chunk/kfp_ray/doc_chunk_multiple_wf.py +++ b/transforms/language/doc_chunk/kfp_ray/doc_chunk_multiple_wf.py @@ -73,23 +73,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -108,6 +96,7 @@ def compute_exec_params_func( def doc_chunk( # Ray cluster ray_name: str = "doc-json-chunk-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = { @@ -139,6 +128,7 @@ def doc_chunk( """ Pipeline to execute chunk documents transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -175,6 +165,16 @@ def doc_chunk( :param doc_chunk_dl_min_chunk_len - minimum chunk size :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op( ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params diff --git a/transforms/language/doc_chunk/kfp_ray/doc_chunk_wf.py b/transforms/language/doc_chunk/kfp_ray/doc_chunk_wf.py index 618c11d68..e671177a9 100644 --- a/transforms/language/doc_chunk/kfp_ray/doc_chunk_wf.py +++ b/transforms/language/doc_chunk/kfp_ray/doc_chunk_wf.py @@ -73,23 +73,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -109,6 +97,7 @@ def compute_exec_params_func( def doc_chunk( # Ray cluster ray_name: str = "doc-json-chunk-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, @@ -133,6 +122,7 @@ def doc_chunk( """ Pipeline to execute chunk documents transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -169,6 +159,16 @@ def doc_chunk( :param doc_chunk_dl_min_chunk_len - minimum chunk size :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) diff --git a/transforms/language/doc_quality/kfp_ray/doc_quality_multiple_wf.py b/transforms/language/doc_quality/kfp_ray/doc_quality_multiple_wf.py index 4a2d9de1d..2830ce32c 100644 --- a/transforms/language/doc_quality/kfp_ray/doc_quality_multiple_wf.py +++ b/transforms/language/doc_quality/kfp_ray/doc_quality_multiple_wf.py @@ -72,23 +72,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -107,6 +95,7 @@ def compute_exec_params_func( def doc_quality( # Ray cluster ray_name: str = "doc_quality-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image, "image_pull_policy": "Always"}, ray_worker_options: dict = { "replicas": 2, @@ -137,6 +126,7 @@ def doc_quality( """ Pipeline to execute Document Quality transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -172,6 +162,16 @@ def doc_quality( :param docq_bad_word_filepath - a path to bad word file :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at + # compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op( ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params diff --git a/transforms/language/doc_quality/kfp_ray/doc_quality_wf.py b/transforms/language/doc_quality/kfp_ray/doc_quality_wf.py index e26efe832..c4d6c7d43 100644 --- a/transforms/language/doc_quality/kfp_ray/doc_quality_wf.py +++ b/transforms/language/doc_quality/kfp_ray/doc_quality_wf.py @@ -72,23 +72,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -107,6 +95,7 @@ def compute_exec_params_func( def doc_quality( # Ray cluster ray_name: str = "doc_quality-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", ray_head_options: dict = { "cpu": 1, "memory": 4, @@ -143,6 +132,7 @@ def doc_quality( """ Pipeline to execute Document Quality transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -178,6 +168,17 @@ def doc_quality( :param docq_bad_word_filepath - a path to bad word file :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at + # compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER + # create clean_up task clean_up_task = cleanup_ray_op( ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params diff --git a/transforms/language/html2parquet/kfp_ray/html2parquet_wf.py b/transforms/language/html2parquet/kfp_ray/html2parquet_wf.py index b6f5dff19..b75064e79 100644 --- a/transforms/language/html2parquet/kfp_ray/html2parquet_wf.py +++ b/transforms/language/html2parquet/kfp_ray/html2parquet_wf.py @@ -71,23 +71,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -107,6 +95,7 @@ def compute_exec_params_func( def html2parquet( # Ray cluster ray_name: str = "html2parquet-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = { @@ -137,6 +126,7 @@ def html2parquet( """ Pipeline to execute html2parquet transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -171,6 +161,16 @@ def html2parquet( :param html2parquet_output_format - # Output format for the contents column. :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op( ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params diff --git a/transforms/language/lang_id/kfp_ray/lang_id_multiple_wf.py b/transforms/language/lang_id/kfp_ray/lang_id_multiple_wf.py index 941d32627..480f1a738 100644 --- a/transforms/language/lang_id/kfp_ray/lang_id_multiple_wf.py +++ b/transforms/language/lang_id/kfp_ray/lang_id_multiple_wf.py @@ -77,23 +77,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -112,6 +100,7 @@ def compute_exec_params_func( def lang_id( # Ray cluster ray_name: str = "lang_id-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = { @@ -145,6 +134,7 @@ def lang_id( """ Pipeline to execute Language Identification transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -183,6 +173,16 @@ def lang_id( :param lang_id_output_score_column_name - name of the output column to hold score of prediction :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op( ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params diff --git a/transforms/language/lang_id/kfp_ray/lang_id_wf.py b/transforms/language/lang_id/kfp_ray/lang_id_wf.py index fa4debbe3..b16243762 100644 --- a/transforms/language/lang_id/kfp_ray/lang_id_wf.py +++ b/transforms/language/lang_id/kfp_ray/lang_id_wf.py @@ -78,23 +78,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -113,6 +101,7 @@ def compute_exec_params_func( def lang_id( # Ray cluster ray_name: str = "lang_id-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = { @@ -146,6 +135,7 @@ def lang_id( """ Pipeline to execute Language Identification transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -184,6 +174,16 @@ def lang_id( :param lang_id_output_score_column_name - name of the output column to hold score of prediction :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op( ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params diff --git a/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_multiple_wf.py b/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_multiple_wf.py index 91d40567e..f1796ee9f 100644 --- a/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_multiple_wf.py +++ b/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_multiple_wf.py @@ -75,23 +75,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -110,6 +98,7 @@ def compute_exec_params_func( def pdf2parquet( # Ray cluster ray_name: str = "pdf2parquet-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = { @@ -142,6 +131,7 @@ def pdf2parquet( """ Pipeline to execute PDF2PARQUET transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -179,6 +169,16 @@ def pdf2parquet( :param pdf2parquet_bitmap_area_threshold - threshold for bitmaps :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op( ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params diff --git a/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py b/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py index 4dab7d4af..a6f308ea7 100644 --- a/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py +++ b/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py @@ -77,23 +77,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -113,6 +101,7 @@ def compute_exec_params_func( def pdf2parquet( # Ray cluster ray_name: str = "pdf2parquet-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = { @@ -146,6 +135,7 @@ def pdf2parquet( """ Pipeline to execute PDF2PARQUET transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -183,6 +173,16 @@ def pdf2parquet( :param pdf2parquet_bitmap_area_threshold - threshold for bitmaps :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op( ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params diff --git a/transforms/language/pii_redactor/kfp_ray/pii_redactor_wf.py b/transforms/language/pii_redactor/kfp_ray/pii_redactor_wf.py index b05aecd69..fb70f789a 100644 --- a/transforms/language/pii_redactor/kfp_ray/pii_redactor_wf.py +++ b/transforms/language/pii_redactor/kfp_ray/pii_redactor_wf.py @@ -66,23 +66,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -102,6 +90,7 @@ def compute_exec_params_func( def pii_redactor( # Ray cluster ray_name: str = "pii-redactor-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = { @@ -130,6 +119,7 @@ def pii_redactor( """ Pipeline to execute pii_redactor transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -163,6 +153,16 @@ def pii_redactor( :param pii_redactor_contents - column that has pii data and needs to be transformed by pii redactor transform :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op( ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params diff --git a/transforms/language/text_encoder/kfp_ray/text_encoder_multiple_wf.py b/transforms/language/text_encoder/kfp_ray/text_encoder_multiple_wf.py index 2005ee163..f746f4aef 100644 --- a/transforms/language/text_encoder/kfp_ray/text_encoder_multiple_wf.py +++ b/transforms/language/text_encoder/kfp_ray/text_encoder_multiple_wf.py @@ -71,23 +71,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -106,6 +94,7 @@ def compute_exec_params_func( def text_encoder( # Ray cluster ray_name: str = "text-encoder-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, @@ -129,6 +118,7 @@ def text_encoder( """ Pipeline to execute TextEncoder transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -164,6 +154,16 @@ def text_encoder( :param text_encoder_output_embeddings_column_name - name of the output column :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) diff --git a/transforms/language/text_encoder/kfp_ray/text_encoder_wf.py b/transforms/language/text_encoder/kfp_ray/text_encoder_wf.py index aa63e23f8..5e7421490 100644 --- a/transforms/language/text_encoder/kfp_ray/text_encoder_wf.py +++ b/transforms/language/text_encoder/kfp_ray/text_encoder_wf.py @@ -71,23 +71,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -107,6 +95,7 @@ def compute_exec_params_func( def text_encoder( # Ray cluster ray_name: str = "text-encoder-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, @@ -130,6 +119,7 @@ def text_encoder( """ Pipeline to execute TextEncoder transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -165,6 +155,16 @@ def text_encoder( :param text_encoder_output_embeddings_column_name - name of the output column :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) diff --git a/transforms/universal/doc_id/kfp_ray/doc_id_wf.py b/transforms/universal/doc_id/kfp_ray/doc_id_wf.py index c5d4cac6d..985139c92 100644 --- a/transforms/universal/doc_id/kfp_ray/doc_id_wf.py +++ b/transforms/universal/doc_id/kfp_ray/doc_id_wf.py @@ -80,23 +80,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -115,6 +103,7 @@ def compute_exec_params_func( def doc_id( # Ray cluster ray_name: str = "doc_id-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = { @@ -149,6 +138,7 @@ def doc_id( """ Pipeline to execute NOOP transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -185,6 +175,16 @@ def doc_id( :param doc_id_start_id - starting id :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op( ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params diff --git a/transforms/universal/ededup/kfp_ray/ededup_wf.py b/transforms/universal/ededup/kfp_ray/ededup_wf.py index 17c85b630..62db57fea 100644 --- a/transforms/universal/ededup/kfp_ray/ededup_wf.py +++ b/transforms/universal/ededup/kfp_ray/ededup_wf.py @@ -49,11 +49,7 @@ compute_exec_params_op = dsl.component_decorator.component( func=ededup_compute_execution_params, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex + else: compute_exec_params_op = comp.create_component_from_func( func=ededup_compute_execution_params, base_image=base_kfp_image @@ -78,6 +74,7 @@ def ededup( # Ray cluster ray_name: str = "ededup-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = { @@ -111,6 +108,7 @@ def ededup( """ Pipeline to execute EDEDUP transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -148,6 +146,16 @@ def ededup( :param ededup_n_samples - number of samples for parameters computation :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op( ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params diff --git a/transforms/universal/fdedup/kfp_ray/fdedup_wf.py b/transforms/universal/fdedup/kfp_ray/fdedup_wf.py index 51ead9c79..bf45ac197 100644 --- a/transforms/universal/fdedup/kfp_ray/fdedup_wf.py +++ b/transforms/universal/fdedup/kfp_ray/fdedup_wf.py @@ -64,11 +64,7 @@ compute_data_cleaning_exec_params_op = dsl.component_decorator.component( func=data_cleaning_compute_execution_params, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex + else: compute_common_params_op = comp.create_component_from_func(func=compute_common_params, base_image=base_kfp_image) compute_signature_calc_exec_params_op = comp.create_component_from_func( @@ -114,6 +110,7 @@ def fuzzydedup( # folders used # Ray cluster ray_name: str = "fuzzydedup-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = { "cpu": 8, @@ -164,6 +161,7 @@ def fuzzydedup( """ Pipeline to execute FDEDUP transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -208,6 +206,16 @@ def fuzzydedup( :param fdedup_n_samples - number of samples for parameters computation :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op( ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params diff --git a/transforms/universal/filter/kfp_ray/filter_wf.py b/transforms/universal/filter/kfp_ray/filter_wf.py index a18d2796d..26ae44489 100644 --- a/transforms/universal/filter/kfp_ray/filter_wf.py +++ b/transforms/universal/filter/kfp_ray/filter_wf.py @@ -72,23 +72,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -107,6 +95,7 @@ def compute_exec_params_func( def filtering( # Ray cluster ray_name: str = "filter-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, @@ -130,6 +119,7 @@ def filtering( """ Pipeline to execute Filtering transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -165,6 +155,16 @@ def filtering( :param filter_columns_to_drop - list of columns to drop after filtering :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) diff --git a/transforms/universal/hap/kfp_ray/hap_wf.py b/transforms/universal/hap/kfp_ray/hap_wf.py index 64c80fe37..46d1dba1a 100644 --- a/transforms/universal/hap/kfp_ray/hap_wf.py +++ b/transforms/universal/hap/kfp_ray/hap_wf.py @@ -79,23 +79,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -115,6 +103,7 @@ def compute_exec_params_func( def hap( # Ray cluster ray_name: str = "hap-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = { @@ -149,6 +138,7 @@ def hap( """ Pipeline to execute hap transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -187,6 +177,16 @@ def hap( :param batch_size - # batch size :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op( ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params diff --git a/transforms/universal/noop/kfp_ray/noop_multiple_wf.py b/transforms/universal/noop/kfp_ray/noop_multiple_wf.py index 9ed874f3d..dd535db5c 100644 --- a/transforms/universal/noop/kfp_ray/noop_multiple_wf.py +++ b/transforms/universal/noop/kfp_ray/noop_multiple_wf.py @@ -67,23 +67,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -102,6 +90,7 @@ def compute_exec_params_func( def noop( # Ray cluster ray_name: str = "noop-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, @@ -123,6 +112,7 @@ def noop( """ Pipeline to execute NOOP transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -156,6 +146,16 @@ def noop( :param noop_sleep_sec - noop sleep time :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) diff --git a/transforms/universal/noop/kfp_ray/noop_wf.py b/transforms/universal/noop/kfp_ray/noop_wf.py index 5a1ce393a..0392e9ab5 100644 --- a/transforms/universal/noop/kfp_ray/noop_wf.py +++ b/transforms/universal/noop/kfp_ray/noop_wf.py @@ -67,23 +67,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -104,6 +92,7 @@ def noop( # Ray cluster ray_name: str = "noop-kfp-ray", # name of Ray cluster # Add image_pull_secret, image_pull_policy and tolerations to ray options if needed + ray_id_KFPv2: str = "", ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", @@ -125,6 +114,7 @@ def noop( """ Pipeline to execute noop transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -158,6 +148,16 @@ def noop( :param noop_sleep_sec - noop sleep time :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op( ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params diff --git a/transforms/universal/profiler/kfp_ray/profiler_wf.py b/transforms/universal/profiler/kfp_ray/profiler_wf.py index 7a157c146..6300f62f8 100644 --- a/transforms/universal/profiler/kfp_ray/profiler_wf.py +++ b/transforms/universal/profiler/kfp_ray/profiler_wf.py @@ -78,6 +78,7 @@ def profiler( # Ray cluster ray_name: str = "profiler-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, @@ -102,6 +103,7 @@ def profiler( """ Pipeline to execute EDEDUP transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -137,6 +139,16 @@ def profiler( :param profiler_n_samples - number of samples for parameters computation :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) diff --git a/transforms/universal/resize/kfp_ray/resize_wf.py b/transforms/universal/resize/kfp_ray/resize_wf.py index 6a1403f18..89007c8be 100644 --- a/transforms/universal/resize/kfp_ray/resize_wf.py +++ b/transforms/universal/resize/kfp_ray/resize_wf.py @@ -76,23 +76,11 @@ def compute_exec_params_func( # KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use # this if/else statement and explicitly call the decorator. if os.getenv("KFPv2", "0") == "1": - # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create - # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to - # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at - # compilation time. - import uuid - compute_exec_params_op = dsl.component_decorator.component( func=compute_exec_params_func, base_image=base_kfp_image ) - print( - "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " - + "same version of the same pipeline !!!" - ) - run_id = uuid.uuid4().hex else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) - run_id = dsl.RUN_ID_PLACEHOLDER # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") @@ -111,6 +99,7 @@ def compute_exec_params_func( def resize( # Ray cluster ray_name: str = "resize-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, @@ -137,6 +126,7 @@ def resize( """ Pipeline to execute NOOP transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -172,6 +162,16 @@ def resize( :param resize_size_type - size type - disk/memory :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) diff --git a/transforms/universal/tokenization/kfp_ray/tokenization_wf.py b/transforms/universal/tokenization/kfp_ray/tokenization_wf.py index 82fc55ae2..15958665b 100644 --- a/transforms/universal/tokenization/kfp_ray/tokenization_wf.py +++ b/transforms/universal/tokenization/kfp_ray/tokenization_wf.py @@ -116,6 +116,7 @@ def compute_exec_params_func( def tokenization( # Ray cluster ray_name: str = "tkn-kfp-ray", # name of Ray cluster + ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2 # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, ray_worker_options: dict = { @@ -149,6 +150,7 @@ def tokenization( """ Pipeline to execute tokenization transform :param ray_name: name of the Ray cluster + :param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2 :param ray_head_options: head node options, containing the following: cpu - number of cpus memory - memory @@ -187,6 +189,16 @@ def tokenization( :param tkn_chunk_size - Specify >0 value to tokenize each row/text in chunks of characters (rounded in words) :return: None """ + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert + # a unique string created at compilation time. + if os.getenv("KFPv2", "0") == "1": + print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + "same version of the same pipeline !!!") + run_id = ray_id_KFPv2 + else: + run_id = dsl.RUN_ID_PLACEHOLDER # create clean_up task clean_up_task = cleanup_ray_op( ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params