Skip to content

Commit

Permalink
refactor: adds silverback_startup task, and on_client_* and on_worker…
Browse files Browse the repository at this point in the history
…_* decorators
  • Loading branch information
mikeshultz committed Nov 18, 2023
1 parent 85a5579 commit ec43642
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 2 deletions.
2 changes: 2 additions & 0 deletions silverback/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from .application import SilverbackApp
from .exceptions import CircuitBreaker, SilverbackException
from .types import SilverbackStartupState

__all__ = [
"CircuitBreaker",
"SilverbackApp",
"SilverbackException",
"SilverbackStartupState",
]
68 changes: 67 additions & 1 deletion silverback/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,30 @@ def checkpoint(self, last_block_seen: int, last_block_processed: int):
self.broker.state.last_block_processed = last_block_processed

def on_startup(self) -> Callable:
"""
Code to execute on one worker upon startup / restart after an error.
Usage example::
@app.on_startup()
def do_something_on_startup(state):
... # Can provision resources, or add things to `state`.
"""
return self.broker.task(task_name="silverback_startup")

def on_shutdown(self) -> Callable:
"""
Code to execute on one worker at shutdown.
Usage example::
@app.on_shutdown()
def do_something_on_shutdown(state):
... # Update some external service, perhaps using information from `state`.
"""
return self.broker.task(task_name="silverback_shutdown")

def on_client_startup(self) -> Callable:
"""
Code to execute on client startup / restart after an error.
Expand All @@ -89,7 +113,7 @@ def do_something_on_startup(state):
"""
return self.broker.on_event(TaskiqEvents.CLIENT_STARTUP)

def on_shutdown(self) -> Callable:
def on_client_shutdown(self) -> Callable:
"""
Code to execute on client shutdown.
Expand All @@ -101,6 +125,48 @@ def do_something_on_shutdown(state):
"""
return self.broker.on_event(TaskiqEvents.CLIENT_SHUTDOWN)

def on_worker_startup(self) -> Callable:
"""
Code to execute on every worker at startup / restart after an error.
Usage example::
@app.on_startup()
def do_something_on_startup(state):
... # Can provision resources, or add things to `state`.
"""
return self.broker.on_event(TaskiqEvents.WORKER_STARTUP)

def on_worker_shutdown(self) -> Callable:
"""
Code to execute on every worker at shutdown.
Usage example::
@app.on_shutdown()
def do_something_on_shutdown(state):
... # Update some external service, perhaps using information from `state`.
"""
return self.broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)

def get_startup_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `silverback_startup` events.
Returns:
Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created.
"""
return self.broker.available_tasks.get("silverback_startup")

def get_shutdown_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `silverback_shutdown` events.
Returns:
Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created.
"""
return self.broker.available_tasks.get("silverback_shutdown")

def get_block_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `block` events.
Expand Down
18 changes: 17 additions & 1 deletion silverback/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from .persistence import BasePersistentStorage, HandlerResult, init_mongo
from .settings import Settings
from .subscriptions import SubscriptionType, Web3SubscriptionsManager
from .types import SilverbackIdent, handler_id_block, handler_id_event
from .types import SilverbackIdent, SilverbackStartupState, handler_id_block, handler_id_event
from .utils import async_wrap_iter, hexbytes_dict

settings = Settings()
Expand Down Expand Up @@ -119,6 +119,22 @@ async def run(self):

await self.app.broker.startup()

# Execute Silverback startup task before we init the rest
if startup_handler := self.app.get_startup_handler():
task = await startup_handler.kiq(
SilverbackStartupState(
last_block_seen=self.last_block_seen,
last_block_processed=self.last_block_processed,
)
)
result = await task.wait_result()
await self._handle_result(
"silverback_startup",
self.last_block_seen,
-1,
result,
)

if block_handler := self.app.get_block_handler():
tasks = [self._block_task(block_handler)]
else:
Expand Down
5 changes: 5 additions & 0 deletions silverback/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ def from_settings(cls, settings_: Settings) -> Self:
return cls(identifier=settings_.INSTANCE, network_choice=settings_.get_network_choice())


class SilverbackStartupState(BaseModel):
last_block_seen: int
last_block_processed: int


def handler_id_block(block_number: Optional[int]) -> str:
"""Return a unique handler ID string for a block"""
if block_number is None:
Expand Down

0 comments on commit ec43642

Please sign in to comment.