Skip to content

Commit

Permalink
Merge pull request #699 from IBM/fuzzy-dedup
Browse files Browse the repository at this point in the history
new Fuzzy Dedup Implementation
  • Loading branch information
touma-I authored Nov 26, 2024
2 parents 6e89570 + 7ae1f13 commit 2f80d9c
Show file tree
Hide file tree
Showing 291 changed files with 6,551 additions and 2,187 deletions.
1 change: 0 additions & 1 deletion data-processing-lib/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ dynamic = ["dependencies", "optional-dependencies"]
Repository = "https://github.com/IBM/data-prep-kit"
Issues = "https://github.com/IBM/data-prep-kit/issues"
Documentation = "https://ibm.github.io/data-prep-kit/doc"
"Transform project" = "https://github.com/IBM/data-prep-kit/tree/dev/transforms/universal/noop"

[build-system]
requires = ["setuptools>=68.0.0", "wheel", "setuptools_scm[toml]>=7.1.0"]
Expand Down
13 changes: 9 additions & 4 deletions data-processing-lib/spark/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@ setup::

set-versions: .check-env
$(MAKE) TOML_VERSION=$(DPK_LIB_VERSION) .defaults.update-toml
sed -e 's/"pyspark...*",/"pyspark>=${SPARK_VERSION}",/' \
pyproject.toml > tt.toml
mv tt.toml pyproject.toml
if [ -e pyproject.toml ]; then \
cat pyproject.toml | sed -e 's/"spark[default]==.*",/"spark[default]==$(SPARK_VERSION)",/' > tt.toml; \
mv tt.toml pyproject.toml; \
fi
if [ -e requirements.txt ]; then \
cat requirements.txt | sed -e 's/ray[default]==.*/ray[default]==$(SPARK_VERSION)/' > tt.txt; \
mv tt.txt requirements.txt; \
fi

build:: build-dist

Expand All @@ -26,7 +31,7 @@ publish-dist :: .check-env .defaults.publish-dist

publish-image:: .defaults.publish-image

venv:: pyproject.toml
venv::
$(MAKE) .defaults.spark-lib-src-venv
pip install pytest pytest-cov

Expand Down
2 changes: 1 addition & 1 deletion scripts/check-workflows.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ if [ ! -d transforms ]; then
echo Please run this script from the top of the repository
exit 1
fi
KFP_BLACK_LIST="doc_chunk pdf2parquet pii_redactor text_encoder license_select repo_level_ordering header_cleanser"
KFP_BLACK_LIST="doc_chunk pdf2parquet pii_redactor text_encoder license_select repo_level_ordering header_cleanser fdedup"
while [ $# -ne 0 ]; do
case $1 in
-show-kfp-black-list) echo $KFP_BLACK_LIST; exit 0;
Expand Down
2 changes: 2 additions & 0 deletions transforms/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ all = { file = [
"universal/hap/python/requirements.txt",
"universal/tokenization/python/requirements.txt",
"universal/ededup/python/requirements.txt",
"universal/fdedup/python/requirements.txt",
"universal/profiler/python/requirements.txt",
"universal/doc_id/python/requirements.txt",
"universal/filter/python/requirements.txt",
Expand Down Expand Up @@ -71,6 +72,7 @@ pdf2parquet = { file = ["language/pdf2parquet/python/requirements.txt"]}
hap = { file = ["universal/hap/python/requirements.txt"]}
tokenization = { file = ["universal/tokenization/python/requirements.txt"]}
ededup = { file = ["universal/ededup/python/requirements.txt"]}
fdedup = { file = ["universal/fdedup/python/requirements.txt"]}
profiler = { file = ["universal/profiler/python/requirements.txt"]}
doc_id = { file = ["universal/doc_id/python/requirements.txt"]}
filter = { file = ["universal/filter/python/requirements.txt"]}
Expand Down
19 changes: 10 additions & 9 deletions transforms/universal/fdedup/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Fuzzy Deduplification Transform
The fdedup transforms removes documents that are very similar to each other within a set of parquet files,
per the set of
[transform project conventions](../../README.md#transform-project-conventions)
the following runtimes are available:
# Fuzzy Deduplication Transform
The fdedup transform eliminates documents that are highly similar to each other (but not necessarily identical) from a
set of Parquet files. This ensures that the resulting dataset contains only unique or sufficiently distinct entries.
Per the set of [transform project conventions](../../README.md#transform-project-conventions) the following runtimes are available:

* [ray](ray/README.md) - enables the running of the base python transformation
in a Ray runtime
* [kfp](kfp_ray/README.md) - enables running the ray docker image
in a kubernetes cluster using a generated `yaml` file.
* [python](python/README.md) - enables running the base transform in a pure python environment
* [ray](ray/README.md) - enables running the base python transform in a Ray runtime
* [spark](spark/README.md) - enables running the base python transform in a spark runtime
* [kfp](kfp_ray/README.md) - enables running the ray docker image in a kubernetes cluster using a generated `yaml` file.

Please check [here](python/README.md) for a more detailed description of this transform.
215 changes: 215 additions & 0 deletions transforms/universal/fdedup/fdedup_python.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "afd55886-5f5b-4794-838e-ef8179fb0394",
"metadata": {},
"source": [
"##### **** These pip installs need to be adapted to use the appropriate release level. Alternatively, The venv running the jupyter lab could be pre-configured with a requirement file that includes the right release. Example for transform developers working from git clone:\n",
"```\n",
"make venv\n",
"source venv/bin/activate && pip install jupyterlab\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4c45c3c6-e4d7-4e61-8de6-32d61f2ce695",
"metadata": {},
"outputs": [],
"source": [
"%%capture\n",
"## This is here as a reference only\n",
"# Users and application developers must use the right tag for the latest from pypi\n",
"#!pip install data-prep-toolkit\n",
"#!pip install data-prep-toolkit-transforms\n",
"#!pip install data-prep-connector"
]
},
{
"cell_type": "markdown",
"id": "ebf1f782-0e61-485c-8670-81066beb734c",
"metadata": {},
"source": [
"##### ***** Import required Classes and modules"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c2a12abc-9460-4e45-8961-873b48a9ab19",
"metadata": {},
"outputs": [],
"source": [
"import ast\n",
"import os\n",
"import sys\n",
"\n",
"from data_processing.utils import ParamsUtils\n",
"from fdedup_transform_python import parse_args, ServiceOrchestrator"
]
},
{
"cell_type": "markdown",
"id": "7234563c-2924-4150-8a31-4aec98c1bf33",
"metadata": {},
"source": [
"##### ***** Setup runtime parameters for this transform\n",
"We will only provide a description for the parameters used in this example. For a complete list of parameters, please refer to the README.md for this transform:\n",
"|parameter:type | value | description |\n",
"|-|-|-|\n",
"| input_folder:str | \\${PWD}/ray/test-data/input/ | folder that contains the input parquet files for the fuzzy dedup algorithm |\n",
"| output_folder:str | \\${PWD}/ray/output/ | folder that contains the all the intermediate results and the output parquet files for the fuzzy dedup algorithm |\n",
"| contents_column:str | contents | name of the column that stores document text |\n",
"| document_id_column:str | int_id_column | name of the column that stores document ID |\n",
"| num_permutations:int | 112 | number of permutations to use for minhash calculation |\n",
"| num_bands:int | 14 | number of bands to use for band hash calculation |\n",
"| num_minhashes_per_band | 8 | number of minhashes to use in each band |\n",
"| operation_mode:{filter_duplicates,filter_non_duplicates,annotate} | filter_duplicates | operation mode for data cleanup: filter out duplicates/non-duplicates, or annotate duplicate documents |"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e90a853e-412f-45d7-af3d-959e755aeebb",
"metadata": {},
"outputs": [],
"source": [
"# create parameters\n",
"input_folder = os.path.join(os.path.abspath(\"\"), \"python\", \"test-data\", \"input\")\n",
"output_folder = os.path.join(os.path.abspath(\"\"), \"python\", \"output\")\n",
"params = {\n",
" # transform configuration parameters\n",
" \"input_folder\": input_folder,\n",
" \"output_folder\": output_folder,\n",
" \"contents_column\": \"contents\",\n",
" \"document_id_column\": \"int_id_column\",\n",
" \"num_permutations\": 112,\n",
" \"num_bands\": 14,\n",
" \"num_minhashes_per_band\": 8,\n",
" \"operation_mode\": \"filter_duplicates\",\n",
"}"
]
},
{
"cell_type": "markdown",
"id": "7949f66a-d207-45ef-9ad7-ad9406f8d42a",
"metadata": {},
"source": [
"##### ***** Use ray runtime to invoke each transform in the fuzzy dedup pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0775e400-7469-49a6-8998-bd4772931459",
"metadata": {},
"outputs": [],
"source": [
"\n",
"sys.argv = ParamsUtils.dict_to_req(d=params)\n",
"args = parse_args()\n",
"# Initialize the orchestrator\n",
"orchestrator = ServiceOrchestrator(global_params=args)\n",
"# Launch python fuzzy dedup execution\n",
"orchestrator.orchestrate()"
]
},
{
"cell_type": "markdown",
"id": "c3df5adf-4717-4a03-864d-9151cd3f134b",
"metadata": {},
"source": [
"##### **** The specified folder will include the transformed parquet files."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7276fe84-6512-4605-ab65-747351e13a7c",
"metadata": {},
"outputs": [],
"source": [
"import glob\n",
"glob.glob(\"python/output/cleaned/*\")"
]
},
{
"cell_type": "markdown",
"id": "d30489d9-fc98-423e-90a8-e8f372787e88",
"metadata": {},
"source": [
"***** print the input data"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5b22234f-f7a1-4b92-b2ac-376b2545abce",
"metadata": {},
"outputs": [],
"source": [
"import polars as pl\n",
"input_df_1 = pl.read_parquet(os.path.join(os.path.abspath(\"\"), \"python\", \"test-data\", \"input\", \"data_1\", \"df1.parquet\"))\n",
"input_df_2 = pl.read_parquet(os.path.join(os.path.abspath(\"\"), \"python\", \"test-data\", \"input\", \"data_2\", \"df2.parquet\"))\n",
"input_df = input_df_1.vstack(input_df_2)\n",
"\n",
"with pl.Config(fmt_str_lengths=10000000, tbl_rows=-1):\n",
" print(input_df)"
]
},
{
"cell_type": "markdown",
"id": "5305d127-10fd-4fa6-97a6-ac47db2bdc7e",
"metadata": {},
"source": [
"***** print the output result"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0b2eddb9-4fb6-41eb-916c-3741b9129f2c",
"metadata": {},
"outputs": [],
"source": [
"import polars as pl\n",
"output_df_1 = pl.read_parquet(os.path.join(os.path.abspath(\"\"), \"python\", \"output\", \"cleaned\", \"data_1\", \"df1.parquet\"))\n",
"output_df_2 = pl.read_parquet(os.path.join(os.path.abspath(\"\"), \"python\", \"output\", \"cleaned\", \"data_2\", \"df2.parquet\"))\n",
"output_df = output_df_1.vstack(output_df_2)\n",
"with pl.Config(fmt_str_lengths=10000000, tbl_rows=-1):\n",
" print(output_df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d60e391d-cf58-47ae-9991-04c05d114edc",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "fdedup_ray",
"language": "python",
"name": "fdedup_ray"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading

0 comments on commit 2f80d9c

Please sign in to comment.