Skip to content

Commit

Permalink
feat: adds persistence layer for app state and job results
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeshultz committed Nov 18, 2023
1 parent 8b313b1 commit c46ec08
Show file tree
Hide file tree
Showing 8 changed files with 440 additions and 24 deletions.
4 changes: 4 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
"IPython", # Console for interacting
"ipdb", # Debugger (Must use `export PYTHONBREAKPOINT=ipdb.set_trace`)
],
"mongo": [
"beanie~=1.23.6",
],
}

# NOTE: `pip install -e .[dev]` to install package
Expand All @@ -49,6 +52,7 @@
+ extras_require["doc"]
+ extras_require["release"]
+ extras_require["dev"]
+ extras_require["mongo"]
)

with open("./README.md") as readme:
Expand Down
12 changes: 8 additions & 4 deletions silverback/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,33 @@ def __init__(self, settings: Optional[Settings] = None):
f"{signer_str}{start_block_str}{new_block_timeout_str}"
)

def checkpoint(self, last_block_seen: int, last_block_processed: int):
self.broker.state.last_block_seen = last_block_seen
self.broker.state.last_block_processed = last_block_processed

def on_startup(self) -> Callable:
"""
Code to execute on worker startup / restart after an error.
Code to execute on client startup / restart after an error.
Usage example::
@app.on_startup()
def do_something_on_startup(state):
... # Can provision resources, or add things to `state`.
"""
return self.broker.on_event(TaskiqEvents.WORKER_STARTUP)
return self.broker.on_event(TaskiqEvents.CLIENT_STARTUP)

def on_shutdown(self) -> Callable:
"""
Code to execute on normal worker shutdown.
Code to execute on client shutdown.
Usage example::
@app.on_shutdown()
def do_something_on_shutdown(state):
... # Update some external service, perhaps using information from `state`.
"""
return self.broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)
return self.broker.on_event(TaskiqEvents.CLIENT_SHUTDOWN)

def get_block_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Expand Down
14 changes: 3 additions & 11 deletions silverback/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from ape.utils import ManagerAccessMixin
from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult

from silverback.utils import hexbytes_dict


class SilverbackMiddleware(TaskiqMiddleware, ManagerAccessMixin):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -47,20 +49,10 @@ def _create_label(self, message: TaskiqMessage) -> str:
return f"{message.task_name}{args}"

def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
def fix_dict(data: dict) -> dict:
fixed_data = {}
for name, value in data.items():
if isinstance(value, str) and value.startswith("0x"):
fixed_data[name] = HexBytes(value)
else:
fixed_data[name] = value

return fixed_data

if message.task_name == "block":
# NOTE: Necessary because we don't know the exact block class
message.args[0] = self.provider.network.ecosystem.decode_block(
fix_dict(message.args[0])
hexbytes_dict(message.args[0])
)

elif "event" in message.task_name:
Expand Down
265 changes: 265 additions & 0 deletions silverback/persistence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
import pickle
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from typing import Annotated, Any, Dict, Optional
from typing_extensions import Self # Introduced 3.11

from ape.logging import logger
from pydantic import BaseModel
from taskiq import TaskiqResult

from .types import SilverbackIdent


class SilverbackState(BaseModel):
ident: SilverbackIdent
# Last block number seen by runner
last_block_seen: int
# Last block number processed by a worker
last_block_processed: int


class HandlerResult(BaseModel):
instance: str
network: str
handler_id: str
block_number: int
log_index: Optional[int]
execution_time: float
# TODO: upcoming feature in taskiq
# labels: Dict[str]
return_value_blob: Optional[bytes] # pickled data
created: datetime

@classmethod
def from_taskiq(
cls,
ident: SilverbackIdent,
handler_id: str,
block_number: int,
log_index: int | None,
result: TaskiqResult,
) -> Self:
return cls(
instance=ident.identifier,
network=ident.network_choice,
handler_id=handler_id,
block_number=block_number,
log_index=log_index,
execution_time=result.execution_time,
# labels=result.labels,
return_value_blob=pickle.dumps(result.return_value),
created=datetime.now(timezone.utc),
)

@property
def return_value(self):
if self.return_value_blob is None:
return None
return pickle.loads(self.return_value_blob)

@return_value.setter
def set_return_value(self, v: Any):
self.return_value_blob = pickle.dumps(v)


class BasePersistentStorage(ABC):
@abstractmethod
async def get_instance_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]:
...

@abstractmethod
async def set_instance_state(
self, ident: SilverbackIdent, last_block_seen: int, last_block_processed: int
) -> Optional[SilverbackState]:
...

@abstractmethod
async def get_latest_result(
self, instance: SilverbackIdent, handler: Optional[str] = None
) -> HandlerResult:
...

@abstractmethod
async def add_result(self, v: HandlerResult):
...


async def init_mongo(mongo_uri: str) -> Optional[BasePersistentStorage]:
try:
import pymongo
from beanie import Document, Indexed, init_beanie
from beanie.odm.operators.update.general import Set
from motor.motor_asyncio import AsyncIOMotorClient
except ImportError as err:
print(err)
logger.warning("MongoDB was initialized by dependencies are not installed")
return None

class SilverbackStateDoc(Document):
instance: Annotated[str, Indexed(str)]
network: Annotated[str, Indexed(str)]
last_block_seen: int
last_block_processed: int
updated: datetime

class Settings:
name = "state"
indexes = [
[
("instance", pymongo.TEXT),
("network", pymongo.TEXT),
],
]

def to_silberback_state(self) -> SilverbackState:
return SilverbackState(
ident=SilverbackIdent(
identifier=self.instance,
network_choice=self.network,
),
last_block_seen=self.last_block_seen,
last_block_processed=self.last_block_processed,
)

class HandlerResultDoc(HandlerResult, Document):
# NOTE: Redefining these to annotate with indexed type
instance: Annotated[str, Indexed(str)]
network: Annotated[str, Indexed(str)]
handler_id: Annotated[str, Indexed(str)]

class Settings:
name = "result"
indexes = [
[
("instance", pymongo.TEXT),
("network", pymongo.TEXT),
("handler", pymongo.TEXT),
],
]

@classmethod
def from_handler_result(cls, result: HandlerResult) -> Self:
return cls(
instance=result.instance,
network=result.network,
handler_id=result.handler_id,
block_number=result.block_number,
log_index=result.log_index,
execution_time=result.execution_time,
return_value_blob=result.return_value_blob,
created=result.created,
)

def to_handler_result(self) -> HandlerResult:
return HandlerResult(
instance=self.instance,
network=self.network,
handler_id=self.handler,
block_number=self.block_number,
log_index=self.log_index,
execution_time=self.execution_time,
return_value_blob=self.return_value_blob,
created=self.created,
)

class MongoStorage(BasePersistentStorage):
client: AsyncIOMotorClient

async def get_instance_state(self, ident: SilverbackIdent) -> Optional[SilverbackState]:
res = await SilverbackStateDoc.find_one(
SilverbackStateDoc.instance == ident.identifier,
SilverbackStateDoc.network == ident.network_choice,
)

if res is None:
return None

return res.to_silberback_state()

async def set_instance_state(
self, ident: SilverbackIdent, last_block_seen: int, last_block_processed: int
) -> Optional[SilverbackState]:
now_utc = datetime.now(timezone.utc)

state = await SilverbackStateDoc.find_one(
SilverbackStateDoc.instance == ident.identifier,
SilverbackStateDoc.network == ident.network_choice,
)

if state is not None:
await state.set(
# Unreported type error? Confiremd working
{
SilverbackStateDoc.last_block_seen: last_block_seen,
SilverbackStateDoc.last_block_processed: last_block_processed,
SilverbackStateDoc.updated: now_utc,
} # type: ignore
)
else:
state = SilverbackStateDoc(
instance=ident.identifier,
network=ident.network_choice,
last_block_seen=last_block_seen,
last_block_processed=last_block_processed,
updated=now_utc,
)
await state.create()

# TODO: Why no work?
# await SilverbackStateDoc.find_one(
# SilverbackStateDoc.instance == ident.identifier,
# SilverbackStateDoc.network == ident.network_choice,
# ).upsert(
# Set(
# {
# SilverbackStateDoc.last_block_seen: last_block_seen,
# SilverbackStateDoc.last_block_processed: last_block_processed,
# SilverbackStateDoc.updated: now_utc,
# }
# ),
# on_insert=SilverbackStateDoc(
# instance=ident.identifier,
# network=ident.network_choice,
# last_block_seen=last_block_seen,
# last_block_processed=last_block_processed,
# updated=now_utc,
# ),
# )

async def get_latest_result(
self, ident: SilverbackIdent, handler_id: Optional[str] = None
) -> Optional[HandlerResult]:
query = HandlerResultDoc.find(
HandlerResultDoc.instance == ident.identifier,
HandlerResultDoc.network == ident.network_choice,
)

if handler_id:
query.find(HandlerResultDoc.handler_id == handler_id)

res = await query.sort("-created").first_or_none()

if res is None:
return res

return res.to_handler_result()

async def add_result(self, result: HandlerResult):
doc = HandlerResultDoc.from_handler_result(result)
# Type annotation error: https://github.com/roman-right/beanie/issues/679
await doc.insert() # type: ignore

storage = MongoStorage()
client = AsyncIOMotorClient(mongo_uri)

await init_beanie(
database=client.db_name,
# Type annotation error: https://github.com/roman-right/beanie/issues/670
document_models=[
HandlerResultDoc,
SilverbackStateDoc,
], # type: ignore
)

return storage
Loading

0 comments on commit c46ec08

Please sign in to comment.