diff --git a/.make.defaults b/.make.defaults index c6c0d9d6a..5df7bd933 100644 --- a/.make.defaults +++ b/.make.defaults @@ -49,7 +49,7 @@ DOCKER_REGISTRY_KEY?=$(DPK_DOCKER_REGISTRY_KEY) DOCKER_REGISTRY_ENDPOINT?=$(DOCKER_HOSTNAME)/$(DOCKER_NAMESPACE) DOCKER_LOCAL_IMAGE=$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_VERSION) DOCKER_REMOTE_IMAGE=$(DOCKER_REGISTRY_ENDPOINT)/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_VERSION) -DOCKER_SPARK_BASE_IMAGE_NAME=data-prep-kit-spark-3.5.2 +DOCKER_SPARK_BASE_IMAGE_NAME=data-prep-kit-spark-$(SPARK_VERSION) DOCKER_SPARK_BASE_IMAGE=$(DOCKER_SPARK_BASE_IMAGE_NAME):$(DOCKER_IMAGE_VERSION) RAY_BASE_IMAGE?=docker.io/rayproject/ray:${RAY}-py312 # Deprecated in favor of DOCKER_REMOTE_IMAGE diff --git a/.make.versions b/.make.versions index e77220915..9d2d1171c 100644 --- a/.make.versions +++ b/.make.versions @@ -63,6 +63,7 @@ PROFILER_RAY_VERSION=$(DPK_VERSION) RESIZE_PYTHON_VERSION=$(DPK_VERSION) RESIZE_RAY_VERSION=$(DPK_VERSION) +RESIZE_SPARK_VERSION=$(DPK_VERSION) LANG_ID_PYTHON_VERSION=$(DPK_VERSION) LANG_ID_RAY_VERSION=$(DPK_VERSION) @@ -113,6 +114,7 @@ KFP_v2_SDK=2.8.0 KFP_v1=1.8.5 KFP_v1_SDK=1.8.22 RAY=2.36.1 +SPARK_VERSION=3.5.2 REQUIRED_PYTHON_VERSIONS=">=3.10,<3.13" diff --git a/README.md b/README.md index 0406a3ced..25843f59c 100644 --- a/README.md +++ b/README.md @@ -138,7 +138,7 @@ The matrix below shows the the combination of modules and supported runtimes. Al | [Unique ID annotation](transforms/universal/doc_id/ray/README.md) | :white_check_mark: |:white_check_mark:|:white_check_mark:|:white_check_mark: | | [Filter on annotations](transforms/universal/filter/python/README.md) | :white_check_mark: |:white_check_mark:|:white_check_mark:|:white_check_mark: | | [Profiler](transforms/universal/profiler/ray/README.md) | |:white_check_mark:| |:white_check_mark: | -| [Resize](transforms/universal/resize/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: | +| [Resize](transforms/universal/resize/python/README.md) | :white_check_mark: |:white_check_mark:|:white_check_mark: |:white_check_mark: | | [Tokenizer](transforms/universal/tokenization/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: | | **Language-only** | | | | | | [Language identification](transforms/language/lang_id/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: | diff --git a/data-processing-lib/spark/Makefile b/data-processing-lib/spark/Makefile index 51290ccf5..d4769187b 100644 --- a/data-processing-lib/spark/Makefile +++ b/data-processing-lib/spark/Makefile @@ -1,9 +1,7 @@ # Use make help, to see the available rules REPOROOT=../.. include $(REPOROOT)/.make.defaults -SPARK_VERSION=3.5.2 -DOCKER_IMAGE_NAME=data-prep-kit-spark-$(SPARK_VERSION) -DOCKER_IMAGE_LIB_NAME=data-prep-kit-spark +DOCKER_IMAGE_NAME=$(DOCKER_SPARK_BASE_IMAGE_NAME) .check-env:: diff --git a/transforms/universal/resize/python/README.md b/transforms/universal/resize/python/README.md index 0d1271cbe..61357c180 100644 --- a/transforms/universal/resize/python/README.md +++ b/transforms/universal/resize/python/README.md @@ -1,4 +1,4 @@ -# Split files +# Resize files Please see the set of [transform project conventions](../../README.md) diff --git a/transforms/universal/resize/ray/README.md b/transforms/universal/resize/ray/README.md index ccb61f89c..0db9c73f2 100644 --- a/transforms/universal/resize/ray/README.md +++ b/transforms/universal/resize/ray/README.md @@ -1,4 +1,4 @@ -# Split files +# Resize files Please see the set of [transform project conventions](../../README.md) diff --git a/transforms/universal/resize/spark/.dockerignore b/transforms/universal/resize/spark/.dockerignore new file mode 100644 index 000000000..f7275bbbd --- /dev/null +++ b/transforms/universal/resize/spark/.dockerignore @@ -0,0 +1 @@ +venv/ diff --git a/transforms/universal/resize/spark/.gitignore b/transforms/universal/resize/spark/.gitignore new file mode 100644 index 000000000..34baad132 --- /dev/null +++ b/transforms/universal/resize/spark/.gitignore @@ -0,0 +1,39 @@ +test-data/output +output/* +/output/ +data-processing-lib/ +data-processing-spark/ + + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + + +# Distribution / packaging +bin/ +build/ +develop-eggs/ +dist/ +eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +.tox/ +htmlcov +.coverage +.cache +nosetests.xml +coverage.xml diff --git a/transforms/universal/resize/spark/Dockerfile b/transforms/universal/resize/spark/Dockerfile new file mode 100644 index 000000000..b7b3b7495 --- /dev/null +++ b/transforms/universal/resize/spark/Dockerfile @@ -0,0 +1,44 @@ +ARG BASE_IMAGE=quay.io/dataprep1/data-prep-kit/data-prep-kit-spark-3.5.2:latest +FROM ${BASE_IMAGE} + +USER root +# install pytest +RUN pip install --no-cache-dir pytest + +WORKDIR ${SPARK_HOME}/work-dir + +# Copy in the data processing framework source/project and install it +# This is expected to be placed in the docker context before this is run (see the make image). +COPY --chown=spark:root data-processing-lib-python/ data-processing-lib-python/ +RUN cd data-processing-lib-python && pip install --no-cache-dir -e . +COPY --chown=spark:root data-processing-lib-spark/ data-processing-lib-spark/ +RUN cd data-processing-lib-spark && pip install --no-cache-dir -e . +COPY --chown=spark:root python-transform/ python-transform/ +RUN cd python-transform && pip install --no-cache-dir -e . + +COPY --chown=root:root src/ src/ +COPY --chown=root:root pyproject.toml pyproject.toml +RUN pip install --no-cache-dir -e . + +# copy in the main() entry point to the image +COPY ./src/resize_transform_spark.py . + +# Copy in some samples +COPY ./src/resize_local_spark.py local/ + +# copy test +COPY test/ test/ +COPY test-data/ test-data/ + +USER spark + +# Set environment +ENV PYTHONPATH=${SPARK_HOME}/work-dir/:${PYTHONPATH} + +# Put these at the end since they seem to upset the docker cache. +ARG BUILD_DATE +ARG GIT_COMMIT +LABEL build-date=$BUILD_DATE +LABEL git-commit=$GIT_COMMIT + + diff --git a/transforms/universal/resize/spark/Makefile b/transforms/universal/resize/spark/Makefile new file mode 100644 index 000000000..f02e9db3f --- /dev/null +++ b/transforms/universal/resize/spark/Makefile @@ -0,0 +1,50 @@ +# Define the root of the local git clone for the common rules to be able +# know where they are running from. +REPOROOT=../../../.. +# Include a library of common .transform.* targets which most +# transforms should be able to reuse. However, feel free +# to override/redefine the rules below. + +include $(REPOROOT)/transforms/.make.transforms + +TRANSFORM_NAME=resize + +venv:: .transforms.spark-venv + +test:: .transforms.spark-test + +clean:: .transforms.clean + +image:: .transforms.spark-image + +test-src:: .transforms.test-src + +setup:: .transforms.setup + +build:: build-dist image + +publish: publish-image + +publish-image:: .transforms.publish-image-spark + +# set the version of python transform that this depends on. +set-versions: + $(MAKE) TRANSFORM_PYTHON_VERSION=${RESIZE_PYTHON_VERSION} TOML_VERSION=$(RESIZE_SPARK_VERSION) .transforms.set-versions + +build-dist:: .defaults.build-dist + +publish-dist:: .defaults.publish-dist + +test-image:: .transforms.spark-test-image + +run-cli-sample: .transforms.run-cli-spark-sample + +run-local-sample: .transforms.run-local-sample + +minio-start: .minio-start + +kind-load-image:: .transforms.kind-load-image + +docker-load-image: .defaults.docker-load-image + +docker-save-image: .defaults.docker-save-image diff --git a/transforms/universal/resize/spark/README.md b/transforms/universal/resize/spark/README.md new file mode 100644 index 000000000..0db9c73f2 --- /dev/null +++ b/transforms/universal/resize/spark/README.md @@ -0,0 +1,64 @@ +# Resize files + +Please see the set of +[transform project conventions](../../README.md) +for details on general project conventions, transform configuration, +testing and IDE set up. + +## Summary + +This is a simple transformer that is resizing the input tables to a specified size. +* resizing based on in-memory size of the tables. +* resized based on the number of rows in the tables. + +## Building + +A [docker file](Dockerfile) that can be used for building docker image. You can use + +```shell +make build +``` + +## Configuration and command line Options + +The set of dictionary keys holding [BlockListTransform](src/blocklist_transform.py) +configuration for values are as follows: + +* _max_rows_per_table_ - specifies max documents per table +* _max_mbytes_per_table - specifies max size of table, according to the _size_type_ value. +* _size_type_ - indicates how table size is measured. Can be one of + * memory - table size is measure by the in-process memory used by the table + * disk - table size is estimated as the on-disk size of the parquet files. This is an estimate only + as files are generally compressed on disk and so may not be exact due varying compression ratios. + This is the default. + +Only one of the _max_rows_per_table_ and _max_mbytes_per_table_ may be used. + +## Running + +We also provide several demos of the transform usage for different data storage options, including +[local file system](src/resize_local_ray.py) and [s3](src/resize_s3_ray.py). + + +### Launched Command Line Options +When running the transform with the Ray launcher (i.e. TransformLauncher), +the following command line arguments are available in addition to +[the options provided by the launcher](../../../data-processing-lib/doc/launcher-options.md) and map to the configuration keys above. + +``` + --resize_max_rows_per_table RESIZE_MAX_ROWS_PER_TABLE + Max number of rows per table + --resize_max_mbytes_per_table RESIZE_MAX_MBYTES_PER_TABLE + Max table size (MB). Size is measured according to the --resize_size_type parameter + --resize_size_type {disk,memory} + Determines how memory is measured when using the --resize_max_mbytes_per_table option. + 'memory' measures the in-process memory footprint and + 'disk' makes an estimate of the resulting parquet file size. +``` + + +### Transforming data using the transform image + +To use the transform image to transform your data, please refer to the +[running images quickstart](../../../../doc/quick-start/run-transform-image.md), +substituting the name of this transform image and runtime as appropriate. diff --git a/transforms/universal/resize/spark/pyproject.toml b/transforms/universal/resize/spark/pyproject.toml new file mode 100644 index 000000000..77687ca7e --- /dev/null +++ b/transforms/universal/resize/spark/pyproject.toml @@ -0,0 +1,46 @@ +[project] +name = "dpk_resize_transform_spark" +version = "0.2.2.dev0" +requires-python = ">=3.10" +description = "Resize Spark Transform" +license = {text = "Apache-2.0"} +readme = {file = "README.md", content-type = "text/markdown"} +authors = [ + { name = "David Wood", email = "dawood@us.ibm.com" }, + { name = "Boris Lublinsky", email = "blublinsk@ibm.com" }, +] +dependencies = [ + "dpk-resize-transform-python==0.2.2.dev0", + "data-prep-toolkit-spark==0.2.2.dev0", +] + +[build-system] +requires = ["setuptools>=68.0.0", "wheel", "setuptools_scm[toml]>=7.1.0"] +build-backend = "setuptools.build_meta" + +[project.optional-dependencies] +dev = [ + "twine", + "pytest>=7.3.2", + "pytest-dotenv>=0.5.2", + "pytest-env>=1.0.0", + "pre-commit>=3.3.2", + "pytest-cov>=4.1.0", + "pytest-mock>=3.10.0", + "moto==5.0.5", + "markupsafe==2.0.1", +] + +[options] +package_dir = ["src","test"] + +[options.packages.find] +where = ["src/"] + +[tool.pytest.ini_options] +# Currently we use low coverage since we have to run tests separately (see makefile) +#addopts = "--cov --cov-report term-missing --cov-fail-under 25" +markers = ["unit: unit tests", "integration: integration tests"] + +[tool.coverage.run] +include = ["src/*"] diff --git a/transforms/universal/resize/spark/src/resize_local_spark.py b/transforms/universal/resize/spark/src/resize_local_spark.py new file mode 100644 index 000000000..2d1aee614 --- /dev/null +++ b/transforms/universal/resize/spark/src/resize_local_spark.py @@ -0,0 +1,47 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import sys + +from data_processing.utils import ParamsUtils +from data_processing_spark.runtime.spark import SparkTransformLauncher +from resize_transform_spark import ResizeSparkTransformConfiguration + + +# create parameters +input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test-data", "input")) +output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "output")) +local_conf = { + "input_folder": input_folder, + "output_folder": output_folder, +} +code_location = {"github": "github", "commit_hash": "12345", "path": "path"} +params = { + # Data access. Only required parameters are specified + "data_local_config": ParamsUtils.convert_to_ast(local_conf), + # execution info + "runtime_parallelization": 1, + "runtime_pipeline_id": "pipeline_id", + "runtime_job_id": "job_id", + "runtime_code_location": ParamsUtils.convert_to_ast(code_location), + # resize configuration + # "resize_max_mbytes_per_table": 0.02, + "resize_max_rows_per_table": 300, +} +if __name__ == "__main__": + # Set the simulated command line args + sys.argv = ParamsUtils.dict_to_req(d=params) + # create launcher + launcher = SparkTransformLauncher(runtime_config=ResizeSparkTransformConfiguration()) + # Launch the ray actor(s) to process the input + launcher.launch() diff --git a/transforms/universal/resize/spark/src/resize_transform_spark.py b/transforms/universal/resize/spark/src/resize_transform_spark.py new file mode 100644 index 000000000..17f2804ab --- /dev/null +++ b/transforms/universal/resize/spark/src/resize_transform_spark.py @@ -0,0 +1,39 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from data_processing.utils import get_logger +from data_processing_spark.runtime.spark import SparkTransformLauncher, SparkTransformRuntimeConfiguration +from resize_transform import ResizeTransformConfiguration + + +logger = get_logger(__name__) + + +class ResizeSparkTransformConfiguration(SparkTransformRuntimeConfiguration): + """ + Implements the SparkTransformConfiguration for NOOP as required by the PythonTransformLauncher. + NOOP does not use a RayRuntime class so the superclass only needs the base + python-only configuration. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config=ResizeTransformConfiguration()) + + +if __name__ == "__main__": + # create launcher + launcher = SparkTransformLauncher(runtime_config=ResizeSparkTransformConfiguration()) + logger.info("Launching resize transform") + # Launch the ray actor(s) to process the input + launcher.launch() diff --git a/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/metadata.json b/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/metadata.json new file mode 100644 index 000000000..a31e4d939 --- /dev/null +++ b/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/metadata.json @@ -0,0 +1,45 @@ +{ + "pipeline": "pipeline_id", + "job details": { + "job category": "preprocessing", + "job name": "Resize", + "job type": "ray", + "job id": "job_id", + "start_time": "2024-04-02 15:04:12", + "end_time": "2024-04-02 15:04:13", + "status": "success" + }, + "code": null, + "job_input_params": { + "max_rows_per_table": -1, + "max_mbytes_per_table": 0.02, + "checkpointing": false, + "max_files": -1, + "number of workers": 1, + "worker options": { + "num_cpus": 0.8 + }, + "actor creation delay": 0 + }, + "execution_stats": { + "cpus": 10, + "gpus": 0, + "memory": 13.523791504092515, + "object_store": 2.0 + }, + "job_output_stats": { + "source_files": 3, + "source_size": 280992, + "result_files": 7, + "result_size": 280997, + "table_processing": 0.02783203125 + }, + "source": { + "name": "/Users/dawood/git/fm-data-engineering/transforms/universal/resize/test-data/input", + "type": "path" + }, + "target": { + "name": "/tmp/Resize4x1spb72", + "type": "path" + } +} diff --git a/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test1_0.parquet b/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test1_0.parquet new file mode 100644 index 000000000..48e6d7b2c Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test1_0.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test1_1.parquet b/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test1_1.parquet new file mode 100644 index 000000000..cc1975ce6 Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test1_1.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test2_0.parquet b/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test2_0.parquet new file mode 100644 index 000000000..5e10a1c56 Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test2_0.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test2_1.parquet b/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test2_1.parquet new file mode 100644 index 000000000..84aa85489 Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test2_1.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test3_0.parquet b/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test3_0.parquet new file mode 100644 index 000000000..719f21332 Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test3_0.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test3_1.parquet b/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test3_1.parquet new file mode 100644 index 000000000..0e16b7968 Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test3_1.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test3_2.parquet b/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test3_2.parquet new file mode 100644 index 000000000..0cb2543f9 Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-mbytes-0.02/test3_2.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-mbytes-0.05/metadata.json b/transforms/universal/resize/spark/test-data/expected-mbytes-0.05/metadata.json new file mode 100644 index 000000000..0a95b03a0 --- /dev/null +++ b/transforms/universal/resize/spark/test-data/expected-mbytes-0.05/metadata.json @@ -0,0 +1,45 @@ +{ + "pipeline": "pipeline_id", + "job details": { + "job category": "preprocessing", + "job name": "Resize", + "job type": "ray", + "job id": "job_id", + "start_time": "2024-04-02 14:59:00", + "end_time": "2024-04-02 14:59:01", + "status": "success" + }, + "code": null, + "job_input_params": { + "max_rows_per_table": -1, + "max_mbytes_per_table": 0.05, + "checkpointing": false, + "max_files": -1, + "number of workers": 1, + "worker options": { + "num_cpus": 0.8 + }, + "actor creation delay": 0 + }, + "execution_stats": { + "cpus": 10, + "gpus": 0, + "memory": 13.485490418039262, + "object_store": 2.0 + }, + "job_output_stats": { + "source_files": 3, + "source_size": 280992, + "result_files": 3, + "result_size": 280993, + "table_processing": 0.019520044326782227 + }, + "source": { + "name": "/Users/dawood/git/fm-data-engineering/transforms/universal/resize/test-data/input", + "type": "path" + }, + "target": { + "name": "/tmp/Resizexgyho0l8", + "type": "path" + } +} diff --git a/transforms/universal/resize/spark/test-data/expected-mbytes-0.05/test2.parquet b/transforms/universal/resize/spark/test-data/expected-mbytes-0.05/test2.parquet new file mode 100644 index 000000000..a204e1423 Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-mbytes-0.05/test2.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-mbytes-0.05/test3.parquet b/transforms/universal/resize/spark/test-data/expected-mbytes-0.05/test3.parquet new file mode 100644 index 000000000..d867108eb Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-mbytes-0.05/test3.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-mbytes-0.05/test3_0.parquet b/transforms/universal/resize/spark/test-data/expected-mbytes-0.05/test3_0.parquet new file mode 100644 index 000000000..7837d2621 Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-mbytes-0.05/test3_0.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-mbytes-1/metadata.json b/transforms/universal/resize/spark/test-data/expected-mbytes-1/metadata.json new file mode 100644 index 000000000..535415a4d --- /dev/null +++ b/transforms/universal/resize/spark/test-data/expected-mbytes-1/metadata.json @@ -0,0 +1,45 @@ +{ + "pipeline": "pipeline_id", + "job details": { + "job category": "preprocessing", + "job name": "Resize", + "job type": "ray", + "job id": "job_id", + "start_time": "2024-04-02 15:02:29", + "end_time": "2024-04-02 15:02:30", + "status": "success" + }, + "code": null, + "job_input_params": { + "max_rows_per_table": -1, + "max_mbytes_per_table": 1.0, + "checkpointing": false, + "max_files": -1, + "number of workers": 1, + "worker options": { + "num_cpus": 0.8 + }, + "actor creation delay": 0 + }, + "execution_stats": { + "cpus": 10, + "gpus": 0, + "memory": 13.490118408575654, + "object_store": 2.0 + }, + "job_output_stats": { + "source_files": 3, + "source_size": 280992, + "result_files": 1, + "result_size": 280992, + "table_processing": 0.0011630058288574219 + }, + "source": { + "name": "/Users/dawood/git/fm-data-engineering/transforms/universal/resize/test-data/input", + "type": "path" + }, + "target": { + "name": "/tmp/Resize0_y3emip", + "type": "path" + } +} diff --git a/transforms/universal/resize/spark/test-data/expected-mbytes-1/test3.parquet b/transforms/universal/resize/spark/test-data/expected-mbytes-1/test3.parquet new file mode 100644 index 000000000..758a0127d Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-mbytes-1/test3.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-rows-125/metadata.json b/transforms/universal/resize/spark/test-data/expected-rows-125/metadata.json new file mode 100644 index 000000000..3201d6696 --- /dev/null +++ b/transforms/universal/resize/spark/test-data/expected-rows-125/metadata.json @@ -0,0 +1,45 @@ +{ + "pipeline": "pipeline_id", + "job details": { + "job category": "preprocessing", + "job name": "Resize", + "job type": "ray", + "job id": "job_id", + "start_time": "2024-04-02 14:55:55", + "end_time": "2024-04-02 14:55:56", + "status": "success" + }, + "code": null, + "job_input_params": { + "max_rows_per_table": 125, + "max_mbytes_per_table": -1, + "checkpointing": false, + "max_files": -1, + "number of workers": 1, + "worker options": { + "num_cpus": 0.8 + }, + "actor creation delay": 0 + }, + "execution_stats": { + "cpus": 10, + "gpus": 0, + "memory": 13.48121948260814, + "object_store": 2.0 + }, + "job_output_stats": { + "source_files": 3, + "source_size": 280992, + "result_files": 5, + "result_size": 280996, + "table_processing": 0.01415395736694336 + }, + "source": { + "name": "/Users/dawood/git/fm-data-engineering/transforms/universal/resize/test-data/input", + "type": "path" + }, + "target": { + "name": "/tmp/Resizevm0y7uc9", + "type": "path" + } +} diff --git a/transforms/universal/resize/spark/test-data/expected-rows-125/test1.parquet b/transforms/universal/resize/spark/test-data/expected-rows-125/test1.parquet new file mode 100644 index 000000000..49f6d7e0f Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-rows-125/test1.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-rows-125/test2_0.parquet b/transforms/universal/resize/spark/test-data/expected-rows-125/test2_0.parquet new file mode 100644 index 000000000..9d83a76c4 Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-rows-125/test2_0.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-rows-125/test2_1.parquet b/transforms/universal/resize/spark/test-data/expected-rows-125/test2_1.parquet new file mode 100644 index 000000000..07f6e3c04 Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-rows-125/test2_1.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-rows-125/test3.parquet b/transforms/universal/resize/spark/test-data/expected-rows-125/test3.parquet new file mode 100644 index 000000000..2ae103007 Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-rows-125/test3.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-rows-125/test3_0.parquet b/transforms/universal/resize/spark/test-data/expected-rows-125/test3_0.parquet new file mode 100644 index 000000000..f5cade2bc Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-rows-125/test3_0.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-rows-300/metadata.json b/transforms/universal/resize/spark/test-data/expected-rows-300/metadata.json new file mode 100644 index 000000000..9917d8f4c --- /dev/null +++ b/transforms/universal/resize/spark/test-data/expected-rows-300/metadata.json @@ -0,0 +1,45 @@ +{ + "pipeline": "pipeline_id", + "job details": { + "job category": "preprocessing", + "job name": "Resize", + "job type": "ray", + "job id": "job_id", + "start_time": "2024-04-02 14:09:08", + "end_time": "2024-04-02 14:09:09", + "status": "success" + }, + "code": null, + "job_input_params": { + "max_rows_per_table": 300, + "max_mbytes_per_table": -1, + "checkpointing": false, + "max_files": -1, + "number of workers": 1, + "worker options": { + "num_cpus": 0.8 + }, + "actor creation delay": 0 + }, + "execution_stats": { + "cpus": 10, + "gpus": 0, + "memory": 13.283561706542969, + "object_store": 2.0 + }, + "job_output_stats": { + "source_files": 3, + "source_size": 280992, + "result_files": 2, + "result_size": 280993, + "table_processing": 0.0035724639892578125 + }, + "source": { + "name": "/Users/dawood/git/fm-data-engineering/transforms/universal/resize/test-data/input", + "type": "path" + }, + "target": { + "name": "/tmp/Resizebdc2xm6u", + "type": "path" + } +} diff --git a/transforms/universal/resize/spark/test-data/expected-rows-300/test2.parquet b/transforms/universal/resize/spark/test-data/expected-rows-300/test2.parquet new file mode 100644 index 000000000..21a3f9864 Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-rows-300/test2.parquet differ diff --git a/transforms/universal/resize/spark/test-data/expected-rows-300/test3.parquet b/transforms/universal/resize/spark/test-data/expected-rows-300/test3.parquet new file mode 100644 index 000000000..c46879c7d Binary files /dev/null and b/transforms/universal/resize/spark/test-data/expected-rows-300/test3.parquet differ diff --git a/transforms/universal/resize/spark/test-data/input/test1.parquet b/transforms/universal/resize/spark/test-data/input/test1.parquet new file mode 100644 index 000000000..ea7714a37 Binary files /dev/null and b/transforms/universal/resize/spark/test-data/input/test1.parquet differ diff --git a/transforms/universal/resize/spark/test-data/input/test2.parquet b/transforms/universal/resize/spark/test-data/input/test2.parquet new file mode 100644 index 000000000..ea7714a37 Binary files /dev/null and b/transforms/universal/resize/spark/test-data/input/test2.parquet differ diff --git a/transforms/universal/resize/spark/test-data/input/test3.parquet b/transforms/universal/resize/spark/test-data/input/test3.parquet new file mode 100644 index 000000000..ea7714a37 Binary files /dev/null and b/transforms/universal/resize/spark/test-data/input/test3.parquet differ diff --git a/transforms/universal/resize/spark/test/test_resize_launch.py b/transforms/universal/resize/spark/test/test_resize_launch.py new file mode 100644 index 000000000..30af7dd97 --- /dev/null +++ b/transforms/universal/resize/spark/test/test_resize_launch.py @@ -0,0 +1,55 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os + +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) +from data_processing_spark.runtime.spark import SparkTransformLauncher +from resize_transform_spark import ResizeSparkTransformConfiguration + + +class TestRayResizeTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + # The following based on 3 identical input files of about 39kbytes, and 200 rows + common_config = {"runtime_parallelization": 1} # to make the output files repeatable. + fixtures = [] + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../test-data")) + launcher = SparkTransformLauncher(ResizeSparkTransformConfiguration()) + + # Split into 4 or so files + config = {"resize_max_rows_per_table": 125} | common_config + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-125")) + + # Merge into 2 or so files + config = {"resize_max_rows_per_table": 300} | common_config + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-300")) + + # # Merge all into a single table + config = {"resize_max_mbytes_per_table": 1} | common_config + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-1")) + + # # Merge the 1st 2 and some of the 2nd with the 3rd + config = {"resize_max_mbytes_per_table": 0.05} | common_config + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.05")) + + # Split into 4 or so files + config = {"resize_max_mbytes_per_table": 0.02} | common_config + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.02")) + + return fixtures