From c46ec085bb4057c51731b8611836203858eeeedf Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 17 Nov 2023 18:27:58 -0700 Subject: [PATCH 01/47] feat: adds persistence layer for app state and job results --- setup.py | 4 + silverback/application.py | 12 +- silverback/middlewares.py | 14 +- silverback/persistence.py | 265 ++++++++++++++++++++++++++++++++++++++ silverback/runner.py | 111 +++++++++++++++- silverback/settings.py | 13 +- silverback/types.py | 29 +++++ silverback/utils.py | 16 +++ 8 files changed, 440 insertions(+), 24 deletions(-) create mode 100644 silverback/persistence.py create mode 100644 silverback/types.py diff --git a/setup.py b/setup.py index 9c7911ff..81c58b70 100644 --- a/setup.py +++ b/setup.py @@ -40,6 +40,9 @@ "IPython", # Console for interacting "ipdb", # Debugger (Must use `export PYTHONBREAKPOINT=ipdb.set_trace`) ], + "mongo": [ + "beanie~=1.23.6", + ], } # NOTE: `pip install -e .[dev]` to install package @@ -49,6 +52,7 @@ + extras_require["doc"] + extras_require["release"] + extras_require["dev"] + + extras_require["mongo"] ) with open("./README.md") as readme: diff --git a/silverback/application.py b/silverback/application.py index 4a514802..df755407 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -73,9 +73,13 @@ def __init__(self, settings: Optional[Settings] = None): f"{signer_str}{start_block_str}{new_block_timeout_str}" ) + def checkpoint(self, last_block_seen: int, last_block_processed: int): + self.broker.state.last_block_seen = last_block_seen + self.broker.state.last_block_processed = last_block_processed + def on_startup(self) -> Callable: """ - Code to execute on worker startup / restart after an error. + Code to execute on client startup / restart after an error. Usage example:: @@ -83,11 +87,11 @@ def on_startup(self) -> Callable: def do_something_on_startup(state): ... # Can provision resources, or add things to `state`. """ - return self.broker.on_event(TaskiqEvents.WORKER_STARTUP) + return self.broker.on_event(TaskiqEvents.CLIENT_STARTUP) def on_shutdown(self) -> Callable: """ - Code to execute on normal worker shutdown. + Code to execute on client shutdown. Usage example:: @@ -95,7 +99,7 @@ def on_shutdown(self) -> Callable: def do_something_on_shutdown(state): ... # Update some external service, perhaps using information from `state`. """ - return self.broker.on_event(TaskiqEvents.WORKER_SHUTDOWN) + return self.broker.on_event(TaskiqEvents.CLIENT_SHUTDOWN) def get_block_handler(self) -> Optional[AsyncTaskiqDecoratedTask]: """ diff --git a/silverback/middlewares.py b/silverback/middlewares.py index 00582050..37af0c2f 100644 --- a/silverback/middlewares.py +++ b/silverback/middlewares.py @@ -3,6 +3,8 @@ from ape.utils import ManagerAccessMixin from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult +from silverback.utils import hexbytes_dict + class SilverbackMiddleware(TaskiqMiddleware, ManagerAccessMixin): def __init__(self, *args, **kwargs): @@ -47,20 +49,10 @@ def _create_label(self, message: TaskiqMessage) -> str: return f"{message.task_name}{args}" def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage: - def fix_dict(data: dict) -> dict: - fixed_data = {} - for name, value in data.items(): - if isinstance(value, str) and value.startswith("0x"): - fixed_data[name] = HexBytes(value) - else: - fixed_data[name] = value - - return fixed_data - if message.task_name == "block": # NOTE: Necessary because we don't know the exact block class message.args[0] = self.provider.network.ecosystem.decode_block( - fix_dict(message.args[0]) + hexbytes_dict(message.args[0]) ) elif "event" in message.task_name: diff --git a/silverback/persistence.py b/silverback/persistence.py new file mode 100644 index 00000000..9a3e20f1 --- /dev/null +++ b/silverback/persistence.py @@ -0,0 +1,265 @@ +import pickle +from abc import ABC, abstractmethod +from datetime import datetime, timezone +from typing import Annotated, Any, Dict, Optional +from typing_extensions import Self # Introduced 3.11 + +from ape.logging import logger +from pydantic import BaseModel +from taskiq import TaskiqResult + +from .types import SilverbackIdent + + +class SilverbackState(BaseModel): + ident: SilverbackIdent + # Last block number seen by runner + last_block_seen: int + # Last block number processed by a worker + last_block_processed: int + + +class HandlerResult(BaseModel): + instance: str + network: str + handler_id: str + block_number: int + log_index: Optional[int] + execution_time: float + # TODO: upcoming feature in taskiq + # labels: Dict[str] + return_value_blob: Optional[bytes] # pickled data + created: datetime + + @classmethod + def from_taskiq( + cls, + ident: SilverbackIdent, + handler_id: str, + block_number: int, + log_index: int | None, + result: TaskiqResult, + ) -> Self: + return cls( + instance=ident.identifier, + network=ident.network_choice, + handler_id=handler_id, + block_number=block_number, + log_index=log_index, + execution_time=result.execution_time, + # labels=result.labels, + return_value_blob=pickle.dumps(result.return_value), + created=datetime.now(timezone.utc), + ) + + @property + def return_value(self): + if self.return_value_blob is None: + return None + return pickle.loads(self.return_value_blob) + + @return_value.setter + def set_return_value(self, v: Any): + self.return_value_blob = pickle.dumps(v) + + +class BasePersistentStorage(ABC): + @abstractmethod + async def get_instance_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]: + ... + + @abstractmethod + async def set_instance_state( + self, ident: SilverbackIdent, last_block_seen: int, last_block_processed: int + ) -> Optional[SilverbackState]: + ... + + @abstractmethod + async def get_latest_result( + self, instance: SilverbackIdent, handler: Optional[str] = None + ) -> HandlerResult: + ... + + @abstractmethod + async def add_result(self, v: HandlerResult): + ... + + +async def init_mongo(mongo_uri: str) -> Optional[BasePersistentStorage]: + try: + import pymongo + from beanie import Document, Indexed, init_beanie + from beanie.odm.operators.update.general import Set + from motor.motor_asyncio import AsyncIOMotorClient + except ImportError as err: + print(err) + logger.warning("MongoDB was initialized by dependencies are not installed") + return None + + class SilverbackStateDoc(Document): + instance: Annotated[str, Indexed(str)] + network: Annotated[str, Indexed(str)] + last_block_seen: int + last_block_processed: int + updated: datetime + + class Settings: + name = "state" + indexes = [ + [ + ("instance", pymongo.TEXT), + ("network", pymongo.TEXT), + ], + ] + + def to_silberback_state(self) -> SilverbackState: + return SilverbackState( + ident=SilverbackIdent( + identifier=self.instance, + network_choice=self.network, + ), + last_block_seen=self.last_block_seen, + last_block_processed=self.last_block_processed, + ) + + class HandlerResultDoc(HandlerResult, Document): + # NOTE: Redefining these to annotate with indexed type + instance: Annotated[str, Indexed(str)] + network: Annotated[str, Indexed(str)] + handler_id: Annotated[str, Indexed(str)] + + class Settings: + name = "result" + indexes = [ + [ + ("instance", pymongo.TEXT), + ("network", pymongo.TEXT), + ("handler", pymongo.TEXT), + ], + ] + + @classmethod + def from_handler_result(cls, result: HandlerResult) -> Self: + return cls( + instance=result.instance, + network=result.network, + handler_id=result.handler_id, + block_number=result.block_number, + log_index=result.log_index, + execution_time=result.execution_time, + return_value_blob=result.return_value_blob, + created=result.created, + ) + + def to_handler_result(self) -> HandlerResult: + return HandlerResult( + instance=self.instance, + network=self.network, + handler_id=self.handler, + block_number=self.block_number, + log_index=self.log_index, + execution_time=self.execution_time, + return_value_blob=self.return_value_blob, + created=self.created, + ) + + class MongoStorage(BasePersistentStorage): + client: AsyncIOMotorClient + + async def get_instance_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]: + res = await SilverbackStateDoc.find_one( + SilverbackStateDoc.instance == ident.identifier, + SilverbackStateDoc.network == ident.network_choice, + ) + + if res is None: + return None + + return res.to_silberback_state() + + async def set_instance_state( + self, ident: SilverbackIdent, last_block_seen: int, last_block_processed: int + ) -> Optional[SilverbackState]: + now_utc = datetime.now(timezone.utc) + + state = await SilverbackStateDoc.find_one( + SilverbackStateDoc.instance == ident.identifier, + SilverbackStateDoc.network == ident.network_choice, + ) + + if state is not None: + await state.set( + # Unreported type error? Confiremd working + { + SilverbackStateDoc.last_block_seen: last_block_seen, + SilverbackStateDoc.last_block_processed: last_block_processed, + SilverbackStateDoc.updated: now_utc, + } # type: ignore + ) + else: + state = SilverbackStateDoc( + instance=ident.identifier, + network=ident.network_choice, + last_block_seen=last_block_seen, + last_block_processed=last_block_processed, + updated=now_utc, + ) + await state.create() + + # TODO: Why no work? + # await SilverbackStateDoc.find_one( + # SilverbackStateDoc.instance == ident.identifier, + # SilverbackStateDoc.network == ident.network_choice, + # ).upsert( + # Set( + # { + # SilverbackStateDoc.last_block_seen: last_block_seen, + # SilverbackStateDoc.last_block_processed: last_block_processed, + # SilverbackStateDoc.updated: now_utc, + # } + # ), + # on_insert=SilverbackStateDoc( + # instance=ident.identifier, + # network=ident.network_choice, + # last_block_seen=last_block_seen, + # last_block_processed=last_block_processed, + # updated=now_utc, + # ), + # ) + + async def get_latest_result( + self, ident: SilverbackIdent, handler_id: Optional[str] = None + ) -> Optional[HandlerResult]: + query = HandlerResultDoc.find( + HandlerResultDoc.instance == ident.identifier, + HandlerResultDoc.network == ident.network_choice, + ) + + if handler_id: + query.find(HandlerResultDoc.handler_id == handler_id) + + res = await query.sort("-created").first_or_none() + + if res is None: + return res + + return res.to_handler_result() + + async def add_result(self, result: HandlerResult): + doc = HandlerResultDoc.from_handler_result(result) + # Type annotation error: https://github.com/roman-right/beanie/issues/679 + await doc.insert() # type: ignore + + storage = MongoStorage() + client = AsyncIOMotorClient(mongo_uri) + + await init_beanie( + database=client.db_name, + # Type annotation error: https://github.com/roman-right/beanie/issues/670 + document_models=[ + HandlerResultDoc, + SilverbackStateDoc, + ], # type: ignore + ) + + return storage diff --git a/silverback/runner.py b/silverback/runner.py index 68b70f51..5349ebeb 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -7,11 +7,17 @@ from ape.utils import ManagerAccessMixin from ape_ethereum.ecosystem import keccak from taskiq import AsyncTaskiqDecoratedTask, TaskiqResult +from typing import Optional, Tuple from .application import SilverbackApp from .exceptions import Halt, NoWebsocketAvailableError +from .persistence import BasePersistentStorage, HandlerResult, init_mongo from .subscriptions import SubscriptionType, Web3SubscriptionsManager -from .utils import async_wrap_iter +from .settings import Settings +from .types import SilverbackIdent, handler_id_block, handler_id_event +from .utils import async_wrap_iter, hexbytes_dict + +settings = Settings() class BaseRunner(ABC): @@ -20,8 +26,14 @@ def __init__(self, app: SilverbackApp, *args, max_exceptions: int = 3, **kwargs) self.max_exceptions = max_exceptions self.exceptions = 0 + self.last_block_seen = 0 + self.last_block_processed = 0 + self.persistence: Optional[BasePersistentStorage] = None + self.ident = SilverbackIdent.from_settings(settings) - def _handle_result(self, result: TaskiqResult): + async def _handle_result( + self, handler_id: str, block_number: int, log_index: int | None, result: TaskiqResult + ): if result.is_err: self.exceptions += 1 @@ -31,6 +43,36 @@ def _handle_result(self, result: TaskiqResult): if self.exceptions > self.max_exceptions: raise Halt() + if self.persistence: + handler_result = HandlerResult.from_taskiq( + self.ident, handler_id, block_number, log_index, result + ) + + await self.persistence.add_result(handler_result) + + async def _checkpoint( + self, last_block_seen: int = 0, last_block_processed: int = 0 + ) -> Tuple[int, int]: + """Set latest checkpoint block number""" + if ( + last_block_seen > self.last_block_seen + or last_block_processed > self.last_block_processed + ): + logger.debug( + f"Checkpoint block [seen={self.last_block_seen}, procssed={self.last_block_processed}]" + ) + self.last_block_seen = max(last_block_seen, self.last_block_seen) + self.last_block_processed = max(last_block_processed, self.last_block_processed) + + self.app.checkpoint(self.last_block_seen, self.last_block_processed) + + if self.persistence: + await self.persistence.set_instance_state( + self.ident, self.last_block_seen, self.last_block_processed + ) + + return self.last_block_seen, self.last_block_processed + @abstractmethod async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): """ @@ -55,6 +97,23 @@ async def run(self): Raises: :class:`~silverback.exceptions.Halt`: If there are no configured tasks to execute. """ + if settings.MONGODB_URI: + self.persistence = await init_mongo(settings.MONGODB_URI) + + if self.persistence: + boot_state = await self.persistence.get_instance_state(self.ident) + if boot_state: + self.last_block_seen = boot_state.last_block_seen + self.last_block_processed = boot_state.last_block_processed + + # NOTE: This will update TaskIQ state before app startup. + # TODO: I have no idea if this will work with a distributed runner/worker + # setup. That may require our own injected startup task, but it's unclear if + # we can do that before Taskiq's startup handlers. We might be able to force + # SilverbackApplication to decorate an internal func to handle it. Need to + # figure that out. + self.app.checkpoint(self.last_block_seen, self.last_block_processed) + await self.app.broker.startup() if block_handler := self.app.get_block_handler(): @@ -95,9 +154,18 @@ async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): logger.debug(f"Handling blocks via {sub_id}") async for raw_block in self.subscriptions.get_subscription_data(sub_id): + block = self.provider.network.ecosystem.decode_block(hexbytes_dict(raw_block)) + + if block.number is not None: + await self._checkpoint(last_block_seen=block.number) + block_task = await block_handler.kiq(raw_block) result = await block_task.wait_result() - self._handle_result(result) + + await self._handle_result(handler_id_block(block.number), block.number, None, result) + + if block.number is not None: + await self._checkpoint(last_block_processed=block.number) async def _event_task( self, contract_event: ContractEvent, event_handler: AsyncTaskiqDecoratedTask @@ -120,9 +188,21 @@ async def _event_task( contract_event.abi, ) ) + + if event.block_number is not None: + await self._checkpoint(last_block_seen=event.block_number) + event_task = await event_handler.kiq(event) result = await event_task.wait_result() - self._handle_result(result) + await self._handle_result( + handler_id_event(contract_event.contract.address, contract_event.abi.selector), + event.block_number, + event.log_index, + result, + ) + + if event.block_number is not None: + await self._checkpoint(last_block_processed=event.block_number) async def run(self): async with Web3SubscriptionsManager(self.ws_uri) as subscriptions: @@ -157,15 +237,22 @@ async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): async for block in async_wrap_iter( chain.blocks.poll_blocks(start_block=start_block, new_block_timeout=new_block_timeout) ): + if block.number is not None: + await self._checkpoint(last_block_seen=block.number) + block_task = await block_handler.kiq(block) result = await block_task.wait_result() - self._handle_result(result) + await self._handle_result(handler_id_block(block.number), block.number, None, result) + + if block.number is not None: + await self._checkpoint(last_block_processed=block.number) async def _event_task( self, contract_event: ContractEvent, event_handler: AsyncTaskiqDecoratedTask ): new_block_timeout = None start_block = None + address = None if isinstance(contract_event.contract, ContractInstance): address = contract_event.contract.address if address in self.app.poll_settings: @@ -180,6 +267,18 @@ async def _event_task( async for event in async_wrap_iter( contract_event.poll_logs(start_block=start_block, new_block_timeout=new_block_timeout) ): + if event.block_number is not None: + await self._checkpoint(last_block_seen=event.block_number) + event_task = await event_handler.kiq(event) result = await event_task.wait_result() - self._handle_result(result) + await self._handle_result( + # TODO: Under what circumstance can address be None? + handler_id_event(address, contract_event.abi.selector), + event.block_number, + event.log_index, + result, + ) + + if event.block_number is not None: + await self._checkpoint(last_block_processed=event.block_number) diff --git a/silverback/settings.py b/silverback/settings.py index bd486863..87d8ac33 100644 --- a/silverback/settings.py +++ b/silverback/settings.py @@ -17,6 +17,9 @@ class Settings(BaseSettings, ManagerAccessMixin): testing or deployment purposes. Defaults to a working in-memory broker. """ + # A unique identifier for this silverback instance + INSTANCE: str = "default" + BROKER_CLASS: str = "taskiq:InMemoryBroker" BROKER_URI: str = "" @@ -31,6 +34,8 @@ class Settings(BaseSettings, ManagerAccessMixin): NEW_BLOCK_TIMEOUT: Optional[int] = None START_BLOCK: Optional[int] = None + MONGODB_URI: Optional[str] = None + class Config: env_prefix = "SILVERBACK_" case_sensitive = True @@ -59,10 +64,12 @@ def get_broker(self) -> AsyncBroker: return broker + def get_network_choice(self) -> str: + # return self.NETWORK_CHOICE or self.network_manager.default_ecosystem.name + return self.NETWORK_CHOICE or self.network_manager.network.choice + def get_provider_context(self) -> ProviderContextManager: - return self.network_manager.parse_network_choice( - self.NETWORK_CHOICE or self.network_manager.default_ecosystem.name - ) + return self.network_manager.parse_network_choice(self.get_network_choice()) def get_signer(self) -> Optional[AccountAPI]: if self.SIGNER_ALIAS: diff --git a/silverback/types.py b/silverback/types.py new file mode 100644 index 00000000..a25e2ae7 --- /dev/null +++ b/silverback/types.py @@ -0,0 +1,29 @@ +from enum import Enum +from typing import Annotated, Any, Dict, Optional +from typing_extensions import Self # Introduced 3.11 + +from pydantic import BaseModel + +from .settings import Settings + + +class SilverbackIdent(BaseModel): + identifier: str + network_choice: str + + @classmethod + def from_settings(cls, settings_: Settings) -> Self: + return cls(identifier=settings_.INSTANCE, network_choice=settings_.get_network_choice()) + + +def handler_id_block(block_number: Optional[int]) -> str: + """Return a unique handler ID string for a block""" + if block_number is None: + return "block/pending" + return f"block/{block_number}" + + +def handler_id_event(contract_address: str | None, event_signature: str) -> str: + """Return a unique handler ID string for an event""" + # TODO: Under what circumstance can address be None? + return f"event/{contract_address or 'unknown'}/{event_signature}" diff --git a/silverback/utils.py b/silverback/utils.py index 57ac9e94..c47ad38a 100644 --- a/silverback/utils.py +++ b/silverback/utils.py @@ -2,6 +2,10 @@ import threading from typing import AsyncIterator, Iterator +from ape.contracts import ContractEvent, ContractInstance +from ape.managers.chain import BlockContainer +from ape.types import HexBytes + def async_wrap_iter(it: Iterator) -> AsyncIterator: """Wrap blocking iterator into an asynchronous one""" @@ -34,3 +38,15 @@ def iter_to_queue(): threading.Thread(target=iter_to_queue).start() return yield_queue_items() + + +def hexbytes_dict(data: dict) -> dict: + fixed_data = {} + + for name, value in data.items(): + if isinstance(value, str) and value.startswith("0x"): + fixed_data[name] = HexBytes(value) + else: + fixed_data[name] = value + + return fixed_data From b956d480cdda928872af11e9710129b4dfbbb345 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 17 Nov 2023 18:32:32 -0700 Subject: [PATCH 02/47] style(lint): fix imports --- silverback/persistence.py | 2 +- silverback/runner.py | 4 ++-- silverback/types.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/silverback/persistence.py b/silverback/persistence.py index 9a3e20f1..51fa1274 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -2,11 +2,11 @@ from abc import ABC, abstractmethod from datetime import datetime, timezone from typing import Annotated, Any, Dict, Optional -from typing_extensions import Self # Introduced 3.11 from ape.logging import logger from pydantic import BaseModel from taskiq import TaskiqResult +from typing_extensions import Self # Introduced 3.11 from .types import SilverbackIdent diff --git a/silverback/runner.py b/silverback/runner.py index 5349ebeb..79a3b2aa 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -1,5 +1,6 @@ import asyncio from abc import ABC, abstractmethod +from typing import Optional, Tuple from ape import chain from ape.contracts import ContractEvent, ContractInstance @@ -7,13 +8,12 @@ from ape.utils import ManagerAccessMixin from ape_ethereum.ecosystem import keccak from taskiq import AsyncTaskiqDecoratedTask, TaskiqResult -from typing import Optional, Tuple from .application import SilverbackApp from .exceptions import Halt, NoWebsocketAvailableError from .persistence import BasePersistentStorage, HandlerResult, init_mongo -from .subscriptions import SubscriptionType, Web3SubscriptionsManager from .settings import Settings +from .subscriptions import SubscriptionType, Web3SubscriptionsManager from .types import SilverbackIdent, handler_id_block, handler_id_event from .utils import async_wrap_iter, hexbytes_dict diff --git a/silverback/types.py b/silverback/types.py index a25e2ae7..0cc027bd 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -1,8 +1,8 @@ from enum import Enum from typing import Annotated, Any, Dict, Optional -from typing_extensions import Self # Introduced 3.11 from pydantic import BaseModel +from typing_extensions import Self # Introduced 3.11 from .settings import Settings From 7ba2f71b130a1fb9e3a9fd28fd841b055086e51f Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 17 Nov 2023 19:27:07 -0700 Subject: [PATCH 03/47] fix: fix types --- silverback/persistence.py | 53 ++++++++++++++++++--------------------- silverback/runner.py | 6 +++-- 2 files changed, 29 insertions(+), 30 deletions(-) diff --git a/silverback/persistence.py b/silverback/persistence.py index 51fa1274..1e34ffae 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -1,7 +1,7 @@ import pickle from abc import ABC, abstractmethod from datetime import datetime, timezone -from typing import Annotated, Any, Dict, Optional +from typing import TYPE_CHECKING, Annotated, Any, Dict, Optional, Type from ape.logging import logger from pydantic import BaseModel @@ -12,22 +12,25 @@ class SilverbackState(BaseModel): - ident: SilverbackIdent + instance: str + network: str # Last block number seen by runner last_block_seen: int # Last block number processed by a worker last_block_processed: int + updated: datetime class HandlerResult(BaseModel): instance: str network: str handler_id: str - block_number: int + block_number: Optional[int] log_index: Optional[int] execution_time: float # TODO: upcoming feature in taskiq # labels: Dict[str] + # TODO: Use computed field with pydantic v2 return_value_blob: Optional[bytes] # pickled data created: datetime @@ -36,7 +39,7 @@ def from_taskiq( cls, ident: SilverbackIdent, handler_id: str, - block_number: int, + block_number: int | None, log_index: int | None, result: TaskiqResult, ) -> Self: @@ -52,16 +55,6 @@ def from_taskiq( created=datetime.now(timezone.utc), ) - @property - def return_value(self): - if self.return_value_blob is None: - return None - return pickle.loads(self.return_value_blob) - - @return_value.setter - def set_return_value(self, v: Any): - self.return_value_blob = pickle.dumps(v) - class BasePersistentStorage(ABC): @abstractmethod @@ -77,7 +70,7 @@ async def set_instance_state( @abstractmethod async def get_latest_result( self, instance: SilverbackIdent, handler: Optional[str] = None - ) -> HandlerResult: + ) -> Optional[HandlerResult]: ... @abstractmethod @@ -90,13 +83,15 @@ async def init_mongo(mongo_uri: str) -> Optional[BasePersistentStorage]: import pymongo from beanie import Document, Indexed, init_beanie from beanie.odm.operators.update.general import Set + from motor.core import AgnosticClient from motor.motor_asyncio import AsyncIOMotorClient except ImportError as err: print(err) logger.warning("MongoDB was initialized by dependencies are not installed") return None - class SilverbackStateDoc(Document): + # NOTE: Ignoring an inheritence issue with pydantic's Config class. Goes away with v2 + class SilverbackStateDoc(SilverbackState, Document): # type: ignore instance: Annotated[str, Indexed(str)] network: Annotated[str, Indexed(str)] last_block_seen: int @@ -114,15 +109,15 @@ class Settings: def to_silberback_state(self) -> SilverbackState: return SilverbackState( - ident=SilverbackIdent( - identifier=self.instance, - network_choice=self.network, - ), + instance=self.instance, + network=self.network, last_block_seen=self.last_block_seen, last_block_processed=self.last_block_processed, + updated=self.updated, ) - class HandlerResultDoc(HandlerResult, Document): + # NOTE: Ignoring an inheritence issue with pydantic's Config class. Goes away with v2 + class HandlerResultDoc(HandlerResult, Document): # type: ignore # NOTE: Redefining these to annotate with indexed type instance: Annotated[str, Indexed(str)] network: Annotated[str, Indexed(str)] @@ -164,7 +159,7 @@ def to_handler_result(self) -> HandlerResult: ) class MongoStorage(BasePersistentStorage): - client: AsyncIOMotorClient + client: AgnosticClient async def get_instance_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]: res = await SilverbackStateDoc.find_one( @@ -191,10 +186,10 @@ async def set_instance_state( await state.set( # Unreported type error? Confiremd working { - SilverbackStateDoc.last_block_seen: last_block_seen, - SilverbackStateDoc.last_block_processed: last_block_processed, - SilverbackStateDoc.updated: now_utc, - } # type: ignore + SilverbackStateDoc.last_block_seen: last_block_seen, # type: ignore + SilverbackStateDoc.last_block_processed: last_block_processed, # type: ignore + SilverbackStateDoc.updated: now_utc, # type: ignore + } ) else: state = SilverbackStateDoc( @@ -227,6 +222,8 @@ async def set_instance_state( # ), # ) + return state + async def get_latest_result( self, ident: SilverbackIdent, handler_id: Optional[str] = None ) -> Optional[HandlerResult]: @@ -241,7 +238,7 @@ async def get_latest_result( res = await query.sort("-created").first_or_none() if res is None: - return res + return None return res.to_handler_result() @@ -251,7 +248,7 @@ async def add_result(self, result: HandlerResult): await doc.insert() # type: ignore storage = MongoStorage() - client = AsyncIOMotorClient(mongo_uri) + client: AgnosticClient = AsyncIOMotorClient(mongo_uri) await init_beanie( database=client.db_name, diff --git a/silverback/runner.py b/silverback/runner.py index 79a3b2aa..8a0020fa 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -32,7 +32,7 @@ def __init__(self, app: SilverbackApp, *args, max_exceptions: int = 3, **kwargs) self.ident = SilverbackIdent.from_settings(settings) async def _handle_result( - self, handler_id: str, block_number: int, log_index: int | None, result: TaskiqResult + self, handler_id: str, block_number: int | None, log_index: int | None, result: TaskiqResult ): if result.is_err: self.exceptions += 1 @@ -162,7 +162,9 @@ async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): block_task = await block_handler.kiq(raw_block) result = await block_task.wait_result() - await self._handle_result(handler_id_block(block.number), block.number, None, result) + await self._handle_result( + handler_id_block(block.number), block.number or 0, None, result + ) if block.number is not None: await self._checkpoint(last_block_processed=block.number) From 85a5579cdb01276aaac6cf147742a607ab54cd05 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 17 Nov 2023 19:41:23 -0700 Subject: [PATCH 04/47] style(lint): disagreement flake8 didn't like black output --- silverback/persistence.py | 6 +++--- silverback/runner.py | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/silverback/persistence.py b/silverback/persistence.py index 1e34ffae..2d7cabde 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -186,9 +186,9 @@ async def set_instance_state( await state.set( # Unreported type error? Confiremd working { - SilverbackStateDoc.last_block_seen: last_block_seen, # type: ignore - SilverbackStateDoc.last_block_processed: last_block_processed, # type: ignore - SilverbackStateDoc.updated: now_utc, # type: ignore + SilverbackStateDoc.last_block_seen: last_block_seen, # type: ignore # noqa: E501 + SilverbackStateDoc.last_block_processed: last_block_processed, # type: ignore # noqa: E501 + SilverbackStateDoc.updated: now_utc, # type: ignore # noqa: E501 } ) else: diff --git a/silverback/runner.py b/silverback/runner.py index 8a0020fa..72bec454 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -59,7 +59,10 @@ async def _checkpoint( or last_block_processed > self.last_block_processed ): logger.debug( - f"Checkpoint block [seen={self.last_block_seen}, procssed={self.last_block_processed}]" + ( + f"Checkpoint block [seen={self.last_block_seen}, " + f"procssed={self.last_block_processed}]" + ) ) self.last_block_seen = max(last_block_seen, self.last_block_seen) self.last_block_processed = max(last_block_processed, self.last_block_processed) From ec436422e2acfffeb268b9e72ab885d85f830963 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Sat, 18 Nov 2023 10:00:48 -0700 Subject: [PATCH 05/47] refactor: adds silverback_startup task, and on_client_* and on_worker_* decorators --- silverback/__init__.py | 2 ++ silverback/application.py | 68 ++++++++++++++++++++++++++++++++++++++- silverback/runner.py | 18 ++++++++++- silverback/types.py | 5 +++ 4 files changed, 91 insertions(+), 2 deletions(-) diff --git a/silverback/__init__.py b/silverback/__init__.py index 56ddad86..dd26b077 100644 --- a/silverback/__init__.py +++ b/silverback/__init__.py @@ -1,8 +1,10 @@ from .application import SilverbackApp from .exceptions import CircuitBreaker, SilverbackException +from .types import SilverbackStartupState __all__ = [ "CircuitBreaker", "SilverbackApp", "SilverbackException", + "SilverbackStartupState", ] diff --git a/silverback/application.py b/silverback/application.py index df755407..0ae38c80 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -78,6 +78,30 @@ def checkpoint(self, last_block_seen: int, last_block_processed: int): self.broker.state.last_block_processed = last_block_processed def on_startup(self) -> Callable: + """ + Code to execute on one worker upon startup / restart after an error. + + Usage example:: + + @app.on_startup() + def do_something_on_startup(state): + ... # Can provision resources, or add things to `state`. + """ + return self.broker.task(task_name="silverback_startup") + + def on_shutdown(self) -> Callable: + """ + Code to execute on one worker at shutdown. + + Usage example:: + + @app.on_shutdown() + def do_something_on_shutdown(state): + ... # Update some external service, perhaps using information from `state`. + """ + return self.broker.task(task_name="silverback_shutdown") + + def on_client_startup(self) -> Callable: """ Code to execute on client startup / restart after an error. @@ -89,7 +113,7 @@ def do_something_on_startup(state): """ return self.broker.on_event(TaskiqEvents.CLIENT_STARTUP) - def on_shutdown(self) -> Callable: + def on_client_shutdown(self) -> Callable: """ Code to execute on client shutdown. @@ -101,6 +125,48 @@ def do_something_on_shutdown(state): """ return self.broker.on_event(TaskiqEvents.CLIENT_SHUTDOWN) + def on_worker_startup(self) -> Callable: + """ + Code to execute on every worker at startup / restart after an error. + + Usage example:: + + @app.on_startup() + def do_something_on_startup(state): + ... # Can provision resources, or add things to `state`. + """ + return self.broker.on_event(TaskiqEvents.WORKER_STARTUP) + + def on_worker_shutdown(self) -> Callable: + """ + Code to execute on every worker at shutdown. + + Usage example:: + + @app.on_shutdown() + def do_something_on_shutdown(state): + ... # Update some external service, perhaps using information from `state`. + """ + return self.broker.on_event(TaskiqEvents.WORKER_SHUTDOWN) + + def get_startup_handler(self) -> Optional[AsyncTaskiqDecoratedTask]: + """ + Get access to the handler for `silverback_startup` events. + + Returns: + Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created. + """ + return self.broker.available_tasks.get("silverback_startup") + + def get_shutdown_handler(self) -> Optional[AsyncTaskiqDecoratedTask]: + """ + Get access to the handler for `silverback_shutdown` events. + + Returns: + Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created. + """ + return self.broker.available_tasks.get("silverback_shutdown") + def get_block_handler(self) -> Optional[AsyncTaskiqDecoratedTask]: """ Get access to the handler for `block` events. diff --git a/silverback/runner.py b/silverback/runner.py index 72bec454..3007db58 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -14,7 +14,7 @@ from .persistence import BasePersistentStorage, HandlerResult, init_mongo from .settings import Settings from .subscriptions import SubscriptionType, Web3SubscriptionsManager -from .types import SilverbackIdent, handler_id_block, handler_id_event +from .types import SilverbackIdent, SilverbackStartupState, handler_id_block, handler_id_event from .utils import async_wrap_iter, hexbytes_dict settings = Settings() @@ -119,6 +119,22 @@ async def run(self): await self.app.broker.startup() + # Execute Silverback startup task before we init the rest + if startup_handler := self.app.get_startup_handler(): + task = await startup_handler.kiq( + SilverbackStartupState( + last_block_seen=self.last_block_seen, + last_block_processed=self.last_block_processed, + ) + ) + result = await task.wait_result() + await self._handle_result( + "silverback_startup", + self.last_block_seen, + -1, + result, + ) + if block_handler := self.app.get_block_handler(): tasks = [self._block_task(block_handler)] else: diff --git a/silverback/types.py b/silverback/types.py index 0cc027bd..893a69c8 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -16,6 +16,11 @@ def from_settings(cls, settings_: Settings) -> Self: return cls(identifier=settings_.INSTANCE, network_choice=settings_.get_network_choice()) +class SilverbackStartupState(BaseModel): + last_block_seen: int + last_block_processed: int + + def handler_id_block(block_number: Optional[int]) -> str: """Return a unique handler ID string for a block""" if block_number is None: From 8d2427c09542cecc469f6a192f304016b77218a2 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Sat, 18 Nov 2023 10:07:39 -0700 Subject: [PATCH 06/47] fix: add missing silverback_shutdown task exec in runner --- silverback/runner.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/silverback/runner.py b/silverback/runner.py index 3007db58..4f9b43d4 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -150,6 +150,17 @@ async def run(self): await asyncio.gather(*tasks) + # Execute Silverback shutdown task before shutting down the broker + if shutdown_handler := self.app.get_shutdown_handler(): + task = await shutdown_handler.kiq() + result = await task.wait_result() + await self._handle_result( + "silverback_shutdown", + self.last_block_seen, + -1, + result, + ) + await self.app.broker.shutdown() From 2564382795dc55ebaf61fa03ff377587cd2d784b Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Sat, 18 Nov 2023 10:10:32 -0700 Subject: [PATCH 07/47] style(lint): lint rolling --- silverback/middlewares.py | 2 +- silverback/persistence.py | 5 +++-- silverback/types.py | 3 +-- silverback/utils.py | 2 -- 4 files changed, 5 insertions(+), 7 deletions(-) diff --git a/silverback/middlewares.py b/silverback/middlewares.py index 37af0c2f..3b3288fc 100644 --- a/silverback/middlewares.py +++ b/silverback/middlewares.py @@ -1,5 +1,5 @@ from ape.logging import logger -from ape.types import ContractLog, HexBytes +from ape.types import ContractLog from ape.utils import ManagerAccessMixin from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult diff --git a/silverback/persistence.py b/silverback/persistence.py index 2d7cabde..1ee7ca65 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -1,7 +1,7 @@ import pickle from abc import ABC, abstractmethod from datetime import datetime, timezone -from typing import TYPE_CHECKING, Annotated, Any, Dict, Optional, Type +from typing import Annotated, Optional from ape.logging import logger from pydantic import BaseModel @@ -82,9 +82,10 @@ async def init_mongo(mongo_uri: str) -> Optional[BasePersistentStorage]: try: import pymongo from beanie import Document, Indexed, init_beanie - from beanie.odm.operators.update.general import Set from motor.core import AgnosticClient from motor.motor_asyncio import AsyncIOMotorClient + + # from beanie.odm.operators.update.general import Set except ImportError as err: print(err) logger.warning("MongoDB was initialized by dependencies are not installed") diff --git a/silverback/types.py b/silverback/types.py index 893a69c8..447f6243 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -1,5 +1,4 @@ -from enum import Enum -from typing import Annotated, Any, Dict, Optional +from typing import Optional from pydantic import BaseModel from typing_extensions import Self # Introduced 3.11 diff --git a/silverback/utils.py b/silverback/utils.py index c47ad38a..43da7efd 100644 --- a/silverback/utils.py +++ b/silverback/utils.py @@ -2,8 +2,6 @@ import threading from typing import AsyncIterator, Iterator -from ape.contracts import ContractEvent, ContractInstance -from ape.managers.chain import BlockContainer from ape.types import HexBytes From e4caa595471a097528b2b4a77d294098d3d85fb4 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Sat, 18 Nov 2023 10:14:54 -0700 Subject: [PATCH 08/47] refactor: remove unused SilverbackApp.checkpoint() method --- silverback/application.py | 4 ---- silverback/runner.py | 10 ---------- 2 files changed, 14 deletions(-) diff --git a/silverback/application.py b/silverback/application.py index 0ae38c80..98960dab 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -73,10 +73,6 @@ def __init__(self, settings: Optional[Settings] = None): f"{signer_str}{start_block_str}{new_block_timeout_str}" ) - def checkpoint(self, last_block_seen: int, last_block_processed: int): - self.broker.state.last_block_seen = last_block_seen - self.broker.state.last_block_processed = last_block_processed - def on_startup(self) -> Callable: """ Code to execute on one worker upon startup / restart after an error. diff --git a/silverback/runner.py b/silverback/runner.py index 4f9b43d4..a87f71d0 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -67,8 +67,6 @@ async def _checkpoint( self.last_block_seen = max(last_block_seen, self.last_block_seen) self.last_block_processed = max(last_block_processed, self.last_block_processed) - self.app.checkpoint(self.last_block_seen, self.last_block_processed) - if self.persistence: await self.persistence.set_instance_state( self.ident, self.last_block_seen, self.last_block_processed @@ -109,14 +107,6 @@ async def run(self): self.last_block_seen = boot_state.last_block_seen self.last_block_processed = boot_state.last_block_processed - # NOTE: This will update TaskIQ state before app startup. - # TODO: I have no idea if this will work with a distributed runner/worker - # setup. That may require our own injected startup task, but it's unclear if - # we can do that before Taskiq's startup handlers. We might be able to force - # SilverbackApplication to decorate an internal func to handle it. Need to - # figure that out. - self.app.checkpoint(self.last_block_seen, self.last_block_processed) - await self.app.broker.startup() # Execute Silverback startup task before we init the rest From 8be2d10688f3db4cb9a4a128faaff3b702861530 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 20 Nov 2023 13:50:21 -0700 Subject: [PATCH 09/47] refactor: remove mongo, add sqlite --- setup.py | 4 - silverback/persistence.py | 364 ++++++++++++++++++++------------------ silverback/runner.py | 7 +- silverback/settings.py | 12 +- silverback/types.py | 13 +- 5 files changed, 219 insertions(+), 181 deletions(-) diff --git a/setup.py b/setup.py index 81c58b70..9c7911ff 100644 --- a/setup.py +++ b/setup.py @@ -40,9 +40,6 @@ "IPython", # Console for interacting "ipdb", # Debugger (Must use `export PYTHONBREAKPOINT=ipdb.set_trace`) ], - "mongo": [ - "beanie~=1.23.6", - ], } # NOTE: `pip install -e .[dev]` to install package @@ -52,7 +49,6 @@ + extras_require["doc"] + extras_require["release"] + extras_require["dev"] - + extras_require["mongo"] ) with open("./README.md") as readme: diff --git a/silverback/persistence.py b/silverback/persistence.py index 1ee7ca65..2e640c47 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -1,14 +1,14 @@ import pickle +import sqlite3 from abc import ABC, abstractmethod from datetime import datetime, timezone -from typing import Annotated, Optional +from typing import Optional -from ape.logging import logger from pydantic import BaseModel from taskiq import TaskiqResult from typing_extensions import Self # Introduced 3.11 -from .types import SilverbackIdent +from .types import SilverbackIdent, SilverbackSettings class SilverbackState(BaseModel): @@ -57,207 +57,231 @@ def from_taskiq( class BasePersistentStorage(ABC): + def __init__(self, settings: SilverbackSettings): + self.settings = settings + + @abstractmethod + async def init(self): + """Handle any async initialization from Silverback settings (e.g. migrations).""" + ... + @abstractmethod async def get_instance_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]: + """Return the stored state for a Silverback instance""" ... @abstractmethod async def set_instance_state( self, ident: SilverbackIdent, last_block_seen: int, last_block_processed: int ) -> Optional[SilverbackState]: + """Set the stored state for a Silverback instance""" ... @abstractmethod async def get_latest_result( - self, instance: SilverbackIdent, handler: Optional[str] = None + self, ident: SilverbackIdent, handler: Optional[str] = None ) -> Optional[HandlerResult]: + """Return the latest result for a Silverback instance's handler""" ... @abstractmethod async def add_result(self, v: HandlerResult): + """Store a result for a Silverback instance's handler""" ... -async def init_mongo(mongo_uri: str) -> Optional[BasePersistentStorage]: - try: - import pymongo - from beanie import Document, Indexed, init_beanie - from motor.core import AgnosticClient - from motor.motor_asyncio import AsyncIOMotorClient - - # from beanie.odm.operators.update.general import Set - except ImportError as err: - print(err) - logger.warning("MongoDB was initialized by dependencies are not installed") - return None - - # NOTE: Ignoring an inheritence issue with pydantic's Config class. Goes away with v2 - class SilverbackStateDoc(SilverbackState, Document): # type: ignore - instance: Annotated[str, Indexed(str)] - network: Annotated[str, Indexed(str)] - last_block_seen: int - last_block_processed: int - updated: datetime - - class Settings: - name = "state" - indexes = [ - [ - ("instance", pymongo.TEXT), - ("network", pymongo.TEXT), - ], - ] - - def to_silberback_state(self) -> SilverbackState: - return SilverbackState( - instance=self.instance, - network=self.network, - last_block_seen=self.last_block_seen, - last_block_processed=self.last_block_processed, - updated=self.updated, - ) +class SQLitePersistentStorage(BasePersistentStorage): + SQL_GET_STATE = """ + SELECT last_block_seen, last_block_processed, updated + FROM silverback_state + WHERE instance = ? AND network = ?; + """ + SQL_INSERT_STATE = """ + INSERT INTO silverback_state ( + instance, network, last_block_seen, last_block_processed, updated + ) + VALUES (?, ?, ?, ?, ?); + """ + SQL_UPDATE_STATE = """ + UPDATE silverback_state + SET last_block_seen = ?, last_block_processed = ?, updated = ? + WHERE instance = ? AND network = ?; + """ + SQL_GET_RESULT_LATEST = """ + SELECT handler_id, block_number, log_index, execution_time, return_value_blob, created + FROM silverback_result + WHERE instance = ? AND network = ? + ORDER BY created DESC + LIMIT 1; + """ + SQL_GET_HANDLER_LATEST = """ + SELECT handler_id, block_number, log_index, execution_time, return_value_blob, created + FROM silverback_result + WHERE instance = ? AND network = ? AND handler_id = ? + ORDER BY created DESC + LIMIT 1; + """ + SQL_INSERT_RESULT = """ + INSERT INTO silverback_result ( + instance, network, handler_id, block_number, log_index, execution_time, + return_value_blob, created + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?); + """ + + async def init(self): + self.con = sqlite3.connect(self.settings.PERSISTENCE_URI or ":memory:") + + cur = self.con.cursor() + cur.executescript( + """ + BEGIN; + CREATE TABLE IF NOT EXISTS silverback_state ( + instance text, + network text, + last_block_seen int, + last_block_processed int, + updated int + ); + CREATE TABLE IF NOT EXISTS silverback_result ( + instance text, + network text, + handler_id text, + block_number int, + log_index int, + execution_time real, + return_value_blob blob, + created int + ); + CREATE UNIQUE INDEX IF NOT EXISTS silverback_state__instance + ON silverback_state(instance, network); + CREATE INDEX IF NOT EXISTS silverback_result__instance + ON silverback_result (instance, network); + CREATE INDEX IF NOT EXISTS silverback_result__handler + ON silverback_result (instance, network, handler_id); + COMMIT; + """ + ) + cur.close() - # NOTE: Ignoring an inheritence issue with pydantic's Config class. Goes away with v2 - class HandlerResultDoc(HandlerResult, Document): # type: ignore - # NOTE: Redefining these to annotate with indexed type - instance: Annotated[str, Indexed(str)] - network: Annotated[str, Indexed(str)] - handler_id: Annotated[str, Indexed(str)] - - class Settings: - name = "result" - indexes = [ - [ - ("instance", pymongo.TEXT), - ("network", pymongo.TEXT), - ("handler", pymongo.TEXT), - ], - ] - - @classmethod - def from_handler_result(cls, result: HandlerResult) -> Self: - return cls( - instance=result.instance, - network=result.network, - handler_id=result.handler_id, - block_number=result.block_number, - log_index=result.log_index, - execution_time=result.execution_time, - return_value_blob=result.return_value_blob, - created=result.created, - ) + async def get_instance_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]: + cur = self.con.cursor() + res = cur.execute( + self.SQL_GET_STATE, + (ident.identifier, ident.network_choice), + ) + row = res.fetchone() - def to_handler_result(self) -> HandlerResult: - return HandlerResult( - instance=self.instance, - network=self.network, - handler_id=self.handler, - block_number=self.block_number, - log_index=self.log_index, - execution_time=self.execution_time, - return_value_blob=self.return_value_blob, - created=self.created, - ) + cur.close() + + if row is None: + return None - class MongoStorage(BasePersistentStorage): - client: AgnosticClient + return SilverbackState( + instance=ident.identifier, + network=ident.network_choice, + last_block_seen=row[0], + last_block_processed=row[1], + updated=datetime.fromtimestamp(row[2], timezone.utc), + ) - async def get_instance_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]: - res = await SilverbackStateDoc.find_one( - SilverbackStateDoc.instance == ident.identifier, - SilverbackStateDoc.network == ident.network_choice, + async def set_instance_state( + self, ident: SilverbackIdent, last_block_seen: int, last_block_processed: int + ) -> Optional[SilverbackState]: + cur = self.con.cursor() + res = cur.execute( + self.SQL_GET_STATE, + (ident.identifier, ident.network_choice), + ) + row = res.fetchone() + + now = datetime.now(timezone.utc) + now_stamp = int(now.timestamp()) + + if row is None: + cur.execute( + self.SQL_INSERT_STATE, + ( + ident.identifier, + ident.network_choice, + last_block_seen, + last_block_processed, + now_stamp, + ), + ) + else: + cur.execute( + self.SQL_UPDATE_STATE, + ( + last_block_seen, + last_block_processed, + now_stamp, + ident.identifier, + ident.network_choice, + ), ) - if res is None: - return None + cur.close() + self.con.commit() - return res.to_silberback_state() + return SilverbackState( + instance=ident.identifier, + network=ident.network_choice, + last_block_seen=last_block_seen, + last_block_processed=last_block_processed, + updated=now, + ) - async def set_instance_state( - self, ident: SilverbackIdent, last_block_seen: int, last_block_processed: int - ) -> Optional[SilverbackState]: - now_utc = datetime.now(timezone.utc) + async def get_latest_result( + self, ident: SilverbackIdent, handler: Optional[str] = None + ) -> Optional[HandlerResult]: + cur = self.con.cursor() - state = await SilverbackStateDoc.find_one( - SilverbackStateDoc.instance == ident.identifier, - SilverbackStateDoc.network == ident.network_choice, + if handler is not None: + res = cur.execute( + self.SQL_GET_HANDLER_LATEST, + (ident.identifier, ident.network_choice, handler), ) - - if state is not None: - await state.set( - # Unreported type error? Confiremd working - { - SilverbackStateDoc.last_block_seen: last_block_seen, # type: ignore # noqa: E501 - SilverbackStateDoc.last_block_processed: last_block_processed, # type: ignore # noqa: E501 - SilverbackStateDoc.updated: now_utc, # type: ignore # noqa: E501 - } - ) - else: - state = SilverbackStateDoc( - instance=ident.identifier, - network=ident.network_choice, - last_block_seen=last_block_seen, - last_block_processed=last_block_processed, - updated=now_utc, - ) - await state.create() - - # TODO: Why no work? - # await SilverbackStateDoc.find_one( - # SilverbackStateDoc.instance == ident.identifier, - # SilverbackStateDoc.network == ident.network_choice, - # ).upsert( - # Set( - # { - # SilverbackStateDoc.last_block_seen: last_block_seen, - # SilverbackStateDoc.last_block_processed: last_block_processed, - # SilverbackStateDoc.updated: now_utc, - # } - # ), - # on_insert=SilverbackStateDoc( - # instance=ident.identifier, - # network=ident.network_choice, - # last_block_seen=last_block_seen, - # last_block_processed=last_block_processed, - # updated=now_utc, - # ), - # ) - - return state - - async def get_latest_result( - self, ident: SilverbackIdent, handler_id: Optional[str] = None - ) -> Optional[HandlerResult]: - query = HandlerResultDoc.find( - HandlerResultDoc.instance == ident.identifier, - HandlerResultDoc.network == ident.network_choice, + else: + res = cur.execute( + self.SQL_GET_RESULT_LATEST, + (ident.identifier, ident.network_choice), ) - if handler_id: - query.find(HandlerResultDoc.handler_id == handler_id) - - res = await query.sort("-created").first_or_none() + row = res.fetchone() - if res is None: - return None + cur.close() - return res.to_handler_result() + if row is None: + return None - async def add_result(self, result: HandlerResult): - doc = HandlerResultDoc.from_handler_result(result) - # Type annotation error: https://github.com/roman-right/beanie/issues/679 - await doc.insert() # type: ignore - - storage = MongoStorage() - client: AgnosticClient = AsyncIOMotorClient(mongo_uri) + return HandlerResult( + instance=ident.identifier, + network=ident.network_choice, + handler_id=row[0], + block_number=row[1], + log_index=row[2], + execution_time=row[3], + return_value_blob=row[4], + created=datetime.fromtimestamp(row[5], timezone.utc), + ) - await init_beanie( - database=client.db_name, - # Type annotation error: https://github.com/roman-right/beanie/issues/670 - document_models=[ - HandlerResultDoc, - SilverbackStateDoc, - ], # type: ignore - ) + async def add_result(self, v: HandlerResult): + cur = self.con.cursor() + + cur.execute( + self.SQL_INSERT_RESULT, + ( + v.instance, + v.network, + v.handler_id, + v.block_number, + v.log_index, + v.execution_time, + v.return_value_blob, + v.created, + ), + ) - return storage + cur.close() + self.con.commit() diff --git a/silverback/runner.py b/silverback/runner.py index a87f71d0..a7c51e2f 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -11,7 +11,7 @@ from .application import SilverbackApp from .exceptions import Halt, NoWebsocketAvailableError -from .persistence import BasePersistentStorage, HandlerResult, init_mongo +from .persistence import BasePersistentStorage, HandlerResult from .settings import Settings from .subscriptions import SubscriptionType, Web3SubscriptionsManager from .types import SilverbackIdent, SilverbackStartupState, handler_id_block, handler_id_event @@ -98,10 +98,11 @@ async def run(self): Raises: :class:`~silverback.exceptions.Halt`: If there are no configured tasks to execute. """ - if settings.MONGODB_URI: - self.persistence = await init_mongo(settings.MONGODB_URI) + self.persistence = settings.get_persistent_storage() if self.persistence: + await self.persistence.init() + boot_state = await self.persistence.get_instance_state(self.ident) if boot_state: self.last_block_seen = boot_state.last_block_seen diff --git a/silverback/settings.py b/silverback/settings.py index 87d8ac33..ef1ca9f0 100644 --- a/silverback/settings.py +++ b/silverback/settings.py @@ -7,6 +7,7 @@ from ._importer import import_from_string from .middlewares import SilverbackMiddleware +from .persistence import BasePersistentStorage class Settings(BaseSettings, ManagerAccessMixin): @@ -34,7 +35,9 @@ class Settings(BaseSettings, ManagerAccessMixin): NEW_BLOCK_TIMEOUT: Optional[int] = None START_BLOCK: Optional[int] = None - MONGODB_URI: Optional[str] = None + # Used for persistent storage + PERSISTENCE_CLASS: Optional[str] = None + PERSISTENCE_URI: Optional[str] = None class Config: env_prefix = "SILVERBACK_" @@ -68,6 +71,13 @@ def get_network_choice(self) -> str: # return self.NETWORK_CHOICE or self.network_manager.default_ecosystem.name return self.NETWORK_CHOICE or self.network_manager.network.choice + def get_persistent_storage(self) -> Optional[BasePersistentStorage]: + if not self.PERSISTENCE_CLASS: + return None + + persistence_class = import_from_string(self.PERSISTENCE_CLASS) + return persistence_class(settings=self) + def get_provider_context(self) -> ProviderContextManager: return self.network_manager.parse_network_choice(self.get_network_choice()) diff --git a/silverback/types.py b/silverback/types.py index 447f6243..17be0040 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -1,9 +1,16 @@ -from typing import Optional +from typing import Optional, Protocol from pydantic import BaseModel from typing_extensions import Self # Introduced 3.11 -from .settings import Settings + +class SilverbackSettings(Protocol): + INSTANCE: str + PERSISTENCE_CLASS: Optional[str] + PERSISTENCE_URI: Optional[str] + + def get_network_choice(self) -> str: + ... class SilverbackIdent(BaseModel): @@ -11,7 +18,7 @@ class SilverbackIdent(BaseModel): network_choice: str @classmethod - def from_settings(cls, settings_: Settings) -> Self: + def from_settings(cls, settings_: SilverbackSettings) -> Self: return cls(identifier=settings_.INSTANCE, network_choice=settings_.get_network_choice()) From 82cb68ef720f48884f3209fa90e09739bcbd6ff2 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 20 Nov 2023 14:03:48 -0700 Subject: [PATCH 10/47] fix: type | None is new in 3.10 --- silverback/persistence.py | 6 +++--- silverback/runner.py | 10 ++++++++-- silverback/types.py | 6 ++++-- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/silverback/persistence.py b/silverback/persistence.py index 2e640c47..48b1cb60 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -8,7 +8,7 @@ from taskiq import TaskiqResult from typing_extensions import Self # Introduced 3.11 -from .types import SilverbackIdent, SilverbackSettings +from .types import IntOrNone, SilverbackIdent, SilverbackSettings class SilverbackState(BaseModel): @@ -39,8 +39,8 @@ def from_taskiq( cls, ident: SilverbackIdent, handler_id: str, - block_number: int | None, - log_index: int | None, + block_number: IntOrNone, + log_index: IntOrNone, result: TaskiqResult, ) -> Self: return cls( diff --git a/silverback/runner.py b/silverback/runner.py index a7c51e2f..d48e00fa 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -14,7 +14,13 @@ from .persistence import BasePersistentStorage, HandlerResult from .settings import Settings from .subscriptions import SubscriptionType, Web3SubscriptionsManager -from .types import SilverbackIdent, SilverbackStartupState, handler_id_block, handler_id_event +from .types import ( + IntOrNone, + SilverbackIdent, + SilverbackStartupState, + handler_id_block, + handler_id_event, +) from .utils import async_wrap_iter, hexbytes_dict settings = Settings() @@ -32,7 +38,7 @@ def __init__(self, app: SilverbackApp, *args, max_exceptions: int = 3, **kwargs) self.ident = SilverbackIdent.from_settings(settings) async def _handle_result( - self, handler_id: str, block_number: int | None, log_index: int | None, result: TaskiqResult + self, handler_id: str, block_number: IntOrNone, log_index: IntOrNone, result: TaskiqResult ): if result.is_err: self.exceptions += 1 diff --git a/silverback/types.py b/silverback/types.py index 17be0040..5db10ccb 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -1,8 +1,10 @@ -from typing import Optional, Protocol +from typing import Optional, Protocol, Union from pydantic import BaseModel from typing_extensions import Self # Introduced 3.11 +StrOrNone = Union[str, None] # For Python < 3.10 + class SilverbackSettings(Protocol): INSTANCE: str @@ -34,7 +36,7 @@ def handler_id_block(block_number: Optional[int]) -> str: return f"block/{block_number}" -def handler_id_event(contract_address: str | None, event_signature: str) -> str: +def handler_id_event(contract_address: StrOrNone, event_signature: str) -> str: """Return a unique handler ID string for an event""" # TODO: Under what circumstance can address be None? return f"event/{contract_address or 'unknown'}/{event_signature}" From c420f49aa9ee81ed39239dc6ff1bcb01e6792d4e Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 20 Nov 2023 14:07:07 -0700 Subject: [PATCH 11/47] fix: forgot to save... --- silverback/types.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/silverback/types.py b/silverback/types.py index 5db10ccb..ba162b9f 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -3,7 +3,9 @@ from pydantic import BaseModel from typing_extensions import Self # Introduced 3.11 -StrOrNone = Union[str, None] # For Python < 3.10 +# NOTE: 'type | None' introduced in 3.10 +StrOrNone = Union[str, None] +IntOrNone = Union[int, None] class SilverbackSettings(Protocol): From 7e9e67ada5f58123f71be03b1ead309aba6687ed Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 20 Nov 2023 18:32:41 -0700 Subject: [PATCH 12/47] style: minor cleanup and docstrings --- silverback/application.py | 8 ++++---- silverback/persistence.py | 4 ++-- silverback/settings.py | 1 - silverback/types.py | 7 +++++-- silverback/utils.py | 1 + 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/silverback/application.py b/silverback/application.py index 98960dab..ed32b6de 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -80,8 +80,8 @@ def on_startup(self) -> Callable: Usage example:: @app.on_startup() - def do_something_on_startup(state): - ... # Can provision resources, or add things to `state`. + def do_something_on_startup(startup_state): + ... # Reprocess missed events or blocks """ return self.broker.task(task_name="silverback_startup") @@ -92,8 +92,8 @@ def on_shutdown(self) -> Callable: Usage example:: @app.on_shutdown() - def do_something_on_shutdown(state): - ... # Update some external service, perhaps using information from `state`. + def do_something_on_shutdown(): + ... # Record final state of app """ return self.broker.task(task_name="silverback_shutdown") diff --git a/silverback/persistence.py b/silverback/persistence.py index 48b1cb60..254b7317 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -8,7 +8,7 @@ from taskiq import TaskiqResult from typing_extensions import Self # Introduced 3.11 -from .types import IntOrNone, SilverbackIdent, SilverbackSettings +from .types import IntOrNone, SilverbackIdent, ISilverbackSettings class SilverbackState(BaseModel): @@ -57,7 +57,7 @@ def from_taskiq( class BasePersistentStorage(ABC): - def __init__(self, settings: SilverbackSettings): + def __init__(self, settings: ISilverbackSettings): self.settings = settings @abstractmethod diff --git a/silverback/settings.py b/silverback/settings.py index ef1ca9f0..96694e60 100644 --- a/silverback/settings.py +++ b/silverback/settings.py @@ -68,7 +68,6 @@ def get_broker(self) -> AsyncBroker: return broker def get_network_choice(self) -> str: - # return self.NETWORK_CHOICE or self.network_manager.default_ecosystem.name return self.NETWORK_CHOICE or self.network_manager.network.choice def get_persistent_storage(self) -> Optional[BasePersistentStorage]: diff --git a/silverback/types.py b/silverback/types.py index ba162b9f..76e77d05 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -8,7 +8,10 @@ IntOrNone = Union[int, None] -class SilverbackSettings(Protocol): +class ISilverbackSettings(Protocol): + """Loose approximation of silverback.settings.Settings. If you can, use the class as + a type reference.""" + INSTANCE: str PERSISTENCE_CLASS: Optional[str] PERSISTENCE_URI: Optional[str] @@ -22,7 +25,7 @@ class SilverbackIdent(BaseModel): network_choice: str @classmethod - def from_settings(cls, settings_: SilverbackSettings) -> Self: + def from_settings(cls, settings_: ISilverbackSettings) -> Self: return cls(identifier=settings_.INSTANCE, network_choice=settings_.get_network_choice()) diff --git a/silverback/utils.py b/silverback/utils.py index 43da7efd..2a6846e2 100644 --- a/silverback/utils.py +++ b/silverback/utils.py @@ -39,6 +39,7 @@ def iter_to_queue(): def hexbytes_dict(data: dict) -> dict: + """Converts any hex string values in a flat dictionary to HexBytes.""" fixed_data = {} for name, value in data.items(): From 134748d070a557c4a7b8a1e942c8e5ce4b3e1d62 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 20 Nov 2023 19:05:30 -0700 Subject: [PATCH 13/47] style(lint): where that pre-commit hook when you need it --- silverback/persistence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/silverback/persistence.py b/silverback/persistence.py index 254b7317..76cddb9a 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -8,7 +8,7 @@ from taskiq import TaskiqResult from typing_extensions import Self # Introduced 3.11 -from .types import IntOrNone, SilverbackIdent, ISilverbackSettings +from .types import IntOrNone, ISilverbackSettings, SilverbackIdent class SilverbackState(BaseModel): From f58911ad698ae36115e77db119b7ddebd2db5fbd Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 20 Nov 2023 19:54:01 -0700 Subject: [PATCH 14/47] fix: call to self.network.__enter__ should be before we try and touch network settings --- example.py => examples/memory/main.py | 0 silverback/application.py | 11 +++++------ 2 files changed, 5 insertions(+), 6 deletions(-) rename example.py => examples/memory/main.py (100%) diff --git a/example.py b/examples/memory/main.py similarity index 100% rename from example.py rename to examples/memory/main.py diff --git a/silverback/application.py b/silverback/application.py index ed32b6de..2d443ff1 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -38,10 +38,13 @@ def __init__(self, settings: Optional[Settings] = None): if not settings: settings = Settings() + self.network = settings.get_provider_context() + # NOTE: This allows using connected ape methods e.g. `Contract` + provider = self.network.__enter__() + # Adjust defaults from connection if settings.NEW_BLOCK_TIMEOUT is None and ( - self.chain_manager.provider.network.name.endswith("-fork") - or self.chain_manager.provider.network.name == LOCAL_NETWORK_NAME + provider.network.name.endswith("-fork") or provider.network.name == LOCAL_NETWORK_NAME ): settings.NEW_BLOCK_TIMEOUT = int(timedelta(days=1).total_seconds()) @@ -52,10 +55,6 @@ def __init__(self, settings: Optional[Settings] = None): self.contract_events: Dict[AddressType, Dict[str, ContractEvent]] = {} self.poll_settings: Dict[str, Dict] = {} - self.network = settings.get_provider_context() - # NOTE: This allows using connected ape methods e.g. `Contract` - provider = self.network.__enter__() - atexit.register(self.network.__exit__) self.signer = settings.get_signer() From 0506ad51113d3ecd449241d6cc0d2f8ddef2d0cf Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 20 Nov 2023 19:54:49 -0700 Subject: [PATCH 15/47] feat(cli): adds simple `worker` command --- silverback/_cli.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/silverback/_cli.py b/silverback/_cli.py index f1d1e6e5..c7336435 100644 --- a/silverback/_cli.py +++ b/silverback/_cli.py @@ -3,9 +3,12 @@ import click from ape.cli import AccountAliasPromptChoice, ape_cli_context, network_option, verbosity_option +from taskiq.cli.worker.args import WorkerArgs +from taskiq.cli.worker.run import run_worker from silverback._importer import import_from_string from silverback.runner import PollingRunner +from silverback.settings import Settings @click.group() @@ -57,3 +60,14 @@ def run(cli_ctx, network, account, runner, max_exceptions, path): app = import_from_string(path) runner = runner(app, max_exceptions=max_exceptions) asyncio.run(runner.run()) + + +@cli.command() +@ape_cli_context() +@verbosity_option() +@network_option(default=None, callback=_network_callback) +@click.option("--account", type=AccountAliasPromptChoice(), callback=_account_callback) +@click.option("-w", "--workers", type=int, default=2) +@click.argument("broker") +def worker(cli_ctx, network, account, workers, broker): + run_worker(WorkerArgs(broker=broker, modules=[], workers=workers)) From 2d67e49f0339f554a9f06044e79e13789e61a264 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 20 Nov 2023 19:55:25 -0700 Subject: [PATCH 16/47] docs: move example, create redis example, update README --- README.md | 6 ++-- examples/memory/main.py | 40 +++++++++++++++++------ examples/redis/main.py | 72 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 106 insertions(+), 12 deletions(-) create mode 100644 examples/redis/main.py diff --git a/README.md b/README.md index d547cd7a..c6bb679b 100644 --- a/README.md +++ b/README.md @@ -43,18 +43,18 @@ python3 setup.py install ## Quick Usage -Checkout [the example](./example.py) to see how to use the library. +Checkout [the example](./examples/memory/main.py) to see how to use the library. To run your bot against a live network, this SDK includes a simple runner you can use via: ```sh -$ silverback run "example:app" --network :mainnet:alchemy +$ silverback run "examples.memory.main:app" --network :mainnet:alchemy ``` ## Docker Usage ```sh -$ docker run --volume $PWD:/home/harambe/project --volume ~/.tokenlists:/home/harambe/.tokenlists apeworx/silverback:latest run "example:app" --network :mainnet:alchemy +$ docker run --volume $PWD:/home/harambe/project --volume ~/.tokenlists:/home/harambe/.tokenlists apeworx/silverback:latest run "examples.memory.main:app" --network :mainnet:alchemy ``` ## Development diff --git a/examples/memory/main.py b/examples/memory/main.py index 8548ff70..7c370b56 100644 --- a/examples/memory/main.py +++ b/examples/memory/main.py @@ -3,7 +3,7 @@ from ape.types import ContractLog from ape_tokens import tokens # type: ignore[import] -from silverback import CircuitBreaker, SilverbackApp +from silverback import CircuitBreaker, SilverbackApp, SilverbackStartupState # Do this to initialize your app app = SilverbackApp() @@ -13,36 +13,58 @@ YFI = tokens["YFI"] -# Can handle some stuff on startup, like loading a heavy model or something @app.on_startup() -def startup(state): - return {"message": "Starting..."} +def app_startup(startup_state: SilverbackStartupState): + return {"message": "Starting...", "block_number": startup_state.last_block_seen} + + +@app.on_client_startup() +def client_startup(state): + return {"message": "Client started."} + + +# Can handle some initialization on startup, like models or network connections +@app.on_worker_startup() +def worker_startup(state): + state.block_count = 0 + # state.db = MyDB() + return {"message": "Worker started."} # This is how we trigger off of new blocks @app.on_(chain.blocks) -def exec_block(block: BlockAPI): +def exec_block(block: BlockAPI, context): + context.state.block_count += 1 return len(block.transactions) # This is how we trigger off of events # Set new_block_timeout to adjust the expected block time. -@app.on_(USDC.Transfer, start_block=17793100, new_block_timeout=25) +@app.on_(USDC.Transfer, start_block=18588777, new_block_timeout=25) # NOTE: Typing isn't required def exec_event1(log): if log.log_index % 7 == 3: # If you ever want the app to shutdown under some scenario, call this exception raise CircuitBreaker("Oopsie!") - return log.amount + return {"amount": log.amount} @app.on_(YFI.Approval) # Any handler function can be async too async def exec_event2(log: ContractLog): - return log.value + return log.amount # Just in case you need to release some resources or something +@app.on_worker_shutdown() +def worker_shutdown(state): + return { + "message": f"Worker stopped after handling {state.block_count} blocks.", + "block_count": state.block_count, + } + + +# A final job to execute on Silverback shutdown @app.on_shutdown() -def shutdown(state): +def app_shutdown(state): return {"message": "Stopping..."} diff --git a/examples/redis/main.py b/examples/redis/main.py new file mode 100644 index 00000000..8ead8af0 --- /dev/null +++ b/examples/redis/main.py @@ -0,0 +1,72 @@ +from ape import chain +from ape.api import BlockAPI +from ape.types import ContractLog +from ape_tokens import tokens # type: ignore[import] + +from silverback import CircuitBreaker, SilverbackApp, SilverbackStartupState + +# Do this to initialize your app +app = SilverbackApp() +# TODO: broker must be exposed as a module-level var for taskiq to resolve it... +broker = app.broker + +# NOTE: Don't do any networking until after initializing app +USDC = tokens["USDC"] +YFI = tokens["YFI"] + + +# Can handle some stuff on startup, like loading a heavy model or something +@app.on_startup() +def app_startup(startup_state: SilverbackStartupState): + return {"message": "Starting...", "block_number": startup_state.last_block_seen} + + +@app.on_client_startup() +def client_startup(state): + return {"message": "Client started."} + + +@app.on_worker_startup() +def worker_startup(state): + state.block_count = 0 + # state.db = MyDB() + return {"message": "Worker started."} + + +# This is how we trigger off of new blocks +@app.on_(chain.blocks) +def exec_block(block: BlockAPI, context): + context.state.block_count += 1 + return len(block.transactions) + + +# This is how we trigger off of events +# Set new_block_timeout to adjust the expected block time. +@app.on_(USDC.Transfer, start_block=18588777, new_block_timeout=25) +# NOTE: Typing isn't required +def exec_event1(log): + if log.log_index % 7 == 3: + # If you ever want the app to shutdown under some scenario, call this exception + raise CircuitBreaker("Oopsie!") + return {"amount": log.amount} + + +@app.on_(YFI.Approval) +# Any handler function can be async too +async def exec_event2(log: ContractLog): + return log.amount + + +# Just in case you need to release some resources or something +@app.on_worker_shutdown() +def worker_shutdown(state): + return { + "message": f"Worker stopped after handling {state.block_count} blocks.", + "block_count": state.block_count, + } + + +# A final job to execute on Silverback shutdown +@app.on_shutdown() +def app_shutdown(state): + return {"message": "Stopping..."} From 98a7adbdc6672dda95c80a0b7b1b91fb180da447 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 20 Nov 2023 19:56:26 -0700 Subject: [PATCH 17/47] fix(style): remove unused import --- silverback/_cli.py | 1 - 1 file changed, 1 deletion(-) diff --git a/silverback/_cli.py b/silverback/_cli.py index c7336435..cc76295d 100644 --- a/silverback/_cli.py +++ b/silverback/_cli.py @@ -8,7 +8,6 @@ from silverback._importer import import_from_string from silverback.runner import PollingRunner -from silverback.settings import Settings @click.group() From 19a458b7e118a03b99c2b7229c20ab214780bd79 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 20 Nov 2023 20:09:54 -0700 Subject: [PATCH 18/47] style(lint): chill, mypy --- examples/memory/__init__.py | 0 examples/redis/__init__.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 examples/memory/__init__.py create mode 100644 examples/redis/__init__.py diff --git a/examples/memory/__init__.py b/examples/memory/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/redis/__init__.py b/examples/redis/__init__.py new file mode 100644 index 00000000..e69de29b From 4bb23c821aba80c6e18f8603d80aafa3028ab3e5 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 20 Nov 2023 20:23:55 -0700 Subject: [PATCH 19/47] fix: update examples to fix taskiq issue --- examples/memory/main.py | 5 ++++- examples/redis/main.py | 7 +++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/examples/memory/main.py b/examples/memory/main.py index 7c370b56..448bce03 100644 --- a/examples/memory/main.py +++ b/examples/memory/main.py @@ -2,8 +2,10 @@ from ape.api import BlockAPI from ape.types import ContractLog from ape_tokens import tokens # type: ignore[import] +from typing import Annotated from silverback import CircuitBreaker, SilverbackApp, SilverbackStartupState +from taskiq import Context, TaskiqDepends, TaskiqState # Do this to initialize your app app = SilverbackApp() @@ -33,7 +35,8 @@ def worker_startup(state): # This is how we trigger off of new blocks @app.on_(chain.blocks) -def exec_block(block: BlockAPI, context): +# context must be a type annotated kwarg to be provided to the task +def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]): context.state.block_count += 1 return len(block.transactions) diff --git a/examples/redis/main.py b/examples/redis/main.py index 8ead8af0..7e9be56f 100644 --- a/examples/redis/main.py +++ b/examples/redis/main.py @@ -2,8 +2,10 @@ from ape.api import BlockAPI from ape.types import ContractLog from ape_tokens import tokens # type: ignore[import] +from typing import Annotated from silverback import CircuitBreaker, SilverbackApp, SilverbackStartupState +from taskiq import Context, TaskiqDepends, TaskiqState # Do this to initialize your app app = SilverbackApp() @@ -27,7 +29,7 @@ def client_startup(state): @app.on_worker_startup() -def worker_startup(state): +def worker_startup(state: TaskiqState): state.block_count = 0 # state.db = MyDB() return {"message": "Worker started."} @@ -35,7 +37,8 @@ def worker_startup(state): # This is how we trigger off of new blocks @app.on_(chain.blocks) -def exec_block(block: BlockAPI, context): +# context must be a type annotated kwarg to be provided to the task +def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]): context.state.block_count += 1 return len(block.transactions) From 4d50d4c0b08b19baba3b6932ac85ad558a01ac7e Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 20 Nov 2023 20:29:10 -0700 Subject: [PATCH 20/47] style(lint): b0rk b0rk b0rk b0rk --- examples/memory/main.py | 7 ++++--- examples/redis/main.py | 5 +++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/examples/memory/main.py b/examples/memory/main.py index 448bce03..d8953737 100644 --- a/examples/memory/main.py +++ b/examples/memory/main.py @@ -1,11 +1,12 @@ +from typing import Annotated + from ape import chain from ape.api import BlockAPI from ape.types import ContractLog from ape_tokens import tokens # type: ignore[import] -from typing import Annotated +from taskiq import Context, TaskiqDepends, TaskiqState from silverback import CircuitBreaker, SilverbackApp, SilverbackStartupState -from taskiq import Context, TaskiqDepends, TaskiqState # Do this to initialize your app app = SilverbackApp() @@ -27,7 +28,7 @@ def client_startup(state): # Can handle some initialization on startup, like models or network connections @app.on_worker_startup() -def worker_startup(state): +def worker_startup(state: TaskiqState): state.block_count = 0 # state.db = MyDB() return {"message": "Worker started."} diff --git a/examples/redis/main.py b/examples/redis/main.py index 7e9be56f..70ba35e1 100644 --- a/examples/redis/main.py +++ b/examples/redis/main.py @@ -1,11 +1,12 @@ +from typing import Annotated + from ape import chain from ape.api import BlockAPI from ape.types import ContractLog from ape_tokens import tokens # type: ignore[import] -from typing import Annotated +from taskiq import Context, TaskiqDepends, TaskiqState from silverback import CircuitBreaker, SilverbackApp, SilverbackStartupState -from taskiq import Context, TaskiqDepends, TaskiqState # Do this to initialize your app app = SilverbackApp() From 2d219b1e924eed1d5e992c6a15a8bf1109c040d3 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Tue, 21 Nov 2023 17:53:32 -0700 Subject: [PATCH 21/47] fix: worker command now uses broker app uses --- examples/redis/main.py | 2 -- silverback/_cli.py | 35 ++++++++++++++++++++++++++++------- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/examples/redis/main.py b/examples/redis/main.py index 70ba35e1..43928d2c 100644 --- a/examples/redis/main.py +++ b/examples/redis/main.py @@ -10,8 +10,6 @@ # Do this to initialize your app app = SilverbackApp() -# TODO: broker must be exposed as a module-level var for taskiq to resolve it... -broker = app.broker # NOTE: Don't do any networking until after initializing app USDC = tokens["USDC"] diff --git a/silverback/_cli.py b/silverback/_cli.py index cc76295d..0a653dc8 100644 --- a/silverback/_cli.py +++ b/silverback/_cli.py @@ -1,10 +1,12 @@ import asyncio import os +from concurrent.futures import ThreadPoolExecutor import click from ape.cli import AccountAliasPromptChoice, ape_cli_context, network_option, verbosity_option -from taskiq.cli.worker.args import WorkerArgs -from taskiq.cli.worker.run import run_worker +from taskiq import AsyncBroker +from taskiq.cli.worker.run import shutdown_broker +from taskiq.receiver import Receiver from silverback._importer import import_from_string from silverback.runner import PollingRunner @@ -42,7 +44,23 @@ def _network_callback(ctx, param, val): return val -@cli.command() +async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90): + try: + with ThreadPoolExecutor(max_workers=worker_count) as pool: + receiver = Receiver( + broker=broker, + executor=pool, + validate_params=True, + max_async_tasks=1, + max_prefetch=0, + ) + broker.is_worker_process = True + await receiver.listen() + finally: + await shutdown_broker(broker, shutdown_timeout) + + +@cli.command(help="Run Silverback application") @ape_cli_context() @verbosity_option() @network_option(default=None, callback=_network_callback) @@ -61,12 +79,15 @@ def run(cli_ctx, network, account, runner, max_exceptions, path): asyncio.run(runner.run()) -@cli.command() +@cli.command(help="Run distributed worker only") @ape_cli_context() @verbosity_option() @network_option(default=None, callback=_network_callback) @click.option("--account", type=AccountAliasPromptChoice(), callback=_account_callback) @click.option("-w", "--workers", type=int, default=2) -@click.argument("broker") -def worker(cli_ctx, network, account, workers, broker): - run_worker(WorkerArgs(broker=broker, modules=[], workers=workers)) +@click.option("-x", "--max-exceptions", type=int, default=3) +@click.option("-s", "--shutdown_timeout", type=int, default=90) +@click.argument("path") +def worker(cli_ctx, network, account, workers, max_exceptions, shutdown_timeout, path): + app = import_from_string(path) + asyncio.run(run_worker(app.broker, worker_count=workers, shutdown_timeout=shutdown_timeout)) From 9273cb02c5c8d76e773e4ff0cb4a9a0d7b304da3 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Tue, 21 Nov 2023 18:41:38 -0700 Subject: [PATCH 22/47] refactor: simplify app strings for examples --- README.md | 4 ++-- examples/memory/__init__.py | 3 +++ examples/redis/__init__.py | 3 +++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index c6bb679b..7b3e4050 100644 --- a/README.md +++ b/README.md @@ -48,13 +48,13 @@ Checkout [the example](./examples/memory/main.py) to see how to use the library. To run your bot against a live network, this SDK includes a simple runner you can use via: ```sh -$ silverback run "examples.memory.main:app" --network :mainnet:alchemy +$ silverback run "examples.memory:app" --network :mainnet:alchemy ``` ## Docker Usage ```sh -$ docker run --volume $PWD:/home/harambe/project --volume ~/.tokenlists:/home/harambe/.tokenlists apeworx/silverback:latest run "examples.memory.main:app" --network :mainnet:alchemy +$ docker run --volume $PWD:/home/harambe/project --volume ~/.tokenlists:/home/harambe/.tokenlists apeworx/silverback:latest run "examples.memory:app" --network :mainnet:alchemy ``` ## Development diff --git a/examples/memory/__init__.py b/examples/memory/__init__.py index e69de29b..de1660bd 100644 --- a/examples/memory/__init__.py +++ b/examples/memory/__init__.py @@ -0,0 +1,3 @@ +from .main import app + +__all__ = ["app"] diff --git a/examples/redis/__init__.py b/examples/redis/__init__.py index e69de29b..de1660bd 100644 --- a/examples/redis/__init__.py +++ b/examples/redis/__init__.py @@ -0,0 +1,3 @@ +from .main import app + +__all__ = ["app"] From f7059c4eb643cd903ac8058518c974e441978599 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Tue, 21 Nov 2023 18:42:29 -0700 Subject: [PATCH 23/47] fix: drop pickle for security, use generic types for HandlerResult return_type --- silverback/persistence.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/silverback/persistence.py b/silverback/persistence.py index 76cddb9a..8fd0f9fe 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -1,8 +1,8 @@ -import pickle +import json import sqlite3 from abc import ABC, abstractmethod from datetime import datetime, timezone -from typing import Optional +from typing import Generic, Optional, TypeVar from pydantic import BaseModel from taskiq import TaskiqResult @@ -10,6 +10,8 @@ from .types import IntOrNone, ISilverbackSettings, SilverbackIdent +_HandlerReturnType = TypeVar("_HandlerReturnType") + class SilverbackState(BaseModel): instance: str @@ -21,7 +23,7 @@ class SilverbackState(BaseModel): updated: datetime -class HandlerResult(BaseModel): +class HandlerResult(BaseModel, Generic[_HandlerReturnType]): instance: str network: str handler_id: str @@ -30,8 +32,7 @@ class HandlerResult(BaseModel): execution_time: float # TODO: upcoming feature in taskiq # labels: Dict[str] - # TODO: Use computed field with pydantic v2 - return_value_blob: Optional[bytes] # pickled data + return_value: _HandlerReturnType created: datetime @classmethod @@ -51,7 +52,7 @@ def from_taskiq( log_index=log_index, execution_time=result.execution_time, # labels=result.labels, - return_value_blob=pickle.dumps(result.return_value), + return_value=result.return_value, created=datetime.now(timezone.utc), ) @@ -262,7 +263,7 @@ async def get_latest_result( block_number=row[1], log_index=row[2], execution_time=row[3], - return_value_blob=row[4], + return_value=json.loads(row[4]), created=datetime.fromtimestamp(row[5], timezone.utc), ) @@ -278,7 +279,7 @@ async def add_result(self, v: HandlerResult): v.block_number, v.log_index, v.execution_time, - v.return_value_blob, + json.dumps(v.return_value), v.created, ), ) From 7dc5bd0bc3bf24689df6b58e77ece851b034c0c8 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Tue, 21 Nov 2023 18:51:18 -0700 Subject: [PATCH 24/47] refactor: HandlerResult should subclass TaskiqResult --- silverback/persistence.py | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/silverback/persistence.py b/silverback/persistence.py index 8fd0f9fe..f0f078ff 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -2,7 +2,7 @@ import sqlite3 from abc import ABC, abstractmethod from datetime import datetime, timezone -from typing import Generic, Optional, TypeVar +from typing import Optional, TypeVar from pydantic import BaseModel from taskiq import TaskiqResult @@ -23,16 +23,12 @@ class SilverbackState(BaseModel): updated: datetime -class HandlerResult(BaseModel, Generic[_HandlerReturnType]): +class HandlerResult(TaskiqResult): instance: str network: str handler_id: str block_number: Optional[int] log_index: Optional[int] - execution_time: float - # TODO: upcoming feature in taskiq - # labels: Dict[str] - return_value: _HandlerReturnType created: datetime @classmethod @@ -50,10 +46,8 @@ def from_taskiq( handler_id=handler_id, block_number=block_number, log_index=log_index, - execution_time=result.execution_time, - # labels=result.labels, - return_value=result.return_value, created=datetime.now(timezone.utc), + **result.dict(), ) @@ -109,14 +103,16 @@ class SQLitePersistentStorage(BasePersistentStorage): WHERE instance = ? AND network = ?; """ SQL_GET_RESULT_LATEST = """ - SELECT handler_id, block_number, log_index, execution_time, return_value_blob, created + SELECT handler_id, block_number, log_index, execution_time, is_err, created, + return_value_blob FROM silverback_result WHERE instance = ? AND network = ? ORDER BY created DESC LIMIT 1; """ SQL_GET_HANDLER_LATEST = """ - SELECT handler_id, block_number, log_index, execution_time, return_value_blob, created + SELECT handler_id, block_number, log_index, execution_time, is_err, created, + return_value_blob FROM silverback_result WHERE instance = ? AND network = ? AND handler_id = ? ORDER BY created DESC @@ -125,9 +121,9 @@ class SQLitePersistentStorage(BasePersistentStorage): SQL_INSERT_RESULT = """ INSERT INTO silverback_result ( instance, network, handler_id, block_number, log_index, execution_time, - return_value_blob, created + is_err, created, return_value_blob ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?); + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); """ async def init(self): @@ -151,8 +147,9 @@ async def init(self): block_number int, log_index int, execution_time real, - return_value_blob blob, - created int + is_err bool, + created int, + return_value_blob blob ); CREATE UNIQUE INDEX IF NOT EXISTS silverback_state__instance ON silverback_state(instance, network); @@ -160,6 +157,8 @@ async def init(self): ON silverback_result (instance, network); CREATE INDEX IF NOT EXISTS silverback_result__handler ON silverback_result (instance, network, handler_id); + CREATE INDEX IF NOT EXISTS silverback_result__is_err + ON silverback_result (is_err); COMMIT; """ ) @@ -263,8 +262,9 @@ async def get_latest_result( block_number=row[1], log_index=row[2], execution_time=row[3], - return_value=json.loads(row[4]), + is_err=row[4], created=datetime.fromtimestamp(row[5], timezone.utc), + return_value=json.loads(row[6]), ) async def add_result(self, v: HandlerResult): @@ -279,8 +279,9 @@ async def add_result(self, v: HandlerResult): v.block_number, v.log_index, v.execution_time, - json.dumps(v.return_value), + v.is_err, v.created, + json.dumps(v.return_value), ), ) From c300fb6e4cf23b141943a4f3f3259cb7c1dc2cb9 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Tue, 21 Nov 2023 19:27:50 -0700 Subject: [PATCH 25/47] docs: update docs for worker events, new on_startup behavior, nad elaborate on state a bit --- docs/userguides/development.md | 48 ++++++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/docs/userguides/development.md b/docs/userguides/development.md index 6d3a5305..ddf1d1cf 100644 --- a/docs/userguides/development.md +++ b/docs/userguides/development.md @@ -61,19 +61,57 @@ Any errors you raise during this function will get captured by the client, and r ## Startup and Shutdown -If you have heavier resources you want to load during startup, or otherwise perform some data collection prior to starting the bot, you can add a startup function like so: +### Worker Events + +If you have heavier resources you want to load during startup, or want to initialize things like database connections, you can add a worker startup function like so: ```py -@app.on_startup() +@app.on_worker_startup() def handle_on_worker_startup(state): + # Connect to DB, set initial state, etc + ... + +@app.on_worker_shutdown() +def handle_on_worker_shutdown(state): + # cleanup resources, close connections cleanly, etc ... ``` This function comes a parameter `state` that you can use for storing the results of your startup computation or resources that you have provisioned. -It's import to note that this is useful for ensuring that your workers (of which there can be multiple) have the resources necessary to properly handle any updates you want to make in your handler functions, such as connecting to the Telegram API, an SQL or NoSQL database connection, or something else. -The `state` variable is also useful as this gets made available to each handler method so other stateful quantities can be maintained for other uses. -TODO: Add more information about `state` +It's import to note that this is useful for ensuring that your workers (of which there can be multiple) have the resources necessary to properly handle any updates you want to make in your handler functions, such as connecting to the Telegram API, an SQL or NoSQL database connection, or something else. In a distributed environment, **this function will run on every worker**. + +#### Worker State + +The `state` variable is also useful as this can be made available to each handler method so other stateful quantities can be maintained for other uses. Each distributed worker has its own instance of state. + +To access the state from a handler, you must annotate `context` as a dependency like so: + +```py +from typing import Annotated +from taskiq import Context, TaskiqDepends + +@app.on_(chain.blocks) +def block_handler(block, context: Annotated[Context, TaskiqDepends()]): + # Access state via context.state + ... +``` + +### Application Events + +You can also add an application startup and shutdown handler that will be **executed by one worker upon every application startup**. This may be useful for things like processing historical events since the application was shutdown or other one-time actions to perform at startup. + +```py +@app.on_startup() +def handle_on_startup(state): + # Process missed events, etc + ... + +@app.on_shutdown() +def handle_on_startup(state): + # Record final state, etc + ... +``` ## Running your Application From e96e4f202d01ce83ec9f96413d85647ad4dc54bb Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 24 Nov 2023 13:15:47 -0700 Subject: [PATCH 26/47] refactor: use position arg for reasons --- silverback/settings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/silverback/settings.py b/silverback/settings.py index 96694e60..1b5a4655 100644 --- a/silverback/settings.py +++ b/silverback/settings.py @@ -75,7 +75,7 @@ def get_persistent_storage(self) -> Optional[BasePersistentStorage]: return None persistence_class = import_from_string(self.PERSISTENCE_CLASS) - return persistence_class(settings=self) + return persistence_class(self) def get_provider_context(self) -> ProviderContextManager: return self.network_manager.parse_network_choice(self.get_network_choice()) From 29fa6cebcdc563b1aeb3ac4dbec623234f17e969 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 24 Nov 2023 13:16:18 -0700 Subject: [PATCH 27/47] fix: surface errors in persistence when saving results --- silverback/runner.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/silverback/runner.py b/silverback/runner.py index d48e00fa..1e724614 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -54,7 +54,10 @@ async def _handle_result( self.ident, handler_id, block_number, log_index, result ) - await self.persistence.add_result(handler_result) + try: + await self.persistence.add_result(handler_result) + except Exception as err: + logger.error(f"Error storing result: {err}") async def _checkpoint( self, last_block_seen: int = 0, last_block_processed: int = 0 @@ -74,9 +77,12 @@ async def _checkpoint( self.last_block_processed = max(last_block_processed, self.last_block_processed) if self.persistence: - await self.persistence.set_instance_state( - self.ident, self.last_block_seen, self.last_block_processed - ) + try: + await self.persistence.set_instance_state( + self.ident, self.last_block_seen, self.last_block_processed + ) + except Exception as err: + logger.error(f"Error settings state: {err}") return self.last_block_seen, self.last_block_processed From a544e331f461c81a1a6d8a7f93fcfb9fcc236b27 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 24 Nov 2023 14:50:24 -0700 Subject: [PATCH 28/47] refactor: move results persistence calls to middleware --- silverback/middlewares.py | 39 ++++++++++++++++++++++++++ silverback/runner.py | 59 ++++++--------------------------------- silverback/settings.py | 2 +- 3 files changed, 49 insertions(+), 51 deletions(-) diff --git a/silverback/middlewares.py b/silverback/middlewares.py index 3b3288fc..331d320e 100644 --- a/silverback/middlewares.py +++ b/silverback/middlewares.py @@ -1,11 +1,31 @@ +from typing import Optional, Tuple + from ape.logging import logger from ape.types import ContractLog from ape.utils import ManagerAccessMixin from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult +from silverback.persistence import HandlerResult +from silverback.types import SilverbackIdent, handler_id_block, handler_id_event from silverback.utils import hexbytes_dict +def resolve_task(message: TaskiqMessage) -> Tuple[str, Optional[int], Optional[int]]: + block_number = None + log_index = None + task_id = message.task_name + + if task_id == "block": + block_number = message.args[0].number + task_id = handler_id_block(block_number) + elif task_id == "event": + block_number = message.args[0].block_number + log_index = message.args[0].log_index + task_id = handler_id_event(message.args[0].address, message.args[0].abi.name) + + return task_id, block_number, log_index + + class SilverbackMiddleware(TaskiqMiddleware, ManagerAccessMixin): def __init__(self, *args, **kwargs): def compute_block_time() -> int: @@ -17,7 +37,11 @@ def compute_block_time() -> int: return int((head.timestamp - genesis.timestamp) / head.number) + settings = kwargs.pop("silverback_settings") + self.block_time = self.chain_manager.provider.network.block_time or compute_block_time() + self.ident = SilverbackIdent.from_settings(settings) + self.persistence = settings.get_persistent_storage() def pre_send(self, message: TaskiqMessage) -> TaskiqMessage: # TODO: Necessary until https://github.com/ApeWorX/ape/issues/1465 is resolved @@ -69,6 +93,21 @@ def post_execute(self, message: TaskiqMessage, result: TaskiqResult): f"- {result.execution_time:.3f}s ({percentage_time:.1f}%)" ) + async def post_save(self, message: TaskiqMessage, result: TaskiqResult): + if not self.persistence: + return + + handler_id, block_number, log_index = resolve_task(message) + + handler_result = HandlerResult.from_taskiq( + self.ident, handler_id, block_number, log_index, result + ) + + try: + await self.persistence.add_result(handler_result) + except Exception as err: + logger.error(f"Error storing result: {err}") + async def on_error( self, message: TaskiqMessage, diff --git a/silverback/runner.py b/silverback/runner.py index 1e724614..2f8b9f9f 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -11,16 +11,10 @@ from .application import SilverbackApp from .exceptions import Halt, NoWebsocketAvailableError -from .persistence import BasePersistentStorage, HandlerResult +from .persistence import BasePersistentStorage from .settings import Settings from .subscriptions import SubscriptionType, Web3SubscriptionsManager -from .types import ( - IntOrNone, - SilverbackIdent, - SilverbackStartupState, - handler_id_block, - handler_id_event, -) +from .types import SilverbackIdent, SilverbackStartupState from .utils import async_wrap_iter, hexbytes_dict settings = Settings() @@ -37,9 +31,7 @@ def __init__(self, app: SilverbackApp, *args, max_exceptions: int = 3, **kwargs) self.persistence: Optional[BasePersistentStorage] = None self.ident = SilverbackIdent.from_settings(settings) - async def _handle_result( - self, handler_id: str, block_number: IntOrNone, log_index: IntOrNone, result: TaskiqResult - ): + def _handle_result(self, result: TaskiqResult): if result.is_err: self.exceptions += 1 @@ -49,16 +41,6 @@ async def _handle_result( if self.exceptions > self.max_exceptions: raise Halt() - if self.persistence: - handler_result = HandlerResult.from_taskiq( - self.ident, handler_id, block_number, log_index, result - ) - - try: - await self.persistence.add_result(handler_result) - except Exception as err: - logger.error(f"Error storing result: {err}") - async def _checkpoint( self, last_block_seen: int = 0, last_block_processed: int = 0 ) -> Tuple[int, int]: @@ -131,12 +113,7 @@ async def run(self): ) ) result = await task.wait_result() - await self._handle_result( - "silverback_startup", - self.last_block_seen, - -1, - result, - ) + self._handle_result(result) if block_handler := self.app.get_block_handler(): tasks = [self._block_task(block_handler)] @@ -157,12 +134,7 @@ async def run(self): if shutdown_handler := self.app.get_shutdown_handler(): task = await shutdown_handler.kiq() result = await task.wait_result() - await self._handle_result( - "silverback_shutdown", - self.last_block_seen, - -1, - result, - ) + self._handle_result(result) await self.app.broker.shutdown() @@ -195,9 +167,7 @@ async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): block_task = await block_handler.kiq(raw_block) result = await block_task.wait_result() - await self._handle_result( - handler_id_block(block.number), block.number or 0, None, result - ) + self._handle_result(result) if block.number is not None: await self._checkpoint(last_block_processed=block.number) @@ -229,12 +199,7 @@ async def _event_task( event_task = await event_handler.kiq(event) result = await event_task.wait_result() - await self._handle_result( - handler_id_event(contract_event.contract.address, contract_event.abi.selector), - event.block_number, - event.log_index, - result, - ) + self._handle_result(result) if event.block_number is not None: await self._checkpoint(last_block_processed=event.block_number) @@ -277,7 +242,7 @@ async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): block_task = await block_handler.kiq(block) result = await block_task.wait_result() - await self._handle_result(handler_id_block(block.number), block.number, None, result) + self._handle_result(result) if block.number is not None: await self._checkpoint(last_block_processed=block.number) @@ -307,13 +272,7 @@ async def _event_task( event_task = await event_handler.kiq(event) result = await event_task.wait_result() - await self._handle_result( - # TODO: Under what circumstance can address be None? - handler_id_event(address, contract_event.abi.selector), - event.block_number, - event.log_index, - result, - ) + self._handle_result(result) if event.block_number is not None: await self._checkpoint(last_block_processed=event.block_number) diff --git a/silverback/settings.py b/silverback/settings.py index 1b5a4655..4ede1610 100644 --- a/silverback/settings.py +++ b/silverback/settings.py @@ -51,7 +51,7 @@ def get_broker(self) -> AsyncBroker: else: broker = broker_class(self.BROKER_URI) - middlewares: List[TaskiqMiddleware] = [SilverbackMiddleware()] + middlewares: List[TaskiqMiddleware] = [SilverbackMiddleware(silverback_settings=self)] if self.ENABLE_METRICS: middlewares.append( From e8dcda3142fa0438d86c5f72cb95851d04c833b1 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 27 Nov 2023 11:41:04 -0700 Subject: [PATCH 29/47] Update silverback/_cli.py Co-authored-by: El De-dog-lo <3859395+fubuloubu@users.noreply.github.com> --- silverback/_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/silverback/_cli.py b/silverback/_cli.py index 0a653dc8..29a48007 100644 --- a/silverback/_cli.py +++ b/silverback/_cli.py @@ -79,7 +79,7 @@ def run(cli_ctx, network, account, runner, max_exceptions, path): asyncio.run(runner.run()) -@cli.command(help="Run distributed worker only") +@cli.command(help="Run Silverback application task workers") @ape_cli_context() @verbosity_option() @network_option(default=None, callback=_network_callback) From ee7627351188529b262cbc15d5d8a6817bef5860 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 27 Nov 2023 13:05:35 -0700 Subject: [PATCH 30/47] refactor: use lazy init and not require runner or whatever to have to call init() --- silverback/persistence.py | 29 +++++++++++++++++++++++++++++ silverback/runner.py | 2 -- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/silverback/persistence.py b/silverback/persistence.py index f0f078ff..f3efe88d 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -1,4 +1,5 @@ import json +from os import initgroups import sqlite3 from abc import ABC, abstractmethod from datetime import datetime, timezone @@ -126,6 +127,9 @@ class SQLitePersistentStorage(BasePersistentStorage): VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); """ + con: Optional[sqlite3.Connection] + initialized: bool = False + async def init(self): self.con = sqlite3.connect(self.settings.PERSISTENCE_URI or ":memory:") @@ -164,7 +168,17 @@ async def init(self): ) cur.close() + if not self.con: + raise Exception("Failed to setup SQLite connection") + + self.initialized = True + async def get_instance_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]: + if not self.initialized: + await self.init() + + assert self.con is not None + cur = self.con.cursor() res = cur.execute( self.SQL_GET_STATE, @@ -188,6 +202,11 @@ async def get_instance_state(self, ident: SilverbackIdent) -> Optional[Silverbac async def set_instance_state( self, ident: SilverbackIdent, last_block_seen: int, last_block_processed: int ) -> Optional[SilverbackState]: + if not self.initialized: + await self.init() + + assert self.con is not None + cur = self.con.cursor() res = cur.execute( self.SQL_GET_STATE, @@ -235,6 +254,11 @@ async def set_instance_state( async def get_latest_result( self, ident: SilverbackIdent, handler: Optional[str] = None ) -> Optional[HandlerResult]: + if not self.initialized: + await self.init() + + assert self.con is not None + cur = self.con.cursor() if handler is not None: @@ -268,6 +292,11 @@ async def get_latest_result( ) async def add_result(self, v: HandlerResult): + if not self.initialized: + await self.init() + + assert self.con is not None + cur = self.con.cursor() cur.execute( diff --git a/silverback/runner.py b/silverback/runner.py index 2f8b9f9f..cba5cd00 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -95,8 +95,6 @@ async def run(self): self.persistence = settings.get_persistent_storage() if self.persistence: - await self.persistence.init() - boot_state = await self.persistence.get_instance_state(self.ident) if boot_state: self.last_block_seen = boot_state.last_block_seen From 0f1f7f220e2559359ff736a547172ed17342279e Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 27 Nov 2023 13:40:07 -0700 Subject: [PATCH 31/47] fix: run all requested workers --- examples/redis/main.py | 5 +++-- silverback/_cli.py | 22 +++++++++++++--------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/examples/redis/main.py b/examples/redis/main.py index 43928d2c..52064e83 100644 --- a/examples/redis/main.py +++ b/examples/redis/main.py @@ -62,9 +62,10 @@ async def exec_event2(log: ContractLog): # Just in case you need to release some resources or something @app.on_worker_shutdown() def worker_shutdown(state): + block_count = state.block_count if hasattr(state, "block_count") else 0 return { - "message": f"Worker stopped after handling {state.block_count} blocks.", - "block_count": state.block_count, + "message": f"Worker stopped after handling {block_count} blocks.", + "block_count": block_count, } diff --git a/silverback/_cli.py b/silverback/_cli.py index 29a48007..a0a6c098 100644 --- a/silverback/_cli.py +++ b/silverback/_cli.py @@ -46,16 +46,20 @@ def _network_callback(ctx, param, val): async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90): try: + tasks = [] with ThreadPoolExecutor(max_workers=worker_count) as pool: - receiver = Receiver( - broker=broker, - executor=pool, - validate_params=True, - max_async_tasks=1, - max_prefetch=0, - ) - broker.is_worker_process = True - await receiver.listen() + for _ in range(worker_count): + receiver = Receiver( + broker=broker, + executor=pool, + validate_params=True, + max_async_tasks=1, + max_prefetch=0, + ) + broker.is_worker_process = True + tasks.append(receiver.listen()) + + await asyncio.gather(*tasks) finally: await shutdown_broker(broker, shutdown_timeout) From 8dfb17c440cd56780aa8d22fc830e4035b96d110 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 27 Nov 2023 13:41:29 -0700 Subject: [PATCH 32/47] fix: wut --- silverback/persistence.py | 1 - 1 file changed, 1 deletion(-) diff --git a/silverback/persistence.py b/silverback/persistence.py index f3efe88d..4599fd5a 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -1,5 +1,4 @@ import json -from os import initgroups import sqlite3 from abc import ABC, abstractmethod from datetime import datetime, timezone From a0d1c2043979e0a2678a3d4cebb8d0aca2d16af1 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 27 Nov 2023 14:04:03 -0700 Subject: [PATCH 33/47] docs: add section covering distributed configuration --- docs/userguides/development.md | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/docs/userguides/development.md b/docs/userguides/development.md index ddf1d1cf..144bef2f 100644 --- a/docs/userguides/development.md +++ b/docs/userguides/development.md @@ -79,7 +79,7 @@ def handle_on_worker_shutdown(state): This function comes a parameter `state` that you can use for storing the results of your startup computation or resources that you have provisioned. -It's import to note that this is useful for ensuring that your workers (of which there can be multiple) have the resources necessary to properly handle any updates you want to make in your handler functions, such as connecting to the Telegram API, an SQL or NoSQL database connection, or something else. In a distributed environment, **this function will run on every worker**. +It's import to note that this is useful for ensuring that your workers (of which there can be multiple) have the resources necessary to properly handle any updates you want to make in your handler functions, such as connecting to the Telegram API, an SQL or NoSQL database connection, or something else. **This function will run on every worker process**. #### Worker State @@ -99,7 +99,7 @@ def block_handler(block, context: Annotated[Context, TaskiqDepends()]): ### Application Events -You can also add an application startup and shutdown handler that will be **executed by one worker upon every application startup**. This may be useful for things like processing historical events since the application was shutdown or other one-time actions to perform at startup. +You can also add an application startup and shutdown handler that will be **executed once upon every application startup**. This may be useful for things like processing historical events since the application was shutdown or other one-time actions to perform at startup. ```py @app.on_startup() @@ -139,6 +139,34 @@ If you configure your application to use a signer, and that signer signs anythin Always test your applications throughly before deploying. ``` +### Distributed Execution + +Using only the `silverback run ...` command in a defualt configuration executes everything in one process and the job queue is completely in-memory with a shared state. In some high volume environments, you may want to deploy your Silverback application in a distributed configuration. + +The primary components are the client and workers. The client handles Silverback events (blocks and contract event logs) and creates jobs for the workers to process in an asynchronous manner. + +For this to work, you must configure a [TaskIQ broker](https://taskiq-python.github.io/guide/architecture-overview.html#broker) capable of distributed processing. For instance, with [`taskiq_redis`](https://github.com/taskiq-python/taskiq-redis) you could do something like this for the client: + +```bash +export SILVERBACK_BROKER_CLASS="taskiq_redis:ListQueueBroker" +export SILVERBACK_BROKER_URI="redis://127.0.0.1:6379" + +silverback run "examples.redis.main:app" \ + --network :mainnet:alchemy \ + --runner "silverback.runner:WebsocketRunner" +``` + +And then the worker process with 2 worker subprocesses: + +```bash +export SILVERBACK_BROKER_CLASS="taskiq_redis:ListQueueBroker" +export SILVERBACK_BROKER_URI="redis://127.0.0.1:6379" + +silverback worker -w2 "examples.redis.main:app" +``` + +This will run one client and 2 workers and all queue data will be go through Redis. + ## Testing your Application TODO: Add backtesting mode w/ `silverback test` From e32297d2147946fa3a1d7d81e3721eb72510d978 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 27 Nov 2023 14:22:32 -0700 Subject: [PATCH 34/47] refactor: remove PERSISTENCE_URI, use first-class SQLITE_PATH --- silverback/persistence.py | 15 ++++++++++++++- silverback/settings.py | 1 - 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/silverback/persistence.py b/silverback/persistence.py index 4599fd5a..25ebf4fd 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -1,4 +1,5 @@ import json +import os import sqlite3 from abc import ABC, abstractmethod from datetime import datetime, timezone @@ -86,6 +87,18 @@ async def add_result(self, v: HandlerResult): class SQLitePersistentStorage(BasePersistentStorage): + """ + SQLite implementation of BasePersistentStorage used to store application state and handler + result data. + + Usage: + + To use SQLite persistent storage, you must configure the following env vars: + + - `PERSISTENCE_CLASS`: `silverback.persistence.SQLitePersistentStorage` + - `SQLITE_PATH` (optional): A system file path or if blank it will be stored in-memory. + """ + SQL_GET_STATE = """ SELECT last_block_seen, last_block_processed, updated FROM silverback_state @@ -130,7 +143,7 @@ class SQLitePersistentStorage(BasePersistentStorage): initialized: bool = False async def init(self): - self.con = sqlite3.connect(self.settings.PERSISTENCE_URI or ":memory:") + self.con = sqlite3.connect(os.environ.get("SQLITE_PATH", ":memory:")) cur = self.con.cursor() cur.executescript( diff --git a/silverback/settings.py b/silverback/settings.py index 4ede1610..a4a6dc7c 100644 --- a/silverback/settings.py +++ b/silverback/settings.py @@ -37,7 +37,6 @@ class Settings(BaseSettings, ManagerAccessMixin): # Used for persistent storage PERSISTENCE_CLASS: Optional[str] = None - PERSISTENCE_URI: Optional[str] = None class Config: env_prefix = "SILVERBACK_" From a7b071903b4134592a22ab8f186c4f5a0e1b4826 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 27 Nov 2023 14:33:11 -0700 Subject: [PATCH 35/47] fix: leftover PERSISTENCE_URI in settings type --- silverback/types.py | 1 - 1 file changed, 1 deletion(-) diff --git a/silverback/types.py b/silverback/types.py index 76e77d05..c2435ee4 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -14,7 +14,6 @@ class ISilverbackSettings(Protocol): INSTANCE: str PERSISTENCE_CLASS: Optional[str] - PERSISTENCE_URI: Optional[str] def get_network_choice(self) -> str: ... From 3892e136334b33ad3850282582c406cde2d3bcda Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 27 Nov 2023 14:48:20 -0700 Subject: [PATCH 36/47] refactor: minor cleanup of unused settings arg in BasePersistentStorage --- silverback/persistence.py | 5 +---- silverback/settings.py | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/silverback/persistence.py b/silverback/persistence.py index 25ebf4fd..bc91a683 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -9,7 +9,7 @@ from taskiq import TaskiqResult from typing_extensions import Self # Introduced 3.11 -from .types import IntOrNone, ISilverbackSettings, SilverbackIdent +from .types import IntOrNone, SilverbackIdent _HandlerReturnType = TypeVar("_HandlerReturnType") @@ -53,9 +53,6 @@ def from_taskiq( class BasePersistentStorage(ABC): - def __init__(self, settings: ISilverbackSettings): - self.settings = settings - @abstractmethod async def init(self): """Handle any async initialization from Silverback settings (e.g. migrations).""" diff --git a/silverback/settings.py b/silverback/settings.py index a4a6dc7c..98fcf55c 100644 --- a/silverback/settings.py +++ b/silverback/settings.py @@ -74,7 +74,7 @@ def get_persistent_storage(self) -> Optional[BasePersistentStorage]: return None persistence_class = import_from_string(self.PERSISTENCE_CLASS) - return persistence_class(self) + return persistence_class() def get_provider_context(self) -> ProviderContextManager: return self.network_manager.parse_network_choice(self.get_network_choice()) From db7cce6e547216e56a11aa28eaa7fd235fd66fb0 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 27 Nov 2023 15:18:41 -0700 Subject: [PATCH 37/47] docs: help string update Co-authored-by: El De-dog-lo <3859395+fubuloubu@users.noreply.github.com> --- silverback/_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/silverback/_cli.py b/silverback/_cli.py index a0a6c098..d5cf5356 100644 --- a/silverback/_cli.py +++ b/silverback/_cli.py @@ -64,7 +64,7 @@ async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90): await shutdown_broker(broker, shutdown_timeout) -@cli.command(help="Run Silverback application") +@cli.command(help="Run Silverback application client") @ape_cli_context() @verbosity_option() @network_option(default=None, callback=_network_callback) From 60c4fd1708d173417b2b44b1c552a6b2ee7ccacc Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 27 Nov 2023 16:03:22 -0700 Subject: [PATCH 38/47] refactor: remove on_client_* silverback decorators --- silverback/application.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/silverback/application.py b/silverback/application.py index f13f9bcc..527b85f5 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -96,30 +96,6 @@ def do_something_on_shutdown(): """ return self.broker.task(task_name="silverback_shutdown") - def on_client_startup(self) -> Callable: - """ - Code to execute on client startup / restart after an error. - - Usage example:: - - @app.on_startup() - def do_something_on_startup(state): - ... # Can provision resources, or add things to `state`. - """ - return self.broker.on_event(TaskiqEvents.CLIENT_STARTUP) - - def on_client_shutdown(self) -> Callable: - """ - Code to execute on client shutdown. - - Usage example:: - - @app.on_shutdown() - def do_something_on_shutdown(state): - ... # Update some external service, perhaps using information from `state`. - """ - return self.broker.on_event(TaskiqEvents.CLIENT_SHUTDOWN) - def on_worker_startup(self) -> Callable: """ Code to execute on every worker at startup / restart after an error. From f35048c507ee26f7c012b4341e3f416d85143581 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 27 Nov 2023 16:13:43 -0700 Subject: [PATCH 39/47] refactor: storage->store --- silverback/middlewares.py | 2 +- silverback/persistence.py | 10 +++++----- silverback/runner.py | 6 +++--- silverback/settings.py | 6 +++--- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/silverback/middlewares.py b/silverback/middlewares.py index 331d320e..31215653 100644 --- a/silverback/middlewares.py +++ b/silverback/middlewares.py @@ -41,7 +41,7 @@ def compute_block_time() -> int: self.block_time = self.chain_manager.provider.network.block_time or compute_block_time() self.ident = SilverbackIdent.from_settings(settings) - self.persistence = settings.get_persistent_storage() + self.persistence = settings.get_persistent_store() def pre_send(self, message: TaskiqMessage) -> TaskiqMessage: # TODO: Necessary until https://github.com/ApeWorX/ape/issues/1465 is resolved diff --git a/silverback/persistence.py b/silverback/persistence.py index bc91a683..5bc87a36 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -52,7 +52,7 @@ def from_taskiq( ) -class BasePersistentStorage(ABC): +class BasePersistentStore(ABC): @abstractmethod async def init(self): """Handle any async initialization from Silverback settings (e.g. migrations).""" @@ -83,16 +83,16 @@ async def add_result(self, v: HandlerResult): ... -class SQLitePersistentStorage(BasePersistentStorage): +class SQLitePersistentStore(BasePersistentStore): """ - SQLite implementation of BasePersistentStorage used to store application state and handler + SQLite implementation of BasePersistentStore used to store application state and handler result data. Usage: - To use SQLite persistent storage, you must configure the following env vars: + To use SQLite persistent store, you must configure the following env vars: - - `PERSISTENCE_CLASS`: `silverback.persistence.SQLitePersistentStorage` + - `PERSISTENCE_CLASS`: `silverback.persistence.SQLitePersistentStore` - `SQLITE_PATH` (optional): A system file path or if blank it will be stored in-memory. """ diff --git a/silverback/runner.py b/silverback/runner.py index cba5cd00..03cb8c98 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -11,7 +11,7 @@ from .application import SilverbackApp from .exceptions import Halt, NoWebsocketAvailableError -from .persistence import BasePersistentStorage +from .persistence import BasePersistentStore from .settings import Settings from .subscriptions import SubscriptionType, Web3SubscriptionsManager from .types import SilverbackIdent, SilverbackStartupState @@ -28,7 +28,7 @@ def __init__(self, app: SilverbackApp, *args, max_exceptions: int = 3, **kwargs) self.exceptions = 0 self.last_block_seen = 0 self.last_block_processed = 0 - self.persistence: Optional[BasePersistentStorage] = None + self.persistence: Optional[BasePersistentStore] = None self.ident = SilverbackIdent.from_settings(settings) def _handle_result(self, result: TaskiqResult): @@ -92,7 +92,7 @@ async def run(self): Raises: :class:`~silverback.exceptions.Halt`: If there are no configured tasks to execute. """ - self.persistence = settings.get_persistent_storage() + self.persistence = settings.get_persistent_store() if self.persistence: boot_state = await self.persistence.get_instance_state(self.ident) diff --git a/silverback/settings.py b/silverback/settings.py index 98fcf55c..73d19e06 100644 --- a/silverback/settings.py +++ b/silverback/settings.py @@ -7,7 +7,7 @@ from ._importer import import_from_string from .middlewares import SilverbackMiddleware -from .persistence import BasePersistentStorage +from .persistence import BasePersistentStore class Settings(BaseSettings, ManagerAccessMixin): @@ -35,7 +35,7 @@ class Settings(BaseSettings, ManagerAccessMixin): NEW_BLOCK_TIMEOUT: Optional[int] = None START_BLOCK: Optional[int] = None - # Used for persistent storage + # Used for persistent store PERSISTENCE_CLASS: Optional[str] = None class Config: @@ -69,7 +69,7 @@ def get_broker(self) -> AsyncBroker: def get_network_choice(self) -> str: return self.NETWORK_CHOICE or self.network_manager.network.choice - def get_persistent_storage(self) -> Optional[BasePersistentStorage]: + def get_persistent_store(self) -> Optional[BasePersistentStore]: if not self.PERSISTENCE_CLASS: return None From 40a2041f2f8de04c163f2f49e5d883689b95b817 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 27 Nov 2023 16:15:10 -0700 Subject: [PATCH 40/47] refactor: instance_state -> state --- silverback/persistence.py | 8 ++++---- silverback/runner.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/silverback/persistence.py b/silverback/persistence.py index 5bc87a36..800b0546 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -59,12 +59,12 @@ async def init(self): ... @abstractmethod - async def get_instance_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]: + async def get_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]: """Return the stored state for a Silverback instance""" ... @abstractmethod - async def set_instance_state( + async def set_state( self, ident: SilverbackIdent, last_block_seen: int, last_block_processed: int ) -> Optional[SilverbackState]: """Set the stored state for a Silverback instance""" @@ -182,7 +182,7 @@ async def init(self): self.initialized = True - async def get_instance_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]: + async def get_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]: if not self.initialized: await self.init() @@ -208,7 +208,7 @@ async def get_instance_state(self, ident: SilverbackIdent) -> Optional[Silverbac updated=datetime.fromtimestamp(row[2], timezone.utc), ) - async def set_instance_state( + async def set_state( self, ident: SilverbackIdent, last_block_seen: int, last_block_processed: int ) -> Optional[SilverbackState]: if not self.initialized: diff --git a/silverback/runner.py b/silverback/runner.py index 03cb8c98..da5a93f1 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -60,7 +60,7 @@ async def _checkpoint( if self.persistence: try: - await self.persistence.set_instance_state( + await self.persistence.set_state( self.ident, self.last_block_seen, self.last_block_processed ) except Exception as err: @@ -95,7 +95,7 @@ async def run(self): self.persistence = settings.get_persistent_store() if self.persistence: - boot_state = await self.persistence.get_instance_state(self.ident) + boot_state = await self.persistence.get_state(self.ident) if boot_state: self.last_block_seen = boot_state.last_block_seen self.last_block_processed = boot_state.last_block_processed From 0f630b06b1535caf5c194b05d6cb78bc61df1017 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 27 Nov 2023 16:23:10 -0700 Subject: [PATCH 41/47] fix: remove unnecessary Union types --- silverback/persistence.py | 6 +++--- silverback/types.py | 8 ++------ 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/silverback/persistence.py b/silverback/persistence.py index 800b0546..a673d34e 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -9,7 +9,7 @@ from taskiq import TaskiqResult from typing_extensions import Self # Introduced 3.11 -from .types import IntOrNone, SilverbackIdent +from .types import SilverbackIdent _HandlerReturnType = TypeVar("_HandlerReturnType") @@ -37,8 +37,8 @@ def from_taskiq( cls, ident: SilverbackIdent, handler_id: str, - block_number: IntOrNone, - log_index: IntOrNone, + block_number: Optional[int], + log_index: Optional[int], result: TaskiqResult, ) -> Self: return cls( diff --git a/silverback/types.py b/silverback/types.py index c2435ee4..954e4134 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -1,12 +1,8 @@ -from typing import Optional, Protocol, Union +from typing import Optional, Protocol from pydantic import BaseModel from typing_extensions import Self # Introduced 3.11 -# NOTE: 'type | None' introduced in 3.10 -StrOrNone = Union[str, None] -IntOrNone = Union[int, None] - class ISilverbackSettings(Protocol): """Loose approximation of silverback.settings.Settings. If you can, use the class as @@ -40,7 +36,7 @@ def handler_id_block(block_number: Optional[int]) -> str: return f"block/{block_number}" -def handler_id_event(contract_address: StrOrNone, event_signature: str) -> str: +def handler_id_event(contract_address: Optional[str], event_signature: str) -> str: """Return a unique handler ID string for an event""" # TODO: Under what circumstance can address be None? return f"event/{contract_address or 'unknown'}/{event_signature}" From ccc96d89a757901f3067aee8d327dbf1d9ffead3 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 27 Nov 2023 16:28:07 -0700 Subject: [PATCH 42/47] refactor(docs): move back to unified example script --- examples/memory/main.py => example.py | 5 -- examples/memory/__init__.py | 3 -- examples/redis/__init__.py | 3 -- examples/redis/main.py | 75 --------------------------- 4 files changed, 86 deletions(-) rename examples/memory/main.py => example.py (95%) delete mode 100644 examples/memory/__init__.py delete mode 100644 examples/redis/__init__.py delete mode 100644 examples/redis/main.py diff --git a/examples/memory/main.py b/example.py similarity index 95% rename from examples/memory/main.py rename to example.py index d8953737..60d13456 100644 --- a/examples/memory/main.py +++ b/example.py @@ -21,11 +21,6 @@ def app_startup(startup_state: SilverbackStartupState): return {"message": "Starting...", "block_number": startup_state.last_block_seen} -@app.on_client_startup() -def client_startup(state): - return {"message": "Client started."} - - # Can handle some initialization on startup, like models or network connections @app.on_worker_startup() def worker_startup(state: TaskiqState): diff --git a/examples/memory/__init__.py b/examples/memory/__init__.py deleted file mode 100644 index de1660bd..00000000 --- a/examples/memory/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .main import app - -__all__ = ["app"] diff --git a/examples/redis/__init__.py b/examples/redis/__init__.py deleted file mode 100644 index de1660bd..00000000 --- a/examples/redis/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .main import app - -__all__ = ["app"] diff --git a/examples/redis/main.py b/examples/redis/main.py deleted file mode 100644 index 52064e83..00000000 --- a/examples/redis/main.py +++ /dev/null @@ -1,75 +0,0 @@ -from typing import Annotated - -from ape import chain -from ape.api import BlockAPI -from ape.types import ContractLog -from ape_tokens import tokens # type: ignore[import] -from taskiq import Context, TaskiqDepends, TaskiqState - -from silverback import CircuitBreaker, SilverbackApp, SilverbackStartupState - -# Do this to initialize your app -app = SilverbackApp() - -# NOTE: Don't do any networking until after initializing app -USDC = tokens["USDC"] -YFI = tokens["YFI"] - - -# Can handle some stuff on startup, like loading a heavy model or something -@app.on_startup() -def app_startup(startup_state: SilverbackStartupState): - return {"message": "Starting...", "block_number": startup_state.last_block_seen} - - -@app.on_client_startup() -def client_startup(state): - return {"message": "Client started."} - - -@app.on_worker_startup() -def worker_startup(state: TaskiqState): - state.block_count = 0 - # state.db = MyDB() - return {"message": "Worker started."} - - -# This is how we trigger off of new blocks -@app.on_(chain.blocks) -# context must be a type annotated kwarg to be provided to the task -def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]): - context.state.block_count += 1 - return len(block.transactions) - - -# This is how we trigger off of events -# Set new_block_timeout to adjust the expected block time. -@app.on_(USDC.Transfer, start_block=18588777, new_block_timeout=25) -# NOTE: Typing isn't required -def exec_event1(log): - if log.log_index % 7 == 3: - # If you ever want the app to shutdown under some scenario, call this exception - raise CircuitBreaker("Oopsie!") - return {"amount": log.amount} - - -@app.on_(YFI.Approval) -# Any handler function can be async too -async def exec_event2(log: ContractLog): - return log.amount - - -# Just in case you need to release some resources or something -@app.on_worker_shutdown() -def worker_shutdown(state): - block_count = state.block_count if hasattr(state, "block_count") else 0 - return { - "message": f"Worker stopped after handling {block_count} blocks.", - "block_count": block_count, - } - - -# A final job to execute on Silverback shutdown -@app.on_shutdown() -def app_shutdown(state): - return {"message": "Stopping..."} From ee51bec6cfba7c6a78b631f09433ac2275d93cc8 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Mon, 27 Nov 2023 16:31:00 -0700 Subject: [PATCH 43/47] docs(refactor): fix example reference --- README.md | 6 +++--- docs/userguides/development.md | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 7b3e4050..d547cd7a 100644 --- a/README.md +++ b/README.md @@ -43,18 +43,18 @@ python3 setup.py install ## Quick Usage -Checkout [the example](./examples/memory/main.py) to see how to use the library. +Checkout [the example](./example.py) to see how to use the library. To run your bot against a live network, this SDK includes a simple runner you can use via: ```sh -$ silverback run "examples.memory:app" --network :mainnet:alchemy +$ silverback run "example:app" --network :mainnet:alchemy ``` ## Docker Usage ```sh -$ docker run --volume $PWD:/home/harambe/project --volume ~/.tokenlists:/home/harambe/.tokenlists apeworx/silverback:latest run "examples.memory:app" --network :mainnet:alchemy +$ docker run --volume $PWD:/home/harambe/project --volume ~/.tokenlists:/home/harambe/.tokenlists apeworx/silverback:latest run "example:app" --network :mainnet:alchemy ``` ## Development diff --git a/docs/userguides/development.md b/docs/userguides/development.md index 144bef2f..ac28e9fd 100644 --- a/docs/userguides/development.md +++ b/docs/userguides/development.md @@ -151,7 +151,7 @@ For this to work, you must configure a [TaskIQ broker](https://taskiq-python.git export SILVERBACK_BROKER_CLASS="taskiq_redis:ListQueueBroker" export SILVERBACK_BROKER_URI="redis://127.0.0.1:6379" -silverback run "examples.redis.main:app" \ +silverback run "example:app" \ --network :mainnet:alchemy \ --runner "silverback.runner:WebsocketRunner" ``` @@ -162,7 +162,7 @@ And then the worker process with 2 worker subprocesses: export SILVERBACK_BROKER_CLASS="taskiq_redis:ListQueueBroker" export SILVERBACK_BROKER_URI="redis://127.0.0.1:6379" -silverback worker -w2 "examples.redis.main:app" +silverback worker -w2 "example:app" ``` This will run one client and 2 workers and all queue data will be go through Redis. From 463f8f611bbdfa15b1a4b24a1db2d20bbf261a23 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Tue, 28 Nov 2023 13:11:10 -0700 Subject: [PATCH 44/47] docs: add 0.2.0 notes on event handler decorator behavior changes --- docs/userguides/development.md | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/docs/userguides/development.md b/docs/userguides/development.md index ac28e9fd..4c3f2817 100644 --- a/docs/userguides/development.md +++ b/docs/userguides/development.md @@ -81,6 +81,8 @@ This function comes a parameter `state` that you can use for storing the results It's import to note that this is useful for ensuring that your workers (of which there can be multiple) have the resources necessary to properly handle any updates you want to make in your handler functions, such as connecting to the Telegram API, an SQL or NoSQL database connection, or something else. **This function will run on every worker process**. +*New in 0.2.0*: These events moved from `on_startup()` and `on_shutdown()` for clarity. + #### Worker State The `state` variable is also useful as this can be made available to each handler method so other stateful quantities can be maintained for other uses. Each distributed worker has its own instance of state. @@ -103,16 +105,21 @@ You can also add an application startup and shutdown handler that will be **exec ```py @app.on_startup() -def handle_on_startup(state): +def handle_on_startup(startup_state): # Process missed events, etc + # process_history(start_block=startup_state.last_block_seen) + # ...or startup_state.last_block_processed ... + @app.on_shutdown() -def handle_on_startup(state): +def handle_on_shutdown(): # Record final state, etc ... ``` +*Changed in 0.2.0*: The behavior of the `@app.on_startup()` decorator and handler signature have changed. It is now executed only once upon application startup and worker events have moved on `@app.on_worker_startup()`. + ## Running your Application Once you have programmed your bot, it's really useful to be able to run it locally and validate that it does what you expect it to do. From f7e44c5057f2b08bb8662927862d8c98b49fc79f Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Tue, 28 Nov 2023 16:51:35 -0700 Subject: [PATCH 45/47] docs: update for multi-process configuration Co-authored-by: El De-dog-lo <3859395+fubuloubu@users.noreply.github.com> --- docs/userguides/development.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/userguides/development.md b/docs/userguides/development.md index 4c3f2817..1f4ab8d0 100644 --- a/docs/userguides/development.md +++ b/docs/userguides/development.md @@ -148,7 +148,7 @@ Always test your applications throughly before deploying. ### Distributed Execution -Using only the `silverback run ...` command in a defualt configuration executes everything in one process and the job queue is completely in-memory with a shared state. In some high volume environments, you may want to deploy your Silverback application in a distributed configuration. +Using only the `silverback run ...` command in a defualt configuration executes everything in one process and the job queue is completely in-memory with a shared state. In some high volume environments, you may want to deploy your Silverback application in a distributed configuration using multiple processes to handle the messages at a higher rate. The primary components are the client and workers. The client handles Silverback events (blocks and contract event logs) and creates jobs for the workers to process in an asynchronous manner. From 4f0b11b85f3ce95f6bce2aaab37174beddaf7a52 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Tue, 28 Nov 2023 16:51:57 -0700 Subject: [PATCH 46/47] docs: arg clarity Co-authored-by: El De-dog-lo <3859395+fubuloubu@users.noreply.github.com> --- docs/userguides/development.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/userguides/development.md b/docs/userguides/development.md index 1f4ab8d0..2ae7a8f7 100644 --- a/docs/userguides/development.md +++ b/docs/userguides/development.md @@ -169,7 +169,7 @@ And then the worker process with 2 worker subprocesses: export SILVERBACK_BROKER_CLASS="taskiq_redis:ListQueueBroker" export SILVERBACK_BROKER_URI="redis://127.0.0.1:6379" -silverback worker -w2 "example:app" +silverback worker -w 2 "example:app" ``` This will run one client and 2 workers and all queue data will be go through Redis. From 5bf000cd09152650cb6e3cbdc5f02c020681d2a2 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Tue, 28 Nov 2023 17:00:29 -0700 Subject: [PATCH 47/47] refactor: SilverbackIdent -> SilverbackID --- silverback/middlewares.py | 4 ++-- silverback/persistence.py | 16 ++++++++-------- silverback/runner.py | 4 ++-- silverback/types.py | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/silverback/middlewares.py b/silverback/middlewares.py index 31215653..b25efdb3 100644 --- a/silverback/middlewares.py +++ b/silverback/middlewares.py @@ -6,7 +6,7 @@ from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult from silverback.persistence import HandlerResult -from silverback.types import SilverbackIdent, handler_id_block, handler_id_event +from silverback.types import SilverbackID, handler_id_block, handler_id_event from silverback.utils import hexbytes_dict @@ -40,7 +40,7 @@ def compute_block_time() -> int: settings = kwargs.pop("silverback_settings") self.block_time = self.chain_manager.provider.network.block_time or compute_block_time() - self.ident = SilverbackIdent.from_settings(settings) + self.ident = SilverbackID.from_settings(settings) self.persistence = settings.get_persistent_store() def pre_send(self, message: TaskiqMessage) -> TaskiqMessage: diff --git a/silverback/persistence.py b/silverback/persistence.py index a673d34e..2c2b4e80 100644 --- a/silverback/persistence.py +++ b/silverback/persistence.py @@ -9,7 +9,7 @@ from taskiq import TaskiqResult from typing_extensions import Self # Introduced 3.11 -from .types import SilverbackIdent +from .types import SilverbackID _HandlerReturnType = TypeVar("_HandlerReturnType") @@ -35,7 +35,7 @@ class HandlerResult(TaskiqResult): @classmethod def from_taskiq( cls, - ident: SilverbackIdent, + ident: SilverbackID, handler_id: str, block_number: Optional[int], log_index: Optional[int], @@ -59,20 +59,20 @@ async def init(self): ... @abstractmethod - async def get_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]: + async def get_state(self, ident: SilverbackID) -> Optional[SilverbackState]: """Return the stored state for a Silverback instance""" ... @abstractmethod async def set_state( - self, ident: SilverbackIdent, last_block_seen: int, last_block_processed: int + self, ident: SilverbackID, last_block_seen: int, last_block_processed: int ) -> Optional[SilverbackState]: """Set the stored state for a Silverback instance""" ... @abstractmethod async def get_latest_result( - self, ident: SilverbackIdent, handler: Optional[str] = None + self, ident: SilverbackID, handler: Optional[str] = None ) -> Optional[HandlerResult]: """Return the latest result for a Silverback instance's handler""" ... @@ -182,7 +182,7 @@ async def init(self): self.initialized = True - async def get_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]: + async def get_state(self, ident: SilverbackID) -> Optional[SilverbackState]: if not self.initialized: await self.init() @@ -209,7 +209,7 @@ async def get_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]: ) async def set_state( - self, ident: SilverbackIdent, last_block_seen: int, last_block_processed: int + self, ident: SilverbackID, last_block_seen: int, last_block_processed: int ) -> Optional[SilverbackState]: if not self.initialized: await self.init() @@ -261,7 +261,7 @@ async def set_state( ) async def get_latest_result( - self, ident: SilverbackIdent, handler: Optional[str] = None + self, ident: SilverbackID, handler: Optional[str] = None ) -> Optional[HandlerResult]: if not self.initialized: await self.init() diff --git a/silverback/runner.py b/silverback/runner.py index da5a93f1..49580586 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -14,7 +14,7 @@ from .persistence import BasePersistentStore from .settings import Settings from .subscriptions import SubscriptionType, Web3SubscriptionsManager -from .types import SilverbackIdent, SilverbackStartupState +from .types import SilverbackID, SilverbackStartupState from .utils import async_wrap_iter, hexbytes_dict settings = Settings() @@ -29,7 +29,7 @@ def __init__(self, app: SilverbackApp, *args, max_exceptions: int = 3, **kwargs) self.last_block_seen = 0 self.last_block_processed = 0 self.persistence: Optional[BasePersistentStore] = None - self.ident = SilverbackIdent.from_settings(settings) + self.ident = SilverbackID.from_settings(settings) def _handle_result(self, result: TaskiqResult): if result.is_err: diff --git a/silverback/types.py b/silverback/types.py index 954e4134..571a5c87 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -15,7 +15,7 @@ def get_network_choice(self) -> str: ... -class SilverbackIdent(BaseModel): +class SilverbackID(BaseModel): identifier: str network_choice: str