diff --git a/silverback/exceptions.py b/silverback/exceptions.py index aa262e75..7bde82b0 100644 --- a/silverback/exceptions.py +++ b/silverback/exceptions.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, Sequence from ape.exceptions import ApeException @@ -30,6 +30,20 @@ class SilverbackException(ApeException): """Base Exception for any Silverback runtime faults.""" +# TODO: `ExceptionGroup` added in Python 3.11 +class StartupFailure(SilverbackException): + def __init__(self, *exceptions: Sequence[Exception]): + if error_str := "\n".join(str(e) for e in exceptions): + super().__init__(f"Startup failure(s):\n{error_str}") + else: + super().__init__("Startup failure(s) detected. See logs for details.") + + +class NoTasksAvailableError(SilverbackException): + def __init__(self): + super().__init__("No tasks to execute") + + class Halt(SilverbackException): def __init__(self): super().__init__("App halted, must restart manually") diff --git a/silverback/runner.py b/silverback/runner.py index 907068b7..41f5db44 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -10,7 +10,7 @@ from taskiq import AsyncTaskiqDecoratedTask, AsyncTaskiqTask from .application import SilverbackApp -from .exceptions import Halt, NoWebsocketAvailableError +from .exceptions import Halt, NoTasksAvailableError, NoWebsocketAvailableError, StartupFailure from .recorder import BaseRecorder, TaskResult from .subscriptions import SubscriptionType, Web3SubscriptionsManager from .types import AppState, SilverbackID, TaskType @@ -106,7 +106,10 @@ async def run(self): and process them by kicking events over to the configured broker. Raises: - :class:`~silverback.exceptions.Halt`: If there are no configured tasks to execute. + :class:`~silverback.exceptions.StartupFailure`: + If there was an exception during startup. + :class:`~silverback.exceptions.NoTasksAvailableError`: + If there are no configured tasks to execute. """ # Initialize recorder (if available) and fetch state if app has been run previously if self.recorder and (startup_state := (await self.recorder.init(app_id=self.identifier))): @@ -119,35 +122,57 @@ async def run(self): await self.app.broker.startup() # Execute Silverback startup task before we init the rest - for startup_task in self.app.tasks[TaskType.STARTUP]: - task = await startup_task.handler.kiq( - SilverbackStartupState( - last_block_seen=self.last_block_seen, - last_block_processed=self.last_block_processed, - ) + if startup_tasks := await asyncio.gather( + *(task_def.handler.kiq() for task_def in self.app.tasks[TaskType.STARTUP]) + ): + results = await asyncio.gather( + *(startup_task.wait_result() for startup_task in startup_tasks) ) - result = await task.wait_result() - self._handle_result(result) - tasks = [] - for task in self.app.tasks[TaskType.NEW_BLOCKS]: - tasks.append(self._block_task(task.handler)) + if any(result.is_err for result in results): + # NOTE: Abort before even starting to run + raise StartupFailure(*(result.error for result in results if result.is_err)) + # NOTE: No need to handle results otherwise + + # Create our long-running event listeners + # NOTE: Any propagated failure in here should be handled such that shutdown tasks also run + # TODO: `asyncio.TaskGroup` added in Python 3.11 + listener_tasks = ( + *( + asyncio.create_task(self._block_task(task_def.handler)) + for task_def in self.app.tasks[TaskType.NEW_BLOCKS] + ), + *( + asyncio.create_task(self._event_task(task_def.container, task_def.handler)) + for task_def in self.app.tasks[TaskType.EVENT_LOG] + ), + ) - for task in self.app.tasks[TaskType.EVENT_LOG]: - tasks.append(self._event_task(task.container, task.handler)) + # NOTE: Safe to do this because no tasks have been scheduled to run yet + if len(listener_tasks) == 0: + raise NoTasksAvailableError() - if len(tasks) == 0: - raise Halt("No tasks to execute") + # Run until one task bubbles up an exception that should stop execution + tasks_with_errors, tasks_running = await asyncio.wait( + listener_tasks, return_when=asyncio.FIRST_EXCEPTION + ) + if runtime_errors := "\n".join(str(task.exception()) for task in tasks_with_errors): + # NOTE: In case we are somehow not displaying the error correctly with task status + logger.debug(f"Runtime error(s) detected, shutting down:\n{runtime_errors}") - try: - await asyncio.gather(*tasks) - except Exception as e: - logger.error(f"Fatal error detected, shutting down: '{e}'") + # Cancel any still running + (task.cancel() for task in tasks_running) + # NOTE: All listener tasks are shut down now - # Execute Silverback shutdown task before shutting down the broker - for shutdown_task in self.app.tasks[TaskType.SHUTDOWN]: - task = await shutdown_task.handler.kiq() - result = self._handle_result(await task.wait_result()) + # Execute Silverback shutdown task(s) before shutting down the broker and app + if shutdown_tasks := await asyncio.gather( + *(task_def.handler.kiq() for task_def in self.app.tasks[TaskType.SHUTDOWN]) + ): + asyncio.gather(*(shutdown_task.is_ready() for shutdown_task in shutdown_tasks)) + if any(result.is_err for result in results): + errors_str = "\n".join(str(result.error) for result in results if result.is_err) + logger.error(f"Errors while shutting down:\n{errors_str}") + # NOTE: No need to handle results otherwise await self.app.broker.shutdown()