Skip to content

Commit

Permalink
adding exception handling for transforms creation (#616)
Browse files Browse the repository at this point in the history
* adding exception handling for transforms creation

* fixing introduced bugs

* additional testing

* increased timeout

* increased timeout

* increased timeout

* increased timeout

* documentation update
  • Loading branch information
blublinsky authored Sep 25, 2024
1 parent 740de6b commit cca4c82
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 10 deletions.
6 changes: 5 additions & 1 deletion data-processing-lib/doc/transform-exceptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ With this in mind, there are two types of exceptions:
1. Those that would not allow any data to be processed (e.g. model loading problem).
2. Those that would not allow a specific datum to be processed (e.g. missing column).

In the first situation the transform should throw an exception from the initializer, which
In the first situation the transform should throw an
[unrecoverable exception](../python/src/data_processing/utils/unrecoverable.py), which
will cause the runtime to terminate processing of all data.
**Note:** any exception thrown from `init` method of transform will cause runtime to
terminate processing

In the second situation (identified in the `transform()` or `flush()` methods), the transform
should throw an exception from the associated method.
This will cause only the error-causing datum to be ignored and not written out,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from data_processing.data_access import DataAccessFactoryBase
from data_processing.runtime import AbstractTransformFileProcessor
from data_processing.transform import AbstractBinaryTransform, TransformStatistics
from data_processing.utils import UnrecoverableException


class PythonTransformFileProcessor(AbstractTransformFileProcessor):
Expand Down Expand Up @@ -43,7 +44,11 @@ def __init__(
)
self.transform_params["statistics"] = statistics
# Create local processor
self.transform = transform_class(self.transform_params)
try:
self.transform = transform_class(self.transform_params)
except Exception as e:
self.logger.error(f"Exception creating transform {e}")
raise UnrecoverableException("failed creating transform")
# Create statistics
self.stats = statistics

Expand Down Expand Up @@ -82,7 +87,11 @@ def process_file(self, f_name: str) -> dict[str, Any]:
self.stats = {}
if self.transform is None:
# create transform. Make sure to do this locally
self.transform = self.transform_class(self.transform_params)
try:
self.transform = self.transform_class(self.transform_params)
except Exception as e:
self.logger.error(f"Exception creating transform {e}")
raise UnrecoverableException("failed creating transform")
# Invoke superclass method
super().process_file(f_name=f_name)
# return collected statistics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
################################################################################

import logging
import sys
import time
from typing import Any

import ray
from ray.experimental.state.api import list_actors
from data_processing.utils import GB, UnrecoverableException
from ray.actor import ActorHandle
from ray.exceptions import RayError
Expand Down Expand Up @@ -109,7 +109,16 @@ def operator() -> ActorHandle:
time.sleep(creation_delay)
return clazz.options(**actor_options).remote(params)

return [operator() for _ in range(n_actors)]
cls_name = clazz.__class__.__name__.replace('ActorClass(', '').replace(')','')
actors = [operator() for _ in range(n_actors)]
for i in range(60):
time.sleep(1)
alive = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")])
if len(actors) == len(alive):
return actors
# failed - raise an exception
print(f"created {actors}, alive {alive}")
raise UnrecoverableException(f"out of {len(actors)} created actors only {len(alive)} alive")

@staticmethod
def process_files(
Expand Down Expand Up @@ -220,6 +229,7 @@ def process_files(
def wait_for_execution_completion(logger: logging.Logger, replies: list[ray.ObjectRef]) -> int:
"""
Wait for all requests completed
:param logger: logger to use
:param replies: list of request futures
:return: None
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ def __init__(self, params: dict[str, Any]):
raise UnrecoverableException("statistics is None")
self.transform_params["statistics"] = self.stats
# Create local processor
self.transform = params.get("transform_class", None)(self.transform_params)
try:
self.transform = params.get("transform_class", None)(self.transform_params)
except Exception as e:
self.logger.error(f"Exception creating transform {e}")
raise UnrecoverableException("failed creating transform")

def _publish_stats(self, stats: dict[str, Any]) -> None:
self.stats.add_stats.remote(stats)
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def orchestrate(
logger.info(
f"Number of workers - {preprocessing_params.n_workers} " f"with {preprocessing_params.worker_options} each"
)
# create statistics
# log retries
if retries > 0:
statistics.add_stats.remote({"data access retries": retries})
# create executors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from data_processing.runtime import AbstractTransformFileProcessor
from data_processing.transform import TransformStatistics
from data_processing_spark.runtime.spark import SparkTransformRuntimeConfiguration
from data_processing.utils import UnrecoverableException


class SparkTransformFileProcessor(AbstractTransformFileProcessor):
Expand Down Expand Up @@ -49,7 +50,11 @@ def create_transform(self, transform_parameters: dict[str, Any]):
:return: None
"""
# Create local processor
self.transform = self.runtime_configuration.get_transform_class()(transform_parameters)
try:
self.transform = self.runtime_configuration.get_transform_class()(transform_parameters)
except Exception as e:
self.logger.error(f"Exception creating transform {e}")
raise UnrecoverableException("failed creating transform")

def _publish_stats(self, stats: dict[str, Any]) -> None:
"""
Expand Down
3 changes: 1 addition & 2 deletions transforms/universal/noop/ray/src/noop_transform_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# limitations under the License.
################################################################################

from data_processing.utils import CLIArgumentProvider, get_logger
from data_processing.utils import get_logger
from data_processing_ray.runtime.ray import RayTransformLauncher
from data_processing_ray.runtime.ray.runtime_configuration import (
RayTransformRuntimeConfiguration,
Expand All @@ -31,7 +31,6 @@ class NOOPRayTransformConfiguration(RayTransformRuntimeConfiguration):
def __init__(self):
"""
Initialization
:param base_configuration - base configuration class
"""
super().__init__(transform_config=NOOPTransformConfiguration())

Expand Down

0 comments on commit cca4c82

Please sign in to comment.