Skip to content

Commit

Permalink
fix relative locations
Browse files Browse the repository at this point in the history
Signed-off-by: Alexey Roytman <[email protected]>
  • Loading branch information
roytman committed Sep 22, 2024
1 parent 974cae7 commit 558117d
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 33 deletions.
2 changes: 1 addition & 1 deletion kfp/pipeline_generator/single-pipeline/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## Steps to generate a new pipeline
- create a `pipeline_definitions.yaml` file for the required task (similar to the example [pipeline_definitions.yaml for the noop task](./example/pipeline_definitions.yaml)).
- create a `pipeline_definitions.yaml` file for the required task (similar to the example [pipeline_definitions.yaml for the noop task](../../../transforms/universal/noop/kfp_ray/pipeline_definitions.yaml)).
- execute `./run.sh --config_file <pipeline_definitions_file_path> --output_dir_file <destination directory>`. When `pipeline_definitions_file_path` is the path of the `pipeline_definitions.yaml` file that defines the pipeline and `destination directory` is a directory where new pipeline file
will be generated.
18 changes: 6 additions & 12 deletions kfp/pipeline_generator/single-pipeline/pipeline_generator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PRE_COMMIT = "../pre-commit-config.yaml"

PIPELINE_TEMPLATE_FILE = "simple_pipeline.py"

INPUT_PARAMETERS = "input_parameters"
Expand All @@ -13,13 +13,15 @@

if __name__ == "__main__":
import argparse

import os
import yaml
from jinja2 import Environment, FileSystemLoader

environment = Environment(loader=FileSystemLoader("templates/"))
script_dir = os.path.dirname(os.path.abspath(__file__))
environment = Environment(loader=FileSystemLoader(f"{script_dir}/templates/"))
template = environment.get_template(PIPELINE_TEMPLATE_FILE)

#pre_commit_config = f"{script_dir}/../pre-commit-config.yaml"
parser = argparse.ArgumentParser(description="Kubeflow pipeline generator for Foundation Models")
parser.add_argument("-c", "--config_file", type=str, default="")
parser.add_argument("-od", "--output_dir_file", type=str, default="")
Expand Down Expand Up @@ -50,16 +52,8 @@
image_pull_secret=common_input_params_values["image_pull_secret"],
multi_s3=pipeline_parameters["multi_s3"],
)

output_file = f"{args.output_dir_file}/{pipeline_parameters[NAME]}_wf.py"
output_file = f"{args.output_dir_file}{pipeline_parameters[NAME]}_wf.py"
with open(output_file, mode="w", encoding="utf-8") as message:
message.write(content)
print(f"... wrote {output_file}")

import sys

from pre_commit.main import main

print(f"Pipeline ${output_file} auto generation completed")
args = ["run", "--file", f"{output_file}", "-c", PRE_COMMIT]
sys.exit(main(args))
5 changes: 3 additions & 2 deletions kfp/pipeline_generator/single-pipeline/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ ROOT_DIR=${PWD}
mkdir -p ${ROOT_DIR}/${DIST_DIR}/
python3 -m venv venv
source venv/bin/activate
pip install pre-commit
pip install jinja2
python3 pipeline_generator.py -c ${DEF_FILE} -od ${ROOT_DIR}/${DIST_DIR}/
script_dir="$(dirname "$(readlink -f "$0")")"
echo $PYTHONPATH
python3 ${script_dir}/pipeline_generator.py -c ${DEF_FILE} -od ${ROOT_DIR}/${DIST_DIR}/
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import os

import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl

from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils


Expand Down Expand Up @@ -101,19 +101,21 @@ def compute_exec_params_func(
)
def {{ pipeline_name }}(
# Ray cluster
ray_name: str = "{{ pipeline_name }}-kfp-ray",
ray_name: str = "{{ pipeline_name }}-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
{% if multi_s3 == False %}
{%- if multi_s3 == False %}
data_s3_config: str = "{'input_folder': '{{ input_folder }}', 'output_folder': '{{ output_folder }}'}",
{% else %}
{%- else %}
data_s3_config: str = ["{'input_folder': '{{ input_folder }}', 'output_folder': '{{ output_folder }}'}"],
{% endif %}
{%- endif %}
data_s3_access_secret: str = "{{ s3_access_secret }}",
data_max_files: int = -1,
data_num_samples: int = -1,
data_checkpointing: bool = False,
# orchestrator
runtime_actor_options: dict = {'num_cpus': 0.8},
runtime_pipeline_id: str = "pipeline_id",
Expand Down Expand Up @@ -209,4 +211,4 @@ def {{ pipeline_name }}(

if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile({{ pipeline_name }}, __file__.replace(".py", ".yaml"))
compiler.Compiler().compile({{ pipeline_name }}, __file__.replace(".py", ".yaml"))
5 changes: 1 addition & 4 deletions transforms/universal/noop/kfp_ray/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,4 @@ workflow-upload: workflow-build

.PHONY: workflow-generate
workflow-generate:
cd ../../../../kfp/pipeline_generator/single-pipeline/ && \
./run.sh --config_file ../../../transforms/universal/noop/kfp_ray/pipeline_definitions.yaml --output_dir_file ../../../transforms/universal/noop/kfp_ray/ && \
cd ../../../transforms/universal/noop/kfp_ray/
done
../../../../kfp/pipeline_generator/single-pipeline/run.sh -c `pwd`/pipeline_definitions.yaml -od .
2 changes: 1 addition & 1 deletion transforms/universal/noop/kfp_ray/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This project allows execution of the [noop Ray transform](../ray) as a

The detail pipeline is presented in the [Simplest Transform pipeline tutorial](../../../../kfp/doc/simple_transform_pipeline.md)

## Generate
## Pipeline file generation
In order to generate a pipeline python file run
```shell
make workflow-generate
Expand Down
11 changes: 6 additions & 5 deletions transforms/universal/noop/kfp_ray/noop_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl

from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils


Expand Down Expand Up @@ -92,7 +93,7 @@ def compute_exec_params_func(

@dsl.pipeline(
name=TASK_NAME + "-ray-pipeline",
description="Pipeline for noop",
description="Pipeline for noop task",
)
def noop(
# Ray cluster
Expand All @@ -117,7 +118,7 @@ def noop(
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
):
"""
Pipeline to execute NOOP transform
Pipeline to execute noop transform
:param ray_name: name of the Ray cluster
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
Expand Down Expand Up @@ -167,6 +168,7 @@ def noop(
runtime_code_location=runtime_code_location,
noop_sleep_sec=noop_sleep_sec,
)

ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2)
# start Ray cluster
ray_cluster = create_ray_op(
Expand All @@ -179,12 +181,12 @@ def noop(
)
ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2)
ray_cluster.after(compute_exec_params)

# Execute job
execute_job = execute_ray_jobs_op(
ray_name=ray_name,
run_id=run_id,
additional_params=additional_params,
# note that the parameters below are specific for NOOP transform
exec_params=compute_exec_params.output,
exec_script_name=EXEC_SCRIPT_NAME,
server_url=server_url,
Expand All @@ -193,7 +195,6 @@ def noop(
ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret)
execute_job.after(ray_cluster)


if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile(noop, __file__.replace(".py", ".yaml"))
compiler.Compiler().compile(noop, __file__.replace(".py", ".yaml"))
4 changes: 2 additions & 2 deletions transforms/universal/noop/kfp_ray/pipeline_definitions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pipeline_parameters:
description: "Pipeline for noop task"
script_name: "noop_transform_ray.py"
prefix: ""
multi_s3: True
multi_s3: False
compute_func_name: ""
compute_func_import: ""
component_spec_path: ""
Expand All @@ -21,4 +21,4 @@ pipeline_transform_input_parameters:
- name: "noop_sleep_sec"
type: "int"
value: 10
description: "# noop sleep time"
description: "noop sleep time"

0 comments on commit 558117d

Please sign in to comment.