From e144927943ee50f7c651f52da398dbe947ed6f94 Mon Sep 17 00:00:00 2001 From: matouma <94904133+matouma@users.noreply.github.com> Date: Sat, 14 Dec 2024 11:24:27 +0100 Subject: [PATCH] fix pdf2parquet script Signed-off-by: matouma <94904133+matouma@users.noreply.github.com> --- .../pdf2parquet/kfp_ray/pdf2parquet_wf.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py b/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py index c9cdbf652..af3506161 100644 --- a/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py +++ b/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py @@ -20,7 +20,7 @@ task_image = "quay.io/dataprep1/data-prep-kit/pdf2parquet-ray:latest" # the name of the job script -EXEC_SCRIPT_NAME: str = "pdf2parquet_transform_ray.py" +EXEC_SCRIPT_NAME: str = "-m dpk_pdf2parquet.ray.transform" # components base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest" @@ -109,7 +109,14 @@ def pdf2parquet( ray_name: str = "pdf2parquet-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, - ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, + ray_worker_options: dict = { + "replicas": 2, + "max_replicas": 2, + "min_replicas": 2, + "cpu": 2, + "memory": 4, + "image": task_image, + }, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/pdf2parquet/input/', 'output_folder': 'test/pdf2parquet/output/'}", @@ -118,9 +125,9 @@ def pdf2parquet( data_num_samples: int = -1, data_files_to_use: str = "['.pdf']", # orchestrator - runtime_actor_options: dict = {'num_cpus': 4}, + runtime_actor_options: dict = {"num_cpus": 4}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, + runtime_code_location: dict = {"github": "github", "commit_hash": "12345", "path": "path"}, # pdf2parquet parameters pdf2parquet_batch_size: int = -1, pdf2parquet_do_table_structure: bool = True, @@ -171,7 +178,9 @@ def pdf2parquet( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + clean_up_task = cleanup_ray_op( + ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params + ) ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task):