Skip to content

Commit

Permalink
refactor!: add data acquisition models to recorder
Browse files Browse the repository at this point in the history
  • Loading branch information
fubuloubu committed Apr 11, 2024
1 parent a66c72d commit c5428e2
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 21 deletions.
71 changes: 53 additions & 18 deletions silverback/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import sqlite3
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from typing import Optional, TypeVar
from typing import Any, Optional, TypeVar

from pydantic import BaseModel
from ape.logging import get_logger
from pydantic import BaseModel, Field
from taskiq import TaskiqResult
from typing_extensions import Self # Introduced 3.11

from .types import SilverbackID
from .types import BaseDatapoint, Metrics, ScalarDatapoint, SilverbackID, scalar_types

_HandlerReturnType = TypeVar("_HandlerReturnType")

Expand All @@ -24,13 +25,44 @@ class SilverbackState(BaseModel):
updated: datetime


class HandlerResult(TaskiqResult):
class HandlerResult(BaseModel):
instance: str
network: str
handler_id: str
block_number: Optional[int]
log_index: Optional[int]
created: datetime
labels: dict[str, Any] = Field(default_factory=dict)
execution_time: float
metrics: Metrics
error: Optional[str] = None

@classmethod
def _extract_metrics(cls, result: Any, handler_id: str) -> Metrics:
if isinstance(result, BaseDatapoint):
return {f"{handler_id}_result": result}

elif isinstance(result, scalar_types):
return {f"{handler_id}_result": ScalarDatapoint(data=result)}

elif isinstance(result, dict):
converted_result = {
k: ScalarDatapoint(data=v) if not isinstance(v, BaseDatapoint) else v
for k, v in result.items()
if isinstance(v, (BaseDatapoint, *scalar_types))
}
if len(converted_result) < len(result):
logger = get_logger(handler_id)
logger.warning(f"Unhandled results: {len(result)-len(converted_result)}")

return converted_result

elif result is not None:
logger = get_logger(handler_id)
logger.warning(f"Cannot handle return type '{type(result.metrics)}'.")

# else:
return {}

@classmethod
def from_taskiq(
Expand All @@ -48,7 +80,10 @@ def from_taskiq(
block_number=block_number,
log_index=log_index,
created=datetime.now(timezone.utc),
**result.dict(),
labels=result.labels,
execution_time=result.execution_time,
error=str(result.error),
metrics=cls._extract_metrics(result.return_value, handler_id),
)


Expand Down Expand Up @@ -113,16 +148,16 @@ class SQLiteRecorder(BaseRecorder):
WHERE instance = ? AND network = ?;
"""
SQL_GET_RESULT_LATEST = """
SELECT handler_id, block_number, log_index, execution_time, is_err, created,
return_value_blob
SELECT handler_id, block_number, log_index, execution_time, error, created,
metrics_blob
FROM silverback_result
WHERE instance = ? AND network = ?
ORDER BY created DESC
LIMIT 1;
"""
SQL_GET_HANDLER_LATEST = """
SELECT handler_id, block_number, log_index, execution_time, is_err, created,
return_value_blob
SELECT handler_id, block_number, log_index, execution_time, error, created,
metrics_blob
FROM silverback_result
WHERE instance = ? AND network = ? AND handler_id = ?
ORDER BY created DESC
Expand All @@ -131,7 +166,7 @@ class SQLiteRecorder(BaseRecorder):
SQL_INSERT_RESULT = """
INSERT INTO silverback_result (
instance, network, handler_id, block_number, log_index, execution_time,
is_err, created, return_value_blob
error, created, metrics_blob
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);
"""
Expand Down Expand Up @@ -160,18 +195,18 @@ async def init(self):
block_number int,
log_index int,
execution_time real,
is_err bool,
error text,
created int,
return_value_blob blob
metrics_blob blob
);
CREATE UNIQUE INDEX IF NOT EXISTS silverback_state__instance
ON silverback_state(instance, network);
CREATE INDEX IF NOT EXISTS silverback_result__instance
ON silverback_result (instance, network);
CREATE INDEX IF NOT EXISTS silverback_result__handler
ON silverback_result (instance, network, handler_id);
CREATE INDEX IF NOT EXISTS silverback_result__is_err
ON silverback_result (is_err);
CREATE INDEX IF NOT EXISTS silverback_result__error
ON silverback_result (error);
COMMIT;
"""
)
Expand Down Expand Up @@ -295,9 +330,9 @@ async def get_latest_result(
block_number=row[1],
log_index=row[2],
execution_time=row[3],
is_err=row[4],
error=row[4],
created=datetime.fromtimestamp(row[5], timezone.utc),
return_value=json.loads(row[6]),
metrics=json.loads(row[6]),
)

async def add_result(self, v: HandlerResult):
Expand All @@ -317,9 +352,9 @@ async def add_result(self, v: HandlerResult):
v.block_number,
v.log_index,
v.execution_time,
v.is_err,
v.error,
v.created,
json.dumps(v.return_value),
json.dumps({n: m.model_dump_json() for n, m in v.metrics.items()}),
),
)

Expand Down
3 changes: 2 additions & 1 deletion silverback/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ def _handle_result(self, result: TaskiqResult):
self.exceptions += 1

else:
# NOTE: Reset exception counter
self.exceptions = 0

if self.exceptions > self.max_exceptions:
raise Halt()
raise Halt() from result.error

async def _checkpoint(
self, last_block_seen: int = 0, last_block_processed: int = 0
Expand Down
29 changes: 27 additions & 2 deletions silverback/types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from datetime import datetime, timezone
from decimal import Decimal
from enum import Enum # NOTE: `enum.StrEnum` only in Python 3.11+
from typing import Optional, Protocol
from typing import Literal, Optional, Protocol, get_args

from pydantic import BaseModel
from pydantic import BaseModel, Field
from typing_extensions import Self # Introduced 3.11


Expand Down Expand Up @@ -38,3 +40,26 @@ def from_settings(cls, settings_: ISilverbackSettings) -> Self:
class SilverbackStartupState(BaseModel):
last_block_seen: int
last_block_processed: int


class BaseDatapoint(BaseModel):
type: str # discriminator

# NOTE: default value ensures we don't have to set this manually
time: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))


ScalarType = bool | int | float | Decimal
scalar_types = get_args(ScalarType)


class ScalarDatapoint(BaseDatapoint):
type: Literal["scalar"] = "scalar"

# NOTE: app-supported scalar value types:
data: ScalarType


# This is what a Silverback app task must return to integrate properly with our data acq system
Metrics = dict[str, BaseDatapoint]
# Otherwise, log a warning and ignore any unconverted return value(s)

0 comments on commit c5428e2

Please sign in to comment.