From c5428e2ae8c5c570a3240f1f60c8e82c28ca5775 Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Tue, 2 Apr 2024 19:22:46 -0400 Subject: [PATCH] refactor!: add data acquisition models to recorder --- silverback/recorder.py | 71 +++++++++++++++++++++++++++++++----------- silverback/runner.py | 3 +- silverback/types.py | 29 +++++++++++++++-- 3 files changed, 82 insertions(+), 21 deletions(-) diff --git a/silverback/recorder.py b/silverback/recorder.py index dec03028..d7f5a5cb 100644 --- a/silverback/recorder.py +++ b/silverback/recorder.py @@ -3,13 +3,14 @@ import sqlite3 from abc import ABC, abstractmethod from datetime import datetime, timezone -from typing import Optional, TypeVar +from typing import Any, Optional, TypeVar -from pydantic import BaseModel +from ape.logging import get_logger +from pydantic import BaseModel, Field from taskiq import TaskiqResult from typing_extensions import Self # Introduced 3.11 -from .types import SilverbackID +from .types import BaseDatapoint, Metrics, ScalarDatapoint, SilverbackID, scalar_types _HandlerReturnType = TypeVar("_HandlerReturnType") @@ -24,13 +25,44 @@ class SilverbackState(BaseModel): updated: datetime -class HandlerResult(TaskiqResult): +class HandlerResult(BaseModel): instance: str network: str handler_id: str block_number: Optional[int] log_index: Optional[int] created: datetime + labels: dict[str, Any] = Field(default_factory=dict) + execution_time: float + metrics: Metrics + error: Optional[str] = None + + @classmethod + def _extract_metrics(cls, result: Any, handler_id: str) -> Metrics: + if isinstance(result, BaseDatapoint): + return {f"{handler_id}_result": result} + + elif isinstance(result, scalar_types): + return {f"{handler_id}_result": ScalarDatapoint(data=result)} + + elif isinstance(result, dict): + converted_result = { + k: ScalarDatapoint(data=v) if not isinstance(v, BaseDatapoint) else v + for k, v in result.items() + if isinstance(v, (BaseDatapoint, *scalar_types)) + } + if len(converted_result) < len(result): + logger = get_logger(handler_id) + logger.warning(f"Unhandled results: {len(result)-len(converted_result)}") + + return converted_result + + elif result is not None: + logger = get_logger(handler_id) + logger.warning(f"Cannot handle return type '{type(result.metrics)}'.") + + # else: + return {} @classmethod def from_taskiq( @@ -48,7 +80,10 @@ def from_taskiq( block_number=block_number, log_index=log_index, created=datetime.now(timezone.utc), - **result.dict(), + labels=result.labels, + execution_time=result.execution_time, + error=str(result.error), + metrics=cls._extract_metrics(result.return_value, handler_id), ) @@ -113,16 +148,16 @@ class SQLiteRecorder(BaseRecorder): WHERE instance = ? AND network = ?; """ SQL_GET_RESULT_LATEST = """ - SELECT handler_id, block_number, log_index, execution_time, is_err, created, - return_value_blob + SELECT handler_id, block_number, log_index, execution_time, error, created, + metrics_blob FROM silverback_result WHERE instance = ? AND network = ? ORDER BY created DESC LIMIT 1; """ SQL_GET_HANDLER_LATEST = """ - SELECT handler_id, block_number, log_index, execution_time, is_err, created, - return_value_blob + SELECT handler_id, block_number, log_index, execution_time, error, created, + metrics_blob FROM silverback_result WHERE instance = ? AND network = ? AND handler_id = ? ORDER BY created DESC @@ -131,7 +166,7 @@ class SQLiteRecorder(BaseRecorder): SQL_INSERT_RESULT = """ INSERT INTO silverback_result ( instance, network, handler_id, block_number, log_index, execution_time, - is_err, created, return_value_blob + error, created, metrics_blob ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); """ @@ -160,9 +195,9 @@ async def init(self): block_number int, log_index int, execution_time real, - is_err bool, + error text, created int, - return_value_blob blob + metrics_blob blob ); CREATE UNIQUE INDEX IF NOT EXISTS silverback_state__instance ON silverback_state(instance, network); @@ -170,8 +205,8 @@ async def init(self): ON silverback_result (instance, network); CREATE INDEX IF NOT EXISTS silverback_result__handler ON silverback_result (instance, network, handler_id); - CREATE INDEX IF NOT EXISTS silverback_result__is_err - ON silverback_result (is_err); + CREATE INDEX IF NOT EXISTS silverback_result__error + ON silverback_result (error); COMMIT; """ ) @@ -295,9 +330,9 @@ async def get_latest_result( block_number=row[1], log_index=row[2], execution_time=row[3], - is_err=row[4], + error=row[4], created=datetime.fromtimestamp(row[5], timezone.utc), - return_value=json.loads(row[6]), + metrics=json.loads(row[6]), ) async def add_result(self, v: HandlerResult): @@ -317,9 +352,9 @@ async def add_result(self, v: HandlerResult): v.block_number, v.log_index, v.execution_time, - v.is_err, + v.error, v.created, - json.dumps(v.return_value), + json.dumps({n: m.model_dump_json() for n, m in v.metrics.items()}), ), ) diff --git a/silverback/runner.py b/silverback/runner.py index 4f947895..7ba5e738 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -36,10 +36,11 @@ def _handle_result(self, result: TaskiqResult): self.exceptions += 1 else: + # NOTE: Reset exception counter self.exceptions = 0 if self.exceptions > self.max_exceptions: - raise Halt() + raise Halt() from result.error async def _checkpoint( self, last_block_seen: int = 0, last_block_processed: int = 0 diff --git a/silverback/types.py b/silverback/types.py index 10aadc3c..9e9a28a1 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -1,7 +1,9 @@ +from datetime import datetime, timezone +from decimal import Decimal from enum import Enum # NOTE: `enum.StrEnum` only in Python 3.11+ -from typing import Optional, Protocol +from typing import Literal, Optional, Protocol, get_args -from pydantic import BaseModel +from pydantic import BaseModel, Field from typing_extensions import Self # Introduced 3.11 @@ -38,3 +40,26 @@ def from_settings(cls, settings_: ISilverbackSettings) -> Self: class SilverbackStartupState(BaseModel): last_block_seen: int last_block_processed: int + + +class BaseDatapoint(BaseModel): + type: str # discriminator + + # NOTE: default value ensures we don't have to set this manually + time: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +ScalarType = bool | int | float | Decimal +scalar_types = get_args(ScalarType) + + +class ScalarDatapoint(BaseDatapoint): + type: Literal["scalar"] = "scalar" + + # NOTE: app-supported scalar value types: + data: ScalarType + + +# This is what a Silverback app task must return to integrate properly with our data acq system +Metrics = dict[str, BaseDatapoint] +# Otherwise, log a warning and ignore any unconverted return value(s)