Skip to content

Commit

Permalink
docs: add more color in docs for app.state
Browse files Browse the repository at this point in the history
Also updated docstrings within `silverback/application.py`
  • Loading branch information
fubuloubu committed Jun 4, 2024
1 parent 65f5eb7 commit d625e68
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 11 deletions.
37 changes: 37 additions & 0 deletions docs/userguides/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,43 @@ def handle_on_shutdown():

*Changed in 0.2.0*: The behavior of the `@app.on_startup()` decorator and handler signature have changed. It is now executed only once upon application startup and worker events have moved on `@app.on_worker_startup()`.

## Application State

Sometimes it is very useful to have access to values in a shared state across your workers.
For example you might have a value or complex reference type that you wish to update during one of your tasks, and read during another.
Silverback provides `app.state` to help with these use cases.

For example, you might want to pre-populate a large dataframe into state on startup, keeping that dataframe in sync with the chain through event logs,
and then use that data to determine a signal under which you want trigger transactions to commit back to the chain.
Such an application might look like this:

```py
@app.on_startup()
def create_table(startup_state):
df = contract.MyEvent.query(..., start_block=startup_state.last_block_processed)
... # Do some further processing on df
app.state.table = df


@app.on_(contract.MyEvent)
def update_table(log):
app.state.table = ... # Update using stuff from `log`


@app.on_(chain.blocks)
def use_table(blk):
if app.state.table[...].mean() > app.state.table[...].sum():
contract.myMethod(..., sender=app.signer)
...
```

```{warning}
You can use `app.state` to store any python variable type, however note that the item is not networked nor threadsafe so it is not recommended to have multiple tasks write to the same value in state at the same time.
```
```{note}
Application startup and application runtime events (e.g. block or event container) are handled distinctly and can be trusted not to execute at the same time.
```

## Running your Application

Once you have programmed your bot, it's really useful to be able to run it locally and validate that it does what you expect it to do.
Expand Down
69 changes: 58 additions & 11 deletions silverback/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,35 @@ class TaskData(BaseModel):


class SharedState(defaultdict):
"""
Class containing the application shared state that all workers can read from and write to.
```{warning}
This is not networked in any way, nor is it multi-process safe, but will be
accessible across multiple thread workers within a single process.
```
Usage example::
@app.on_(...)
def do_something_with_state(value):
# Read from state using `getattr`
... = app.state.something
# Set state using `setattr`
app.state.something = ...
# Read from state using `getitem`
... = app.state["something"]
# Set state using setitem
app.state["something"] = ...
"""

# TODO: This class does not have thread-safe access control, but should remain safe due to
# it being a memory mapping, and writes are strictly controlled to be handled only by
# one worker at a time. There may be issues with using this in production however.

def __init__(self):
# Any unknown key returns None
super().__init__(lambda: None)
Expand Down Expand Up @@ -173,8 +202,6 @@ def __get_user_all_taskdata_handler(self) -> list[TaskData]:
return [v for k, l in self.tasks.items() if str(k).startswith("user:") for v in l]

async def __load_snapshot_handler(self) -> StateSnapshot:
# NOTE: This is not networked in any way, nor thread-safe nor multi-process safe,
# but will be accessible across multiple workers in a single container
# NOTE: *DO NOT USE* in Runner, as it will not be updated by the app
self.state = SharedState()
# NOTE: attribute does not exist before this task is executed,
Expand Down Expand Up @@ -221,6 +248,11 @@ def broker_task_decorator(
"""
Dynamically create a new broker task that handles tasks of ``task_type``.
```{warning}
Dynamically creating a task does not ensure that the runner will be aware of the task
in order to trigger it. Use at your own risk.
```
Args:
task_type: :class:`~silverback.types.TaskType`: The type of task to create.
container: (BlockContainer | ContractEvent): The event source to watch.
Expand Down Expand Up @@ -277,19 +309,21 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask:

def on_startup(self) -> Callable:
"""
Code to execute on one worker upon startup / restart after an error.
Code that will be exected by one worker after worker startup, but before the
application is put into the "run" state by the Runner.
Usage example::
@app.on_startup()
def do_something_on_startup(startup_state):
def do_something_on_startup(startup_state: StateSnapshot):
... # Reprocess missed events or blocks
"""
return self.broker_task_decorator(TaskType.STARTUP)

def on_shutdown(self) -> Callable:
"""
Code to execute on one worker at shutdown.
Code that will be exected by one worker before worker shutdown, after the
Runner has decided to put the application into the "shutdown" state.
Usage example::
Expand All @@ -299,25 +333,37 @@ def do_something_on_shutdown():
"""
return self.broker_task_decorator(TaskType.SHUTDOWN)

# TODO: Abstract away worker startup into dependency system
def on_worker_startup(self) -> Callable:
"""
Code to execute on every worker at startup / restart after an error.
Code to execute on every worker immediately after broker startup.
```{note}
This is a great place to load heavy dependencies for the workers,
such as database connections, ML models, etc.
```
Usage example::
@app.on_startup()
@app.on_worker_startup()
def do_something_on_startup(state):
... # Can provision resources, or add things to `state`.
"""
return self.broker.on_event(TaskiqEvents.WORKER_STARTUP)

# TODO: Abstract away worker shutdown into dependency system
def on_worker_shutdown(self) -> Callable:
"""
Code to execute on every worker at shutdown.
Code to execute on every worker immediately before broker shutdown.
```{note}
This is where you should also release any resources you have loaded during
worker startup.
```
Usage example::
@app.on_shutdown()
@app.on_worker_shutdown()
def do_something_on_shutdown(state):
... # Update some external service, perhaps using information from `state`.
"""
Expand All @@ -326,11 +372,12 @@ def do_something_on_shutdown(state):
def on_(
self,
container: BlockContainer | ContractEvent,
# TODO: possibly remove these
new_block_timeout: int | None = None,
start_block: int | None = None,
):
"""
Create task to handle events created by `container`.
Create task to handle events created by the `container` trigger.
Args:
container: (BlockContainer | ContractEvent): The event source to watch.
Expand Down Expand Up @@ -378,5 +425,5 @@ def on_(
return self.broker_task_decorator(TaskType.EVENT_LOG, container=container)

# TODO: Support account transaction polling
# TODO: Support mempool polling
# TODO: Support mempool polling?
raise InvalidContainerTypeError(container)

0 comments on commit d625e68

Please sign in to comment.