Skip to content

Commit

Permalink
Provide default on_error callback
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver committed Aug 15, 2024
1 parent 8adf8fc commit e53eb02
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.TableListener;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.ListenerRecorder;
import io.deephaven.engine.table.impl.MergedListener;
Expand Down Expand Up @@ -101,19 +102,20 @@ public ArrayList<TableUpdate> currentRowsAsUpdates() {

@Override
protected void process() {
try {
pyListenerCallable.call("__call__");
} catch (Exception e) {
if (!pyOnFailureCallback.isNone()) {
try {
pyOnFailureCallback.call("__call__", ExceptionUtils.getStackTrace(e));
} catch (Exception e2) {
// If the Python onFailure callback fails, log the new exception
// and continue with the original exception.
log.error().append("Python on_error callback failed: ").append(e2).endl();
}
pyListenerCallable.call("__call__");
}

@Override
protected void propagateErrorDownstream(boolean fromProcess, @NotNull Throwable error,
TableListener.@Nullable Entry entry) {
if (pyOnFailureCallback != null && !pyOnFailureCallback.isNone()) {
try {
pyOnFailureCallback.call("__call__", ExceptionUtils.getStackTrace(error));
} catch (Exception e2) {
// If the Python onFailure callback fails, log the new exception
// and continue with the original exception.
log.error().append("Python on_error callback failed: ").append(e2).endl();
}
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public abstract class MergedListener extends LivenessArtifact implements Notific
protected final PerformanceEntry entry;
private final String logPrefix;

private boolean failed;


@SuppressWarnings("FieldMayBeFinal")
private volatile long lastCompletedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;
private volatile long lastEnqueuedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;
Expand Down Expand Up @@ -96,6 +99,10 @@ protected Iterable<? extends ListenerRecorder> getRecorders() {
return recorders;
}

public boolean isFailed() {
return failed;
}

public final void notifyOnUpstreamError(
@NotNull final Throwable upstreamError, @Nullable final TableListener.Entry errorSourceEntry) {
notifyInternal(upstreamError, errorSourceEntry);
Expand All @@ -107,6 +114,10 @@ public void notifyChanges() {

private void notifyInternal(@Nullable final Throwable upstreamError,
@Nullable final TableListener.Entry errorSourceEntry) {
if (failed) {
return;
}

final long currentStep = getUpdateGraph().clock().currentStep();

synchronized (this) {
Expand Down Expand Up @@ -150,6 +161,7 @@ protected void propagateError(
final boolean uncaughtExceptionFromProcess,
@NotNull final Throwable error,
@Nullable final TableListener.Entry entry) {
failed = true;
forceReferenceCountToZero();
propagateErrorDownstream(uncaughtExceptionFromProcess, error, entry);
try {
Expand Down
110 changes: 69 additions & 41 deletions py/server/deephaven/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,25 @@ def modified_columns(self) -> List[str]:


class TableListener(ABC):
"""An abstract table listener class that should be subclassed by any user table listener class."""
"""An abstract table listener class that should be subclassed by any user table listener class. It provides a
default implementation for the on_error method that simply prints out the error."""

@abstractmethod
def on_update(self, update: TableUpdate, is_replay: bool) -> None:
"""The required method on a listener object that receives table updates."""
...

def on_error(self, e: Exception) -> None:
"""The callback method on a listener object that prints out the received error.
Args:
e (Exception): the exception that occurred during the listener's execution.
"""
print(f"An error occurred during listener execution: {self}, {e}")


def _default_on_error(e: Exception) -> None:
print(f"An error occurred during listener execution: {e}")

def _listener_wrapper(table: Table):
"""A decorator to wrap a user listener function or on_update method to receive the numpy-converted Table updates.
Expand Down Expand Up @@ -246,7 +258,8 @@ def __init__(self, t: Table, listener: Union[Callable[[TableUpdate, bool], None]
Table change events are processed by 'listener', which can be either
(1) a callable (e.g. function) or
(2) an instance of TableListener type which provides an "on_update" method.
(2) an instance of a TableListener subclass that must override the abstract "on_update" method, and optionally
override the default "on_error" method.
The callable or the on_update method must have the following signatures.
* (update: TableUpdate, is_replay: bool): support replaying the initial table snapshot and normal table updates
Expand All @@ -272,15 +285,13 @@ def __init__(self, t: Table, listener: Union[Callable[[TableUpdate, bool], None]
the listener is safe, it is not recommended because reading or operating on the result tables of those
operations may not be safe. It is best to perform the operations on the dependent tables beforehand,
and then add the result tables as dependencies to the listener so that they can be safely read in it.
on_error (Callable[[Exception], None]): a callback function to call when an error occurs during the
listener's execution, default is None. The function must take an Exception object as its only argument.
When the on_error function is invoked, the listener will be put in a failed state and will not receive
any further table updates. If the function raises an exception, the exception will only be logged in
the Deephaven server log and will not be further processed by the server.
on_error (Callable[[Exception], None]): a callback function to be invoked when an error occurs during the
listener's execution. It should only be set when the listener is a function, not when it is an instance
of TableListener. Defaults to None. When None, a default callback function will be provided that simply
prints out the received exception. If the callback function itself raises an exception, the new exception
will be logged in the Deephaven server log and will not be further processed by the server.
Although optional, it is recommended that you provide an on_error function to handle any exceptions that
may occur during the listener's execution, even when the listener is not expected to raise any exceptions
or uses try-except blocks to handle them.
Raises:
DHError
Expand All @@ -293,15 +304,22 @@ def __init__(self, t: Table, listener: Union[Callable[[TableUpdate, bool], None]
self.dependencies = to_sequence(dependencies)

if isinstance(listener, TableListener):
if on_error:
raise DHError(message="Invalid on_error argument for listeners of TableListener type which already have an on_error method.")
self.listener_wrapped = _wrap_listener_obj(t, listener)
on_error_callback = _error_callback_wrapper(listener.on_error)
elif callable(listener):
self.listener_wrapped = _wrap_listener_func(t, listener)
if on_error:
on_error_callback = _error_callback_wrapper(on_error)
else:
on_error_callback = _error_callback_wrapper(_default_on_error)
else:
raise DHError(message="listener is neither callable nor TableListener object")

try:
self.listener_adapter = _JPythonReplayListenerAdapter.create(description, t.j_table, False,
self.listener_wrapped, _error_callback_wrapper(on_error),
self.listener_wrapped, on_error_callback,
self.dependencies)
except Exception as e:
raise DHError(e, "failed to create a table listener.") from e
Expand Down Expand Up @@ -370,15 +388,12 @@ def listen(t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableL
the listener is safe, it is not recommended because reading or operating on the result tables of those
operations may not be safe. It is best to perform the operations on the dependent tables beforehand,
and then add the result tables as dependencies to the listener so that they can be safely read in it.
on_error (Callable[[Exception], None]): a callback function to call when an error occurs during the
listener's execution, default is None. The function must take an Exception object as its only argument.
When the on_error function is invoked, the listener will be put in a failed state and will not receive
any further table updates. If the function raises an exception, the exception will only be logged in
the Deephaven server log and will not be further processed by the server.
on_error (Callable[[Exception], None]): a callback function to be invoked when an error occurs during the
listener's execution. It should only be set when the listener is a function, not when it is an instance
of TableListener. Defaults to None. When None, a default callback function will be provided that simply
prints out the received exception. If the callback function itself raises an exception, the new exception
will be logged in the Deephaven server log and will not be further processed by the server.
Although optional, it is recommended that you provide an on_error function to handle any exceptions that
may occur during the listener's execution, even when the listener is not expected to raise any exceptions
or uses try-except blocks to handle them.
Returns:
a TableListenerHandle
Expand Down Expand Up @@ -421,7 +436,8 @@ def table_update(self) -> Optional[TableUpdate]:


class MergedListener(ABC):
"""An abstract multi-table listener class that should be subclassed by any user multi-table listener class."""
"""An abstract multi-table listener class that should be subclassed by any user multi-table listener class. It
provides a default implementation for the on_error method that simply prints out the error."""

@abstractmethod
def on_update(self, updates: Dict[Table, TableUpdate], is_replay: bool) -> None:
Expand All @@ -430,6 +446,14 @@ def on_update(self, updates: Dict[Table, TableUpdate], is_replay: bool) -> None:
"""
...

def on_error(self, e: Exception) -> None:
""" The callback method on a listener object that prints out the received error.
Args:
e (Exception): the exception that occurred during the listener's execution.
"""
print(f"An error occurred during listener execution: {self}, {e}")


class MergedListenerHandle(JObjectWrapper):
"""A handle to manage a merged listener's lifecycle."""
Expand All @@ -445,7 +469,9 @@ def __init__(self, tables: Sequence[Table], listener: Union[Callable[[Dict[Table
Table change events are processed by 'listener', which can be either
(1) a callable (e.g. function) or
(2) an instance of MergedListener type which provides an "on_update" method.
(2) an instance of a MergedListener subclass that must override the abstract "on_update" method, and optionally
override the default "on_error" method.
The callable or the on_update method must have the following signature.
*(updates: Dict[Table, TableUpdate], is_replay: bool): support replaying the initial table snapshots and normal table updates
The 'updates' parameter is a dictionary of Table to TableUpdate;
Expand All @@ -471,15 +497,12 @@ def __init__(self, tables: Sequence[Table], listener: Union[Callable[[Dict[Table
the listener is safe, it is not recommended because reading or operating on the result tables of those
operations may not be safe. It is best to perform the operations on the dependent tables beforehand,
and then add the result tables as dependencies to the listener so that they can be safely read in it.
on_error (Callable[[Exception], None]): a callback function to call when an error occurs during the
listener's execution, default is None. The function must take an Exception object as its only argument.
When the on_error function is invoked, the listener will be put in a failed state and will not receive
any further table updates. If the function raises an exception, the exception will only be logged in
the Deephaven server log and will not be further processed by the server.
on_error (Callable[[Exception], None]): a callback function to be invoked when an error occurs during the
listener's execution. It should only be set when the listener is a function, not when it is an instance
of MergedListener. Defaults to None. When None, a default callback function will be provided that simply
prints out the received exception. If the callback function itself raises an exception, the new exception
will be logged in the Deephaven server log and will not be further processed by the server.
Although optional, it is recommended that you provide an on_error function to handle any exceptions that
may occur during the listener's execution, even when the listener is not expected to raise any exceptions
or uses try-except blocks to handle them.
Raises:
DHError
Expand All @@ -493,21 +516,30 @@ def __init__(self, tables: Sequence[Table], listener: Union[Callable[[Dict[Table
self.dependencies = dependencies

if isinstance(listener, MergedListener):
if on_error:
raise DHError(message="Invalid on_error argument for listeners of MergedListener type which already have an on_error method.")
self.listener = listener.on_update
else:
on_error_callback = _error_callback_wrapper(listener.on_error)
elif callable(listener):
self.listener = listener
if on_error:
on_error_callback = _error_callback_wrapper(on_error)
else:
on_error_callback = _error_callback_wrapper(_default_on_error)
else:
raise DHError(message="listener is neither callable nor MergedListener object")

n_params = len(signature(self.listener).parameters)
if n_params != 2:
raise ValueError("merged listener function must have 2 parameters (updates, is_replay).")


try:
self.merged_listener_adapter = _JPythonMergedListenerAdapter.create(
to_sequence(self.listener_recorders),
to_sequence(self.dependencies),
description,
self,
_error_callback_wrapper(on_error))
on_error_callback)
self.started = False
except Exception as e:
raise DHError(e, "failed to create a merged listener adapter.") from e
Expand Down Expand Up @@ -591,15 +623,11 @@ def merged_listen(tables: Sequence[Table], listener: Union[Callable[[Dict[Table,
the listener is safe, it is not recommended because reading or operating on the result tables of those
operations may not be safe. It is best to perform the operations on the dependent tables beforehand,
and then add the result tables as dependencies to the listener so that they can be safely read in it.
on_error (Callable[[Exception], None]): a callback function to call when an error occurs during the
listener's execution, default is None. The function must take an Exception object as its only argument.
When the on_error function is invoked, the listener will be put in a failed state and will not receive
any further table updates. If the function raises an exception, the exception will only be logged in
the Deephaven server log and will not be further processed by the server.
Although optional, it is recommended that you provide an on_error function to handle any exceptions that
may occur during the listener's execution, even when the listener is not expected to raise any exceptions
or uses try-except blocks to handle them.
on_error (Callable[[Exception], None]): a callback function to be invoked when an error occurs during the
listener's execution. It should only be set when the listener is a function, not when it is an instance
of MergedListener. Defaults to None. When None, a default callback function will be provided that simply
prints out the received exception. If the callback function itself raises an exception, the new exception
will be logged in the Deephaven server log and will not be further processed by the server.
"""
merged_listener_handle = MergedListenerHandle(tables=tables, listener=listener,
description=description, dependencies=dependencies, on_error=on_error)
Expand Down
Loading

0 comments on commit e53eb02

Please sign in to comment.