Skip to content

Commit

Permalink
Obtain the Ray cluster run ID from the user for KFP v2.
Browse files Browse the repository at this point in the history
Signed-off-by: Revital Sur <[email protected]>
  • Loading branch information
revit13 committed Jan 20, 2025
1 parent c8096b1 commit 79e7e08
Show file tree
Hide file tree
Showing 32 changed files with 378 additions and 330 deletions.
25 changes: 20 additions & 5 deletions kfp/doc/simple_transform_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,16 @@ Ray cluster. For each step we have to define a component that will execute them:
```python
# components
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.0.2"
# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
compute_exec_params_op = comp.func_to_container_op(
func=ComponentUtils.default_compute_execution_params, base_image=base_kfp_image
)
# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the
# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path.
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
# create Ray cluster
create_ray_op = comp.load_component_from_file("../../../kfp_ray_components/createRayComponent.yaml")
# execute job
Expand Down Expand Up @@ -148,6 +153,16 @@ Now, when all components and input parameters are defined, we can implement pipe
component execution and parameters submitted to every component.

```python
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert
# a unique string created at compilation time.
if os.getenv("KFPv2", "0") == "1":
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
"same version of the same pipeline !!!")
run_id = ray_id_KFPv2
else:
run_id = dsl.RUN_ID_PLACEHOLDER
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ def runtime_name(ray_name: str = "", run_id: str = "") -> str:
# the return value plus namespace name will be the name of the Ray Route,
# which length is restricted to 64 characters,
# therefore we restrict the return name by 15 character.
if run_id != "":
return f"{ray_name[:9]}-{run_id[:5]}"
return ray_name[:15]
if run_id == "":
logger.error("Run ID must not be provided")
sys.exit(1)
return f"{ray_name[:9]}-{run_id[:5]}"

@staticmethod
def dict_to_req(d: dict[str, Any], executor: str = "transformer_launcher.py") -> str:
Expand Down
25 changes: 13 additions & 12 deletions kfp/pipeline_generator/single-pipeline/templates/simple_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,11 @@ def compute_exec_params_func(
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
+ "same version of the same pipeline !!!"
)
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER

# create Ray cluster
create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml")
Expand All @@ -111,9 +99,11 @@ def {{ pipeline_name }}(
ray_name: str = "{{ pipeline_name }}-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
{%- if image_pull_secret != "" %}
ray_id_KFPv2: str = "",
ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": "{{ image_pull_secret }}", "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": "{{ image_pull_secret }}", "image": task_image},
{%- else %}
ray_id_KFPv2: str = "",
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
{%- endif %}
Expand Down Expand Up @@ -142,6 +132,7 @@ def {{ pipeline_name }}(
"""
Pipeline to execute {{ pipeline_name }} transform
:param ray_name: name of the Ray cluster
:param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
memory - memory
Expand Down Expand Up @@ -177,6 +168,16 @@ def {{ pipeline_name }}(
{%- endfor %}
:return: None
"""
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert
# a unique string created at compilation time.
if os.getenv("KFPv2", "0") == "1":
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
"same version of the same pipeline !!!")
run_id = ray_id_KFPv2
else:
run_id = dsl.RUN_ID_PLACEHOLDER
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
Expand Down
24 changes: 12 additions & 12 deletions transforms/code/code2parquet/kfp_ray/code2parquet_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,11 @@ def compute_exec_params_func(
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
+ "same version of the same pipeline !!!"
)
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER


# create Ray cluster
Expand All @@ -113,6 +101,7 @@ def compute_exec_params_func(
)
def code2parquet(
ray_name: str = "code2parquet-kfp-ray", # name of Ray cluster
ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
Expand All @@ -139,6 +128,7 @@ def code2parquet(
"""
Pipeline to execute NOOP transform
:param ray_name: name of the Ray cluster
:param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
memory - memory
Expand Down Expand Up @@ -178,6 +168,16 @@ def code2parquet(
(here we are assuming that select language info is in S3, but potentially in the different bucket)
:return: None
"""
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert
# a unique string created at compilation time.
if os.getenv("KFPv2", "0") == "1":
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
"same version of the same pipeline !!!")
run_id = ray_id_KFPv2
else:
run_id = dsl.RUN_ID_PLACEHOLDER
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
Expand Down
24 changes: 12 additions & 12 deletions transforms/code/code_quality/kfp_ray/code_quality_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,11 @@ def compute_exec_params_func(
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
+ "same version of the same pipeline !!!"
)
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER


# create Ray cluster
Expand All @@ -112,6 +100,7 @@ def compute_exec_params_func(
def code_quality(
# Ray cluster
ray_name: str = "code_quality-kfp-ray", # name of Ray cluster
ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
Expand All @@ -136,6 +125,7 @@ def code_quality(
"""
Pipeline to execute Code Quality transform
:param ray_name: name of the Ray cluster
:param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
memory - memory
Expand Down Expand Up @@ -171,6 +161,16 @@ def code_quality(
:param cq_hf_token - Huggingface auth token to download and use the tokenizer
:return: None
"""
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert
# a unique string created at compilation time.
if os.getenv("KFPv2", "0") == "1":
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
"same version of the same pipeline !!!")
run_id = ray_id_KFPv2
else:
run_id = dsl.RUN_ID_PLACEHOLDER
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
Expand Down
14 changes: 2 additions & 12 deletions transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,11 @@ def compute_exec_params_func(
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
+ "same version of the same pipeline !!!"
)
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER


# create Ray cluster
Expand All @@ -120,6 +108,7 @@ def compute_exec_params_func(
def header_cleanser(
# Ray cluster
ray_name: str = "header_cleanser-kfp-ray", # name of Ray cluster
ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
Expand Down Expand Up @@ -148,6 +137,7 @@ def header_cleanser(
"""
Pipeline to execute Header Cleanser transform
:param ray_name: name of the Ray cluster
:param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
memory - memory
Expand Down
24 changes: 12 additions & 12 deletions transforms/code/license_select/kfp_ray/license_select_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,11 @@ def compute_exec_params_func(
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
+ "same version of the same pipeline !!!"
)
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER

# create Ray cluster
create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml")
Expand All @@ -106,6 +94,7 @@ def compute_exec_params_func(
)
def license_select(
ray_name: str = "license_select-kfp-ray", # name of Ray cluster
ray_id_KFPv2: str = "", # Ray cluster unique ID used only in KFP v2
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {
Expand Down Expand Up @@ -135,6 +124,7 @@ def license_select(
"""
Pipeline to execute License Select transform
:param ray_name: name of the Ray cluster
:param ray_id_KFPv2: string holding the id used for the Ray cluster used only in KFP v2
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
memory - memory
Expand Down Expand Up @@ -166,6 +156,16 @@ def license_select(
:param lc_licenses_file - path to license list json file
:return: None
"""
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime the user is requested to insert
# a unique string created at compilation time.
if os.getenv("KFPv2", "0") == "1":
print("WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
"same version of the same pipeline !!!")
run_id = ray_id_KFPv2
else:
run_id = dsl.RUN_ID_PLACEHOLDER
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
Expand Down
Loading

0 comments on commit 79e7e08

Please sign in to comment.