diff --git a/.github/workflows/test-language-html2parquet-kfp.yml b/.github/workflows/test-language-html2parquet-kfp.yml new file mode 100644 index 000000000..d31e03e7d --- /dev/null +++ b/.github/workflows/test-language-html2parquet-kfp.yml @@ -0,0 +1,116 @@ +# +# DO NOT EDIT THIS FILE: it is generated from test-transform.template, Edit there and run make to change these files +# +name: Test KFP - transforms/language/html2parquet + +on: + workflow_dispatch: + push: + branches: + - "dev" + - "releases/**" + tags: + - "*" + paths: + - ".make.*" + - "transforms/.make.workflows" + - "transforms/language/html2parquet/**" + - "!kfp/**" # This is tested in separate workflow + - "!data-processing-lib/**" # This is tested in separate workflow + - "!**.md" + - "!**/doc/**" + - "!**/images/**" + - "!**.gitignore" + pull_request: + branches: + - "dev" + - "releases/**" + paths: + - ".make.*" + - "transforms/.make.workflows" + - "transforms/language/html2parquet/**" + - "!data-processing-lib/**" # This is tested in separate workflow + - "!kfp/**" # This is tested in separate workflow + - "!**.md" + - "!**/doc/**" + - "!**/images/**" + - "!**.gitignore" + +# taken from https://stackoverflow.com/questions/66335225/how-to-cancel-previous-runs-in-the-pr-when-you-push-new-commitsupdate-the-curre +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + test-kfp-v1: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Free up space in github runner + # Free space as indicated here : https://github.com/actions/runner-images/issues/2840#issuecomment-790492173 + run: | + df -h + sudo rm -rf "/usr/local/share/boost" + sudo rm -rf "$AGENT_TOOLSDIRECTORY" + sudo rm -rf /usr/share/dotnet /opt/ghc /usr/local/lib/android /usr/local/share/powershell /usr/share/swift /usr/lib/jvm /usr/local/.ghcup + sudo docker rmi $(docker image ls -aq) >/dev/null 2>&1 || true + df -h + - name: Import environment variables + run: | + cat scripts/k8s-setup/requirements.env >> $GITHUB_ENV + echo "K8S_SETUP_SCRIPTS=$PWD/scripts/k8s-setup" >> $GITHUB_ENV + echo "REPOROOT=$PWD" >> $GITHUB_ENV + echo "PATH=$PATH:/tmp" >> $GITHUB_ENV + - name: Test V1 KFP workflow for transforms/language/html2parquet + timeout-minutes: 120 + run: | + KFP_BLACK_LIST=$(./scripts/check-workflows.sh -show-kfp-black-list) + if [ -e "transforms/language/html2parquet/Makefile" -a -e "transforms/language/html2parquet/kfp_ray/Makefile" ]; then + transform=$(basename "transforms/language/html2parquet") + if echo ${KFP_BLACK_LIST} | grep -qv ${transform}; then + $PWD/scripts/workflow_helper.sh install-tools + $PWD/scripts/workflow_helper.sh test-workflow transforms/language/html2parquet + else + $PWD/scripts/workflow_helper.sh build-workflow transforms/language/html2parquet + fi + else + echo "Skipping transforms/language/html2parquet kfp test for lack of Makefile and/or kfp_ray/Makefile" + fi + + test-kfp-v2: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Free up space in github runner + # Free space as indicated here : https://github.com/actions/runner-images/issues/2840#issuecomment-790492173 + run: | + df -h + sudo rm -rf "/usr/local/share/boost" + sudo rm -rf "$AGENT_TOOLSDIRECTORY" + sudo rm -rf /usr/share/dotnet /opt/ghc /usr/local/lib/android /usr/local/share/powershell /usr/share/swift /usr/lib/jvm /usr/local/.ghcup + sudo docker rmi $(docker image ls -aq) >/dev/null 2>&1 || true + df -h + - name: Import environment variables + run: | + cat scripts/k8s-setup/requirements.env >> $GITHUB_ENV + echo "K8S_SETUP_SCRIPTS=$PWD/scripts/k8s-setup" >> $GITHUB_ENV + echo "REPOROOT=$PWD" >> $GITHUB_ENV + echo "PATH=$PATH:/tmp" >> $GITHUB_ENV + echo "KFPv2=1" >> $GITHUB_ENV + - name: Test V2 KFP workflow for transforms/language/html2parquet + timeout-minutes: 120 + run: | + KFP_BLACK_LIST=$(./scripts/check-workflows.sh -show-kfp-black-list) + if [ -e "transforms/language/html2parquet/Makefile" -a -e "transforms/language/html2parquet/kfp_ray/Makefile" ]; then + transform=$(basename "transforms/language/html2parquet") + if echo ${KFP_BLACK_LIST} | grep -qv ${transform}; then + $PWD/scripts/workflow_helper.sh install-tools + $PWD/scripts/workflow_helper.sh test-workflow transforms/language/html2parquet + else + $PWD/scripts/workflow_helper.sh build-workflow transforms/language/html2parquet + fi + else + echo "Skipping transforms/language/html2parquet kfp test for lack of Makefile and/or kfp_ray/Makefile" + fi \ No newline at end of file diff --git a/scripts/k8s-setup/populate_minio.sh b/scripts/k8s-setup/populate_minio.sh index 3b22b37e7..1ced8fdb3 100755 --- a/scripts/k8s-setup/populate_minio.sh +++ b/scripts/k8s-setup/populate_minio.sh @@ -33,6 +33,7 @@ mc cp --recursive ${REPOROOT}/transforms/language/doc_quality/ray/test-data/inpu mc cp --recursive ${REPOROOT}/transforms/language/pdf2parquet/ray/test-data/input/2206.01062.pdf kfp/test/pdf2parquet/input mc cp --recursive ${REPOROOT}/transforms/language/text_encoder/ray/test-data/input/ kfp/test/text_encoder/input mc cp --recursive ${REPOROOT}/transforms/language/doc_chunk/ray/test-data/input/ kfp/test/doc_chunk/input +mc cp --recursive ${REPOROOT}/transforms/language/html2parquet/ray/test-data/input/test1.html kfp/test/html2parquet/input # universal mc cp --recursive ${REPOROOT}/transforms/universal/doc_id/ray/test-data/input/ kfp/test/doc_id/input mc cp --recursive ${REPOROOT}/transforms/universal/ededup/ray/test-data/input/ kfp/test/ededup/input diff --git a/transforms/language/html2parquet/kfp_ray/Makefile b/transforms/language/html2parquet/kfp_ray/Makefile new file mode 100644 index 000000000..f4cb64903 --- /dev/null +++ b/transforms/language/html2parquet/kfp_ray/Makefile @@ -0,0 +1,48 @@ +REPOROOT=${CURDIR}/../../../../ +WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate +include $(REPOROOT)/transforms/.make.workflows + +# Include the common configuration for this transform +include ../transform.config + +SRC_DIR=${CURDIR}/../ray/ + +PYTHON_WF := $(shell find ./ -name '*_wf.py') +YAML_WF := $(patsubst %.py, %.yaml, ${PYTHON_WF}) + +workflow-venv: .check_python_version ${WORKFLOW_VENV_ACTIVATE} + +.PHONY: clean +clean: + @# Help: Clean up the virtual environment. + rm -rf ${REPOROOT}/transforms/venv + +venv:: + +build:: + +test:: + +test-src:: + +test-image:: + +publish:: + +image:: + +load-image:: + +.PHONY: workflow-build +workflow-build: workflow-venv + $(MAKE) $(YAML_WF) + +.PHONY: workflow-test +workflow-test: workflow-build + $(MAKE) .workflows.test-pipeline TRANSFORM_SRC=${SRC_DIR} PIPELINE_FILE=html2parquet_wf.yaml + +.PHONY: workflow-upload +workflow-upload: + @for file in $(YAML_WF); do \ + $(MAKE) .workflows.upload-pipeline PIPELINE_FILE=$$file; \ + done diff --git a/transforms/language/html2parquet/kfp_ray/html2parquet_wf.py b/transforms/language/html2parquet/kfp_ray/html2parquet_wf.py new file mode 100644 index 000000000..256a6d90c --- /dev/null +++ b/transforms/language/html2parquet/kfp_ray/html2parquet_wf.py @@ -0,0 +1,205 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os + +import kfp.compiler as compiler +import kfp.components as comp +import kfp.dsl as dsl + +from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils + + +task_image = "quay.io/dataprep1/data-prep-kit/html2parquet-ray:latest" + +# the name of the job script +EXEC_SCRIPT_NAME: str = "html2parquet_transform_ray.py" + +# components +base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest" + +# path to kfp component specifications files +component_spec_path = "../../../../kfp/kfp_ray_components/" + +# compute execution parameters. Here different transforms might need different implementations. As +# a result, instead of creating a component we are creating it in place here. +def compute_exec_params_func( + worker_options: dict, + actor_options: dict, + data_s3_config: str, + data_max_files: int, + data_num_samples: int, + runtime_pipeline_id: str, + runtime_job_id: str, + runtime_code_location: dict, + output_format: str, + data_files_to_use: str, +) -> dict: + from runtime_utils import KFPUtils + + return { + "data_s3_config": data_s3_config, + "data_max_files": data_max_files, + "data_num_samples": data_num_samples, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), + "runtime_pipeline_id": runtime_pipeline_id, + "runtime_job_id": runtime_job_id, + "runtime_code_location": str(runtime_code_location), + "html2parquet_output_format": output_format, + "data_files_to_use": data_files_to_use, + } + + +# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the +# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path. +# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use +# this if/else statement and explicitly call the decorator. +if os.getenv("KFPv2", "0") == "1": + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at + # compilation time. + import uuid + + compute_exec_params_op = dsl.component_decorator.component( + func=compute_exec_params_func, base_image=base_kfp_image + ) + print( + "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + + "same version of the same pipeline !!!" + ) + run_id = uuid.uuid4().hex +else: + compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) + run_id = dsl.RUN_ID_PLACEHOLDER + +# create Ray cluster +create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") +# execute job +execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "executeRayJobComponent.yaml") +# clean up Ray +cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml") + +# Task name is part of the pipeline name, the ray cluster name and the job name in DMF. +TASK_NAME: str = "html2parquet" + + +@dsl.pipeline( + name=TASK_NAME + "-ray-pipeline", + description="Pipeline for html2parquet task", +) +def html2parquet( + # Ray cluster + ray_name: str = "html2parquet-kfp-ray", # name of Ray cluster + # Add image_pull_secret and image_pull_policy to ray workers if needed + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, + server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", + # data access + data_s3_config: str = "{'input_folder': 'test/html2parquet/input/', 'output_folder': 'test/html2parquet/output/'}", + data_s3_access_secret: str = "s3-secret", + data_max_files: int = -1, + data_num_samples: int = -1, + data_files_to_use: str = "['.html', '.zip']", + # orchestrator + runtime_actor_options: dict = {'num_cpus': 0.8}, + runtime_pipeline_id: str = "pipeline_id", + runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, + # html2parquet parameters + output_format: str = "markdown", + # additional parameters + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', +): + """ + Pipeline to execute html2parquet transform + :param ray_name: name of the Ray cluster + :param ray_head_options: head node options, containing the following: + cpu - number of cpus + memory - memory + image - image to use + image_pull_secret - image pull secret + tolerations - (optional) tolerations for the ray pods + :param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following: + replicas - number of replicas to create + max_replicas - max number of replicas + min_replicas - min number of replicas + cpu - number of cpus + memory - memory + image - image to use + image_pull_secret - image pull secret + tolerations - (optional) tolerations for the ray pods + :param server_url - server url + :param additional_params: additional (support) parameters, containing the following: + wait_interval - wait interval for API server, sec + wait_cluster_ready_tmout - time to wait for cluster ready, sec + wait_cluster_up_tmout - time to wait for cluster up, sec + wait_job_ready_tmout - time to wait for job ready, sec + wait_print_tmout - time between prints, sec + http_retries - http retries for API server calls + :param data_s3_access_secret - s3 access secret + :param data_s3_config - s3 configuration + :param data_max_files - max files to process + :param data_num_samples - num samples to process + :param runtime_actor_options - actor options + :param runtime_pipeline_id - pipeline id + :param runtime_code_location - code location + :param output_format - output format + :return: None + """ + # create clean_up task + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) + # pipeline definition + with dsl.ExitHandler(clean_up_task): + # compute execution params + compute_exec_params = compute_exec_params_op( + worker_options=ray_worker_options, + actor_options=runtime_actor_options, + data_s3_config=data_s3_config, + data_max_files=data_max_files, + data_num_samples=data_num_samples, + runtime_pipeline_id=runtime_pipeline_id, + runtime_job_id=run_id, + runtime_code_location=runtime_code_location, + output_format=output_format, + data_files_to_use=data_files_to_use, + ) + + ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2) + # start Ray cluster + ray_cluster = create_ray_op( + ray_name=ray_name, + run_id=run_id, + ray_head_options=ray_head_options, + ray_worker_options=ray_worker_options, + server_url=server_url, + additional_params=additional_params, + ) + ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2) + ray_cluster.after(compute_exec_params) + + # Execute job + execute_job = execute_ray_jobs_op( + ray_name=ray_name, + run_id=run_id, + additional_params=additional_params, + exec_params=compute_exec_params.output, + exec_script_name=EXEC_SCRIPT_NAME, + server_url=server_url, + ) + ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC) + ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret) + execute_job.after(ray_cluster) + +if __name__ == "__main__": + # Compiling the pipeline + compiler.Compiler().compile(html2parquet, __file__.replace(".py", ".yaml"))