Skip to content

Commit

Permalink
Add kfp workflow to html2parquet.
Browse files Browse the repository at this point in the history
Signed-off-by: Revital Sur <[email protected]>
  • Loading branch information
revit13 committed Oct 12, 2024
1 parent 4c7f4a5 commit f5d15f0
Show file tree
Hide file tree
Showing 4 changed files with 370 additions and 0 deletions.
116 changes: 116 additions & 0 deletions .github/workflows/test-language-html2parquet-kfp.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#
# DO NOT EDIT THIS FILE: it is generated from test-transform.template, Edit there and run make to change these files
#
name: Test KFP - transforms/language/html2parquet

on:
workflow_dispatch:
push:
branches:
- "dev"
- "releases/**"
tags:
- "*"
paths:
- ".make.*"
- "transforms/.make.workflows"
- "transforms/language/html2parquet/**"
- "!kfp/**" # This is tested in separate workflow
- "!data-processing-lib/**" # This is tested in separate workflow
- "!**.md"
- "!**/doc/**"
- "!**/images/**"
- "!**.gitignore"
pull_request:
branches:
- "dev"
- "releases/**"
paths:
- ".make.*"
- "transforms/.make.workflows"
- "transforms/language/html2parquet/**"
- "!data-processing-lib/**" # This is tested in separate workflow
- "!kfp/**" # This is tested in separate workflow
- "!**.md"
- "!**/doc/**"
- "!**/images/**"
- "!**.gitignore"

# taken from https://stackoverflow.com/questions/66335225/how-to-cancel-previous-runs-in-the-pr-when-you-push-new-commitsupdate-the-curre
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
test-kfp-v1:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Free up space in github runner
# Free space as indicated here : https://github.com/actions/runner-images/issues/2840#issuecomment-790492173
run: |
df -h
sudo rm -rf "/usr/local/share/boost"
sudo rm -rf "$AGENT_TOOLSDIRECTORY"
sudo rm -rf /usr/share/dotnet /opt/ghc /usr/local/lib/android /usr/local/share/powershell /usr/share/swift /usr/lib/jvm /usr/local/.ghcup
sudo docker rmi $(docker image ls -aq) >/dev/null 2>&1 || true
df -h
- name: Import environment variables
run: |
cat scripts/k8s-setup/requirements.env >> $GITHUB_ENV
echo "K8S_SETUP_SCRIPTS=$PWD/scripts/k8s-setup" >> $GITHUB_ENV
echo "REPOROOT=$PWD" >> $GITHUB_ENV
echo "PATH=$PATH:/tmp" >> $GITHUB_ENV
- name: Test V1 KFP workflow for transforms/language/html2parquet
timeout-minutes: 120
run: |
KFP_BLACK_LIST=$(./scripts/check-workflows.sh -show-kfp-black-list)
if [ -e "transforms/language/html2parquet/Makefile" -a -e "transforms/language/html2parquet/kfp_ray/Makefile" ]; then
transform=$(basename "transforms/language/html2parquet")
if echo ${KFP_BLACK_LIST} | grep -qv ${transform}; then
$PWD/scripts/workflow_helper.sh install-tools
$PWD/scripts/workflow_helper.sh test-workflow transforms/language/html2parquet
else
$PWD/scripts/workflow_helper.sh build-workflow transforms/language/html2parquet
fi
else
echo "Skipping transforms/language/html2parquet kfp test for lack of Makefile and/or kfp_ray/Makefile"
fi
test-kfp-v2:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Free up space in github runner
# Free space as indicated here : https://github.com/actions/runner-images/issues/2840#issuecomment-790492173
run: |
df -h
sudo rm -rf "/usr/local/share/boost"
sudo rm -rf "$AGENT_TOOLSDIRECTORY"
sudo rm -rf /usr/share/dotnet /opt/ghc /usr/local/lib/android /usr/local/share/powershell /usr/share/swift /usr/lib/jvm /usr/local/.ghcup
sudo docker rmi $(docker image ls -aq) >/dev/null 2>&1 || true
df -h
- name: Import environment variables
run: |
cat scripts/k8s-setup/requirements.env >> $GITHUB_ENV
echo "K8S_SETUP_SCRIPTS=$PWD/scripts/k8s-setup" >> $GITHUB_ENV
echo "REPOROOT=$PWD" >> $GITHUB_ENV
echo "PATH=$PATH:/tmp" >> $GITHUB_ENV
echo "KFPv2=1" >> $GITHUB_ENV
- name: Test V2 KFP workflow for transforms/language/html2parquet
timeout-minutes: 120
run: |
KFP_BLACK_LIST=$(./scripts/check-workflows.sh -show-kfp-black-list)
if [ -e "transforms/language/html2parquet/Makefile" -a -e "transforms/language/html2parquet/kfp_ray/Makefile" ]; then
transform=$(basename "transforms/language/html2parquet")
if echo ${KFP_BLACK_LIST} | grep -qv ${transform}; then
$PWD/scripts/workflow_helper.sh install-tools
$PWD/scripts/workflow_helper.sh test-workflow transforms/language/html2parquet
else
$PWD/scripts/workflow_helper.sh build-workflow transforms/language/html2parquet
fi
else
echo "Skipping transforms/language/html2parquet kfp test for lack of Makefile and/or kfp_ray/Makefile"
fi
1 change: 1 addition & 0 deletions scripts/k8s-setup/populate_minio.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mc cp --recursive ${REPOROOT}/transforms/language/doc_quality/ray/test-data/inpu
mc cp --recursive ${REPOROOT}/transforms/language/pdf2parquet/ray/test-data/input/2206.01062.pdf kfp/test/pdf2parquet/input
mc cp --recursive ${REPOROOT}/transforms/language/text_encoder/ray/test-data/input/ kfp/test/text_encoder/input
mc cp --recursive ${REPOROOT}/transforms/language/doc_chunk/ray/test-data/input/ kfp/test/doc_chunk/input
mc cp --recursive ${REPOROOT}/transforms/language/html2parquet/ray/test-data/input/test1.html kfp/test/html2parquet/input
# universal
mc cp --recursive ${REPOROOT}/transforms/universal/doc_id/ray/test-data/input/ kfp/test/doc_id/input
mc cp --recursive ${REPOROOT}/transforms/universal/ededup/ray/test-data/input/ kfp/test/ededup/input
Expand Down
48 changes: 48 additions & 0 deletions transforms/language/html2parquet/kfp_ray/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
REPOROOT=${CURDIR}/../../../../
WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate
include $(REPOROOT)/transforms/.make.workflows

# Include the common configuration for this transform
include ../transform.config

SRC_DIR=${CURDIR}/../ray/

PYTHON_WF := $(shell find ./ -name '*_wf.py')
YAML_WF := $(patsubst %.py, %.yaml, ${PYTHON_WF})

workflow-venv: .check_python_version ${WORKFLOW_VENV_ACTIVATE}

.PHONY: clean
clean:
@# Help: Clean up the virtual environment.
rm -rf ${REPOROOT}/transforms/venv

venv::

build::

test::

test-src::

test-image::

publish::

image::

load-image::

.PHONY: workflow-build
workflow-build: workflow-venv
$(MAKE) $(YAML_WF)

.PHONY: workflow-test
workflow-test: workflow-build
$(MAKE) .workflows.test-pipeline TRANSFORM_SRC=${SRC_DIR} PIPELINE_FILE=html2parquet_wf.yaml

.PHONY: workflow-upload
workflow-upload:
@for file in $(YAML_WF); do \
$(MAKE) .workflows.upload-pipeline PIPELINE_FILE=$$file; \
done
205 changes: 205 additions & 0 deletions transforms/language/html2parquet/kfp_ray/html2parquet_wf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os

import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl

from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils


task_image = "quay.io/dataprep1/data-prep-kit/html2parquet-ray:latest"

# the name of the job script
EXEC_SCRIPT_NAME: str = "html2parquet_transform_ray.py"

# components
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"

# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
def compute_exec_params_func(
worker_options: dict,
actor_options: dict,
data_s3_config: str,
data_max_files: int,
data_num_samples: int,
runtime_pipeline_id: str,
runtime_job_id: str,
runtime_code_location: dict,
output_format: str,
data_files_to_use: str,
) -> dict:
from runtime_utils import KFPUtils

return {
"data_s3_config": data_s3_config,
"data_max_files": data_max_files,
"data_num_samples": data_num_samples,
"runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)),
"runtime_worker_options": str(actor_options),
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": runtime_job_id,
"runtime_code_location": str(runtime_code_location),
"html2parquet_output_format": output_format,
"data_files_to_use": data_files_to_use,
}


# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the
# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path.
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
+ "same version of the same pipeline !!!"
)
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER

# create Ray cluster
create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml")
# execute job
execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "executeRayJobComponent.yaml")
# clean up Ray
cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml")

# Task name is part of the pipeline name, the ray cluster name and the job name in DMF.
TASK_NAME: str = "html2parquet"


@dsl.pipeline(
name=TASK_NAME + "-ray-pipeline",
description="Pipeline for html2parquet task",
)
def html2parquet(
# Ray cluster
ray_name: str = "html2parquet-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "{'input_folder': 'test/html2parquet/input/', 'output_folder': 'test/html2parquet/output/'}",
data_s3_access_secret: str = "s3-secret",
data_max_files: int = -1,
data_num_samples: int = -1,
data_files_to_use: str = "['.html', '.zip']",
# orchestrator
runtime_actor_options: dict = {'num_cpus': 0.8},
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
# html2parquet parameters
output_format: str = "markdown",
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
"""
Pipeline to execute html2parquet transform
:param ray_name: name of the Ray cluster
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
min_replicas - min number of replicas
cpu - number of cpus
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
wait_cluster_ready_tmout - time to wait for cluster ready, sec
wait_cluster_up_tmout - time to wait for cluster up, sec
wait_job_ready_tmout - time to wait for job ready, sec
wait_print_tmout - time between prints, sec
http_retries - http retries for API server calls
:param data_s3_access_secret - s3 access secret
:param data_s3_config - s3 configuration
:param data_max_files - max files to process
:param data_num_samples - num samples to process
:param runtime_actor_options - actor options
:param runtime_pipeline_id - pipeline id
:param runtime_code_location - code location
:param output_format - output format
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
compute_exec_params = compute_exec_params_op(
worker_options=ray_worker_options,
actor_options=runtime_actor_options,
data_s3_config=data_s3_config,
data_max_files=data_max_files,
data_num_samples=data_num_samples,
runtime_pipeline_id=runtime_pipeline_id,
runtime_job_id=run_id,
runtime_code_location=runtime_code_location,
output_format=output_format,
data_files_to_use=data_files_to_use,
)

ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2)
# start Ray cluster
ray_cluster = create_ray_op(
ray_name=ray_name,
run_id=run_id,
ray_head_options=ray_head_options,
ray_worker_options=ray_worker_options,
server_url=server_url,
additional_params=additional_params,
)
ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2)
ray_cluster.after(compute_exec_params)

# Execute job
execute_job = execute_ray_jobs_op(
ray_name=ray_name,
run_id=run_id,
additional_params=additional_params,
exec_params=compute_exec_params.output,
exec_script_name=EXEC_SCRIPT_NAME,
server_url=server_url,
)
ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC)
ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret)
execute_job.after(ray_cluster)

if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile(html2parquet, __file__.replace(".py", ".yaml"))

0 comments on commit f5d15f0

Please sign in to comment.