diff --git a/docs/userguides/development.md b/docs/userguides/development.md index a62514d2..fc6785d7 100644 --- a/docs/userguides/development.md +++ b/docs/userguides/development.md @@ -120,6 +120,43 @@ def handle_on_shutdown(): *Changed in 0.2.0*: The behavior of the `@app.on_startup()` decorator and handler signature have changed. It is now executed only once upon application startup and worker events have moved on `@app.on_worker_startup()`. +## Application State + +Sometimes it is very useful to have access to values in a shared state across your workers. +For example you might have a value or complex reference type that you wish to update during one of your tasks, and read during another. +Silverback provides `app.state` to help with these use cases. + +For example, you might want to pre-populate a large dataframe into state on startup, keeping that dataframe in sync with the chain through event logs, +and then use that data to determine a signal under which you want trigger transactions to commit back to the chain. +Such an application might look like this: + +```py +@app.on_startup() +def create_table(startup_state): + df = contract.MyEvent.query(..., start_block=startup_state.last_block_processed) + ... # Do some further processing on df + app.state.table = df + + +@app.on_(contract.MyEvent) +def update_table(log): + app.state.table = ... # Update using stuff from `log` + + +@app.on_(chain.blocks) +def use_table(blk): + if app.state.table[...].mean() > app.state.table[...].sum(): + contract.myMethod(..., sender=app.signer) + ... +``` + +```{warning} +You can use `app.state` to store any python variable type, however note that the item is not networked nor threadsafe so it is not recommended to have multiple tasks write to the same value in state at the same time. +``` +```{note} +Application startup and application runtime events (e.g. block or event container) are handled distinctly and can be trusted not to execute at the same time. +``` + ## Running your Application Once you have programmed your bot, it's really useful to be able to run it locally and validate that it does what you expect it to do. diff --git a/silverback/application.py b/silverback/application.py index 9b071442..c72ce896 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -36,6 +36,35 @@ class TaskData(BaseModel): class SharedState(defaultdict): + """ + Class containing the application shared state that all workers can read from and write to. + + ```{warning} + This is not networked in any way, nor is it multi-process safe, but will be + accessible across multiple thread workers within a single process. + ``` + + Usage example:: + + @app.on_(...) + def do_something_with_state(value): + # Read from state using `getattr` + ... = app.state.something + + # Set state using `setattr` + app.state.something = ... + + # Read from state using `getitem` + ... = app.state["something"] + + # Set state using setitem + app.state["something"] = ... + """ + + # TODO: This class does not have thread-safe access control, but should remain safe due to + # it being a memory mapping, and writes are strictly controlled to be handled only by + # one worker at a time. There may be issues with using this in production however. + def __init__(self): # Any unknown key returns None super().__init__(lambda: None) @@ -173,8 +202,6 @@ def __get_user_all_taskdata_handler(self) -> list[TaskData]: return [v for k, l in self.tasks.items() if str(k).startswith("user:") for v in l] async def __load_snapshot_handler(self) -> StateSnapshot: - # NOTE: This is not networked in any way, nor thread-safe nor multi-process safe, - # but will be accessible across multiple workers in a single container # NOTE: *DO NOT USE* in Runner, as it will not be updated by the app self.state = SharedState() # NOTE: attribute does not exist before this task is executed, @@ -221,6 +248,11 @@ def broker_task_decorator( """ Dynamically create a new broker task that handles tasks of ``task_type``. + ```{warning} + Dynamically creating a task does not ensure that the runner will be aware of the task + in order to trigger it. Use at your own risk. + ``` + Args: task_type: :class:`~silverback.types.TaskType`: The type of task to create. container: (BlockContainer | ContractEvent): The event source to watch. @@ -277,19 +309,21 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask: def on_startup(self) -> Callable: """ - Code to execute on one worker upon startup / restart after an error. + Code that will be exected by one worker after worker startup, but before the + application is put into the "run" state by the Runner. Usage example:: @app.on_startup() - def do_something_on_startup(startup_state): + def do_something_on_startup(startup_state: StateSnapshot): ... # Reprocess missed events or blocks """ return self.broker_task_decorator(TaskType.STARTUP) def on_shutdown(self) -> Callable: """ - Code to execute on one worker at shutdown. + Code that will be exected by one worker before worker shutdown, after the + Runner has decided to put the application into the "shutdown" state. Usage example:: @@ -299,25 +333,37 @@ def do_something_on_shutdown(): """ return self.broker_task_decorator(TaskType.SHUTDOWN) + # TODO: Abstract away worker startup into dependency system def on_worker_startup(self) -> Callable: """ - Code to execute on every worker at startup / restart after an error. + Code to execute on every worker immediately after broker startup. + + ```{note} + This is a great place to load heavy dependencies for the workers, + such as database connections, ML models, etc. + ``` Usage example:: - @app.on_startup() + @app.on_worker_startup() def do_something_on_startup(state): ... # Can provision resources, or add things to `state`. """ return self.broker.on_event(TaskiqEvents.WORKER_STARTUP) + # TODO: Abstract away worker shutdown into dependency system def on_worker_shutdown(self) -> Callable: """ - Code to execute on every worker at shutdown. + Code to execute on every worker immediately before broker shutdown. + + ```{note} + This is where you should also release any resources you have loaded during + worker startup. + ``` Usage example:: - @app.on_shutdown() + @app.on_worker_shutdown() def do_something_on_shutdown(state): ... # Update some external service, perhaps using information from `state`. """ @@ -326,11 +372,12 @@ def do_something_on_shutdown(state): def on_( self, container: BlockContainer | ContractEvent, + # TODO: possibly remove these new_block_timeout: int | None = None, start_block: int | None = None, ): """ - Create task to handle events created by `container`. + Create task to handle events created by the `container` trigger. Args: container: (BlockContainer | ContractEvent): The event source to watch. @@ -378,5 +425,5 @@ def on_( return self.broker_task_decorator(TaskType.EVENT_LOG, container=container) # TODO: Support account transaction polling - # TODO: Support mempool polling + # TODO: Support mempool polling? raise InvalidContainerTypeError(container)