Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds persistence layer for app state and job results [SBK-363] #45

Merged
merged 50 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
c46ec08
feat: adds persistence layer for app state and job results
mikeshultz Nov 18, 2023
b956d48
style(lint): fix imports
mikeshultz Nov 18, 2023
7ba2f71
fix: fix types
mikeshultz Nov 18, 2023
85a5579
style(lint): disagreement flake8 didn't like black output
mikeshultz Nov 18, 2023
ec43642
refactor: adds silverback_startup task, and on_client_* and on_worker…
mikeshultz Nov 18, 2023
8d2427c
fix: add missing silverback_shutdown task exec in runner
mikeshultz Nov 18, 2023
2564382
style(lint): lint rolling
mikeshultz Nov 18, 2023
e4caa59
refactor: remove unused SilverbackApp.checkpoint() method
mikeshultz Nov 18, 2023
8be2d10
refactor: remove mongo, add sqlite
mikeshultz Nov 20, 2023
82cb68e
fix: type | None is new in 3.10
mikeshultz Nov 20, 2023
c420f49
fix: forgot to save...
mikeshultz Nov 20, 2023
7e9e67a
style: minor cleanup and docstrings
mikeshultz Nov 21, 2023
134748d
style(lint): where that pre-commit hook when you need it
mikeshultz Nov 21, 2023
f58911a
fix: call to self.network.__enter__ should be before we try and touch…
mikeshultz Nov 21, 2023
0506ad5
feat(cli): adds simple `worker` command
mikeshultz Nov 21, 2023
2d67e49
docs: move example, create redis example, update README
mikeshultz Nov 21, 2023
98a7adb
fix(style): remove unused import
mikeshultz Nov 21, 2023
19a458b
style(lint): chill, mypy
mikeshultz Nov 21, 2023
4bb23c8
fix: update examples to fix taskiq issue
mikeshultz Nov 21, 2023
4d50d4c
style(lint): b0rk b0rk b0rk b0rk
mikeshultz Nov 21, 2023
2d219b1
fix: worker command now uses broker app uses
mikeshultz Nov 22, 2023
9273cb0
refactor: simplify app strings for examples
mikeshultz Nov 22, 2023
f7059c4
fix: drop pickle for security, use generic types for HandlerResult re…
mikeshultz Nov 22, 2023
7dc5bd0
refactor: HandlerResult should subclass TaskiqResult
mikeshultz Nov 22, 2023
c300fb6
docs: update docs for worker events, new on_startup behavior, nad ela…
mikeshultz Nov 22, 2023
e96e4f2
refactor: use position arg for reasons
mikeshultz Nov 24, 2023
29fa6ce
fix: surface errors in persistence when saving results
mikeshultz Nov 24, 2023
a544e33
refactor: move results persistence calls to middleware
mikeshultz Nov 24, 2023
00b2bf9
Merge branch 'main' into feat/persistence
mikeshultz Nov 24, 2023
e8dcda3
Update silverback/_cli.py
mikeshultz Nov 27, 2023
ee76273
refactor: use lazy init and not require runner or whatever to have to…
mikeshultz Nov 27, 2023
9779c4b
Merge branch 'feat/persistence' of github.com:ApeWorX/silverback into…
mikeshultz Nov 27, 2023
0f1f7f2
fix: run all requested workers
mikeshultz Nov 27, 2023
8dfb17c
fix: wut
mikeshultz Nov 27, 2023
a0d1c20
docs: add section covering distributed configuration
mikeshultz Nov 27, 2023
e32297d
refactor: remove PERSISTENCE_URI, use first-class SQLITE_PATH
mikeshultz Nov 27, 2023
a7b0719
fix: leftover PERSISTENCE_URI in settings type
mikeshultz Nov 27, 2023
3892e13
refactor: minor cleanup of unused settings arg in BasePersistentStorage
mikeshultz Nov 27, 2023
db7cce6
docs: help string update
mikeshultz Nov 27, 2023
60c4fd1
refactor: remove on_client_* silverback decorators
mikeshultz Nov 27, 2023
f35048c
refactor: storage->store
mikeshultz Nov 27, 2023
40a2041
refactor: instance_state -> state
mikeshultz Nov 27, 2023
0f630b0
fix: remove unnecessary Union types
mikeshultz Nov 27, 2023
4a6a65a
Merge branch 'feat/persistence' of github.com:ApeWorX/silverback into…
mikeshultz Nov 27, 2023
ccc96d8
refactor(docs): move back to unified example script
mikeshultz Nov 27, 2023
ee51bec
docs(refactor): fix example reference
mikeshultz Nov 27, 2023
463f8f6
docs: add 0.2.0 notes on event handler decorator behavior changes
mikeshultz Nov 28, 2023
f7e44c5
docs: update for multi-process configuration
mikeshultz Nov 28, 2023
4f0b11b
docs: arg clarity
mikeshultz Nov 28, 2023
5bf000c
refactor: SilverbackIdent -> SilverbackID
mikeshultz Nov 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 78 additions & 5 deletions docs/userguides/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,64 @@ Any errors you raise during this function will get captured by the client, and r

## Startup and Shutdown

If you have heavier resources you want to load during startup, or otherwise perform some data collection prior to starting the bot, you can add a startup function like so:
### Worker Events

If you have heavier resources you want to load during startup, or want to initialize things like database connections, you can add a worker startup function like so:

```py
@app.on_startup()
@app.on_worker_startup()
def handle_on_worker_startup(state):
# Connect to DB, set initial state, etc
...

@app.on_worker_shutdown()
def handle_on_worker_shutdown(state):
# cleanup resources, close connections cleanly, etc
...
```

This function comes a parameter `state` that you can use for storing the results of your startup computation or resources that you have provisioned.
It's import to note that this is useful for ensuring that your workers (of which there can be multiple) have the resources necessary to properly handle any updates you want to make in your handler functions, such as connecting to the Telegram API, an SQL or NoSQL database connection, or something else.
The `state` variable is also useful as this gets made available to each handler method so other stateful quantities can be maintained for other uses.

TODO: Add more information about `state`
It's import to note that this is useful for ensuring that your workers (of which there can be multiple) have the resources necessary to properly handle any updates you want to make in your handler functions, such as connecting to the Telegram API, an SQL or NoSQL database connection, or something else. **This function will run on every worker process**.

*New in 0.2.0*: These events moved from `on_startup()` and `on_shutdown()` for clarity.

#### Worker State

The `state` variable is also useful as this can be made available to each handler method so other stateful quantities can be maintained for other uses. Each distributed worker has its own instance of state.

To access the state from a handler, you must annotate `context` as a dependency like so:

```py
from typing import Annotated
from taskiq import Context, TaskiqDepends

@app.on_(chain.blocks)
def block_handler(block, context: Annotated[Context, TaskiqDepends()]):
# Access state via context.state
...
```

### Application Events

You can also add an application startup and shutdown handler that will be **executed once upon every application startup**. This may be useful for things like processing historical events since the application was shutdown or other one-time actions to perform at startup.

```py
@app.on_startup()
def handle_on_startup(startup_state):
# Process missed events, etc
# process_history(start_block=startup_state.last_block_seen)
# ...or startup_state.last_block_processed
...


@app.on_shutdown()
def handle_on_shutdown():
# Record final state, etc
...
```

*Changed in 0.2.0*: The behavior of the `@app.on_startup()` decorator and handler signature have changed. It is now executed only once upon application startup and worker events have moved on `@app.on_worker_startup()`.

## Running your Application

Expand Down Expand Up @@ -101,6 +146,34 @@ If you configure your application to use a signer, and that signer signs anythin
Always test your applications throughly before deploying.
```

### Distributed Execution

Using only the `silverback run ...` command in a defualt configuration executes everything in one process and the job queue is completely in-memory with a shared state. In some high volume environments, you may want to deploy your Silverback application in a distributed configuration using multiple processes to handle the messages at a higher rate.

The primary components are the client and workers. The client handles Silverback events (blocks and contract event logs) and creates jobs for the workers to process in an asynchronous manner.

For this to work, you must configure a [TaskIQ broker](https://taskiq-python.github.io/guide/architecture-overview.html#broker) capable of distributed processing. For instance, with [`taskiq_redis`](https://github.com/taskiq-python/taskiq-redis) you could do something like this for the client:
mikeshultz marked this conversation as resolved.
Show resolved Hide resolved

```bash
export SILVERBACK_BROKER_CLASS="taskiq_redis:ListQueueBroker"
export SILVERBACK_BROKER_URI="redis://127.0.0.1:6379"

silverback run "example:app" \
--network :mainnet:alchemy \
--runner "silverback.runner:WebsocketRunner"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question(non-blocking): should we standardize runner to client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard question, tbh. Would it be right to say "choose your client" to the user who wants to say use the WebsocketRunner?

```

And then the worker process with 2 worker subprocesses:

```bash
export SILVERBACK_BROKER_CLASS="taskiq_redis:ListQueueBroker"
export SILVERBACK_BROKER_URI="redis://127.0.0.1:6379"

silverback worker -w 2 "example:app"
```

This will run one client and 2 workers and all queue data will be go through Redis.

## Testing your Application

TODO: Add backtesting mode w/ `silverback test`
Expand Down
39 changes: 30 additions & 9 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from typing import Annotated

from ape import chain
from ape.api import BlockAPI
from ape.types import ContractLog
from ape_tokens import tokens # type: ignore[import]
from taskiq import Context, TaskiqDepends, TaskiqState

from silverback import CircuitBreaker, SilverbackApp
from silverback import CircuitBreaker, SilverbackApp, SilverbackStartupState

# Do this to initialize your app
app = SilverbackApp()
Expand All @@ -13,36 +16,54 @@
YFI = tokens["YFI"]


# Can handle some stuff on startup, like loading a heavy model or something
@app.on_startup()
def startup(state):
return {"message": "Starting..."}
def app_startup(startup_state: SilverbackStartupState):
return {"message": "Starting...", "block_number": startup_state.last_block_seen}


# Can handle some initialization on startup, like models or network connections
@app.on_worker_startup()
def worker_startup(state: TaskiqState):
state.block_count = 0
# state.db = MyDB()
return {"message": "Worker started."}


# This is how we trigger off of new blocks
@app.on_(chain.blocks)
def exec_block(block: BlockAPI):
# context must be a type annotated kwarg to be provided to the task
def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]):
context.state.block_count += 1
return len(block.transactions)


# This is how we trigger off of events
# Set new_block_timeout to adjust the expected block time.
@app.on_(USDC.Transfer, start_block=17793100, new_block_timeout=25)
@app.on_(USDC.Transfer, start_block=18588777, new_block_timeout=25)
# NOTE: Typing isn't required
def exec_event1(log):
if log.log_index % 7 == 3:
# If you ever want the app to shutdown under some scenario, call this exception
raise CircuitBreaker("Oopsie!")
return log.amount
return {"amount": log.amount}


@app.on_(YFI.Approval)
# Any handler function can be async too
async def exec_event2(log: ContractLog):
return log.value
return log.amount


# Just in case you need to release some resources or something
@app.on_worker_shutdown()
def worker_shutdown(state):
return {
"message": f"Worker stopped after handling {state.block_count} blocks.",
"block_count": state.block_count,
}


# A final job to execute on Silverback shutdown
@app.on_shutdown()
def shutdown(state):
def app_shutdown(state):
return {"message": "Stopping..."}
2 changes: 2 additions & 0 deletions silverback/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from .application import SilverbackApp
from .exceptions import CircuitBreaker, SilverbackException
from .types import SilverbackStartupState

__all__ = [
"CircuitBreaker",
"SilverbackApp",
"SilverbackException",
"SilverbackStartupState",
]
40 changes: 39 additions & 1 deletion silverback/_cli.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import asyncio
import os
from concurrent.futures import ThreadPoolExecutor

import click
from ape.cli import AccountAliasPromptChoice, ape_cli_context, network_option, verbosity_option
from taskiq import AsyncBroker
from taskiq.cli.worker.run import shutdown_broker
from taskiq.receiver import Receiver

from silverback._importer import import_from_string
from silverback.runner import PollingRunner
Expand Down Expand Up @@ -40,7 +44,27 @@ def _network_callback(ctx, param, val):
return val


@cli.command()
async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90):
try:
tasks = []
with ThreadPoolExecutor(max_workers=worker_count) as pool:
for _ in range(worker_count):
receiver = Receiver(
broker=broker,
executor=pool,
validate_params=True,
max_async_tasks=1,
max_prefetch=0,
)
broker.is_worker_process = True
tasks.append(receiver.listen())

await asyncio.gather(*tasks)
finally:
await shutdown_broker(broker, shutdown_timeout)


@cli.command(help="Run Silverback application client")
@ape_cli_context()
@verbosity_option()
@network_option(default=None, callback=_network_callback)
Expand All @@ -57,3 +81,17 @@ def run(cli_ctx, network, account, runner, max_exceptions, path):
app = import_from_string(path)
runner = runner(app, max_exceptions=max_exceptions)
asyncio.run(runner.run())


@cli.command(help="Run Silverback application task workers")
@ape_cli_context()
@verbosity_option()
@network_option(default=None, callback=_network_callback)
@click.option("--account", type=AccountAliasPromptChoice(), callback=_account_callback)
@click.option("-w", "--workers", type=int, default=2)
@click.option("-x", "--max-exceptions", type=int, default=3)
@click.option("-s", "--shutdown_timeout", type=int, default=90)
@click.argument("path")
def worker(cli_ctx, network, account, workers, max_exceptions, shutdown_timeout, path):
app = import_from_string(path)
asyncio.run(run_worker(app.broker, worker_count=workers, shutdown_timeout=shutdown_timeout))
59 changes: 50 additions & 9 deletions silverback/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@ def __init__(self, settings: Optional[Settings] = None):
if not settings:
settings = Settings()

self.network = settings.get_provider_context()
# NOTE: This allows using connected ape methods e.g. `Contract`
provider = self.network.__enter__()

# Adjust defaults from connection
if settings.NEW_BLOCK_TIMEOUT is None and (
self.chain_manager.provider.network.name.endswith("-fork")
or self.chain_manager.provider.network.name == LOCAL_NETWORK_NAME
provider.network.name.endswith("-fork") or provider.network.name == LOCAL_NETWORK_NAME
):
settings.NEW_BLOCK_TIMEOUT = int(timedelta(days=1).total_seconds())

Expand All @@ -52,10 +55,6 @@ def __init__(self, settings: Optional[Settings] = None):
self.contract_events: Dict[AddressType, Dict[str, ContractEvent]] = {}
self.poll_settings: Dict[str, Dict] = {}

self.network = settings.get_provider_context()
# NOTE: This allows using connected ape methods e.g. `Contract`
provider = self.network.__enter__()

atexit.register(self.network.__exit__, None, None, None)

self.signer = settings.get_signer()
Expand All @@ -75,7 +74,31 @@ def __init__(self, settings: Optional[Settings] = None):

def on_startup(self) -> Callable:
"""
Code to execute on worker startup / restart after an error.
Code to execute on one worker upon startup / restart after an error.

Usage example::

@app.on_startup()
def do_something_on_startup(startup_state):
... # Reprocess missed events or blocks
"""
return self.broker.task(task_name="silverback_startup")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the pros/cons of moving the startup/shutdown handlers into separate tasks from the worker startup/shutdown events

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They cannot be combined. They're very separate things.

Having on_startup be a worker event means unexpected duplication of execution. Having a separate task also allows us to feed the handler useful information, like last_block_*. This is basically what we need for ApePay.

Renaming the previous handlers to be explicit worker event handlers implies their actual usage and is clearer, IMO. They're two distinct things.

The only real con is we're changing up behavior here. Anyone setting up worker state may need to refactor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They cannot be combined. They're very separate things.

Having on_startup be a worker event means unexpected duplication of execution. Having a separate task also allows us to feed the handler useful information, like last_block_*. This is basically what we need for ApePay.

Renaming the previous handlers to be explicit worker event handlers implies their actual usage and is clearer, IMO. They're two distinct things.

The only real con is we're changing up behavior here. Anyone setting up worker state may need to refactor.

but won't this mean that only one worker will handle this event on startup? vs. the worker startup event is executed on every worker as it's set up?

Copy link
Contributor Author

@mikeshultz mikeshultz Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. on_startup will only run once upon runner startup. on_worker_startup will run on every worker startup.


def on_shutdown(self) -> Callable:
"""
Code to execute on one worker at shutdown.

Usage example::

@app.on_shutdown()
def do_something_on_shutdown():
... # Record final state of app
"""
return self.broker.task(task_name="silverback_shutdown")

def on_worker_startup(self) -> Callable:
"""
Code to execute on every worker at startup / restart after an error.

Usage example::

Expand All @@ -85,9 +108,9 @@ def do_something_on_startup(state):
"""
return self.broker.on_event(TaskiqEvents.WORKER_STARTUP)

def on_shutdown(self) -> Callable:
def on_worker_shutdown(self) -> Callable:
"""
Code to execute on normal worker shutdown.
Code to execute on every worker at shutdown.

Usage example::

Expand All @@ -97,6 +120,24 @@ def do_something_on_shutdown(state):
"""
return self.broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)

def get_startup_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `silverback_startup` events.

Returns:
Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created.
"""
return self.broker.available_tasks.get("silverback_startup")

def get_shutdown_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `silverback_shutdown` events.

Returns:
Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created.
"""
return self.broker.available_tasks.get("silverback_shutdown")

def get_block_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `block` events.
Expand Down
Loading
Loading