diff --git a/docs/userguides/development.md b/docs/userguides/development.md index 6d3a5305..2ae7a8f7 100644 --- a/docs/userguides/development.md +++ b/docs/userguides/development.md @@ -61,19 +61,64 @@ Any errors you raise during this function will get captured by the client, and r ## Startup and Shutdown -If you have heavier resources you want to load during startup, or otherwise perform some data collection prior to starting the bot, you can add a startup function like so: +### Worker Events + +If you have heavier resources you want to load during startup, or want to initialize things like database connections, you can add a worker startup function like so: ```py -@app.on_startup() +@app.on_worker_startup() def handle_on_worker_startup(state): + # Connect to DB, set initial state, etc + ... + +@app.on_worker_shutdown() +def handle_on_worker_shutdown(state): + # cleanup resources, close connections cleanly, etc ... ``` This function comes a parameter `state` that you can use for storing the results of your startup computation or resources that you have provisioned. -It's import to note that this is useful for ensuring that your workers (of which there can be multiple) have the resources necessary to properly handle any updates you want to make in your handler functions, such as connecting to the Telegram API, an SQL or NoSQL database connection, or something else. -The `state` variable is also useful as this gets made available to each handler method so other stateful quantities can be maintained for other uses. -TODO: Add more information about `state` +It's import to note that this is useful for ensuring that your workers (of which there can be multiple) have the resources necessary to properly handle any updates you want to make in your handler functions, such as connecting to the Telegram API, an SQL or NoSQL database connection, or something else. **This function will run on every worker process**. + +*New in 0.2.0*: These events moved from `on_startup()` and `on_shutdown()` for clarity. + +#### Worker State + +The `state` variable is also useful as this can be made available to each handler method so other stateful quantities can be maintained for other uses. Each distributed worker has its own instance of state. + +To access the state from a handler, you must annotate `context` as a dependency like so: + +```py +from typing import Annotated +from taskiq import Context, TaskiqDepends + +@app.on_(chain.blocks) +def block_handler(block, context: Annotated[Context, TaskiqDepends()]): + # Access state via context.state + ... +``` + +### Application Events + +You can also add an application startup and shutdown handler that will be **executed once upon every application startup**. This may be useful for things like processing historical events since the application was shutdown or other one-time actions to perform at startup. + +```py +@app.on_startup() +def handle_on_startup(startup_state): + # Process missed events, etc + # process_history(start_block=startup_state.last_block_seen) + # ...or startup_state.last_block_processed + ... + + +@app.on_shutdown() +def handle_on_shutdown(): + # Record final state, etc + ... +``` + +*Changed in 0.2.0*: The behavior of the `@app.on_startup()` decorator and handler signature have changed. It is now executed only once upon application startup and worker events have moved on `@app.on_worker_startup()`. ## Running your Application @@ -101,6 +146,34 @@ If you configure your application to use a signer, and that signer signs anythin Always test your applications throughly before deploying. ``` +### Distributed Execution + +Using only the `silverback run ...` command in a defualt configuration executes everything in one process and the job queue is completely in-memory with a shared state. In some high volume environments, you may want to deploy your Silverback application in a distributed configuration using multiple processes to handle the messages at a higher rate. + +The primary components are the client and workers. The client handles Silverback events (blocks and contract event logs) and creates jobs for the workers to process in an asynchronous manner. + +For this to work, you must configure a [TaskIQ broker](https://taskiq-python.github.io/guide/architecture-overview.html#broker) capable of distributed processing. For instance, with [`taskiq_redis`](https://github.com/taskiq-python/taskiq-redis) you could do something like this for the client: + +```bash +export SILVERBACK_BROKER_CLASS="taskiq_redis:ListQueueBroker" +export SILVERBACK_BROKER_URI="redis://127.0.0.1:6379" + +silverback run "example:app" \ + --network :mainnet:alchemy \ + --runner "silverback.runner:WebsocketRunner" +``` + +And then the worker process with 2 worker subprocesses: + +```bash +export SILVERBACK_BROKER_CLASS="taskiq_redis:ListQueueBroker" +export SILVERBACK_BROKER_URI="redis://127.0.0.1:6379" + +silverback worker -w 2 "example:app" +``` + +This will run one client and 2 workers and all queue data will be go through Redis. + ## Testing your Application TODO: Add backtesting mode w/ `silverback test` diff --git a/example.py b/example.py index 8548ff70..60d13456 100644 --- a/example.py +++ b/example.py @@ -1,9 +1,12 @@ +from typing import Annotated + from ape import chain from ape.api import BlockAPI from ape.types import ContractLog from ape_tokens import tokens # type: ignore[import] +from taskiq import Context, TaskiqDepends, TaskiqState -from silverback import CircuitBreaker, SilverbackApp +from silverback import CircuitBreaker, SilverbackApp, SilverbackStartupState # Do this to initialize your app app = SilverbackApp() @@ -13,36 +16,54 @@ YFI = tokens["YFI"] -# Can handle some stuff on startup, like loading a heavy model or something @app.on_startup() -def startup(state): - return {"message": "Starting..."} +def app_startup(startup_state: SilverbackStartupState): + return {"message": "Starting...", "block_number": startup_state.last_block_seen} + + +# Can handle some initialization on startup, like models or network connections +@app.on_worker_startup() +def worker_startup(state: TaskiqState): + state.block_count = 0 + # state.db = MyDB() + return {"message": "Worker started."} # This is how we trigger off of new blocks @app.on_(chain.blocks) -def exec_block(block: BlockAPI): +# context must be a type annotated kwarg to be provided to the task +def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]): + context.state.block_count += 1 return len(block.transactions) # This is how we trigger off of events # Set new_block_timeout to adjust the expected block time. -@app.on_(USDC.Transfer, start_block=17793100, new_block_timeout=25) +@app.on_(USDC.Transfer, start_block=18588777, new_block_timeout=25) # NOTE: Typing isn't required def exec_event1(log): if log.log_index % 7 == 3: # If you ever want the app to shutdown under some scenario, call this exception raise CircuitBreaker("Oopsie!") - return log.amount + return {"amount": log.amount} @app.on_(YFI.Approval) # Any handler function can be async too async def exec_event2(log: ContractLog): - return log.value + return log.amount # Just in case you need to release some resources or something +@app.on_worker_shutdown() +def worker_shutdown(state): + return { + "message": f"Worker stopped after handling {state.block_count} blocks.", + "block_count": state.block_count, + } + + +# A final job to execute on Silverback shutdown @app.on_shutdown() -def shutdown(state): +def app_shutdown(state): return {"message": "Stopping..."} diff --git a/silverback/__init__.py b/silverback/__init__.py index 56ddad86..dd26b077 100644 --- a/silverback/__init__.py +++ b/silverback/__init__.py @@ -1,8 +1,10 @@ from .application import SilverbackApp from .exceptions import CircuitBreaker, SilverbackException +from .types import SilverbackStartupState __all__ = [ "CircuitBreaker", "SilverbackApp", "SilverbackException", + "SilverbackStartupState", ] diff --git a/silverback/_cli.py b/silverback/_cli.py index f1d1e6e5..d5cf5356 100644 --- a/silverback/_cli.py +++ b/silverback/_cli.py @@ -1,8 +1,12 @@ import asyncio import os +from concurrent.futures import ThreadPoolExecutor import click from ape.cli import AccountAliasPromptChoice, ape_cli_context, network_option, verbosity_option +from taskiq import AsyncBroker +from taskiq.cli.worker.run import shutdown_broker +from taskiq.receiver import Receiver from silverback._importer import import_from_string from silverback.runner import PollingRunner @@ -40,7 +44,27 @@ def _network_callback(ctx, param, val): return val -@cli.command() +async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90): + try: + tasks = [] + with ThreadPoolExecutor(max_workers=worker_count) as pool: + for _ in range(worker_count): + receiver = Receiver( + broker=broker, + executor=pool, + validate_params=True, + max_async_tasks=1, + max_prefetch=0, + ) + broker.is_worker_process = True + tasks.append(receiver.listen()) + + await asyncio.gather(*tasks) + finally: + await shutdown_broker(broker, shutdown_timeout) + + +@cli.command(help="Run Silverback application client") @ape_cli_context() @verbosity_option() @network_option(default=None, callback=_network_callback) @@ -57,3 +81,17 @@ def run(cli_ctx, network, account, runner, max_exceptions, path): app = import_from_string(path) runner = runner(app, max_exceptions=max_exceptions) asyncio.run(runner.run()) + + +@cli.command(help="Run Silverback application task workers") +@ape_cli_context() +@verbosity_option() +@network_option(default=None, callback=_network_callback) +@click.option("--account", type=AccountAliasPromptChoice(), callback=_account_callback) +@click.option("-w", "--workers", type=int, default=2) +@click.option("-x", "--max-exceptions", type=int, default=3) +@click.option("-s", "--shutdown_timeout", type=int, default=90) +@click.argument("path") +def worker(cli_ctx, network, account, workers, max_exceptions, shutdown_timeout, path): + app = import_from_string(path) + asyncio.run(run_worker(app.broker, worker_count=workers, shutdown_timeout=shutdown_timeout)) diff --git a/silverback/application.py b/silverback/application.py index 4e9a9e14..527b85f5 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -38,10 +38,13 @@ def __init__(self, settings: Optional[Settings] = None): if not settings: settings = Settings() + self.network = settings.get_provider_context() + # NOTE: This allows using connected ape methods e.g. `Contract` + provider = self.network.__enter__() + # Adjust defaults from connection if settings.NEW_BLOCK_TIMEOUT is None and ( - self.chain_manager.provider.network.name.endswith("-fork") - or self.chain_manager.provider.network.name == LOCAL_NETWORK_NAME + provider.network.name.endswith("-fork") or provider.network.name == LOCAL_NETWORK_NAME ): settings.NEW_BLOCK_TIMEOUT = int(timedelta(days=1).total_seconds()) @@ -52,10 +55,6 @@ def __init__(self, settings: Optional[Settings] = None): self.contract_events: Dict[AddressType, Dict[str, ContractEvent]] = {} self.poll_settings: Dict[str, Dict] = {} - self.network = settings.get_provider_context() - # NOTE: This allows using connected ape methods e.g. `Contract` - provider = self.network.__enter__() - atexit.register(self.network.__exit__, None, None, None) self.signer = settings.get_signer() @@ -75,7 +74,31 @@ def __init__(self, settings: Optional[Settings] = None): def on_startup(self) -> Callable: """ - Code to execute on worker startup / restart after an error. + Code to execute on one worker upon startup / restart after an error. + + Usage example:: + + @app.on_startup() + def do_something_on_startup(startup_state): + ... # Reprocess missed events or blocks + """ + return self.broker.task(task_name="silverback_startup") + + def on_shutdown(self) -> Callable: + """ + Code to execute on one worker at shutdown. + + Usage example:: + + @app.on_shutdown() + def do_something_on_shutdown(): + ... # Record final state of app + """ + return self.broker.task(task_name="silverback_shutdown") + + def on_worker_startup(self) -> Callable: + """ + Code to execute on every worker at startup / restart after an error. Usage example:: @@ -85,9 +108,9 @@ def do_something_on_startup(state): """ return self.broker.on_event(TaskiqEvents.WORKER_STARTUP) - def on_shutdown(self) -> Callable: + def on_worker_shutdown(self) -> Callable: """ - Code to execute on normal worker shutdown. + Code to execute on every worker at shutdown. Usage example:: @@ -97,6 +120,24 @@ def do_something_on_shutdown(state): """ return self.broker.on_event(TaskiqEvents.WORKER_SHUTDOWN) + def get_startup_handler(self) -> Optional[AsyncTaskiqDecoratedTask]: + """ + Get access to the handler for `silverback_startup` events. + + Returns: + Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created. + """ + return self.broker.available_tasks.get("silverback_startup") + + def get_shutdown_handler(self) -> Optional[AsyncTaskiqDecoratedTask]: + """ + Get access to the handler for `silverback_shutdown` events. + + Returns: + Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created. + """ + return self.broker.available_tasks.get("silverback_shutdown") + def get_block_handler(self) -> Optional[AsyncTaskiqDecoratedTask]: """ Get access to the handler for `block` events. diff --git a/silverback/middlewares.py b/silverback/middlewares.py index 00582050..b25efdb3 100644 --- a/silverback/middlewares.py +++ b/silverback/middlewares.py @@ -1,8 +1,30 @@ +from typing import Optional, Tuple + from ape.logging import logger -from ape.types import ContractLog, HexBytes +from ape.types import ContractLog from ape.utils import ManagerAccessMixin from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult +from silverback.persistence import HandlerResult +from silverback.types import SilverbackID, handler_id_block, handler_id_event +from silverback.utils import hexbytes_dict + + +def resolve_task(message: TaskiqMessage) -> Tuple[str, Optional[int], Optional[int]]: + block_number = None + log_index = None + task_id = message.task_name + + if task_id == "block": + block_number = message.args[0].number + task_id = handler_id_block(block_number) + elif task_id == "event": + block_number = message.args[0].block_number + log_index = message.args[0].log_index + task_id = handler_id_event(message.args[0].address, message.args[0].abi.name) + + return task_id, block_number, log_index + class SilverbackMiddleware(TaskiqMiddleware, ManagerAccessMixin): def __init__(self, *args, **kwargs): @@ -15,7 +37,11 @@ def compute_block_time() -> int: return int((head.timestamp - genesis.timestamp) / head.number) + settings = kwargs.pop("silverback_settings") + self.block_time = self.chain_manager.provider.network.block_time or compute_block_time() + self.ident = SilverbackID.from_settings(settings) + self.persistence = settings.get_persistent_store() def pre_send(self, message: TaskiqMessage) -> TaskiqMessage: # TODO: Necessary until https://github.com/ApeWorX/ape/issues/1465 is resolved @@ -47,20 +73,10 @@ def _create_label(self, message: TaskiqMessage) -> str: return f"{message.task_name}{args}" def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage: - def fix_dict(data: dict) -> dict: - fixed_data = {} - for name, value in data.items(): - if isinstance(value, str) and value.startswith("0x"): - fixed_data[name] = HexBytes(value) - else: - fixed_data[name] = value - - return fixed_data - if message.task_name == "block": # NOTE: Necessary because we don't know the exact block class message.args[0] = self.provider.network.ecosystem.decode_block( - fix_dict(message.args[0]) + hexbytes_dict(message.args[0]) ) elif "event" in message.task_name: @@ -77,6 +93,21 @@ def post_execute(self, message: TaskiqMessage, result: TaskiqResult): f"- {result.execution_time:.3f}s ({percentage_time:.1f}%)" ) + async def post_save(self, message: TaskiqMessage, result: TaskiqResult): + if not self.persistence: + return + + handler_id, block_number, log_index = resolve_task(message) + + handler_result = HandlerResult.from_taskiq( + self.ident, handler_id, block_number, log_index, result + ) + + try: + await self.persistence.add_result(handler_result) + except Exception as err: + logger.error(f"Error storing result: {err}") + async def on_error( self, message: TaskiqMessage, diff --git a/silverback/persistence.py b/silverback/persistence.py new file mode 100644 index 00000000..2c2b4e80 --- /dev/null +++ b/silverback/persistence.py @@ -0,0 +1,327 @@ +import json +import os +import sqlite3 +from abc import ABC, abstractmethod +from datetime import datetime, timezone +from typing import Optional, TypeVar + +from pydantic import BaseModel +from taskiq import TaskiqResult +from typing_extensions import Self # Introduced 3.11 + +from .types import SilverbackID + +_HandlerReturnType = TypeVar("_HandlerReturnType") + + +class SilverbackState(BaseModel): + instance: str + network: str + # Last block number seen by runner + last_block_seen: int + # Last block number processed by a worker + last_block_processed: int + updated: datetime + + +class HandlerResult(TaskiqResult): + instance: str + network: str + handler_id: str + block_number: Optional[int] + log_index: Optional[int] + created: datetime + + @classmethod + def from_taskiq( + cls, + ident: SilverbackID, + handler_id: str, + block_number: Optional[int], + log_index: Optional[int], + result: TaskiqResult, + ) -> Self: + return cls( + instance=ident.identifier, + network=ident.network_choice, + handler_id=handler_id, + block_number=block_number, + log_index=log_index, + created=datetime.now(timezone.utc), + **result.dict(), + ) + + +class BasePersistentStore(ABC): + @abstractmethod + async def init(self): + """Handle any async initialization from Silverback settings (e.g. migrations).""" + ... + + @abstractmethod + async def get_state(self, ident: SilverbackID) -> Optional[SilverbackState]: + """Return the stored state for a Silverback instance""" + ... + + @abstractmethod + async def set_state( + self, ident: SilverbackID, last_block_seen: int, last_block_processed: int + ) -> Optional[SilverbackState]: + """Set the stored state for a Silverback instance""" + ... + + @abstractmethod + async def get_latest_result( + self, ident: SilverbackID, handler: Optional[str] = None + ) -> Optional[HandlerResult]: + """Return the latest result for a Silverback instance's handler""" + ... + + @abstractmethod + async def add_result(self, v: HandlerResult): + """Store a result for a Silverback instance's handler""" + ... + + +class SQLitePersistentStore(BasePersistentStore): + """ + SQLite implementation of BasePersistentStore used to store application state and handler + result data. + + Usage: + + To use SQLite persistent store, you must configure the following env vars: + + - `PERSISTENCE_CLASS`: `silverback.persistence.SQLitePersistentStore` + - `SQLITE_PATH` (optional): A system file path or if blank it will be stored in-memory. + """ + + SQL_GET_STATE = """ + SELECT last_block_seen, last_block_processed, updated + FROM silverback_state + WHERE instance = ? AND network = ?; + """ + SQL_INSERT_STATE = """ + INSERT INTO silverback_state ( + instance, network, last_block_seen, last_block_processed, updated + ) + VALUES (?, ?, ?, ?, ?); + """ + SQL_UPDATE_STATE = """ + UPDATE silverback_state + SET last_block_seen = ?, last_block_processed = ?, updated = ? + WHERE instance = ? AND network = ?; + """ + SQL_GET_RESULT_LATEST = """ + SELECT handler_id, block_number, log_index, execution_time, is_err, created, + return_value_blob + FROM silverback_result + WHERE instance = ? AND network = ? + ORDER BY created DESC + LIMIT 1; + """ + SQL_GET_HANDLER_LATEST = """ + SELECT handler_id, block_number, log_index, execution_time, is_err, created, + return_value_blob + FROM silverback_result + WHERE instance = ? AND network = ? AND handler_id = ? + ORDER BY created DESC + LIMIT 1; + """ + SQL_INSERT_RESULT = """ + INSERT INTO silverback_result ( + instance, network, handler_id, block_number, log_index, execution_time, + is_err, created, return_value_blob + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); + """ + + con: Optional[sqlite3.Connection] + initialized: bool = False + + async def init(self): + self.con = sqlite3.connect(os.environ.get("SQLITE_PATH", ":memory:")) + + cur = self.con.cursor() + cur.executescript( + """ + BEGIN; + CREATE TABLE IF NOT EXISTS silverback_state ( + instance text, + network text, + last_block_seen int, + last_block_processed int, + updated int + ); + CREATE TABLE IF NOT EXISTS silverback_result ( + instance text, + network text, + handler_id text, + block_number int, + log_index int, + execution_time real, + is_err bool, + created int, + return_value_blob blob + ); + CREATE UNIQUE INDEX IF NOT EXISTS silverback_state__instance + ON silverback_state(instance, network); + CREATE INDEX IF NOT EXISTS silverback_result__instance + ON silverback_result (instance, network); + CREATE INDEX IF NOT EXISTS silverback_result__handler + ON silverback_result (instance, network, handler_id); + CREATE INDEX IF NOT EXISTS silverback_result__is_err + ON silverback_result (is_err); + COMMIT; + """ + ) + cur.close() + + if not self.con: + raise Exception("Failed to setup SQLite connection") + + self.initialized = True + + async def get_state(self, ident: SilverbackID) -> Optional[SilverbackState]: + if not self.initialized: + await self.init() + + assert self.con is not None + + cur = self.con.cursor() + res = cur.execute( + self.SQL_GET_STATE, + (ident.identifier, ident.network_choice), + ) + row = res.fetchone() + + cur.close() + + if row is None: + return None + + return SilverbackState( + instance=ident.identifier, + network=ident.network_choice, + last_block_seen=row[0], + last_block_processed=row[1], + updated=datetime.fromtimestamp(row[2], timezone.utc), + ) + + async def set_state( + self, ident: SilverbackID, last_block_seen: int, last_block_processed: int + ) -> Optional[SilverbackState]: + if not self.initialized: + await self.init() + + assert self.con is not None + + cur = self.con.cursor() + res = cur.execute( + self.SQL_GET_STATE, + (ident.identifier, ident.network_choice), + ) + row = res.fetchone() + + now = datetime.now(timezone.utc) + now_stamp = int(now.timestamp()) + + if row is None: + cur.execute( + self.SQL_INSERT_STATE, + ( + ident.identifier, + ident.network_choice, + last_block_seen, + last_block_processed, + now_stamp, + ), + ) + else: + cur.execute( + self.SQL_UPDATE_STATE, + ( + last_block_seen, + last_block_processed, + now_stamp, + ident.identifier, + ident.network_choice, + ), + ) + + cur.close() + self.con.commit() + + return SilverbackState( + instance=ident.identifier, + network=ident.network_choice, + last_block_seen=last_block_seen, + last_block_processed=last_block_processed, + updated=now, + ) + + async def get_latest_result( + self, ident: SilverbackID, handler: Optional[str] = None + ) -> Optional[HandlerResult]: + if not self.initialized: + await self.init() + + assert self.con is not None + + cur = self.con.cursor() + + if handler is not None: + res = cur.execute( + self.SQL_GET_HANDLER_LATEST, + (ident.identifier, ident.network_choice, handler), + ) + else: + res = cur.execute( + self.SQL_GET_RESULT_LATEST, + (ident.identifier, ident.network_choice), + ) + + row = res.fetchone() + + cur.close() + + if row is None: + return None + + return HandlerResult( + instance=ident.identifier, + network=ident.network_choice, + handler_id=row[0], + block_number=row[1], + log_index=row[2], + execution_time=row[3], + is_err=row[4], + created=datetime.fromtimestamp(row[5], timezone.utc), + return_value=json.loads(row[6]), + ) + + async def add_result(self, v: HandlerResult): + if not self.initialized: + await self.init() + + assert self.con is not None + + cur = self.con.cursor() + + cur.execute( + self.SQL_INSERT_RESULT, + ( + v.instance, + v.network, + v.handler_id, + v.block_number, + v.log_index, + v.execution_time, + v.is_err, + v.created, + json.dumps(v.return_value), + ), + ) + + cur.close() + self.con.commit() diff --git a/silverback/runner.py b/silverback/runner.py index 68b70f51..49580586 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -1,5 +1,6 @@ import asyncio from abc import ABC, abstractmethod +from typing import Optional, Tuple from ape import chain from ape.contracts import ContractEvent, ContractInstance @@ -10,8 +11,13 @@ from .application import SilverbackApp from .exceptions import Halt, NoWebsocketAvailableError +from .persistence import BasePersistentStore +from .settings import Settings from .subscriptions import SubscriptionType, Web3SubscriptionsManager -from .utils import async_wrap_iter +from .types import SilverbackID, SilverbackStartupState +from .utils import async_wrap_iter, hexbytes_dict + +settings = Settings() class BaseRunner(ABC): @@ -20,6 +26,10 @@ def __init__(self, app: SilverbackApp, *args, max_exceptions: int = 3, **kwargs) self.max_exceptions = max_exceptions self.exceptions = 0 + self.last_block_seen = 0 + self.last_block_processed = 0 + self.persistence: Optional[BasePersistentStore] = None + self.ident = SilverbackID.from_settings(settings) def _handle_result(self, result: TaskiqResult): if result.is_err: @@ -31,6 +41,33 @@ def _handle_result(self, result: TaskiqResult): if self.exceptions > self.max_exceptions: raise Halt() + async def _checkpoint( + self, last_block_seen: int = 0, last_block_processed: int = 0 + ) -> Tuple[int, int]: + """Set latest checkpoint block number""" + if ( + last_block_seen > self.last_block_seen + or last_block_processed > self.last_block_processed + ): + logger.debug( + ( + f"Checkpoint block [seen={self.last_block_seen}, " + f"procssed={self.last_block_processed}]" + ) + ) + self.last_block_seen = max(last_block_seen, self.last_block_seen) + self.last_block_processed = max(last_block_processed, self.last_block_processed) + + if self.persistence: + try: + await self.persistence.set_state( + self.ident, self.last_block_seen, self.last_block_processed + ) + except Exception as err: + logger.error(f"Error settings state: {err}") + + return self.last_block_seen, self.last_block_processed + @abstractmethod async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): """ @@ -55,8 +92,27 @@ async def run(self): Raises: :class:`~silverback.exceptions.Halt`: If there are no configured tasks to execute. """ + self.persistence = settings.get_persistent_store() + + if self.persistence: + boot_state = await self.persistence.get_state(self.ident) + if boot_state: + self.last_block_seen = boot_state.last_block_seen + self.last_block_processed = boot_state.last_block_processed + await self.app.broker.startup() + # Execute Silverback startup task before we init the rest + if startup_handler := self.app.get_startup_handler(): + task = await startup_handler.kiq( + SilverbackStartupState( + last_block_seen=self.last_block_seen, + last_block_processed=self.last_block_processed, + ) + ) + result = await task.wait_result() + self._handle_result(result) + if block_handler := self.app.get_block_handler(): tasks = [self._block_task(block_handler)] else: @@ -72,6 +128,12 @@ async def run(self): await asyncio.gather(*tasks) + # Execute Silverback shutdown task before shutting down the broker + if shutdown_handler := self.app.get_shutdown_handler(): + task = await shutdown_handler.kiq() + result = await task.wait_result() + self._handle_result(result) + await self.app.broker.shutdown() @@ -95,10 +157,19 @@ async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): logger.debug(f"Handling blocks via {sub_id}") async for raw_block in self.subscriptions.get_subscription_data(sub_id): + block = self.provider.network.ecosystem.decode_block(hexbytes_dict(raw_block)) + + if block.number is not None: + await self._checkpoint(last_block_seen=block.number) + block_task = await block_handler.kiq(raw_block) result = await block_task.wait_result() + self._handle_result(result) + if block.number is not None: + await self._checkpoint(last_block_processed=block.number) + async def _event_task( self, contract_event: ContractEvent, event_handler: AsyncTaskiqDecoratedTask ): @@ -120,10 +191,17 @@ async def _event_task( contract_event.abi, ) ) + + if event.block_number is not None: + await self._checkpoint(last_block_seen=event.block_number) + event_task = await event_handler.kiq(event) result = await event_task.wait_result() self._handle_result(result) + if event.block_number is not None: + await self._checkpoint(last_block_processed=event.block_number) + async def run(self): async with Web3SubscriptionsManager(self.ws_uri) as subscriptions: self.subscriptions = subscriptions @@ -157,15 +235,22 @@ async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): async for block in async_wrap_iter( chain.blocks.poll_blocks(start_block=start_block, new_block_timeout=new_block_timeout) ): + if block.number is not None: + await self._checkpoint(last_block_seen=block.number) + block_task = await block_handler.kiq(block) result = await block_task.wait_result() self._handle_result(result) + if block.number is not None: + await self._checkpoint(last_block_processed=block.number) + async def _event_task( self, contract_event: ContractEvent, event_handler: AsyncTaskiqDecoratedTask ): new_block_timeout = None start_block = None + address = None if isinstance(contract_event.contract, ContractInstance): address = contract_event.contract.address if address in self.app.poll_settings: @@ -180,6 +265,12 @@ async def _event_task( async for event in async_wrap_iter( contract_event.poll_logs(start_block=start_block, new_block_timeout=new_block_timeout) ): + if event.block_number is not None: + await self._checkpoint(last_block_seen=event.block_number) + event_task = await event_handler.kiq(event) result = await event_task.wait_result() self._handle_result(result) + + if event.block_number is not None: + await self._checkpoint(last_block_processed=event.block_number) diff --git a/silverback/settings.py b/silverback/settings.py index bd486863..73d19e06 100644 --- a/silverback/settings.py +++ b/silverback/settings.py @@ -7,6 +7,7 @@ from ._importer import import_from_string from .middlewares import SilverbackMiddleware +from .persistence import BasePersistentStore class Settings(BaseSettings, ManagerAccessMixin): @@ -17,6 +18,9 @@ class Settings(BaseSettings, ManagerAccessMixin): testing or deployment purposes. Defaults to a working in-memory broker. """ + # A unique identifier for this silverback instance + INSTANCE: str = "default" + BROKER_CLASS: str = "taskiq:InMemoryBroker" BROKER_URI: str = "" @@ -31,6 +35,9 @@ class Settings(BaseSettings, ManagerAccessMixin): NEW_BLOCK_TIMEOUT: Optional[int] = None START_BLOCK: Optional[int] = None + # Used for persistent store + PERSISTENCE_CLASS: Optional[str] = None + class Config: env_prefix = "SILVERBACK_" case_sensitive = True @@ -43,7 +50,7 @@ def get_broker(self) -> AsyncBroker: else: broker = broker_class(self.BROKER_URI) - middlewares: List[TaskiqMiddleware] = [SilverbackMiddleware()] + middlewares: List[TaskiqMiddleware] = [SilverbackMiddleware(silverback_settings=self)] if self.ENABLE_METRICS: middlewares.append( @@ -59,10 +66,18 @@ def get_broker(self) -> AsyncBroker: return broker + def get_network_choice(self) -> str: + return self.NETWORK_CHOICE or self.network_manager.network.choice + + def get_persistent_store(self) -> Optional[BasePersistentStore]: + if not self.PERSISTENCE_CLASS: + return None + + persistence_class = import_from_string(self.PERSISTENCE_CLASS) + return persistence_class() + def get_provider_context(self) -> ProviderContextManager: - return self.network_manager.parse_network_choice( - self.NETWORK_CHOICE or self.network_manager.default_ecosystem.name - ) + return self.network_manager.parse_network_choice(self.get_network_choice()) def get_signer(self) -> Optional[AccountAPI]: if self.SIGNER_ALIAS: diff --git a/silverback/types.py b/silverback/types.py new file mode 100644 index 00000000..571a5c87 --- /dev/null +++ b/silverback/types.py @@ -0,0 +1,42 @@ +from typing import Optional, Protocol + +from pydantic import BaseModel +from typing_extensions import Self # Introduced 3.11 + + +class ISilverbackSettings(Protocol): + """Loose approximation of silverback.settings.Settings. If you can, use the class as + a type reference.""" + + INSTANCE: str + PERSISTENCE_CLASS: Optional[str] + + def get_network_choice(self) -> str: + ... + + +class SilverbackID(BaseModel): + identifier: str + network_choice: str + + @classmethod + def from_settings(cls, settings_: ISilverbackSettings) -> Self: + return cls(identifier=settings_.INSTANCE, network_choice=settings_.get_network_choice()) + + +class SilverbackStartupState(BaseModel): + last_block_seen: int + last_block_processed: int + + +def handler_id_block(block_number: Optional[int]) -> str: + """Return a unique handler ID string for a block""" + if block_number is None: + return "block/pending" + return f"block/{block_number}" + + +def handler_id_event(contract_address: Optional[str], event_signature: str) -> str: + """Return a unique handler ID string for an event""" + # TODO: Under what circumstance can address be None? + return f"event/{contract_address or 'unknown'}/{event_signature}" diff --git a/silverback/utils.py b/silverback/utils.py index 57ac9e94..2a6846e2 100644 --- a/silverback/utils.py +++ b/silverback/utils.py @@ -2,6 +2,8 @@ import threading from typing import AsyncIterator, Iterator +from ape.types import HexBytes + def async_wrap_iter(it: Iterator) -> AsyncIterator: """Wrap blocking iterator into an asynchronous one""" @@ -34,3 +36,16 @@ def iter_to_queue(): threading.Thread(target=iter_to_queue).start() return yield_queue_items() + + +def hexbytes_dict(data: dict) -> dict: + """Converts any hex string values in a flat dictionary to HexBytes.""" + fixed_data = {} + + for name, value in data.items(): + if isinstance(value, str) and value.startswith("0x"): + fixed_data[name] = HexBytes(value) + else: + fixed_data[name] = value + + return fixed_data