Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: allow multiple handlers #66

Merged
merged 19 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a370d3a
refactor: add TaskType enum to types
fubuloubu Apr 8, 2024
7fd52d9
refactor!: use task collection by task type; use dynamic registration
fubuloubu Apr 8, 2024
144a087
refactor: use new task collections api for fetch task handlers
fubuloubu Apr 8, 2024
8ddb0d3
refactor!: use task type to differentiate tasks in middleware, not name
fubuloubu Apr 8, 2024
64c281e
fix: typing bugs, not using Union for 3.8 support
fubuloubu Apr 8, 2024
42247be
fix: use task type to capture event logs
fubuloubu Apr 9, 2024
e0c3dc7
refactor: use defaultdict instead of custom collection type
fubuloubu Apr 9, 2024
8798669
refactor: use standardized labels, use task_name for task_id
fubuloubu Apr 9, 2024
2a7d62a
refactor: remove `.task_name` from message labels
fubuloubu Apr 9, 2024
6aafc83
refactor: convert to TaskType for better processing
fubuloubu Apr 9, 2024
1bf0702
Merge branch 'main' into refactor/allow-multiple-handlers
fubuloubu Apr 10, 2024
5ad9b7a
refactor: use StrEnum if available
fubuloubu Apr 10, 2024
c177a94
docs: add note to deprecate in breaking change
fubuloubu Apr 10, 2024
eadaea5
refactor: make object type clearer when working with labels in middle…
fubuloubu Apr 10, 2024
9519b30
refactor: use official backport
fubuloubu Apr 10, 2024
184a939
docs: update typing and add docs for dynamic broker task decorator fn
fubuloubu Apr 10, 2024
77a11aa
style: ignore mypy typing issues on <3.11
fubuloubu Apr 11, 2024
9ca7692
refactor: avoid div/0 fault, fix duplicate log entry for results w/errs
fubuloubu Apr 11, 2024
897cf95
refactor: rollback weird typing backport issue
fubuloubu Apr 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 48 additions & 63 deletions silverback/application.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
import atexit
from collections import defaultdict
from dataclasses import dataclass
from datetime import timedelta
from typing import Callable, Dict, Optional, Union

from ape.api.networks import LOCAL_NETWORK_NAME
from ape.contracts import ContractEvent, ContractInstance
from ape.logging import logger
from ape.managers.chain import BlockContainer
from ape.types import AddressType
from ape.utils import ManagerAccessMixin
from taskiq import AsyncTaskiqDecoratedTask, TaskiqEvents

from .exceptions import DuplicateHandlerError, InvalidContainerTypeError
from .exceptions import ContainerTypeMismatchError, InvalidContainerTypeError
from .settings import Settings
from .types import TaskType


@dataclass
class TaskData:
container: Union[BlockContainer, ContractEvent, None]
handler: AsyncTaskiqDecoratedTask


class SilverbackApp(ManagerAccessMixin):
Expand Down Expand Up @@ -52,7 +60,8 @@ def __init__(self, settings: Optional[Settings] = None):
logger.info(f"Loading Silverback App with settings:\n {settings_str}")

self.broker = settings.get_broker()
self.contract_events: Dict[AddressType, Dict[str, ContractEvent]] = {}
# NOTE: If no tasks registered yet, defaults to empty list instead of raising KeyError
self.tasks: defaultdict[TaskType, list[TaskData]] = defaultdict(list)
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
self.poll_settings: Dict[str, Dict] = {}

atexit.register(self.network.__exit__, None, None, None)
Expand All @@ -72,6 +81,38 @@ def __init__(self, settings: Optional[Settings] = None):
f"{signer_str}{start_block_str}{new_block_timeout_str}"
)

def broker_task_decorator(
self,
task_type: TaskType,
container: Union[BlockContainer, ContractEvent, None] = None,
):
if (
(task_type is TaskType.NEW_BLOCKS and not isinstance(container, BlockContainer))
or (task_type is TaskType.EVENT_LOG and not isinstance(container, ContractEvent))
or (
task_type
not in (
TaskType.NEW_BLOCKS,
TaskType.EVENT_LOG,
)
and container is not None
)
):
raise ContainerTypeMismatchError(task_type, container)

# Register user function as task handler with our broker
def add_taskiq_task(handler: Callable):
broker_task = self.broker.register_task(
handler,
task_name=handler.__name__,
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
task_type=str(task_type),
)

self.tasks[task_type].append(TaskData(container=container, handler=broker_task))
return broker_task

return add_taskiq_task

def on_startup(self) -> Callable:
"""
Code to execute on one worker upon startup / restart after an error.
Expand All @@ -82,7 +123,7 @@ def on_startup(self) -> Callable:
def do_something_on_startup(startup_state):
... # Reprocess missed events or blocks
"""
return self.broker.task(task_name="silverback_startup")
return self.broker_task_decorator(TaskType.STARTUP)

def on_shutdown(self) -> Callable:
"""
Expand All @@ -94,7 +135,7 @@ def on_shutdown(self) -> Callable:
def do_something_on_shutdown():
... # Record final state of app
"""
return self.broker.task(task_name="silverback_shutdown")
return self.broker_task_decorator(TaskType.SHUTDOWN)

def on_worker_startup(self) -> Callable:
"""
Expand All @@ -120,48 +161,6 @@ def do_something_on_shutdown(state):
"""
return self.broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)

def get_startup_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `silverback_startup` events.

Returns:
Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created.
"""
return self.broker.find_task("silverback_startup")

def get_shutdown_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `silverback_shutdown` events.

Returns:
Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created.
"""
return self.broker.find_task("silverback_shutdown")

def get_block_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `block` events.

Returns:
Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created.
"""
return self.broker.find_task("block")

def get_event_handler(
self, event_target: AddressType, event_name: str
) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `<event_target>:<event_name>` events.

Args:
event_target (AddressType): The contract address of the target.
event_name: (str): The name of the event emitted by ``event_target``.

Returns:
Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created.
"""
return self.broker.find_task(f"{event_target}/event/{event_name}")

def on_(
self,
container: Union[BlockContainer, ContractEvent],
Expand All @@ -183,9 +182,6 @@ def on_(
If the type of `container` is not configurable for the app.
"""
if isinstance(container, BlockContainer):
if self.get_block_handler():
raise DuplicateHandlerError("block")

if new_block_timeout is not None:
if "_blocks_" in self.poll_settings:
self.poll_settings["_blocks_"]["new_block_timeout"] = new_block_timeout
Expand All @@ -198,21 +194,12 @@ def on_(
else:
self.poll_settings["_blocks_"] = {"start_block": start_block}

return self.broker.task(task_name="block")
return self.broker_task_decorator(TaskType.NEW_BLOCKS, container=container)

elif isinstance(container, ContractEvent) and isinstance(
container.contract, ContractInstance
):
if self.get_event_handler(container.contract.address, container.abi.name):
raise DuplicateHandlerError(
f"event {container.contract.address}:{container.abi.name}"
)

key = container.contract.address
if container.contract.address in self.contract_events:
self.contract_events[key][container.abi.name] = container
else:
self.contract_events[key] = {container.abi.name: container}

if new_block_timeout is not None:
if key in self.poll_settings:
Expand All @@ -226,9 +213,7 @@ def on_(
else:
self.poll_settings[key] = {"start_block": start_block}

return self.broker.task(
task_name=f"{container.contract.address}/event/{container.abi.name}"
)
return self.broker_task_decorator(TaskType.EVENT_LOG, container=container)

# TODO: Support account transaction polling
# TODO: Support mempool polling
Expand Down
12 changes: 7 additions & 5 deletions silverback/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@
from ape.exceptions import ApeException
from ape.logging import logger

from .types import TaskType


class ImportFromStringError(Exception):
pass


class DuplicateHandlerError(Exception):
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, handler_type: str):
super().__init__(f"Only one handler allowed for: {handler_type}")


class InvalidContainerTypeError(Exception):
def __init__(self, container: Any):
super().__init__(f"Invalid container type: {container.__class__}")


class ContainerTypeMismatchError(Exception):
def __init__(self, task_type: TaskType, container: Any):
super().__init__(f"Invalid container type for '{task_type}': {container.__class__}")


class NoWebsocketAvailableError(Exception):
def __init__(self):
super().__init__(
Expand Down
69 changes: 34 additions & 35 deletions silverback/middlewares.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Optional, Tuple
from typing import Any

from ape.logging import logger
from ape.types import ContractLog
Expand All @@ -7,27 +7,10 @@
from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult

from silverback.persistence import HandlerResult
from silverback.types import SilverbackID, handler_id_block, handler_id_event
from silverback.types import SilverbackID, TaskType
from silverback.utils import hexbytes_dict


def resolve_task(message: TaskiqMessage) -> Tuple[str, Optional[int], Optional[int]]:
block_number = None
log_index = None
task_id = message.task_name

if task_id == "block":
block_number = message.args[0].number
task_id = handler_id_block(block_number)
elif "event" in task_id:
block_number = message.args[0].block_number
log_index = message.args[0].log_index
# TODO: Should standardize on event signature here instead of name in case of overloading
task_id = handler_id_event(message.args[0].contract_address, message.args[0].event_name)

return task_id, block_number, log_index


class SilverbackMiddleware(TaskiqMiddleware, ManagerAccessMixin):
def __init__(self, *args, **kwargs):
def compute_block_time() -> int:
Expand Down Expand Up @@ -66,34 +49,43 @@ def fix_dict(data: dict, recurse_count: int = 0) -> dict:
return message

def _create_label(self, message: TaskiqMessage) -> str:
if message.task_name == "block":
args = f"[block={message.args[0].hash.hex()}]"

elif "event" in message.task_name:
args = f"[txn={message.args[0].transaction_hash},log_index={message.args[0].log_index}]"
if labels_str := ",".join(f"{k}={v}" for k, v in message.labels.items()):
return f"{message.task_name}[{labels_str}]"

else:
args = ""

return f"{message.task_name}{args}"
return message.task_name

def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
if message.task_name == "block":
if not (task_type := message.labels.pop("task_type")):
return message # Not a silverback task

fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
try:
task_type = TaskType(task_type)
except ValueError:
return message # Not a silverback task

# Add extra labels for our task to see what their source was
if task_type is TaskType.NEW_BLOCKS:
# NOTE: Necessary because we don't know the exact block class
message.args[0] = self.provider.network.ecosystem.decode_block(
hexbytes_dict(message.args[0])
)
message.labels["block_number"] = str(message.args[0].number)
message.labels["block_hash"] = message.args[0].hash.hex()
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved

elif "event" in message.task_name:
elif task_type is TaskType.EVENT_LOG:
# NOTE: Just in case the user doesn't specify type as `ContractLog`
message.args[0] = ContractLog.model_validate(message.args[0])
message.labels["block_number"] = str(message.args[0].block_number)
message.labels["transaction_hash"] = message.args[0].transaction_hash
message.labels["log_index"] = str(message.args[0].log_index)

logger.info(f"{self._create_label(message)} - Started")
logger.debug(f"{self._create_label(message)} - Started")
return message

def post_execute(self, message: TaskiqMessage, result: TaskiqResult):
percentage_time = 100 * (result.execution_time / self.block_time)
logger.info(
logger.success(
f"{self._create_label(message)} "
f"- {result.execution_time:.3f}s ({percentage_time:.1f}%)"
)
Expand All @@ -102,10 +94,12 @@ async def post_save(self, message: TaskiqMessage, result: TaskiqResult):
if not self.persistence:
return

handler_id, block_number, log_index = resolve_task(message)

handler_result = HandlerResult.from_taskiq(
self.ident, handler_id, block_number, log_index, result
self.ident,
message.task_name,
message.labels.get("block_number"),
message.labels.get("log_index"),
result,
)

try:
Expand All @@ -119,4 +113,9 @@ async def on_error(
result: TaskiqResult,
exception: BaseException,
):
logger.error(f"{message.task_name} - {type(exception).__name__}: {exception}")
percentage_time = 100 * (result.execution_time / self.block_time)
logger.error(
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
f"{self._create_label(message)} "
f"- {result.execution_time:.3f}s ({percentage_time:.1f}%)"
)
# NOTE: Unless stdout is ignored, error traceback appears in stdout
26 changes: 11 additions & 15 deletions silverback/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from .persistence import BasePersistentStore
from .settings import Settings
from .subscriptions import SubscriptionType, Web3SubscriptionsManager
from .types import SilverbackID, SilverbackStartupState
from .types import SilverbackID, SilverbackStartupState, TaskType
from .utils import async_wrap_iter, hexbytes_dict

settings = Settings()
Expand Down Expand Up @@ -103,8 +103,8 @@ async def run(self):
await self.app.broker.startup()

# Execute Silverback startup task before we init the rest
if startup_handler := self.app.get_startup_handler():
task = await startup_handler.kiq(
for startup_task in self.app.tasks[TaskType.STARTUP]:
task = await startup_task.handler.kiq(
SilverbackStartupState(
last_block_seen=self.last_block_seen,
last_block_processed=self.last_block_processed,
Expand All @@ -113,15 +113,12 @@ async def run(self):
result = await task.wait_result()
self._handle_result(result)

if block_handler := self.app.get_block_handler():
tasks = [self._block_task(block_handler)]
else:
tasks = []
tasks = []
for task in self.app.tasks[TaskType.NEW_BLOCKS]:
tasks.append(self._block_task(task.handler))

for contract_address in self.app.contract_events:
for event_name, contract_event in self.app.contract_events[contract_address].items():
if event_handler := self.app.get_event_handler(contract_address, event_name):
tasks.append(self._event_task(contract_event, event_handler))
for task in self.app.tasks[TaskType.EVENT_LOG]:
tasks.append(self._event_task(task.container, task.handler))

if len(tasks) == 0:
raise Halt("No tasks to execute")
Expand All @@ -132,10 +129,9 @@ async def run(self):
logger.error(f"Fatal error detected, shutting down: '{e}'")

# Execute Silverback shutdown task before shutting down the broker
if shutdown_handler := self.app.get_shutdown_handler():
task = await shutdown_handler.kiq()
result = await task.wait_result()
self._handle_result(result)
for shutdown_task in self.app.tasks[TaskType.SHUTDOWN]:
task = await shutdown_task.handler.kiq()
result = self._handle_result(await task.wait_result())

await self.app.broker.shutdown()

Expand Down
Loading
Loading