Skip to content

Commit

Permalink
fix pdf2parquet script
Browse files Browse the repository at this point in the history
Signed-off-by: matouma <[email protected]>
  • Loading branch information
matouma committed Dec 14, 2024
1 parent 7965b5e commit e144927
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
task_image = "quay.io/dataprep1/data-prep-kit/pdf2parquet-ray:latest"

# the name of the job script
EXEC_SCRIPT_NAME: str = "pdf2parquet_transform_ray.py"
EXEC_SCRIPT_NAME: str = "-m dpk_pdf2parquet.ray.transform"

# components
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"
Expand Down Expand Up @@ -109,7 +109,14 @@ def pdf2parquet(
ray_name: str = "pdf2parquet-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
ray_worker_options: dict = {
"replicas": 2,
"max_replicas": 2,
"min_replicas": 2,
"cpu": 2,
"memory": 4,
"image": task_image,
},
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "{'input_folder': 'test/pdf2parquet/input/', 'output_folder': 'test/pdf2parquet/output/'}",
Expand All @@ -118,9 +125,9 @@ def pdf2parquet(
data_num_samples: int = -1,
data_files_to_use: str = "['.pdf']",
# orchestrator
runtime_actor_options: dict = {'num_cpus': 4},
runtime_actor_options: dict = {"num_cpus": 4},
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
runtime_code_location: dict = {"github": "github", "commit_hash": "12345", "path": "path"},
# pdf2parquet parameters
pdf2parquet_batch_size: int = -1,
pdf2parquet_do_table_structure: bool = True,
Expand Down Expand Up @@ -171,7 +178,9 @@ def pdf2parquet(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
clean_up_task = cleanup_ray_op(
ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params
)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
Expand Down

0 comments on commit e144927

Please sign in to comment.