diff --git a/setup.py b/setup.py index 9c7911ff..81c58b70 100644 --- a/setup.py +++ b/setup.py @@ -40,6 +40,9 @@ "IPython", # Console for interacting "ipdb", # Debugger (Must use `export PYTHONBREAKPOINT=ipdb.set_trace`) ], + "mongo": [ + "beanie~=1.23.6", + ], } # NOTE: `pip install -e .[dev]` to install package @@ -49,6 +52,7 @@ + extras_require["doc"] + extras_require["release"] + extras_require["dev"] + + extras_require["mongo"] ) with open("./README.md") as readme: diff --git a/silverback/application.py b/silverback/application.py index 4a514802..df755407 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -73,9 +73,13 @@ def __init__(self, settings: Optional[Settings] = None): f"{signer_str}{start_block_str}{new_block_timeout_str}" ) + def checkpoint(self, last_block_seen: int, last_block_processed: int): + self.broker.state.last_block_seen = last_block_seen + self.broker.state.last_block_processed = last_block_processed + def on_startup(self) -> Callable: """ - Code to execute on worker startup / restart after an error. + Code to execute on client startup / restart after an error. Usage example:: @@ -83,11 +87,11 @@ def on_startup(self) -> Callable: def do_something_on_startup(state): ... # Can provision resources, or add things to `state`. """ - return self.broker.on_event(TaskiqEvents.WORKER_STARTUP) + return self.broker.on_event(TaskiqEvents.CLIENT_STARTUP) def on_shutdown(self) -> Callable: """ - Code to execute on normal worker shutdown. + Code to execute on client shutdown. Usage example:: @@ -95,7 +99,7 @@ def on_shutdown(self) -> Callable: def do_something_on_shutdown(state): ... # Update some external service, perhaps using information from `state`. """ - return self.broker.on_event(TaskiqEvents.WORKER_SHUTDOWN) + return self.broker.on_event(TaskiqEvents.CLIENT_SHUTDOWN) def get_block_handler(self) -> Optional[AsyncTaskiqDecoratedTask]: """ diff --git a/silverback/middlewares.py b/silverback/middlewares.py index 00582050..37af0c2f 100644 --- a/silverback/middlewares.py +++ b/silverback/middlewares.py @@ -3,6 +3,8 @@ from ape.utils import ManagerAccessMixin from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult +from silverback.utils import hexbytes_dict + class SilverbackMiddleware(TaskiqMiddleware, ManagerAccessMixin): def __init__(self, *args, **kwargs): @@ -47,20 +49,10 @@ def _create_label(self, message: TaskiqMessage) -> str: return f"{message.task_name}{args}" def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage: - def fix_dict(data: dict) -> dict: - fixed_data = {} - for name, value in data.items(): - if isinstance(value, str) and value.startswith("0x"): - fixed_data[name] = HexBytes(value) - else: - fixed_data[name] = value - - return fixed_data - if message.task_name == "block": # NOTE: Necessary because we don't know the exact block class message.args[0] = self.provider.network.ecosystem.decode_block( - fix_dict(message.args[0]) + hexbytes_dict(message.args[0]) ) elif "event" in message.task_name: diff --git a/silverback/persistence.py b/silverback/persistence.py new file mode 100644 index 00000000..9a3e20f1 --- /dev/null +++ b/silverback/persistence.py @@ -0,0 +1,265 @@ +import pickle +from abc import ABC, abstractmethod +from datetime import datetime, timezone +from typing import Annotated, Any, Dict, Optional +from typing_extensions import Self # Introduced 3.11 + +from ape.logging import logger +from pydantic import BaseModel +from taskiq import TaskiqResult + +from .types import SilverbackIdent + + +class SilverbackState(BaseModel): + ident: SilverbackIdent + # Last block number seen by runner + last_block_seen: int + # Last block number processed by a worker + last_block_processed: int + + +class HandlerResult(BaseModel): + instance: str + network: str + handler_id: str + block_number: int + log_index: Optional[int] + execution_time: float + # TODO: upcoming feature in taskiq + # labels: Dict[str] + return_value_blob: Optional[bytes] # pickled data + created: datetime + + @classmethod + def from_taskiq( + cls, + ident: SilverbackIdent, + handler_id: str, + block_number: int, + log_index: int | None, + result: TaskiqResult, + ) -> Self: + return cls( + instance=ident.identifier, + network=ident.network_choice, + handler_id=handler_id, + block_number=block_number, + log_index=log_index, + execution_time=result.execution_time, + # labels=result.labels, + return_value_blob=pickle.dumps(result.return_value), + created=datetime.now(timezone.utc), + ) + + @property + def return_value(self): + if self.return_value_blob is None: + return None + return pickle.loads(self.return_value_blob) + + @return_value.setter + def set_return_value(self, v: Any): + self.return_value_blob = pickle.dumps(v) + + +class BasePersistentStorage(ABC): + @abstractmethod + async def get_instance_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]: + ... + + @abstractmethod + async def set_instance_state( + self, ident: SilverbackIdent, last_block_seen: int, last_block_processed: int + ) -> Optional[SilverbackState]: + ... + + @abstractmethod + async def get_latest_result( + self, instance: SilverbackIdent, handler: Optional[str] = None + ) -> HandlerResult: + ... + + @abstractmethod + async def add_result(self, v: HandlerResult): + ... + + +async def init_mongo(mongo_uri: str) -> Optional[BasePersistentStorage]: + try: + import pymongo + from beanie import Document, Indexed, init_beanie + from beanie.odm.operators.update.general import Set + from motor.motor_asyncio import AsyncIOMotorClient + except ImportError as err: + print(err) + logger.warning("MongoDB was initialized by dependencies are not installed") + return None + + class SilverbackStateDoc(Document): + instance: Annotated[str, Indexed(str)] + network: Annotated[str, Indexed(str)] + last_block_seen: int + last_block_processed: int + updated: datetime + + class Settings: + name = "state" + indexes = [ + [ + ("instance", pymongo.TEXT), + ("network", pymongo.TEXT), + ], + ] + + def to_silberback_state(self) -> SilverbackState: + return SilverbackState( + ident=SilverbackIdent( + identifier=self.instance, + network_choice=self.network, + ), + last_block_seen=self.last_block_seen, + last_block_processed=self.last_block_processed, + ) + + class HandlerResultDoc(HandlerResult, Document): + # NOTE: Redefining these to annotate with indexed type + instance: Annotated[str, Indexed(str)] + network: Annotated[str, Indexed(str)] + handler_id: Annotated[str, Indexed(str)] + + class Settings: + name = "result" + indexes = [ + [ + ("instance", pymongo.TEXT), + ("network", pymongo.TEXT), + ("handler", pymongo.TEXT), + ], + ] + + @classmethod + def from_handler_result(cls, result: HandlerResult) -> Self: + return cls( + instance=result.instance, + network=result.network, + handler_id=result.handler_id, + block_number=result.block_number, + log_index=result.log_index, + execution_time=result.execution_time, + return_value_blob=result.return_value_blob, + created=result.created, + ) + + def to_handler_result(self) -> HandlerResult: + return HandlerResult( + instance=self.instance, + network=self.network, + handler_id=self.handler, + block_number=self.block_number, + log_index=self.log_index, + execution_time=self.execution_time, + return_value_blob=self.return_value_blob, + created=self.created, + ) + + class MongoStorage(BasePersistentStorage): + client: AsyncIOMotorClient + + async def get_instance_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]: + res = await SilverbackStateDoc.find_one( + SilverbackStateDoc.instance == ident.identifier, + SilverbackStateDoc.network == ident.network_choice, + ) + + if res is None: + return None + + return res.to_silberback_state() + + async def set_instance_state( + self, ident: SilverbackIdent, last_block_seen: int, last_block_processed: int + ) -> Optional[SilverbackState]: + now_utc = datetime.now(timezone.utc) + + state = await SilverbackStateDoc.find_one( + SilverbackStateDoc.instance == ident.identifier, + SilverbackStateDoc.network == ident.network_choice, + ) + + if state is not None: + await state.set( + # Unreported type error? Confiremd working + { + SilverbackStateDoc.last_block_seen: last_block_seen, + SilverbackStateDoc.last_block_processed: last_block_processed, + SilverbackStateDoc.updated: now_utc, + } # type: ignore + ) + else: + state = SilverbackStateDoc( + instance=ident.identifier, + network=ident.network_choice, + last_block_seen=last_block_seen, + last_block_processed=last_block_processed, + updated=now_utc, + ) + await state.create() + + # TODO: Why no work? + # await SilverbackStateDoc.find_one( + # SilverbackStateDoc.instance == ident.identifier, + # SilverbackStateDoc.network == ident.network_choice, + # ).upsert( + # Set( + # { + # SilverbackStateDoc.last_block_seen: last_block_seen, + # SilverbackStateDoc.last_block_processed: last_block_processed, + # SilverbackStateDoc.updated: now_utc, + # } + # ), + # on_insert=SilverbackStateDoc( + # instance=ident.identifier, + # network=ident.network_choice, + # last_block_seen=last_block_seen, + # last_block_processed=last_block_processed, + # updated=now_utc, + # ), + # ) + + async def get_latest_result( + self, ident: SilverbackIdent, handler_id: Optional[str] = None + ) -> Optional[HandlerResult]: + query = HandlerResultDoc.find( + HandlerResultDoc.instance == ident.identifier, + HandlerResultDoc.network == ident.network_choice, + ) + + if handler_id: + query.find(HandlerResultDoc.handler_id == handler_id) + + res = await query.sort("-created").first_or_none() + + if res is None: + return res + + return res.to_handler_result() + + async def add_result(self, result: HandlerResult): + doc = HandlerResultDoc.from_handler_result(result) + # Type annotation error: https://github.com/roman-right/beanie/issues/679 + await doc.insert() # type: ignore + + storage = MongoStorage() + client = AsyncIOMotorClient(mongo_uri) + + await init_beanie( + database=client.db_name, + # Type annotation error: https://github.com/roman-right/beanie/issues/670 + document_models=[ + HandlerResultDoc, + SilverbackStateDoc, + ], # type: ignore + ) + + return storage diff --git a/silverback/runner.py b/silverback/runner.py index 68b70f51..5349ebeb 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -7,11 +7,17 @@ from ape.utils import ManagerAccessMixin from ape_ethereum.ecosystem import keccak from taskiq import AsyncTaskiqDecoratedTask, TaskiqResult +from typing import Optional, Tuple from .application import SilverbackApp from .exceptions import Halt, NoWebsocketAvailableError +from .persistence import BasePersistentStorage, HandlerResult, init_mongo from .subscriptions import SubscriptionType, Web3SubscriptionsManager -from .utils import async_wrap_iter +from .settings import Settings +from .types import SilverbackIdent, handler_id_block, handler_id_event +from .utils import async_wrap_iter, hexbytes_dict + +settings = Settings() class BaseRunner(ABC): @@ -20,8 +26,14 @@ def __init__(self, app: SilverbackApp, *args, max_exceptions: int = 3, **kwargs) self.max_exceptions = max_exceptions self.exceptions = 0 + self.last_block_seen = 0 + self.last_block_processed = 0 + self.persistence: Optional[BasePersistentStorage] = None + self.ident = SilverbackIdent.from_settings(settings) - def _handle_result(self, result: TaskiqResult): + async def _handle_result( + self, handler_id: str, block_number: int, log_index: int | None, result: TaskiqResult + ): if result.is_err: self.exceptions += 1 @@ -31,6 +43,36 @@ def _handle_result(self, result: TaskiqResult): if self.exceptions > self.max_exceptions: raise Halt() + if self.persistence: + handler_result = HandlerResult.from_taskiq( + self.ident, handler_id, block_number, log_index, result + ) + + await self.persistence.add_result(handler_result) + + async def _checkpoint( + self, last_block_seen: int = 0, last_block_processed: int = 0 + ) -> Tuple[int, int]: + """Set latest checkpoint block number""" + if ( + last_block_seen > self.last_block_seen + or last_block_processed > self.last_block_processed + ): + logger.debug( + f"Checkpoint block [seen={self.last_block_seen}, procssed={self.last_block_processed}]" + ) + self.last_block_seen = max(last_block_seen, self.last_block_seen) + self.last_block_processed = max(last_block_processed, self.last_block_processed) + + self.app.checkpoint(self.last_block_seen, self.last_block_processed) + + if self.persistence: + await self.persistence.set_instance_state( + self.ident, self.last_block_seen, self.last_block_processed + ) + + return self.last_block_seen, self.last_block_processed + @abstractmethod async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): """ @@ -55,6 +97,23 @@ async def run(self): Raises: :class:`~silverback.exceptions.Halt`: If there are no configured tasks to execute. """ + if settings.MONGODB_URI: + self.persistence = await init_mongo(settings.MONGODB_URI) + + if self.persistence: + boot_state = await self.persistence.get_instance_state(self.ident) + if boot_state: + self.last_block_seen = boot_state.last_block_seen + self.last_block_processed = boot_state.last_block_processed + + # NOTE: This will update TaskIQ state before app startup. + # TODO: I have no idea if this will work with a distributed runner/worker + # setup. That may require our own injected startup task, but it's unclear if + # we can do that before Taskiq's startup handlers. We might be able to force + # SilverbackApplication to decorate an internal func to handle it. Need to + # figure that out. + self.app.checkpoint(self.last_block_seen, self.last_block_processed) + await self.app.broker.startup() if block_handler := self.app.get_block_handler(): @@ -95,9 +154,18 @@ async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): logger.debug(f"Handling blocks via {sub_id}") async for raw_block in self.subscriptions.get_subscription_data(sub_id): + block = self.provider.network.ecosystem.decode_block(hexbytes_dict(raw_block)) + + if block.number is not None: + await self._checkpoint(last_block_seen=block.number) + block_task = await block_handler.kiq(raw_block) result = await block_task.wait_result() - self._handle_result(result) + + await self._handle_result(handler_id_block(block.number), block.number, None, result) + + if block.number is not None: + await self._checkpoint(last_block_processed=block.number) async def _event_task( self, contract_event: ContractEvent, event_handler: AsyncTaskiqDecoratedTask @@ -120,9 +188,21 @@ async def _event_task( contract_event.abi, ) ) + + if event.block_number is not None: + await self._checkpoint(last_block_seen=event.block_number) + event_task = await event_handler.kiq(event) result = await event_task.wait_result() - self._handle_result(result) + await self._handle_result( + handler_id_event(contract_event.contract.address, contract_event.abi.selector), + event.block_number, + event.log_index, + result, + ) + + if event.block_number is not None: + await self._checkpoint(last_block_processed=event.block_number) async def run(self): async with Web3SubscriptionsManager(self.ws_uri) as subscriptions: @@ -157,15 +237,22 @@ async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): async for block in async_wrap_iter( chain.blocks.poll_blocks(start_block=start_block, new_block_timeout=new_block_timeout) ): + if block.number is not None: + await self._checkpoint(last_block_seen=block.number) + block_task = await block_handler.kiq(block) result = await block_task.wait_result() - self._handle_result(result) + await self._handle_result(handler_id_block(block.number), block.number, None, result) + + if block.number is not None: + await self._checkpoint(last_block_processed=block.number) async def _event_task( self, contract_event: ContractEvent, event_handler: AsyncTaskiqDecoratedTask ): new_block_timeout = None start_block = None + address = None if isinstance(contract_event.contract, ContractInstance): address = contract_event.contract.address if address in self.app.poll_settings: @@ -180,6 +267,18 @@ async def _event_task( async for event in async_wrap_iter( contract_event.poll_logs(start_block=start_block, new_block_timeout=new_block_timeout) ): + if event.block_number is not None: + await self._checkpoint(last_block_seen=event.block_number) + event_task = await event_handler.kiq(event) result = await event_task.wait_result() - self._handle_result(result) + await self._handle_result( + # TODO: Under what circumstance can address be None? + handler_id_event(address, contract_event.abi.selector), + event.block_number, + event.log_index, + result, + ) + + if event.block_number is not None: + await self._checkpoint(last_block_processed=event.block_number) diff --git a/silverback/settings.py b/silverback/settings.py index bd486863..87d8ac33 100644 --- a/silverback/settings.py +++ b/silverback/settings.py @@ -17,6 +17,9 @@ class Settings(BaseSettings, ManagerAccessMixin): testing or deployment purposes. Defaults to a working in-memory broker. """ + # A unique identifier for this silverback instance + INSTANCE: str = "default" + BROKER_CLASS: str = "taskiq:InMemoryBroker" BROKER_URI: str = "" @@ -31,6 +34,8 @@ class Settings(BaseSettings, ManagerAccessMixin): NEW_BLOCK_TIMEOUT: Optional[int] = None START_BLOCK: Optional[int] = None + MONGODB_URI: Optional[str] = None + class Config: env_prefix = "SILVERBACK_" case_sensitive = True @@ -59,10 +64,12 @@ def get_broker(self) -> AsyncBroker: return broker + def get_network_choice(self) -> str: + # return self.NETWORK_CHOICE or self.network_manager.default_ecosystem.name + return self.NETWORK_CHOICE or self.network_manager.network.choice + def get_provider_context(self) -> ProviderContextManager: - return self.network_manager.parse_network_choice( - self.NETWORK_CHOICE or self.network_manager.default_ecosystem.name - ) + return self.network_manager.parse_network_choice(self.get_network_choice()) def get_signer(self) -> Optional[AccountAPI]: if self.SIGNER_ALIAS: diff --git a/silverback/types.py b/silverback/types.py new file mode 100644 index 00000000..a25e2ae7 --- /dev/null +++ b/silverback/types.py @@ -0,0 +1,29 @@ +from enum import Enum +from typing import Annotated, Any, Dict, Optional +from typing_extensions import Self # Introduced 3.11 + +from pydantic import BaseModel + +from .settings import Settings + + +class SilverbackIdent(BaseModel): + identifier: str + network_choice: str + + @classmethod + def from_settings(cls, settings_: Settings) -> Self: + return cls(identifier=settings_.INSTANCE, network_choice=settings_.get_network_choice()) + + +def handler_id_block(block_number: Optional[int]) -> str: + """Return a unique handler ID string for a block""" + if block_number is None: + return "block/pending" + return f"block/{block_number}" + + +def handler_id_event(contract_address: str | None, event_signature: str) -> str: + """Return a unique handler ID string for an event""" + # TODO: Under what circumstance can address be None? + return f"event/{contract_address or 'unknown'}/{event_signature}" diff --git a/silverback/utils.py b/silverback/utils.py index 57ac9e94..c47ad38a 100644 --- a/silverback/utils.py +++ b/silverback/utils.py @@ -2,6 +2,10 @@ import threading from typing import AsyncIterator, Iterator +from ape.contracts import ContractEvent, ContractInstance +from ape.managers.chain import BlockContainer +from ape.types import HexBytes + def async_wrap_iter(it: Iterator) -> AsyncIterator: """Wrap blocking iterator into an asynchronous one""" @@ -34,3 +38,15 @@ def iter_to_queue(): threading.Thread(target=iter_to_queue).start() return yield_queue_items() + + +def hexbytes_dict(data: dict) -> dict: + fixed_data = {} + + for name, value in data.items(): + if isinstance(value, str) and value.startswith("0x"): + fixed_data[name] = HexBytes(value) + else: + fixed_data[name] = value + + return fixed_data