Skip to content

Commit

Permalink
refactor!: clean up startup process significantly
Browse files Browse the repository at this point in the history
  • Loading branch information
fubuloubu committed Apr 17, 2024
1 parent d9495df commit a7be986
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 26 deletions.
16 changes: 15 additions & 1 deletion silverback/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any
from typing import Any, Sequence

from ape.exceptions import ApeException

Expand Down Expand Up @@ -30,6 +30,20 @@ class SilverbackException(ApeException):
"""Base Exception for any Silverback runtime faults."""


# TODO: `ExceptionGroup` added in Python 3.11
class StartupFailure(SilverbackException):
def __init__(self, *exceptions: Sequence[Exception]):
if error_str := "\n".join(str(e) for e in exceptions):
super().__init__(f"Startup failure(s):\n{error_str}")
else:
super().__init__("Startup failure(s) detected. See logs for details.")


class NoTasksAvailableError(SilverbackException):
def __init__(self):
super().__init__("No tasks to execute")


class Halt(SilverbackException):
def __init__(self):
super().__init__("App halted, must restart manually")
Expand Down
75 changes: 50 additions & 25 deletions silverback/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from taskiq import AsyncTaskiqDecoratedTask, AsyncTaskiqTask

from .application import SilverbackApp
from .exceptions import Halt, NoWebsocketAvailableError
from .exceptions import Halt, NoTasksAvailableError, NoWebsocketAvailableError, StartupFailure
from .recorder import BaseRecorder, TaskResult
from .subscriptions import SubscriptionType, Web3SubscriptionsManager
from .types import AppState, SilverbackID, TaskType
Expand Down Expand Up @@ -106,7 +106,10 @@ async def run(self):
and process them by kicking events over to the configured broker.
Raises:
:class:`~silverback.exceptions.Halt`: If there are no configured tasks to execute.
:class:`~silverback.exceptions.StartupFailure`:
If there was an exception during startup.
:class:`~silverback.exceptions.NoTasksAvailableError`:
If there are no configured tasks to execute.
"""
# Initialize recorder (if available) and fetch state if app has been run previously
if self.recorder and (startup_state := (await self.recorder.init(app_id=self.identifier))):
Expand All @@ -119,35 +122,57 @@ async def run(self):
await self.app.broker.startup()

# Execute Silverback startup task before we init the rest
for startup_task in self.app.tasks[TaskType.STARTUP]:
task = await startup_task.handler.kiq(
SilverbackStartupState(
last_block_seen=self.last_block_seen,
last_block_processed=self.last_block_processed,
)
if startup_tasks := await asyncio.gather(
*(task_def.handler.kiq() for task_def in self.app.tasks[TaskType.STARTUP])
):
results = await asyncio.gather(
*(startup_task.wait_result() for startup_task in startup_tasks)
)
result = await task.wait_result()
self._handle_result(result)

tasks = []
for task in self.app.tasks[TaskType.NEW_BLOCKS]:
tasks.append(self._block_task(task.handler))
if any(result.is_err for result in results):
# NOTE: Abort before even starting to run
raise StartupFailure(*(result.error for result in results if result.is_err))
# NOTE: No need to handle results otherwise

# Create our long-running event listeners
# NOTE: Any propagated failure in here should be handled such that shutdown tasks also run
# TODO: `asyncio.TaskGroup` added in Python 3.11
listener_tasks = (
*(
asyncio.create_task(self._block_task(task_def.handler))
for task_def in self.app.tasks[TaskType.NEW_BLOCKS]
),
*(
asyncio.create_task(self._event_task(task_def.container, task_def.handler))
for task_def in self.app.tasks[TaskType.EVENT_LOG]
),
)

for task in self.app.tasks[TaskType.EVENT_LOG]:
tasks.append(self._event_task(task.container, task.handler))
# NOTE: Safe to do this because no tasks have been scheduled to run yet
if len(listener_tasks) == 0:
raise NoTasksAvailableError()

if len(tasks) == 0:
raise Halt("No tasks to execute")
# Run until one task bubbles up an exception that should stop execution
tasks_with_errors, tasks_running = await asyncio.wait(
listener_tasks, return_when=asyncio.FIRST_EXCEPTION
)
if runtime_errors := "\n".join(str(task.exception()) for task in tasks_with_errors):
# NOTE: In case we are somehow not displaying the error correctly with task status
logger.debug(f"Runtime error(s) detected, shutting down:\n{runtime_errors}")

try:
await asyncio.gather(*tasks)
except Exception as e:
logger.error(f"Fatal error detected, shutting down: '{e}'")
# Cancel any still running
(task.cancel() for task in tasks_running)
# NOTE: All listener tasks are shut down now

# Execute Silverback shutdown task before shutting down the broker
for shutdown_task in self.app.tasks[TaskType.SHUTDOWN]:
task = await shutdown_task.handler.kiq()
result = self._handle_result(await task.wait_result())
# Execute Silverback shutdown task(s) before shutting down the broker and app
if shutdown_tasks := await asyncio.gather(
*(task_def.handler.kiq() for task_def in self.app.tasks[TaskType.SHUTDOWN])
):
asyncio.gather(*(shutdown_task.is_ready() for shutdown_task in shutdown_tasks))
if any(result.is_err for result in results):
errors_str = "\n".join(str(result.error) for result in results if result.is_err)
logger.error(f"Errors while shutting down:\n{errors_str}")
# NOTE: No need to handle results otherwise

await self.app.broker.shutdown()

Expand Down

0 comments on commit a7be986

Please sign in to comment.