Skip to content

Commit

Permalink
Adding Resize Spark (#630)
Browse files Browse the repository at this point in the history
* documentation update

* addressed comments

* addressed comments

* addressed comments
  • Loading branch information
blublinsky authored Sep 27, 2024
1 parent 4b56f4e commit 49ebd51
Show file tree
Hide file tree
Showing 39 changed files with 614 additions and 3 deletions.
1 change: 1 addition & 0 deletions .make.versions
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ PROFILER_RAY_VERSION=$(DPK_VERSION)

RESIZE_PYTHON_VERSION=$(DPK_VERSION)
RESIZE_RAY_VERSION=$(DPK_VERSION)
RESIZE_SPARK_VERSION=$(DPK_VERSION)

LANG_ID_PYTHON_VERSION=$(DPK_VERSION)
LANG_ID_RAY_VERSION=$(DPK_VERSION)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ The matrix below shows the the combination of modules and supported runtimes. Al
| [Unique ID annotation](transforms/universal/doc_id/ray/README.md) | :white_check_mark: |:white_check_mark:|:white_check_mark:|:white_check_mark: |
| [Filter on annotations](transforms/universal/filter/python/README.md) | :white_check_mark: |:white_check_mark:|:white_check_mark:|:white_check_mark: |
| [Profiler](transforms/universal/profiler/ray/README.md) | |:white_check_mark:| |:white_check_mark: |
| [Resize](transforms/universal/resize/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Resize](transforms/universal/resize/python/README.md) | :white_check_mark: |:white_check_mark:|:white_check_mark: |:white_check_mark: |
| [Tokenizer](transforms/universal/tokenization/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| **Language-only** | | | | |
| [Language identification](transforms/language/lang_id/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
Expand Down
2 changes: 1 addition & 1 deletion transforms/universal/resize/python/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Split files
# Resize files

Please see the set of
[transform project conventions](../../README.md)
Expand Down
2 changes: 1 addition & 1 deletion transforms/universal/resize/ray/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Split files
# Resize files

Please see the set of
[transform project conventions](../../README.md)
Expand Down
1 change: 1 addition & 0 deletions transforms/universal/resize/spark/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
venv/
39 changes: 39 additions & 0 deletions transforms/universal/resize/spark/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
test-data/output
output/*
/output/
data-processing-lib/
data-processing-spark/


# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class


# Distribution / packaging
bin/
build/
develop-eggs/
dist/
eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
.tox/
htmlcov
.coverage
.cache
nosetests.xml
coverage.xml
44 changes: 44 additions & 0 deletions transforms/universal/resize/spark/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
ARG BASE_IMAGE=quay.io/dataprep1/data-prep-kit/data-prep-kit-spark-3.5.2:latest
FROM ${BASE_IMAGE}

USER root
# install pytest
RUN pip install --no-cache-dir pytest

WORKDIR ${SPARK_HOME}/work-dir

# Copy in the data processing framework source/project and install it
# This is expected to be placed in the docker context before this is run (see the make image).
COPY --chown=spark:root data-processing-lib-python/ data-processing-lib-python/
RUN cd data-processing-lib-python && pip install --no-cache-dir -e .
COPY --chown=spark:root data-processing-lib-spark/ data-processing-lib-spark/
RUN cd data-processing-lib-spark && pip install --no-cache-dir -e .
COPY --chown=spark:root python-transform/ python-transform/
RUN cd python-transform && pip install --no-cache-dir -e .

COPY --chown=root:root src/ src/
COPY --chown=root:root pyproject.toml pyproject.toml
RUN pip install --no-cache-dir -e .

# copy in the main() entry point to the image
COPY ./src/resize_transform_spark.py .

# Copy in some samples
COPY ./src/resize_local_spark.py local/

# copy test
COPY test/ test/
COPY test-data/ test-data/

USER spark

# Set environment
ENV PYTHONPATH=${SPARK_HOME}/work-dir/:${PYTHONPATH}

# Put these at the end since they seem to upset the docker cache.
ARG BUILD_DATE
ARG GIT_COMMIT
LABEL build-date=$BUILD_DATE
LABEL git-commit=$GIT_COMMIT


50 changes: 50 additions & 0 deletions transforms/universal/resize/spark/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Define the root of the local git clone for the common rules to be able
# know where they are running from.
REPOROOT=../../../..
# Include a library of common .transform.* targets which most
# transforms should be able to reuse. However, feel free
# to override/redefine the rules below.

include $(REPOROOT)/transforms/.make.transforms

TRANSFORM_NAME=resize

venv:: .transforms.spark-venv

test:: .transforms.spark-test

clean:: .transforms.clean

image:: .transforms.spark-image

test-src:: .transforms.test-src

setup:: .transforms.setup

build:: build-dist image

publish: publish-image

publish-image:: .transforms.publish-image-spark

# set the version of python transform that this depends on.
set-versions:
$(MAKE) TRANSFORM_PYTHON_VERSION=${RESIZE_PYTHON_VERSION} TOML_VERSION=$(RESIZE_SPARK_VERSION) .transforms.set-versions

build-dist:: .defaults.build-dist

publish-dist:: .defaults.publish-dist

test-image:: .transforms.spark-test-image

run-cli-sample: .transforms.run-cli-spark-sample

run-local-sample: .transforms.run-local-sample

minio-start: .minio-start

kind-load-image:: .transforms.kind-load-image

docker-load-image: .defaults.docker-load-image

docker-save-image: .defaults.docker-save-image
64 changes: 64 additions & 0 deletions transforms/universal/resize/spark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Resize files

Please see the set of
[transform project conventions](../../README.md)
for details on general project conventions, transform configuration,
testing and IDE set up.

## Summary

This is a simple transformer that is resizing the input tables to a specified size.
* resizing based on in-memory size of the tables.
* resized based on the number of rows in the tables.

## Building

A [docker file](Dockerfile) that can be used for building docker image. You can use

```shell
make build
```

## Configuration and command line Options

The set of dictionary keys holding [BlockListTransform](src/blocklist_transform.py)
configuration for values are as follows:

* _max_rows_per_table_ - specifies max documents per table
* _max_mbytes_per_table - specifies max size of table, according to the _size_type_ value.
* _size_type_ - indicates how table size is measured. Can be one of
* memory - table size is measure by the in-process memory used by the table
* disk - table size is estimated as the on-disk size of the parquet files. This is an estimate only
as files are generally compressed on disk and so may not be exact due varying compression ratios.
This is the default.

Only one of the _max_rows_per_table_ and _max_mbytes_per_table_ may be used.

## Running

We also provide several demos of the transform usage for different data storage options, including
[local file system](src/resize_local_ray.py) and [s3](src/resize_s3_ray.py).


### Launched Command Line Options
When running the transform with the Ray launcher (i.e. TransformLauncher),
the following command line arguments are available in addition to
[the options provided by the launcher](../../../data-processing-lib/doc/launcher-options.md) and map to the configuration keys above.

```
--resize_max_rows_per_table RESIZE_MAX_ROWS_PER_TABLE
Max number of rows per table
--resize_max_mbytes_per_table RESIZE_MAX_MBYTES_PER_TABLE
Max table size (MB). Size is measured according to the --resize_size_type parameter
--resize_size_type {disk,memory}
Determines how memory is measured when using the --resize_max_mbytes_per_table option.
'memory' measures the in-process memory footprint and
'disk' makes an estimate of the resulting parquet file size.
```


### Transforming data using the transform image

To use the transform image to transform your data, please refer to the
[running images quickstart](../../../../doc/quick-start/run-transform-image.md),
substituting the name of this transform image and runtime as appropriate.
46 changes: 46 additions & 0 deletions transforms/universal/resize/spark/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[project]
name = "dpk_resize_transform_spark"
version = "0.2.2.dev0"
requires-python = ">=3.10"
description = "Resize Spark Transform"
license = {text = "Apache-2.0"}
readme = {file = "README.md", content-type = "text/markdown"}
authors = [
{ name = "David Wood", email = "[email protected]" },
{ name = "Boris Lublinsky", email = "[email protected]" },
]
dependencies = [
"dpk-resize-transform-python==0.2.2.dev0",
"data-prep-toolkit-spark==0.2.2.dev0",
]

[build-system]
requires = ["setuptools>=68.0.0", "wheel", "setuptools_scm[toml]>=7.1.0"]
build-backend = "setuptools.build_meta"

[project.optional-dependencies]
dev = [
"twine",
"pytest>=7.3.2",
"pytest-dotenv>=0.5.2",
"pytest-env>=1.0.0",
"pre-commit>=3.3.2",
"pytest-cov>=4.1.0",
"pytest-mock>=3.10.0",
"moto==5.0.5",
"markupsafe==2.0.1",
]

[options]
package_dir = ["src","test"]

[options.packages.find]
where = ["src/"]

[tool.pytest.ini_options]
# Currently we use low coverage since we have to run tests separately (see makefile)
#addopts = "--cov --cov-report term-missing --cov-fail-under 25"
markers = ["unit: unit tests", "integration: integration tests"]

[tool.coverage.run]
include = ["src/*"]
47 changes: 47 additions & 0 deletions transforms/universal/resize/spark/src/resize_local_spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import os
import sys

from data_processing.utils import ParamsUtils
from data_processing_spark.runtime.spark import SparkTransformLauncher
from resize_transform_spark import ResizeSparkTransformConfiguration


# create parameters
input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test-data", "input"))
output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "output"))
local_conf = {
"input_folder": input_folder,
"output_folder": output_folder,
}
code_location = {"github": "github", "commit_hash": "12345", "path": "path"}
params = {
# Data access. Only required parameters are specified
"data_local_config": ParamsUtils.convert_to_ast(local_conf),
# execution info
"runtime_parallelization": 1,
"runtime_pipeline_id": "pipeline_id",
"runtime_job_id": "job_id",
"runtime_code_location": ParamsUtils.convert_to_ast(code_location),
# resize configuration
# "resize_max_mbytes_per_table": 0.02,
"resize_max_rows_per_table": 300,
}
if __name__ == "__main__":
# Set the simulated command line args
sys.argv = ParamsUtils.dict_to_req(d=params)
# create launcher
launcher = SparkTransformLauncher(runtime_config=ResizeSparkTransformConfiguration())
# Launch the ray actor(s) to process the input
launcher.launch()
39 changes: 39 additions & 0 deletions transforms/universal/resize/spark/src/resize_transform_spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from data_processing.utils import get_logger
from data_processing_spark.runtime.spark import SparkTransformLauncher, SparkTransformRuntimeConfiguration
from resize_transform import ResizeTransformConfiguration


logger = get_logger(__name__)


class ResizeSparkTransformConfiguration(SparkTransformRuntimeConfiguration):
"""
Implements the SparkTransformConfiguration for NOOP as required by the PythonTransformLauncher.
NOOP does not use a RayRuntime class so the superclass only needs the base
python-only configuration.
"""

def __init__(self):
"""
Initialization
"""
super().__init__(transform_config=ResizeTransformConfiguration())


if __name__ == "__main__":
# create launcher
launcher = SparkTransformLauncher(runtime_config=ResizeSparkTransformConfiguration())
logger.info("Launching resize transform")
# Launch the ray actor(s) to process the input
launcher.launch()
Loading

0 comments on commit 49ebd51

Please sign in to comment.