Skip to content

Commit

Permalink
Merge branch 'pdf2parquet-simplify' into transforms-1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
matouma committed Dec 14, 2024
2 parents 9c9e76b + e144927 commit 88ff060
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 10 deletions.
1 change: 1 addition & 0 deletions transforms/language/pdf2parquet/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ endif

run-cli-sample:
make venv
source venv/bin/activate && \
$(PYTHON) -m dpk_$(TRANSFORM_NAME).transform_python \
--data_local_config "{ 'input_folder' : 'test-data/input', 'output_folder' : 'output'}" \
--data_files_to_use "['.pdf','.zip']"
Expand Down
19 changes: 14 additions & 5 deletions transforms/language/pdf2parquet/kfp_ray/pdf2parquet_multiple_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
task_image = "quay.io/dataprep1/data-prep-kit/pdf2parquet-ray:latest"

# the name of the job script
EXEC_SCRIPT_NAME: str = "pdf2parquet_transform_ray.py"
EXEC_SCRIPT_NAME: str = "-m dpk_pdf2parquet.ray.transform"

# components
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"
Expand Down Expand Up @@ -106,17 +106,24 @@ def pdf2parquet(
ray_name: str = "pdf2parquet-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
ray_worker_options: dict = {
"replicas": 2,
"max_replicas": 2,
"min_replicas": 2,
"cpu": 2,
"memory": 4,
"image": task_image,
},
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "[{'input_folder': 'test/pdf2parquet/input/', 'output_folder': 'test/pdf2parquet/output/'}]",
data_s3_access_secret: str = "s3-secret",
data_max_files: int = -1,
data_num_samples: int = -1,
# orchestrator
runtime_actor_options: dict = {'num_cpus': 0.8},
runtime_actor_options: dict = {"num_cpus": 0.8},
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
runtime_code_location: dict = {"github": "github", "commit_hash": "12345", "path": "path"},
# pdf2parquet parameters
pdf2parquet_batch_size: int = -1,
pdf2parquet_do_table_structure: bool = True,
Expand Down Expand Up @@ -167,7 +174,9 @@ def pdf2parquet(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
clean_up_task = cleanup_ray_op(
ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params
)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
Expand Down
19 changes: 14 additions & 5 deletions transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
task_image = "quay.io/dataprep1/data-prep-kit/pdf2parquet-ray:latest"

# the name of the job script
EXEC_SCRIPT_NAME: str = "pdf2parquet_transform_ray.py"
EXEC_SCRIPT_NAME: str = "-m dpk_pdf2parquet.ray.transform"

# components
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"
Expand Down Expand Up @@ -109,7 +109,14 @@ def pdf2parquet(
ray_name: str = "pdf2parquet-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
ray_worker_options: dict = {
"replicas": 2,
"max_replicas": 2,
"min_replicas": 2,
"cpu": 2,
"memory": 4,
"image": task_image,
},
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "{'input_folder': 'test/pdf2parquet/input/', 'output_folder': 'test/pdf2parquet/output/'}",
Expand All @@ -118,9 +125,9 @@ def pdf2parquet(
data_num_samples: int = -1,
data_files_to_use: str = "['.pdf']",
# orchestrator
runtime_actor_options: dict = {'num_cpus': 4},
runtime_actor_options: dict = {"num_cpus": 4},
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
runtime_code_location: dict = {"github": "github", "commit_hash": "12345", "path": "path"},
# pdf2parquet parameters
pdf2parquet_batch_size: int = -1,
pdf2parquet_do_table_structure: bool = True,
Expand Down Expand Up @@ -171,7 +178,9 @@ def pdf2parquet(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
clean_up_task = cleanup_ray_op(
ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params
)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
Expand Down

0 comments on commit 88ff060

Please sign in to comment.