diff --git a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_file_processor.py b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_file_processor.py index 143835dd0..fa3e69e4a 100644 --- a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_file_processor.py +++ b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_file_processor.py @@ -14,7 +14,7 @@ from data_processing.data_access import DataAccessFactoryBase from data_processing.runtime import AbstractTransformFileProcessor -from data_processing.transform import AbstractBinaryTransform, TransformStatistics +from data_processing.transform import AbstractTransform, TransformStatistics from data_processing.utils import UnrecoverableException @@ -28,7 +28,8 @@ def __init__( data_access_factory: DataAccessFactoryBase, statistics: TransformStatistics, transform_params: dict[str, Any], - transform_class: type[AbstractBinaryTransform], + transform_class: type[AbstractTransform], + is_folder: bool, ): """ Init method @@ -36,11 +37,13 @@ def __init__( :param statistics - reference to statistics class :param transform_params - transform parameters :param transform_class: transform class + :param is_folder: folder transform flag """ # invoke superclass super().__init__( data_access_factory=data_access_factory, transform_parameters=dict(transform_params), + is_folder=is_folder, ) self.transform_params["statistics"] = statistics # Create local processor @@ -52,7 +55,8 @@ def __init__( # Create statistics self.stats = statistics - def _publish_stats(self, stats: dict[str, Any]) -> None: + +def _publish_stats(self, stats: dict[str, Any]) -> None: self.stats.add_stats(stats) @@ -65,17 +69,20 @@ def __init__( self, data_access_factory: DataAccessFactoryBase, transform_params: dict[str, Any], - transform_class: type[AbstractBinaryTransform], + transform_class: type[AbstractTransform], + is_folder: bool ): """ Init method :param data_access_factory - data access factory :param transform_params - transform parameters :param transform_class: transform class + :param is_folder: folder tranform flag """ super().__init__( data_access_factory=data_access_factory, transform_parameters=dict(transform_params), + is_folder=is_folder, ) # Add data access and statistics to the processor parameters self.transform_params["data_access"] = self.data_access diff --git a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py index 8692da29e..153eaaf0a 100644 --- a/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py +++ b/data-processing-lib/python/src/data_processing/runtime/pure_python/transform_orchestrator.py @@ -24,7 +24,7 @@ PythonTransformFileProcessor, PythonTransformRuntimeConfiguration, ) -from data_processing.transform import AbstractBinaryTransform, TransformStatistics +from data_processing.transform import AbstractBinaryTransform, TransformStatistics, AbstractFolderTransform from data_processing.utils import GB, get_logger @@ -48,8 +48,6 @@ def _execution_resources() -> dict[str, Any]: "object_store": 0, } - - def orchestrate( data_access_factory: DataAccessFactoryBase, runtime_config: PythonTransformRuntimeConfiguration, @@ -74,15 +72,21 @@ def orchestrate( return 1 # create additional execution parameters runtime = runtime_config.create_transform_runtime() + is_folder = issubclass(runtime_config.get_transform_class(), AbstractFolderTransform) try: - # Get files to process - files, profile, retries = data_access.get_files_to_process() - if len(files) == 0: - logger.error("No input files to process - exiting") - return 0 - if retries > 0: - statistics.add_stats({"data access retries": retries}) - logger.info(f"Number of files is {len(files)}, source profile {profile}") + if is_folder: + # folder transform + files = AbstractFolderTransform.get_folders(data_access=data_access) + logger.info(f"Number of folders is {len(files)}") + else: + # Get files to process + files, profile, retries = data_access.get_files_to_process() + if len(files) == 0: + logger.error("No input files to process - exiting") + return 0 + if retries > 0: + statistics.add_stats({"data access retries": retries}) + logger.info(f"Number of files is {len(files)}, source profile {profile}") # Print interval print_interval = int(len(files) / 100) if print_interval == 0: @@ -99,6 +103,7 @@ def orchestrate( data_access_factory=data_access_factory, statistics=statistics, files=files ), transform_class=runtime_config.get_transform_class(), + is_folder=is_folder, ) else: # using sequential execution @@ -111,6 +116,7 @@ def orchestrate( data_access_factory=data_access_factory, statistics=statistics, files=files ), transform_class=runtime_config.get_transform_class(), + is_folder=is_folder, ) status = "success" return_code = 0 @@ -157,7 +163,8 @@ def _process_transforms( data_access_factory: DataAccessFactoryBase, statistics: TransformStatistics, transform_params: dict[str, Any], - transform_class: type[AbstractBinaryTransform], + transform_class: type[AbstractTransform], + is_folder: bool, ) -> None: """ Process transforms sequentially @@ -167,9 +174,8 @@ def _process_transforms( :param data_access_factory: data access factory :param transform_params - transform parameters :param transform_class: transform class + :param is_folder: folder transform flag :return: metadata for the execution - - :return: None """ # create executor executor = PythonTransformFileProcessor( @@ -177,6 +183,7 @@ def _process_transforms( statistics=statistics, transform_params=transform_params, transform_class=transform_class, + is_folder=is_folder, ) # process data t_start = time.time() @@ -203,6 +210,7 @@ def _process_transforms_multiprocessor( data_access_factory: DataAccessFactoryBase, transform_params: dict[str, Any], transform_class: type[AbstractBinaryTransform], + is_folder: bool ) -> TransformStatistics: """ Process transforms using multiprocessing pool @@ -212,13 +220,17 @@ def _process_transforms_multiprocessor( :param data_access_factory: data access factory :param transform_params - transform parameters :param transform_class: transform class + :param is_folder: folder transform class :return: metadata for the execution """ # result statistics statistics = TransformStatistics() # create processor processor = PythonPoolTransformFileProcessor( - data_access_factory=data_access_factory, transform_params=transform_params, transform_class=transform_class + data_access_factory=data_access_factory, + transform_params=transform_params, + transform_class=transform_class, + is_folder=is_folder, ) completed = 0 t_start = time.time() diff --git a/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py b/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py index d4ec548d8..1d268875f 100644 --- a/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py +++ b/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py @@ -26,11 +26,13 @@ def __init__( self, data_access_factory: DataAccessFactoryBase, transform_parameters: dict[str, Any], + is_folder: bool = False, ): """ Init method :param data_access_factory: Data Access Factory :param transform_parameters: Transform parameters + :param is_folder: folder transform flag """ self.logger = get_logger(__name__) # validate parameters @@ -46,6 +48,7 @@ def __init__( # Add data access and statistics to the processor parameters self.transform_params = transform_parameters self.transform_params["data_access"] = self.data_access + self.is_folder = is_folder def process_file(self, f_name: str) -> None: """ @@ -58,25 +61,29 @@ def process_file(self, f_name: str) -> None: self.logger.warning("No data_access found. Returning.") return t_start = time.time() - # Read source file - filedata, retries = self.data_access.get_file(path=f_name) - if retries > 0: - self._publish_stats({"data access retries": retries}) - if filedata is None: - self.logger.warning(f"File read resulted in None for {f_name}. Returning.") - self._publish_stats({"failed_reads": 1}) - return - self._publish_stats({"source_files": 1, "source_size": len(filedata)}) + if not self.is_folder: + # Read source file only if we are processing file + filedata, retries = self.data_access.get_file(path=f_name) + if retries > 0: + self._publish_stats({"data access retries": retries}) + if filedata is None: + self.logger.warning(f"File read resulted in None for {f_name}. Returning.") + self._publish_stats({"failed_reads": 1}) + return + self._publish_stats({"source_files": 1, "source_size": len(filedata)}) # Process input file try: - # execute local processing - name_extension = TransformUtils.get_file_extension(f_name) self.logger.debug(f"Begin transforming file {f_name}") - out_files, stats = self.transform.transform_binary(file_name=f_name, byte_array=filedata) + if not self.is_folder: + # execute local processing + out_files, stats = self.transform.transform_binary(file_name=f_name, byte_array=filedata) + name_extension = TransformUtils.get_file_extension(f_name) + self.last_file_name = name_extension[0] + self.last_file_name_next_index = None + self.last_extension = name_extension[1] + else: + out_files, stats = self.transform.transform(folder_name=f_name) self.logger.debug(f"Done transforming file {f_name}, got {len(out_files)} files") - self.last_file_name = name_extension[0] - self.last_file_name_next_index = None - self.last_extension = name_extension[1] # save results self._submit_file(t_start=t_start, out_files=out_files, stats=stats) # Process unrecoverable exceptions @@ -95,10 +102,10 @@ def flush(self) -> None: the hook for them to return back locally stored data and their statistics. :return: None """ - if self.last_file_name is None: + if self.last_file_name is None or self.is_folder: # for some reason a given worker never processed anything. Happens in testing # when the amount of workers is greater than the amount of files - self.logger.debug("skipping flush, no name for file is defined") + self.logger.debug("skipping flush, no name for file is defined or this is a folder transform") return try: t_start = time.time() diff --git a/data-processing-lib/python/src/data_processing/transform/__init__.py b/data-processing-lib/python/src/data_processing/transform/__init__.py index 6af43ad60..20254e47b 100644 --- a/data-processing-lib/python/src/data_processing/transform/__init__.py +++ b/data-processing-lib/python/src/data_processing/transform/__init__.py @@ -1,3 +1,5 @@ +from data_processing.transform.abstract_transform import AbstractTransform +from data_processing.transform.folder_transform import AbstractFolderTransform from data_processing.transform.binary_transform import AbstractBinaryTransform from data_processing.transform.table_transform import AbstractTableTransform from data_processing.transform.transform_statistics import TransformStatistics diff --git a/data-processing-lib/python/src/data_processing/transform/abstract_transform.py b/data-processing-lib/python/src/data_processing/transform/abstract_transform.py new file mode 100644 index 000000000..89db70f42 --- /dev/null +++ b/data-processing-lib/python/src/data_processing/transform/abstract_transform.py @@ -0,0 +1,16 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +class AbstractTransform: + """ + Base class for all transform types + """ \ No newline at end of file diff --git a/data-processing-lib/python/src/data_processing/transform/binary_transform.py b/data-processing-lib/python/src/data_processing/transform/binary_transform.py index 80dff61ea..b313aff2f 100644 --- a/data-processing-lib/python/src/data_processing/transform/binary_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/binary_transform.py @@ -10,10 +10,11 @@ # limitations under the License. ################################################################################ -from typing import Any, TypeVar +from typing import Any +from data_processing.transform import AbstractTransform -class AbstractBinaryTransform: +class AbstractBinaryTransform(AbstractTransform): """ Converts input binary file to output file(s) (binary) Sub-classes must provide the transform() method to provide the conversion of one binary files to 0 or diff --git a/data-processing-lib/python/src/data_processing/transform/folder_transform.py b/data-processing-lib/python/src/data_processing/transform/folder_transform.py new file mode 100644 index 000000000..866e3286f --- /dev/null +++ b/data-processing-lib/python/src/data_processing/transform/folder_transform.py @@ -0,0 +1,50 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from typing import Any +from data_processing.data_access import data_access +from data_processing.transform import AbstractTransform + + +class AbstractFolderTransform(AbstractTransform): + """ + Converts input folder to output file(s) (binary) + Sub-classes must provide the transform() method to provide the conversion of a folder to 0 or + more new binary files and metadata. + """ + + def __init__(self, config: dict[str, Any]): + """ + Initialize based on the dictionary of configuration information. + This simply stores the given instance in this instance for later use. + """ + self.config = config + + def transform(self, folder_name: str) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: + """ + Converts input folder into o or more output files. + If there is an error, an exception must be raised - exit()ing is not generally allowed. + :param folder_name: the name of the folder containing arbitrary amount of files. + :return: a tuple of a list of 0 or more tuples and a dictionary of statistics that will be propagated + to metadata. Each element of the return list, is a tuple of the transformed bytes and a string + holding the extension to be used when writing out the new bytes. + """ + raise NotImplemented() + + @staticmethod + def get_folders(data_access:data_access) -> list(str): + """ + Compute the list of folders to use. + :param data_access - data access class + :return: + """ + raise NotImplemented() diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_file_processor.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_file_processor.py index e1fabb144..cdad1309f 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_file_processor.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_file_processor.py @@ -35,6 +35,7 @@ def __init__(self, params: dict[str, Any]): super().__init__( data_access_factory=params.get("data_access_factory", None), transform_parameters=dict(params.get("transform_params", {})), + is_folder=params.get("is_folder", False) ) # Create statistics self.stats = params.get("statistics", None) diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py index 42eba47a6..8276eb56c 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/transform_orchestrator.py @@ -16,6 +16,7 @@ import ray from data_processing.data_access import DataAccessFactoryBase +from data_processing.transform import AbstractFolderTransform from data_processing_ray.runtime.ray import ( RayTransformExecutionConfiguration, RayTransformFileProcessor, @@ -56,13 +57,18 @@ def orchestrate( # create transformer runtime runtime = runtime_config.create_transform_runtime() resources = RayUtils.get_cluster_resources() + is_folder = issubclass(runtime_config.get_transform_class(), AbstractFolderTransform) try: - # Get files to process - files, profile, retries = data_access.get_files_to_process() - if len(files) == 0: - logger.error("No input files to process - exiting") - return 0 - logger.info(f"Number of files is {len(files)}, source profile {profile}") + if is_folder: + # folder transform + files = AbstractFolderTransform.get_folders(data_access=data_access) + logger.info(f"Number of folders is {len(files)}") # Get files to process + else: + files, profile, retries = data_access.get_files_to_process() + if len(files) == 0: + logger.error("No input files to process - exiting") + return 0 + logger.info(f"Number of files is {len(files)}, source profile {profile}") # Print interval print_interval = int(len(files) / 100) if print_interval == 0: @@ -84,6 +90,7 @@ def orchestrate( data_access_factory=data_access_factory, statistics=statistics, files=files ), "statistics": statistics, + "is_folder": is_folder, } logger.debug("Creating actors") processors = RayUtils.create_actors( diff --git a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_file_processor.py b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_file_processor.py index d63664ac4..a0968ab1d 100644 --- a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_file_processor.py +++ b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_file_processor.py @@ -29,12 +29,15 @@ def __init__( data_access_factory: DataAccessFactoryBase, runtime_configuration: SparkTransformRuntimeConfiguration, statistics: TransformStatistics, + is_folder: bool, ): """ Init method """ super().__init__( - data_access_factory=data_access_factory, transform_parameters=runtime_configuration.get_transform_params() + data_access_factory=data_access_factory, + transform_parameters=runtime_configuration.get_transform_params(), + is_folder=is_folder, ) # Add data access ant statistics to the processor parameters self.runtime_configuration = runtime_configuration diff --git a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py index 57a6c58fc..11589dbaf 100644 --- a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py +++ b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py @@ -15,7 +15,7 @@ from datetime import datetime from data_processing.data_access import DataAccessFactoryBase -from data_processing.transform import TransformStatistics +from data_processing.transform import TransformStatistics, AbstractFolderTransform from data_processing.utils import GB, get_logger from data_processing_spark.runtime.spark import ( SparkTransformFileProcessor, @@ -68,7 +68,10 @@ def process_partition(iterator): runtime = runtime_conf.create_transform_runtime() # create file processor file_processor = SparkTransformFileProcessor( - data_access_factory=d_access_factory, runtime_configuration=runtime_conf, statistics=statistics + data_access_factory=d_access_factory, + runtime_configuration=runtime_conf, + statistics=statistics, + is_folder=is_folder, ) first = True for f in iterator: @@ -92,13 +95,19 @@ def process_partition(iterator): return list(statistics.get_execution_stats().items()) num_partitions = 0 + is_folder = issubclass(runtime_config.get_transform_class(), AbstractFolderTransform) try: - # Get files to process - files, profile, retries = data_access.get_files_to_process() - if len(files) == 0: - logger.error("No input files to process - exiting") - return 0 - logger.info(f"Number of files is {len(files)}, source profile {profile}") + if is_folder: + # folder transform + files = AbstractFolderTransform.get_folders(data_access=data_access) + logger.info(f"Number of folders is {len(files)}") # Get files to process + else: + # Get files to process + files, profile, retries = data_access.get_files_to_process() + if len(files) == 0: + logger.error("No input files to process - exiting") + return 0 + logger.info(f"Number of files is {len(files)}, source profile {profile}") # process data logger.debug("Begin processing files") # process files split by partitions