diff --git a/silverback/application.py b/silverback/application.py index 04ef1012..b75f7b96 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -73,6 +73,9 @@ def __init__(self, settings: Optional[Settings] = None): f"{signer_str}{start_block_str}{new_block_timeout_str}" ) + async def startup(self): + await self.broker.startup() + def on_startup(self) -> Callable: """ Code to execute on worker startup / restart after an error. @@ -85,6 +88,9 @@ def do_something_on_startup(state): """ return self.broker.on_event(TaskiqEvents.WORKER_STARTUP) + async def shutdown(self): + await self.broker.shutdown() + def on_shutdown(self) -> Callable: """ Code to execute on normal worker shutdown. @@ -126,7 +132,7 @@ def on_( container: Union[BlockContainer, ContractEvent], new_block_timeout: Optional[int] = None, start_block: Optional[int] = None, - ): + ) -> AsyncTaskiqDecoratedTask: """ Create task to handle events created by `container`. diff --git a/silverback/runner.py b/silverback/runner.py index 68b70f51..648eecb3 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -55,7 +55,7 @@ async def run(self): Raises: :class:`~silverback.exceptions.Halt`: If there are no configured tasks to execute. """ - await self.app.broker.startup() + await self.app.startup() if block_handler := self.app.get_block_handler(): tasks = [self._block_task(block_handler)] @@ -72,7 +72,7 @@ async def run(self): await asyncio.gather(*tasks) - await self.app.broker.shutdown() + await self.app.shutdown() class WebsocketRunner(BaseRunner, ManagerAccessMixin):