diff --git a/kfp/Add_new_kfp_workflow.md b/kfp/Add_new_kfp_workflow.md new file mode 100644 index 0000000000..53ed337578 --- /dev/null +++ b/kfp/Add_new_kfp_workflow.md @@ -0,0 +1,7 @@ +# Adding new KFP workflows + +This README outlines the steps to add a new KFP workflow for a new transform: + +- Generate the workflow using the [pipeline generator](./pipeline_generator). +- Add the path to the transform input directory in the [populate_minio script](../scripts/k8s-setup/populate_minio.sh). This path is used when testing the workflow. +- Create a GitHub Action for the kfp workflow using the `make` command in the [github directory](../.github/workflows/README.md). \ No newline at end of file diff --git a/kfp/pipeline_generator/single-pipeline/example/pipeline_definitions.yaml b/kfp/pipeline_generator/single-pipeline/example/pipeline_definitions.yaml index c0e3dc51a1..e9685f30a5 100644 --- a/kfp/pipeline_generator/single-pipeline/example/pipeline_definitions.yaml +++ b/kfp/pipeline_generator/single-pipeline/example/pipeline_definitions.yaml @@ -3,14 +3,14 @@ pipeline_parameters: description: "Pipeline for noop task" script_name: "noop_transform.py" prefix: "" - multi_s3: True + multi_s3: False compute_func_name: "" compute_func_import: "" component_spec_path: "" pipeline_common_input_parameters_values: - kfp_base_image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0.dev6" - transform_image: "quay.io/dataprep1/data-prep-kit/noop-ray:0.9.0.dev6" + kfp_base_image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest" + transform_image: "quay.io/dataprep1/data-prep-kit/noop-ray:latest" s3_access_secret: "s3-secret" image_pull_secret: "prod-all-icr-io" input_folder: "test/noop/input/" diff --git a/transforms/language/html2parquet/kfp_ray/html2parquet_wf.py b/transforms/language/html2parquet/kfp_ray/html2parquet_wf.py index 256a6d90ce..ff100e5059 100644 --- a/transforms/language/html2parquet/kfp_ray/html2parquet_wf.py +++ b/transforms/language/html2parquet/kfp_ray/html2parquet_wf.py @@ -40,8 +40,8 @@ def compute_exec_params_func( runtime_pipeline_id: str, runtime_job_id: str, runtime_code_location: dict, - output_format: str, data_files_to_use: str, + html2parquet_output_format: str, ) -> dict: from runtime_utils import KFPUtils @@ -54,8 +54,8 @@ def compute_exec_params_func( "runtime_pipeline_id": runtime_pipeline_id, "runtime_job_id": runtime_job_id, "runtime_code_location": str(runtime_code_location), - "html2parquet_output_format": output_format, "data_files_to_use": data_files_to_use, + "html2parquet_output_format": html2parquet_output_format, } @@ -109,13 +109,14 @@ def html2parquet( data_s3_access_secret: str = "s3-secret", data_max_files: int = -1, data_num_samples: int = -1, - data_files_to_use: str = "['.html', '.zip']", + data_checkpointing: bool = False, # orchestrator runtime_actor_options: dict = {'num_cpus': 0.8}, runtime_pipeline_id: str = "pipeline_id", runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # html2parquet parameters - output_format: str = "markdown", + data_files_to_use: str = "['.html', '.zip']", + html2parquet_output_format: str = "markdown", # additional parameters additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): @@ -152,7 +153,8 @@ def html2parquet( :param runtime_actor_options - actor options :param runtime_pipeline_id - pipeline id :param runtime_code_location - code location - :param output_format - output format + :param data_files_to_use - # file extensions to use for processing + :param html2parquet_output_format - # Output format for the contents column. :return: None """ # create clean_up task @@ -170,8 +172,8 @@ def html2parquet( runtime_pipeline_id=runtime_pipeline_id, runtime_job_id=run_id, runtime_code_location=runtime_code_location, - output_format=output_format, data_files_to_use=data_files_to_use, + html2parquet_output_format=html2parquet_output_format, ) ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2) diff --git a/transforms/language/html2parquet/kfp_ray/pipeline_definitions.yaml b/transforms/language/html2parquet/kfp_ray/pipeline_definitions.yaml new file mode 100644 index 0000000000..475c1d981a --- /dev/null +++ b/transforms/language/html2parquet/kfp_ray/pipeline_definitions.yaml @@ -0,0 +1,28 @@ +pipeline_parameters: + name: "html2parquet" + description: "Pipeline for html2parquet task" + script_name: "html2parquet_transform_ray.py" + prefix: "" + multi_s3: False + compute_func_name: "" + compute_func_import: "" + component_spec_path: "" + +pipeline_common_input_parameters_values: + kfp_base_image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest" + transform_image: "quay.io/dataprep1/data-prep-kit/html2parquet-ray:latest" + s3_access_secret: "s3-secret" + image_pull_secret: "prod-all-icr-io" + input_folder: "test/html2parquet/input/" + output_folder: "test/html2parquet/output/" + +pipeline_transform_input_parameters: + pipeline_arguments: + - name: data_files_to_use + type: "str" + value: "['.html', '.zip']" + description: "# file extensions to use for processing" + - name: html2parquet_output_format + type: "str" + value: "markdown" + description: "# Output format for the contents column."