Skip to content

Commit

Permalink
Make add_filename str/bool (#465)
Browse files Browse the repository at this point in the history
  • Loading branch information
praateekmahajan authored Jan 7, 2025
1 parent 694970a commit 2d7e857
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 83 deletions.
6 changes: 4 additions & 2 deletions docs/user-guide/documentdataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,16 @@ Let's walk through this code line by line.
"books_dataset/books_02.jsonl"]
* ``books = DocumentDataset.read_json(files, add_filename=True)`` This will read the files listed into memory.
The ``add_filename=True`` option preserves the name of the shard (``books_00.jsonl``, ``books_01.jsonl``, etc.) as an additional ``filename`` field.
When the dataset is written back to disk, this option (in conjunction with the ``write_to_filename`` option) ensure that documents stay in their original shard.
The ``add_filename=True`` option preserves the name of the shard (``books_00.jsonl``, ``books_01.jsonl``, etc.) as an additional ``file_name`` field.
When the dataset is written back to disk, this option (in conjunction with the ``write_to_filename`` option and ``filename_col`` ) ensure that documents stay in their original shard.
This can be useful for manually inspecting the results of filtering shard by shard.
The ``add_filename`` option can also be used as a string, in which case it will be used as the name of the column (instead of the default ``file_name``).
* ``filter_step = ...`` This constructs and applies a heuristic filter for the length of the document.
More information is provided in the filtering page of the documentation.
* ``long_books.to_json("long_books/", write_to_filename=True)`` This writes the filtered dataset to a new directory.
As mentioned above, the ``write_to_filename=True`` preserves the sharding of the dataset.
If the dataset was not read in with ``add_filename=True``, setting ``write_to_filename=True`` will throw an error.
If the dataset was read with ``add_filename="path"`` then along with ``write_to_filename=True`` the ``filename_col="path"`` will need to be set as well.

``DocumentDataset`` is just a wrapper around a `Dask dataframe <https://docs.dask.org/en/stable/dataframe.html>`_.
The underlying dataframe can be accessed with the ``DocumentDataset.df`` member variable.
Expand Down
20 changes: 14 additions & 6 deletions nemo_curator/datasets/doc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def read_json(
backend: Literal["pandas", "cudf"] = "pandas",
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = "1gb",
add_filename: bool = False,
add_filename: Union[bool, str] = False,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
**kwargs,
Expand All @@ -64,7 +64,9 @@ def read_json(
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a "file_name" column to the DataFrame.
add_filename: Whether to add a filename column to the DataFrame.
If True, a new column is added to the DataFrame called `file_name`.
If str, sets new column name. Default is False.
input_meta: A dictionary or a string formatted as a dictionary, which outlines
the field names and their respective data types within the JSONL input file.
columns: If not None, only these columns will be read from the file.
Expand All @@ -91,7 +93,7 @@ def read_parquet(
backend: Literal["pandas", "cudf"] = "pandas",
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = "1gb",
add_filename=False,
add_filename: Union[bool, str] = False,
columns: Optional[List[str]] = None,
**kwargs,
) -> "DocumentDataset":
Expand All @@ -102,7 +104,9 @@ def read_parquet(
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a "file_name" column to the DataFrame.
add_filename: Whether to add a filename column to the DataFrame.
If True, a new column is added to the DataFrame called `file_name`.
If str, sets new column name. Default is False.
columns: If not None, only these columns will be read from the file.
There is a significant performance gain when specifying columns for Parquet files.
Expand Down Expand Up @@ -135,7 +139,9 @@ def read_pickle(
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a "file_name" column to the DataFrame.
add_filename: Whether to add a filename column to the DataFrame.
If True, a new column is added to the DataFrame called `file_name`.
If str, sets new column name. Default is False.
columns: If not None, only these columns will be read from the file.
"""
Expand All @@ -154,6 +160,7 @@ def to_json(
output_path: str,
write_to_filename: bool = False,
keep_filename_column: bool = False,
filename_col: str = "file_name",
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters.
Expand All @@ -165,6 +172,7 @@ def to_json(
write_to_filename=write_to_filename,
keep_filename_column=keep_filename_column,
output_type="jsonl",
filename_col=filename_col,
)

def to_parquet(
Expand Down Expand Up @@ -234,7 +242,7 @@ def _read_json_or_parquet(
input_files: Union[str, List[str]],
file_type: str,
backend: Literal["cudf", "pandas"],
add_filename: bool,
add_filename: Union[bool, str] = False,
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = None,
input_meta: Union[str, dict] = None,
Expand Down
17 changes: 11 additions & 6 deletions nemo_curator/datasets/parallel_dataset.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import csv
from typing import List, Optional, Tuple, Union
from typing import List, Tuple, Union

import dask.dataframe as dd
import pandas as pd

from nemo_curator.datasets.doc_dataset import DocumentDataset
from nemo_curator.utils.distributed_utils import write_to_disk
from nemo_curator.utils.distributed_utils import _resolve_filename_col, write_to_disk
from nemo_curator.utils.file_utils import remove_path_extension
from nemo_curator.utils.import_utils import gpu_only_import

Expand All @@ -31,7 +31,7 @@ def read_simple_bitext(
src_lang: str,
tgt_lang: str,
backend: str = "pandas",
add_filename: bool = False,
add_filename: Union[bool, str] = False,
npartitions: int = 16,
):
"""See `read_single_simple_bitext_file_pair` docstring for what "simple_bitext" means and usage of other parameters.
Expand Down Expand Up @@ -99,7 +99,7 @@ def read_single_simple_bitext_file_pair(
tgt_lang: str,
doc_id: str = None,
backend: str = "cudf",
add_filename: bool = False,
add_filename: Union[bool, str] = False,
) -> Union[dd.DataFrame, "dask_cudf.DataFrame"]:
"""This function reads a pair of "simple bitext" files into a pandas DataFrame.
A simple bitext is a commonly data format in machine translation.
Expand Down Expand Up @@ -129,7 +129,10 @@ def read_single_simple_bitext_file_pair(
tgt_lang (str): Target language, in ISO-639-1 (two character) format (e.g. 'en')
doc_id (str, optional): A string document id to assign to every segment in the file. Defaults to None.
backend (str, optional): Backend of the data frame. Defaults to "cudf".
add_filename (bool, optional): Add "file_name" as an extra field to every segment in the file. Defaults to False.
add_filename (Union[bool, str]): Whether to add a filename column to the DataFrame.
If True, a new column is added to the DataFrame called `file_name`.
If str, sets new column name. Default is False.
Returns:
Union[dd.DataFrame, dask_cudf.DataFrame]
Expand Down Expand Up @@ -162,6 +165,8 @@ def read_single_simple_bitext_file_pair(
df_combined["tgt_lang"] = tgt_lang

if add_filename:
df_combined["file_name"] = remove_path_extension(src_input_file)
df_combined[_resolve_filename_col(add_filename)] = remove_path_extension(
src_input_file
)

return df_combined
1 change: 1 addition & 0 deletions nemo_curator/download/arxiv.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ def download_arxiv(
output_type=output_type,
keep_raw_download=keep_raw_download,
force_download=force_download,
filename_col="file_name",
)

return dataset
1 change: 1 addition & 0 deletions nemo_curator/download/commoncrawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ def download_common_crawl(
output_type=output_type,
keep_raw_download=keep_raw_download,
force_download=force_download,
filename_col="file_name",
)

return dataset
16 changes: 12 additions & 4 deletions nemo_curator/download/doc_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,16 @@ def _download_and_extract_single_partition(
keep_raw_download: bool,
force_download: bool,
input_meta: Union[str, dict] = None,
filename_col: str = "file_name",
) -> pd.DataFrame:
url, output_path = paths

if os.path.exists(output_path) and not force_download:
partition = read_single_partition(
[output_path], backend="pandas", filetype=output_type, add_filename=True
[output_path],
backend="pandas",
filetype=output_type,
add_filename=filename_col,
)
return partition

Expand All @@ -141,8 +145,10 @@ def _download_and_extract_single_partition(
partition = pd.DataFrame(records)
filename = os.path.basename(output_path)
output_dir = os.path.dirname(output_path)
partition["file_name"] = filename
single_partition_write_with_filename(partition, output_dir, output_type=output_type)
partition[filename_col] = filename
single_partition_write_with_filename(
partition, output_dir, output_type=output_type, filename_col=filename_col
)
if not keep_raw_download:
os.remove(downloaded_file)

Expand All @@ -160,6 +166,7 @@ def download_and_extract(
keep_raw_download=False,
force_download=False,
input_meta: Union[str, dict] = None,
filename_col: str = "file_name",
) -> DocumentDataset:
"""
Downloads and extracts a dataset into a format accepted by the NeMo Curator
Expand All @@ -178,7 +185,7 @@ def download_and_extract(
directly read from them instead.
input_meta: A dictionary or a string formatted as a dictionary, which outlines
the field names and their respective data types within the JSONL input file.
filename_col : The name of the column that contains the filename. Default is "filename_col"
Returns:
A DocumentDataset of the downloaded data
"""
Expand All @@ -202,6 +209,7 @@ def download_and_extract(
force_download=force_download,
enforce_metadata=False,
input_meta=input_meta,
filename_col=filename_col,
meta=output_format,
)

Expand Down
1 change: 1 addition & 0 deletions nemo_curator/download/wikipedia.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,7 @@ def download_wikipedia(
output_type=output_type,
keep_raw_download=keep_raw_download,
force_download=force_download,
filename_col="file_name",
)

return dataset
18 changes: 11 additions & 7 deletions nemo_curator/modules/dataset_ops.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import math
from typing import Any, Callable, List, Optional
from typing import Callable, List, Optional

import dask.dataframe as dd
import numpy as np
Expand All @@ -17,9 +17,10 @@ def __init__(
seed: Optional[int] = None,
npartitions: Optional[int] = None,
partition_to_filename: Callable[[int], str] = default_filename,
filename_col: str = "file_name",
) -> None:
"""
Randomly permutes the dataset. This will make the original "file_name" column invalid, so if the column is present it will be overwritten.
Randomly permutes the dataset. This will make the original filename_col column invalid, so if the column is present it will be overwritten.
Args:
seed: The random seed that will be used to determine which partition (file) each datapoint goes to.
Setting the seed will guarantee determinism, but may be slightly slower (20-30% slower)
Expand All @@ -35,6 +36,7 @@ def __init__(
self.npartitions = npartitions
self.partition_to_filename = partition_to_filename
self.rand_col = "_shuffle_rand"
self.filename_col = filename_col

def __call__(self, dataset: DocumentDataset) -> DocumentDataset:
if self.seed is None:
Expand All @@ -52,8 +54,10 @@ def shuffle_deterministic(self, dataset: DocumentDataset) -> DocumentDataset:
shuffled_df = dataset.df.set_index(self.rand_col, npartitions=new_npartitions)
shuffled_df = shuffled_df.reset_index(drop=True)

if "file_name" in shuffled_df:
shuffled_df["file_name"] = shuffled_df.map_partitions(self._add_filename)
if self.filename_col in shuffled_df:
shuffled_df[self.filename_col] = shuffled_df.map_partitions(
self._add_filename
)

return DocumentDataset(shuffled_df)

Expand Down Expand Up @@ -98,15 +102,15 @@ def _partition_shuffle(self, partition, partition_info=None):
drop=True
)

if "file_name" in partition:
if self.filename_col in partition:
filename = self.partition_to_filename(partition_num)
partition["file_name"] = filename
partition[self.filename_col] = filename

return partition

def _add_filename(self, partition, partition_info=None):
if partition_info is None:
return ["file_name"] * len(partition)
return [self.filename_col] * len(partition)

filename = self.partition_to_filename(partition_info["number"])

Expand Down
Loading

0 comments on commit 2d7e857

Please sign in to comment.