Skip to content

Commit

Permalink
CI integration and docstr cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
cbalioglu committed Dec 8, 2023
1 parent b63fad1 commit 1b3e3fb
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 86 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/_build_wheel-linux.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
NllbTokenizer Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
Expand Down Expand Up @@ -188,7 +188,10 @@ jobs:
pip install --no-cache-dir ~/artifacts/fairseq2n/python/build/wheelhouse/*.whl
- name: Install fairseq2
run: |
pip install --no-cache-dir ~/artifacts/build/wheelhouse/*.whl
for whl in ~/artifacts/build/wheelhouse/*.whl; do
pip install --no-cache-dir $whl
pip install --no-cache-dir "fairseq['arrow']@$whl"
done
- name: Set the sanitizer variables
if: inputs.sanitizers != 'nosan'
env:
Expand Down
66 changes: 40 additions & 26 deletions recipes/parquet/README.md
Original file line number Diff line number Diff line change
@@ -1,36 +1,50 @@
## Parquet dataloader with fairseq2
## Parquet Data Loading with fairseq2

The recipe module [parquet_dataloader](./parquet_dataloader.py) shows one way to build an efficient
dataloader over a parquet dataset (partitioned or not) using `fairseq2.data` primitives.
It uses a [pyarrow.parquet](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html) API to interface with parquet files, so it requires an extra packages installation with
`pip install fairseq2[arrow]`
The recipe module [parquet_dataloader](./parquet_dataloader.py) shows one way to
build an efficient dataloader over a Apache Parquet dataset (partitioned or not)
using `fairseq2.data` primitives. It uses the [pyarrow.parquet](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html)
API to interface with Parquet files, so it requires an extra package
installation with `pip install fairseq2[arrow]`.

The present dataloader is of general purpose and can be combined with various downstream workflows.
Some important technical notes to keep in mind :
* Dataloader will simultaneously load several parquet dataset fragments (`nb_parallel_fragments`) and shuffle their elements together before returning
* Thus, increasing `nb_parallel_fragments` will result in better randomization but also increase the memory footprint
* For heavy rows datasets, prefer save the parquet files with relatively small `row_groups` to improve streaming regularity.
* For reading from S3 storage, `fairseq2.data` being multithread based, `from pyarrow.fs import S3FileSystem` (releasing GIL) works the best.
* Currently, only some of pyarrow dtypes are mapped to torch equivalent, this support will improve in the future
The present dataloader is of general purpose and can be combined with various
downstream workflows. Some important technical notes to keep in mind:

* Dataloader will simultaneously load several Parquet dataset fragments
(`nb_parallel_fragments`) and shuffle their elements together before returning
* Thus, increasing `nb_parallel_fragments` will result in better randomization
but also increase the memory footprint.
* For heavy rows datasets, prefer save the Parquet files with relatively small
`row_groups` to improve streaming regularity.
* For reading from S3 storage, `fairseq2.data` being multithreaded,
`from pyarrow.fs import S3FileSystem` (releasing GIL) works best.
* Currently, only some of pyarrow dtypes are mapped to their torch equivalent,
this support will improve in the future.

Please refer to the `ParquetBasicDataloaderConfig` for more details about the existing configuration parameters.
Please refer to the `ParquetBasicDataloaderConfig` for more details about the
existing configuration parameters.

Example of simple usage:

Example of simple usage :
```python
from recipes.parquet.parquet_dataloader import (ParquetBasicDataloaderConfig,
ParquetBatchFormat,
build_parquet_iterator_pipeline)
import pyarrow.compute as pc
config = ParquetBasicDataloaderConfig(parquet_path="path/to/parquet/dataset",
filters=pc.greater(pc.utf8_length(pc.field("src_text")), 5)
columns=["src_text", "src_lang", "audio_wav"],
batch_size=20,
output_format=ParquetBatchFormat.torch,
world_size=1,
rank=0,
seed=123)

from recipes.parquet.parquet_dataloader import (
ParquetBasicDataloaderConfig,
ParquetBatchFormat,
build_parquet_iterator_pipeline
)

config = ParquetBasicDataloaderConfig(
parquet_path="path/to/parquet/dataset",
filters=pc.greater(pc.utf8_length(pc.field("src_text")), 5)
columns=["src_text", "src_lang", "audio_wav"],
batch_size=20,
output_format=ParquetBatchFormat.torch,
world_size=1,
rank=0,
seed=123,
)

for batch in parquet_iterator(config):
pass
```
```
81 changes: 34 additions & 47 deletions recipes/parquet/parquet_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,33 +37,28 @@ class ParquetBatchFormat(Enum):
@dataclass # TODO: (kw_only=True) with python3.10
class ParquetBasicDataloaderConfig:
parquet_path: str
"""
Path to parquet dataset file
"""
"""The path to parquet dataset file."""

batch_size: Optional[int] = None
"""
Fixed output batch size
"""
"""The output batch size."""

order_by_length: Optional[str] = None
"""Column in dataset whose value length `L` will be used for batches ordering.
This results in batches with relatively homogeneous values of `L`,
typically to support optimal padding.
"""
"""The column in the dataset whose length will be used for batch ordering.
This results in batches with relatively homogeneous values, typically to
support optimal padding."""

max_tokens: Optional[int] = None
"""
Used with `order_by_length` option to control the total number of padded tokens in a each batch.
Typically, this option is preferred to `batch_size` for reducing the memory footprint.
"""Used with the ``order_by_length`` option to control the total number of
padded tokens in each batch. Typically, this option is preferred over
``batch_size`` to reduce the memory footprint.
"""

columns: Optional[List[str]] = None
"""List of columns to load"""
"""The list of columns to load."""

filters: Optional[Union[List[Any], pa.dataset.Expression]] = None
"""
See https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Expression.html#pyarrow.dataset.Expression
"""See https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Expression.html#pyarrow.dataset.Expression
Some examples :
>>> import pyarrow.compute as pc
Expand All @@ -74,47 +69,40 @@ class ParquetBasicDataloaderConfig:
>>> filters = pa.compute.greater(pa.compute.utf8_length(ds.field("lang1_text")), 4)
>>> filters = pa.compute.less_equal(pa.compute.list_value_length(pa.dataset.field("audio_wav")), 16_000 * 30)
Note that all fields used here should be among existing columns in the dataset schema
Note that all fields used here should be among existing columns in the dataset schema.
"""

output_format: ParquetBatchFormat = ParquetBatchFormat.pyarrow
"""
Format to use for output batches
"""
"""The format to use for output batches."""

split_to_row_groups: bool = True
"""
Use parquet row groups instead of simple partitions which are generally smaller.
Highly recommended for non-partitioned parquet file.
"""
"""If ``True``, uses Parquet row groups instead of simple partitions which
are generally smaller. Highly recommended for non-partitioned parquet files."""

shuffle: bool = True
"""
Whether to shuffle dataset samples during the iteration.
If False and `order_by_length` is None, the batch samples will be produced in natural parquet dataset reading order.
"""
"""If ``True``, shuffles the dataset samples during the iteration. If ``False``
and ``order_by_length`` is ``None``, the batch samples will be produced in
natural Parquet dataset reading order."""

drop_null: bool = True
"""Dropping rows containing any null value"""
"""If ``True``, drops rows containing any null value."""

seed: Optional[int] = None
"""
seed making iteration deterministic
"""
"""The RNG seed value for deterministic behavior."""

min_batch_size: int = 1
"""Drops batches whose length < `min_batch_size`"""
"""Drops batches whose length is less than ``min_batch_size``"""

nb_parallel_fragments: int = 5
"""Number of parquet fragments read allowed to be read synonymously.
Higher values will result in higher speed, better randomization and higher memory footprint.
If partitions size is rather small compared to batch size, we recommend to increase nb_parallel_fragments.
"""
"""The number of Parquet fragments allowed to be read in parallel. Higher
values will result in higher speeds, better randomization, and higher memory
footprint. If partition size is rather small compared to the batch size, we
recommend to increase ``nb_parallel_fragments``."""

nb_prefetch: int = 2
"""
Nb of producers groups (of size `nb_parallel_fragments`) to prefetch
"""
"""The number of producer groups (of size `nb_parallel_fragments`) to
prefetch."""

world_size: int = 1
"""The world size of the process group."""

Expand All @@ -125,13 +113,12 @@ class ParquetBasicDataloaderConfig:
"""The number of parallel calls in map operations."""

use_threads: bool = False
"""
Whether pyarrow should use its internal parallelism threads to read the parquet part.
Since we rely on the external parallelism, this param is tuned off.
"""
"""Whether pyarrow should use its internal threads to read the Parquet file.
Since we rely on the external parallelism, this param is tuned off by
default."""

filesystem: Optional[pa.fs.FileSystem] = None
"""
Filesystem to read parquet files from. S3 example :
"""The filesystem to read the Parquet files from. S3 example:
>>> import s3fs
>>> filesystem = s3fs.core.S3FileSystem(...)
"""
Expand Down Expand Up @@ -218,7 +205,7 @@ def parquet_iterator(
config: ParquetBasicDataloaderConfig,
) -> Generator[BatchOutputType, None, None]:
"""
Example of usage :
Example of usage:
>>> from recipes.parquet.parquet_dataloader import (
... ParquetBasicDataloaderConfig, ParquetBatchFormat, build_parquet_iterator_pipeline)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@
"typing_extensions~=4.3;python_version<'3.10'",
],
extras_require={
"arrow": ["pyarrow>=13.0.0", "pandas>=2.0.0"],
"arrow": ["pyarrow>=13.0.0", "pandas~=2.0.0"],
},
)
28 changes: 18 additions & 10 deletions tests/integration/parquet/test_parquet_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,25 @@
from collections import Counter
from typing import Any, Dict, Generator, List, Union

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
from numpy.typing import NDArray

from recipes.parquet.parquet_dataloader import (
ParquetBasicDataloaderConfig,
ParquetBatchFormat,
parquet_iterator,
)
try:
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

arrow_found = True

from numpy.typing import NDArray

from recipes.parquet.parquet_dataloader import (
ParquetBasicDataloaderConfig,
ParquetBatchFormat,
parquet_iterator,
)
except ImportError:
arrow_found = False


def gen_random_string(length: int) -> str:
Expand Down Expand Up @@ -92,6 +99,7 @@ def multi_partition_file() -> Generator[str, None, None]:
shutil.rmtree(tmpdir)


@pytest.mark.skipif(not arrow_found, reason="arrow not found")
class TestParquetDataloader:
def test_simple_dataload(self, multi_partition_file: str) -> None:
config = ParquetBasicDataloaderConfig(
Expand Down

0 comments on commit 1b3e3fb

Please sign in to comment.