Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update pdf2parquet to Docling v2 #756

Merged
merged 16 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion transforms/language/doc_chunk/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ The transform can be tuned with the following parameters.
| `chunking_type` | `dl_json` | Chunking type to apply. Valid options are `li_markdown` for using the LlamaIndex [Markdown chunking](https://docs.llamaindex.ai/en/stable/module_guides/loading/node_parsers/modules/#markdownnodeparser), `dl_json` for using the [Docling JSON chunking](https://github.com/DS4SD/docling), `li_token_text` for using the LlamaIndex [Token Text Splitter](https://docs.llamaindex.ai/en/stable/api_reference/node_parsers/token_text_splitter/), which chunks the text into fixed-sized windows of tokens. |
| `content_column_name` | `contents` | Name of the column containing the text to be chunked. |
| `doc_id_column_name` | `document_id` | Name of the column containing the doc_id to be propagated in the output. |
| `dl_min_chunk_len` | `None` | Minimum number of characters for the chunk in the dl_json chunker. Setting to None is using the library defaults, i.e. a `min_chunk_len=64`. |
| `chunk_size_tokens` | `128` | Size of the chunk in tokens for the token text chunker. |
| `chunk_overlap_tokens` | `30` | Number of tokens overlapping between chunks for the token text chunker. |
| `output_chunk_column_name` | `contents` | Column name to store the chunks in the output table. |
Expand Down
2 changes: 1 addition & 1 deletion transforms/language/doc_chunk/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "dpk_doc_chunk_transform_python"
version = "0.2.2.dev1"
version = "0.3.0"
daw3rd marked this conversation as resolved.
Show resolved Hide resolved
requires-python = ">=3.10,<3.13"
description = "chunk documents Python Transform"
license = {text = "Apache-2.0"}
Expand Down
2 changes: 1 addition & 1 deletion transforms/language/doc_chunk/python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
data-prep-toolkit==0.2.2.dev1
docling-core==1.7.2
docling-core==2.3.0
llama-index-core>=0.11.0,<0.12.0
21 changes: 10 additions & 11 deletions transforms/language/doc_chunk/python/src/doc_chunk_chunkers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
from abc import ABCMeta, abstractmethod
from typing import Iterator, Optional, Dict, List

from docling_core.types import Document as DLDocument
from docling_core.types.doc import DoclingDocument
from llama_index.core.node_parser.text.token import TokenTextSplitter
from llama_index.core import Document as LIDocument
from llama_index.core.node_parser import MarkdownNodeParser
from docling_core.transforms.chunker import HierarchicalChunker
from docling_core.transforms.chunker import HierarchicalChunker, DocMeta


class ChunkingExecutor(metaclass=ABCMeta):
Expand All @@ -29,7 +29,6 @@ def chunk(self, content: str) -> Iterator[dict]:
class DLJsonChunker(ChunkingExecutor):
def __init__(
self,
min_chunk_len: Optional[int],
output_chunk_column_name: str,
output_jsonpath_column_name: str,
output_pageno_column_name_key: str,
Expand All @@ -40,19 +39,19 @@ def __init__(
self.output_pageno_column_name_key = output_pageno_column_name_key
self.output_bbox_column_name_key = output_bbox_column_name_key

chunker_kwargs = dict(include_metadata=True)
if min_chunk_len is not None:
chunker_kwargs["min_chunk_len"] = min_chunk_len
self._chunker = HierarchicalChunker(**chunker_kwargs)
self._chunker = HierarchicalChunker()

def chunk(self, content: str) -> Iterator[dict]:
doc = DLDocument.model_validate_json(content)
doc = DoclingDocument.model_validate_json(content)
for chunk in self._chunker.chunk(doc):
meta = DocMeta.model_validate(chunk.meta)
doc_item = meta.doc_items[0]
prov = doc_item.prov[0]
yield {
self.output_chunk_column_name: chunk.text,
self.output_jsonpath_column_name: chunk.path,
self.output_pageno_column_name_key: chunk.page,
self.output_bbox_column_name_key: chunk.bbox,
self.output_jsonpath_column_name: doc_item.self_ref,
self.output_pageno_column_name_key: prov.page_no,
self.output_bbox_column_name_key: prov.bbox.as_tuple(),
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"runtime_job_id": "job_id",
"runtime_code_location": ParamsUtils.convert_to_ast(code_location),
# doc_chunk params
# "doc_chunk_dl_min_chunk_len": 10, # for testing the usage of the deprecated argument
# "doc_chunk_chunking_type": "li_markdown",
"doc_chunk_chunking_type": "dl_json",
# "doc_chunk_chunking_type": chunking_types.LI_TOKEN_TEXT,
Expand Down
17 changes: 7 additions & 10 deletions transforms/language/doc_chunk/python/src/doc_chunk_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
content_column_name_key = "content_column_name"
doc_id_column_name_key = "doc_id_column_name"
chunking_type_key = "chunking_type"
dl_min_chunk_len_key = "dl_min_chunk_len"
daw3rd marked this conversation as resolved.
Show resolved Hide resolved
chunk_size_tokens_key = "chunk_size_tokens"
chunk_overlap_tokens_key = "chunk_overlap_tokens"
output_chunk_column_name_key = "output_chunk_column_name"
Expand All @@ -38,7 +37,6 @@
content_column_name_cli_param = f"{cli_prefix}{content_column_name_key}"
doc_id_column_name_cli_param = f"{cli_prefix}{doc_id_column_name_key}"
chunking_type_cli_param = f"{cli_prefix}{chunking_type_key}"
dl_min_chunk_len_cli_param = f"{cli_prefix}{dl_min_chunk_len_key}"
output_chunk_column_name_cli_param = f"{cli_prefix}{output_chunk_column_name_key}"
output_source_doc_id_column_name_cli_param = f"{cli_prefix}{output_source_doc_id_column_name_key}"
output_jsonpath_column_name_cli_param = f"{cli_prefix}{output_jsonpath_column_name_key}"
Expand All @@ -59,7 +57,6 @@ def __str__(self):
default_content_column_name = "contents"
default_doc_id_column_name = "document_id"
default_chunking_type = chunking_types.DL_JSON
default_dl_min_chunk_len = None
default_output_chunk_column_name = "contents"
default_output_chunk_column_id = "chunk_id"
default_output_source_doc_id_column_name = "source_document_id"
Expand Down Expand Up @@ -95,7 +92,6 @@ def __init__(self, config: dict[str, Any]):
self.output_source_doc_id_column_name = config.get(output_source_doc_id_column_name_key, default_output_source_doc_id_column_name)

# Parameters for Docling JSON chunking
self.dl_min_chunk_len = config.get(dl_min_chunk_len_key, default_dl_min_chunk_len)
self.output_jsonpath_column_name = config.get(
output_jsonpath_column_name_key, default_output_jsonpath_column_name
)
Expand All @@ -113,7 +109,6 @@ def __init__(self, config: dict[str, Any]):
self.chunker: ChunkingExecutor
if self.chunking_type == chunking_types.DL_JSON:
self.chunker = DLJsonChunker(
min_chunk_len=self.dl_min_chunk_len,
output_chunk_column_name=self.output_chunk_column_name,
output_jsonpath_column_name=self.output_jsonpath_column_name,
output_pageno_column_name_key=self.output_pageno_column_name_key,
Expand Down Expand Up @@ -202,11 +197,6 @@ def add_input_params(self, parser: ArgumentParser) -> None:
default=default_doc_id_column_name,
help="Name of the column containing the doc_id to be propagated in the output",
)
parser.add_argument(
f"--{dl_min_chunk_len_cli_param}",
default=default_dl_min_chunk_len,
help="Minimum number of characters for the chunk in the dl_json chunker. Setting to None is using the library defaults, i.e. a min_chunk_len=64.",
)
parser.add_argument(
f"--{output_chunk_column_name_cli_param}",
default=default_output_chunk_column_name,
Expand Down Expand Up @@ -244,6 +234,11 @@ def add_input_params(self, parser: ArgumentParser) -> None:
type=int,
help="Number of tokens overlapping between chunks for the fixed-sized chunker.",
)
parser.add_argument(
f"--{cli_prefix}dl_min_chunk_len",
default=None,
help="Deprecated. This option is no longer considered.",
)

def apply_input_params(self, args: Namespace) -> bool:
"""
Expand All @@ -254,5 +249,7 @@ def apply_input_params(self, args: Namespace) -> bool:
captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False)

self.params = self.params | captured
if self.params.get("dl_min_chunk_len") is not None:
self.logger.warning("The `dl_min_chunk_len` option is deprecated and will be ignored. Please stop using it, it will not accepted anymore in future versions.")
self.logger.info(f"doc_chunk parameters are : {self.params}")
return True
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
"job name": "doc_chunk",
"job type": "pure python",
"job id": "job_id",
"start_time": "2024-10-18 14:05:09",
"end_time": "2024-10-18 14:05:11",
"start_time": "2024-10-30 18:38:40",
"end_time": "2024-10-30 18:38:40",
"status": "success"
},
"code": {
Expand All @@ -18,7 +18,6 @@
"chunking_type": "dl_json",
"content_column_name": "contents",
"doc_id_column_name": "document_id",
"dl_min_chunk_len": null,
"output_chunk_column_name": "contents",
"output_source_doc_id_column_name": "source_document_id",
"output_jsonpath_column_name": "doc_jsonpath",
Expand All @@ -35,22 +34,22 @@
"num_processors": 0
},
"execution_stats": {
"cpus": 27.9,
"cpus": 19.5,
"gpus": 0,
"memory": 25.75,
"memory": 27.48,
"object_store": 0,
"execution time, min": 0.021
"execution time, min": 0.001
},
"job_output_stats": {
"source_files": 1,
"source_size": 50276,
"source_size": 12073,
"result_files": 1,
"result_size": 31223,
"processing_time": 1.266,
"result_size": 14363,
"processing_time": 0.043,
"nfiles": 1,
"nrows": 88,
"nrows": 39,
"source_doc_count": 1,
"result_doc_count": 88
"result_doc_count": 39
},
"source": {
"name": "/Users/dol/codes/data-prep-kit/transforms/language/doc_chunk/python/test-data/input",
Expand Down
Binary file not shown.
Binary file not shown.
4 changes: 2 additions & 2 deletions transforms/language/doc_chunk/ray/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "dpk_doc_chunk_transform_ray"
version = "0.2.2.dev1"
version = "0.3.0"
requires-python = ">=3.10,<3.13"
description = "chunk documents Ray Transform"
license = {text = "Apache-2.0"}
Expand All @@ -11,7 +11,7 @@ authors = [
{ name = "Christoph Auer", email = "[email protected]" },
]
dependencies = [
"dpk-doc-chunk-transform-python==0.2.2.dev1",
"dpk-doc-chunk-transform-python==0.3.0",
"data-prep-toolkit[ray]==0.2.2.dev1",
]

Expand Down
21 changes: 10 additions & 11 deletions transforms/language/doc_chunk/ray/test-data/expected/metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
"job name": "doc_chunk",
"job type": "pure python",
"job id": "job_id",
"start_time": "2024-10-18 14:05:09",
"end_time": "2024-10-18 14:05:11",
"start_time": "2024-10-30 18:38:40",
"end_time": "2024-10-30 18:38:40",
"status": "success"
},
"code": {
Expand All @@ -18,7 +18,6 @@
"chunking_type": "dl_json",
"content_column_name": "contents",
"doc_id_column_name": "document_id",
"dl_min_chunk_len": null,
"output_chunk_column_name": "contents",
"output_source_doc_id_column_name": "source_document_id",
"output_jsonpath_column_name": "doc_jsonpath",
Expand All @@ -35,22 +34,22 @@
"num_processors": 0
},
"execution_stats": {
"cpus": 27.9,
"cpus": 19.5,
"gpus": 0,
"memory": 25.75,
"memory": 27.48,
"object_store": 0,
"execution time, min": 0.021
"execution time, min": 0.001
},
"job_output_stats": {
"source_files": 1,
"source_size": 50276,
"source_size": 12073,
"result_files": 1,
"result_size": 31223,
"processing_time": 1.266,
"result_size": 14363,
"processing_time": 0.043,
"nfiles": 1,
"nrows": 88,
"nrows": 39,
"source_doc_count": 1,
"result_doc_count": 88
"result_doc_count": 39
},
"source": {
"name": "/Users/dol/codes/data-prep-kit/transforms/language/doc_chunk/python/test-data/input",
Expand Down
Binary file not shown.
Binary file modified transforms/language/doc_chunk/ray/test-data/input/test1.parquet
Binary file not shown.
15 changes: 15 additions & 0 deletions transforms/language/pdf2parquet/kfp_ray/pdf2parquet_multiple_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ def compute_exec_params_func(
runtime_pipeline_id: str,
runtime_job_id: str,
runtime_code_location: dict,
pdf2parquet_batch_size: int,
pdf2parquet_do_table_structure: bool,
pdf2parquet_do_ocr: bool,
pdf2parquet_ocr_engine: str,
pdf2parquet_bitmap_area_threshold: float,
) -> dict:
from runtime_utils import KFPUtils

Expand All @@ -53,8 +56,11 @@ def compute_exec_params_func(
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": runtime_job_id,
"runtime_code_location": str(runtime_code_location),
"pdf2parquet_batch_size": pdf2parquet_batch_size,
"pdf2parquet_do_table_structure": pdf2parquet_do_table_structure,
"pdf2parquet_do_ocr": pdf2parquet_do_ocr,
"pdf2parquet_ocr_engine": pdf2parquet_ocr_engine,
"pdf2parquet_bitmap_area_threshold": pdf2parquet_bitmap_area_threshold,
}


Expand Down Expand Up @@ -112,8 +118,11 @@ def pdf2parquet(
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
# pdf2parquet parameters
pdf2parquet_batch_size: int = -1,
pdf2parquet_do_table_structure: bool = True,
pdf2parquet_do_ocr: bool = False,
pdf2parquet_ocr_engine: str = "easyocr",
pdf2parquet_bitmap_area_threshold: float = 0.05,
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
Expand Down Expand Up @@ -150,8 +159,11 @@ def pdf2parquet(
:param runtime_actor_options - actor options
:param runtime_pipeline_id - pipeline id
:param runtime_code_location - code location
:param pdf2parquet_batch_size - how many inputs to batch into one output table
:param pdf2parquet_do_table_structure - run table structure model
:param pdf2parquet_do_ocr - run ocr model
:param pdf2parquet_ocr_engine - which ocr engine
:param pdf2parquet_bitmap_area_threshold - threshold for bitmaps
:return: None
"""
# create clean_up task
Expand All @@ -169,8 +181,11 @@ def pdf2parquet(
runtime_pipeline_id=runtime_pipeline_id,
runtime_job_id=run_id,
runtime_code_location=runtime_code_location,
pdf2parquet_batch_size=pdf2parquet_batch_size,
pdf2parquet_do_table_structure=pdf2parquet_do_table_structure,
pdf2parquet_do_ocr=pdf2parquet_do_ocr,
pdf2parquet_ocr_engine=pdf2parquet_ocr_engine,
pdf2parquet_bitmap_area_threshold=pdf2parquet_bitmap_area_threshold,
)
ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2)
# start Ray cluster
Expand Down
15 changes: 15 additions & 0 deletions transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ def compute_exec_params_func(
runtime_pipeline_id: str,
runtime_job_id: str,
runtime_code_location: dict,
pdf2parquet_batch_size: int,
pdf2parquet_do_table_structure: bool,
pdf2parquet_do_ocr: bool,
pdf2parquet_ocr_engine: str,
pdf2parquet_bitmap_area_threshold: float,
) -> dict:
from runtime_utils import KFPUtils

Expand All @@ -55,8 +58,11 @@ def compute_exec_params_func(
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": runtime_job_id,
"runtime_code_location": str(runtime_code_location),
"pdf2parquet_batch_size": pdf2parquet_batch_size,
"pdf2parquet_do_table_structure": pdf2parquet_do_table_structure,
"pdf2parquet_do_ocr": pdf2parquet_do_ocr,
"pdf2parquet_ocr_engine": pdf2parquet_ocr_engine,
"pdf2parquet_bitmap_area_threshold": pdf2parquet_bitmap_area_threshold,
}


Expand Down Expand Up @@ -116,8 +122,11 @@ def pdf2parquet(
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
# pdf2parquet parameters
pdf2parquet_batch_size: int = -1,
pdf2parquet_do_table_structure: bool = True,
pdf2parquet_do_ocr: bool = False,
pdf2parquet_ocr_engine: str = "easyocr",
pdf2parquet_bitmap_area_threshold: float = 0.05,
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
Expand Down Expand Up @@ -154,8 +163,11 @@ def pdf2parquet(
:param runtime_actor_options - actor options
:param runtime_pipeline_id - pipeline id
:param runtime_code_location - code location
:param pdf2parquet_batch_size - how many inputs to batch into one output table
:param pdf2parquet_do_table_structure - run table structure model
:param pdf2parquet_do_ocr - run ocr model
:param pdf2parquet_ocr_engine - which ocr engine
:param pdf2parquet_bitmap_area_threshold - threshold for bitmaps
:return: None
"""
# create clean_up task
Expand All @@ -174,8 +186,11 @@ def pdf2parquet(
runtime_pipeline_id=runtime_pipeline_id,
runtime_job_id=run_id,
runtime_code_location=runtime_code_location,
pdf2parquet_batch_size=pdf2parquet_batch_size,
pdf2parquet_do_table_structure=pdf2parquet_do_table_structure,
pdf2parquet_do_ocr=pdf2parquet_do_ocr,
pdf2parquet_ocr_engine=pdf2parquet_ocr_engine,
pdf2parquet_bitmap_area_threshold=pdf2parquet_bitmap_area_threshold,
)
ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2)
# start Ray cluster
Expand Down
2 changes: 1 addition & 1 deletion transforms/language/pdf2parquet/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ RUN pip install ${PIP_INSTALL_EXTRA_ARGS} --no-cache-dir -e .

# Download models
RUN python -c 'from deepsearch_glm.utils.load_pretrained_models import load_pretrained_nlp_models; load_pretrained_nlp_models(verbose=True);'
RUN python -c 'from docling.document_converter import DocumentConverter; s=DocumentConverter.download_models_hf(); print(f"Models cached in {s}")'
RUN python -c 'from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline; s=StandardPdfPipeline.download_models_hf(); print(f"Models cached in {s}")'

# copy the main() entry point to the image
COPY --chown=dpk:root src/pdf2parquet_transform.py ./
Expand Down
Loading