diff --git a/silverback/__init__.py b/silverback/__init__.py index 56ddad86..dd26b077 100644 --- a/silverback/__init__.py +++ b/silverback/__init__.py @@ -1,8 +1,10 @@ from .application import SilverbackApp from .exceptions import CircuitBreaker, SilverbackException +from .types import SilverbackStartupState __all__ = [ "CircuitBreaker", "SilverbackApp", "SilverbackException", + "SilverbackStartupState", ] diff --git a/silverback/application.py b/silverback/application.py index df755407..0ae38c80 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -78,6 +78,30 @@ def checkpoint(self, last_block_seen: int, last_block_processed: int): self.broker.state.last_block_processed = last_block_processed def on_startup(self) -> Callable: + """ + Code to execute on one worker upon startup / restart after an error. + + Usage example:: + + @app.on_startup() + def do_something_on_startup(state): + ... # Can provision resources, or add things to `state`. + """ + return self.broker.task(task_name="silverback_startup") + + def on_shutdown(self) -> Callable: + """ + Code to execute on one worker at shutdown. + + Usage example:: + + @app.on_shutdown() + def do_something_on_shutdown(state): + ... # Update some external service, perhaps using information from `state`. + """ + return self.broker.task(task_name="silverback_shutdown") + + def on_client_startup(self) -> Callable: """ Code to execute on client startup / restart after an error. @@ -89,7 +113,7 @@ def do_something_on_startup(state): """ return self.broker.on_event(TaskiqEvents.CLIENT_STARTUP) - def on_shutdown(self) -> Callable: + def on_client_shutdown(self) -> Callable: """ Code to execute on client shutdown. @@ -101,6 +125,48 @@ def do_something_on_shutdown(state): """ return self.broker.on_event(TaskiqEvents.CLIENT_SHUTDOWN) + def on_worker_startup(self) -> Callable: + """ + Code to execute on every worker at startup / restart after an error. + + Usage example:: + + @app.on_startup() + def do_something_on_startup(state): + ... # Can provision resources, or add things to `state`. + """ + return self.broker.on_event(TaskiqEvents.WORKER_STARTUP) + + def on_worker_shutdown(self) -> Callable: + """ + Code to execute on every worker at shutdown. + + Usage example:: + + @app.on_shutdown() + def do_something_on_shutdown(state): + ... # Update some external service, perhaps using information from `state`. + """ + return self.broker.on_event(TaskiqEvents.WORKER_SHUTDOWN) + + def get_startup_handler(self) -> Optional[AsyncTaskiqDecoratedTask]: + """ + Get access to the handler for `silverback_startup` events. + + Returns: + Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created. + """ + return self.broker.available_tasks.get("silverback_startup") + + def get_shutdown_handler(self) -> Optional[AsyncTaskiqDecoratedTask]: + """ + Get access to the handler for `silverback_shutdown` events. + + Returns: + Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created. + """ + return self.broker.available_tasks.get("silverback_shutdown") + def get_block_handler(self) -> Optional[AsyncTaskiqDecoratedTask]: """ Get access to the handler for `block` events. diff --git a/silverback/runner.py b/silverback/runner.py index 72bec454..3007db58 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -14,7 +14,7 @@ from .persistence import BasePersistentStorage, HandlerResult, init_mongo from .settings import Settings from .subscriptions import SubscriptionType, Web3SubscriptionsManager -from .types import SilverbackIdent, handler_id_block, handler_id_event +from .types import SilverbackIdent, SilverbackStartupState, handler_id_block, handler_id_event from .utils import async_wrap_iter, hexbytes_dict settings = Settings() @@ -119,6 +119,22 @@ async def run(self): await self.app.broker.startup() + # Execute Silverback startup task before we init the rest + if startup_handler := self.app.get_startup_handler(): + task = await startup_handler.kiq( + SilverbackStartupState( + last_block_seen=self.last_block_seen, + last_block_processed=self.last_block_processed, + ) + ) + result = await task.wait_result() + await self._handle_result( + "silverback_startup", + self.last_block_seen, + -1, + result, + ) + if block_handler := self.app.get_block_handler(): tasks = [self._block_task(block_handler)] else: diff --git a/silverback/types.py b/silverback/types.py index 0cc027bd..893a69c8 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -16,6 +16,11 @@ def from_settings(cls, settings_: Settings) -> Self: return cls(identifier=settings_.INSTANCE, network_choice=settings_.get_network_choice()) +class SilverbackStartupState(BaseModel): + last_block_seen: int + last_block_processed: int + + def handler_id_block(block_number: Optional[int]) -> str: """Return a unique handler ID string for a block""" if block_number is None: