Skip to content

Commit

Permalink
Internal: improve chain data service
Browse files Browse the repository at this point in the history
Problem: a few todos are still open on the chain data service.

Solution: improve and rename the `get_chaindata` method. The new
`prepare_sync_event_payload` method now always returns an off-chain sync
event (on-chain sync events never occur anyway). It also returns the
result as a Pydantic model for more flexibility. Serialization is left
upon the caller. We now use a Pydantic model to translate `MessageDb`
objects in the correct format.
  • Loading branch information
odesenfans committed Oct 20, 2023
1 parent a6710e7 commit fb9ebbe
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 79 deletions.
2 changes: 1 addition & 1 deletion src/aleph/chains/bsc.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from aleph_message.models import Chain
from configmanager import Config

from aleph.chains.chaindata import ChainDataService
from aleph.chains.chain_data_service import ChainDataService
from aleph.chains.abc import ChainReader
from aleph.chains.indexer_reader import AlephIndexerReader
from aleph.types.chain_sync import ChainEventType
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
import json
from io import StringIO
from typing import Dict, Optional, List, Any, Mapping, Set, cast, Type, Union

from aleph_message.models import StoreContent, ItemType, Chain, MessageType
Expand All @@ -17,6 +17,12 @@
ContentCurrentlyUnavailable,
)
from aleph.schemas.chains.indexer_response import MessageEvent, GenericMessageEvent
from aleph.schemas.chains.sync_events import (
OffChainSyncEventPayload,
OnChainSyncEventPayload,
OnChainContent,
OnChainMessage,
)
from aleph.schemas.chains.tezos_indexer_response import (
MessageEventPayload as TezosMessageEventPayload,
)
Expand All @@ -27,18 +33,6 @@
from aleph.types.files import FileType
from aleph.utils import get_sha256

INCOMING_MESSAGE_AUTHORIZED_FIELDS = [
"item_hash",
"item_content",
"item_type",
"chain",
"channel",
"sender",
"type",
"time",
"signature",
]


class ChainDataService:
def __init__(
Expand All @@ -47,52 +41,39 @@ def __init__(
self.session_factory = session_factory
self.storage_service = storage_service

# TODO: split this function in severa
async def get_chaindata(
self, session: DbSession, messages: List[MessageDb], bulk_threshold: int = 2000
):
"""Returns content ready to be broadcast on-chain (aka chaindata).
async def prepare_sync_event_payload(
self, session: DbSession, messages: List[MessageDb]
) -> OffChainSyncEventPayload:
"""
Returns the payload of a sync event to be published on chain.
We publish message archives on-chain at regular intervals. This function prepares the data
before the node emits a transaction on-chain:
1. Pack all messages as a JSON file
2. Add this file to IPFS and get its CID
3. Return the CID + some metadata.
If message length is over bulk_threshold (default 2000 chars), store list
in IPFS and store the object hash instead of raw list.
Note that the archive file is pinned on IPFS but not inserted in the `file_pins` table
here. This is left upon the caller once the event is successfully emitted on chain to avoid
persisting unused archives.
"""
# In previous versions, it was envisioned to store messages on-chain. This proved to be
# too expensive. The archive uses the same format as these "on-chain" data.
archive = OnChainSyncEventPayload(
protocol=ChainSyncProtocol.ON_CHAIN_SYNC,
version=1,
content=OnChainContent(
messages=[OnChainMessage.from_orm(message) for message in messages]
),
)
archive_content = archive.json()

# TODO: this function is used to guarantee that the chain sync protocol is not broken
# while shifting to Postgres.
# * exclude the useless fields in the DB query directly and get rid of
# INCOMING_MESSAGE_AUTHORIZED_FIELDS
# * use a Pydantic model to enforce the output format
def message_to_dict(_message: MessageDb) -> Mapping[str, Any]:
message_dict = {
k: v
for k, v in _message.to_dict().items()
if k in INCOMING_MESSAGE_AUTHORIZED_FIELDS
}
# Convert the time field to epoch
message_dict["time"] = message_dict["time"].timestamp()
return message_dict

message_dicts = [message_to_dict(message) for message in messages]

chaindata = {
"protocol": ChainSyncProtocol.ON_CHAIN_SYNC,
"version": 1,
"content": {"messages": message_dicts},
}
content = json.dumps(chaindata)
if len(content) > bulk_threshold:
ipfs_id = await self.storage_service.add_json(
session=session, value=chaindata
)
return json.dumps(
{
"protocol": ChainSyncProtocol.OFF_CHAIN_SYNC,
"version": 1,
"content": ipfs_id,
}
)
else:
return content
ipfs_cid = await self.storage_service.add_file(
session=session, fileobject=StringIO(archive_content), engine=ItemType.ipfs
)
return OffChainSyncEventPayload(
protocol=ChainSyncProtocol.OFF_CHAIN_SYNC, version=1, content=ipfs_cid
)

@staticmethod
def _get_sync_messages(tx_content: Mapping[str, Any]):
Expand Down
2 changes: 1 addition & 1 deletion src/aleph/chains/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from aleph.storage import StorageService
from aleph.types.db_session import DbSessionFactory
from .bsc import BscConnector
from .chaindata import ChainDataService
from .chain_data_service import ChainDataService
from .abc import ChainReader, ChainWriter
from .ethereum import EthereumConnector
from .nuls2 import Nuls2Connector
Expand Down
8 changes: 4 additions & 4 deletions src/aleph/chains/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from aleph.toolkit.timestamp import utc_now
from aleph.types.db_session import DbSessionFactory
from aleph.utils import run_in_executor
from .chaindata import ChainDataService
from .chain_data_service import ChainDataService
from .abc import ChainWriter, Verifier, ChainReader
from .indexer_reader import AlephIndexerReader
from ..db.models import ChainTxDb
Expand Down Expand Up @@ -346,8 +346,8 @@ async def packer(self, config: Config):
LOGGER.info("Chain sync: %d unconfirmed messages")

# This function prepares a chain data file and makes it downloadable from the node.
content = await self.chain_data_service.get_chaindata(
session=session, messages=messages, bulk_threshold=200
sync_event_payload = await self.chain_data_service.prepare_sync_event_payload(
session=session, messages=messages
)
# Required to apply update to the files table in get_chaindata
session.commit()
Expand All @@ -360,7 +360,7 @@ async def packer(self, config: Config):
account,
int(gas_price * 1.1),
nonce,
content,
sync_event_payload.json(),
)
LOGGER.info("Broadcast %r on %s" % (response, CHAIN_NAME))

Expand Down
2 changes: 1 addition & 1 deletion src/aleph/chains/indexer_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pydantic import BaseModel

import aleph.toolkit.json as aleph_json
from aleph.chains.chaindata import ChainDataService
from aleph.chains.chain_data_service import ChainDataService
from aleph.db.accessors.chains import (
get_missing_indexer_datetime_multirange,
add_indexer_range,
Expand Down
5 changes: 3 additions & 2 deletions src/aleph/chains/nuls2.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from aleph.toolkit.timestamp import utc_now
from aleph.types.db_session import DbSessionFactory
from aleph.utils import run_in_executor
from .chaindata import ChainDataService
from .chain_data_service import ChainDataService
from .abc import Verifier, ChainWriter
from aleph.schemas.chains.tx_context import TxContext
from ..db.models import ChainTxDb
Expand Down Expand Up @@ -197,12 +197,13 @@ async def packer(self, config: Config):

if len(messages):
# This function prepares a chain data file and makes it downloadable from the node.
content = await self.chain_data_service.get_chaindata(
sync_event_payload = await self.chain_data_service.prepare_sync_event_payload(
session=session, messages=messages
)
# Required to apply update to the files table in get_chaindata
session.commit()

content = sync_event_payload.json()
tx = await prepare_transfer_tx(
address,
[(target_addr, CHEAP_UNIT_FEE)],
Expand Down
2 changes: 1 addition & 1 deletion src/aleph/chains/tezos.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from nacl.exceptions import BadSignatureError

import aleph.toolkit.json as aleph_json
from aleph.chains.chaindata import ChainDataService
from aleph.chains.chain_data_service import ChainDataService
from aleph.chains.common import get_verification_buffer
from aleph.chains.abc import Verifier, ChainReader
from aleph.db.accessors.chains import get_last_height, upsert_chain_sync_status
Expand Down
2 changes: 1 addition & 1 deletion src/aleph/jobs/process_pending_txs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from sqlalchemy import delete

from ..chains.signature_verifier import SignatureVerifier
from aleph.chains.chaindata import ChainDataService
from aleph.chains.chain_data_service import ChainDataService
from aleph.db.accessors.pending_txs import get_pending_txs
from aleph.db.connection import make_engine, make_session_factory
from aleph.db.models.pending_txs import PendingTxDb
Expand Down
52 changes: 52 additions & 0 deletions src/aleph/schemas/chains/sync_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Literal, Optional, List, Union, Annotated

from aleph_message.models import ItemHash, ItemType, Chain, MessageType
from pydantic import BaseModel, Field, validator

from aleph.types.chain_sync import ChainSyncProtocol
from aleph.types.channel import Channel
import datetime as dt


class OnChainMessage(BaseModel):
class Config:
orm_mode = True

sender: str
chain: Chain
signature: Optional[str]
type: MessageType
item_content: Optional[str]
item_type: ItemType
item_hash: ItemHash
time: float
channel: Optional[Channel] = None

@validator("time", pre=True)
def check_time(cls, v, values):
if isinstance(v, dt.datetime):
return v.timestamp()

return v


class OnChainContent(BaseModel):
messages: List[OnChainMessage]


class OnChainSyncEventPayload(BaseModel):
protocol: Literal[ChainSyncProtocol.ON_CHAIN_SYNC]
version: int
content: OnChainContent


class OffChainSyncEventPayload(BaseModel):
protocol: Literal[ChainSyncProtocol.OFF_CHAIN_SYNC]
version: int
content: str


SyncEventPayload = Annotated[
Union[OnChainSyncEventPayload, OffChainSyncEventPayload],
Field(discriminator="protocol"),
]
9 changes: 3 additions & 6 deletions src/aleph/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import logging
from hashlib import sha256
from typing import Any, IO, Optional, cast, Final
from aiohttp import web

from aleph_message.models import ItemType

Expand All @@ -20,13 +19,9 @@
from aleph.services.ipfs.common import get_cid_version
from aleph.services.p2p.http import request_hash as p2p_http_request_hash
from aleph.services.storage.engine import StorageEngine
from aleph.toolkit.constants import MiB
from aleph.types.db_session import DbSession
from aleph.types.files import FileType
from aleph.utils import get_sha256
from aleph.schemas.pending_messages import (
parse_message,
)

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -144,7 +139,9 @@ async def _verify_content_hash(
) -> None:
"""
Checks that the hash of a content we fetched from the network matches the expected hash.
:return: True if the hashes match, False otherwise.
Raises an exception if the content does not match the expected hash.
:raises InvalidContent: The computed hash does not match.
:raises ContentCurrentlyUnavailable: The hash cannot be computed at this time.
"""
config = get_config()
ipfs_enabled = config.ipfs.enabled.value
Expand Down
68 changes: 63 additions & 5 deletions tests/chains/test_chain_data_service.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,71 @@
import pytest
from aleph_message.models import Chain, StoreContent, MessageType, ItemType, PostContent
from typing import IO

from aleph.chains.chaindata import ChainDataService
from aleph.db.models import ChainTxDb
import pytest
from aleph_message.models import (
Chain,
StoreContent,
MessageType,
ItemType,
PostContent,
ItemHash,
)

from aleph.chains.chain_data_service import ChainDataService
from aleph.db.models import ChainTxDb, MessageDb
from aleph.schemas.chains.sync_events import OnChainSyncEventPayload
from aleph.schemas.chains.tezos_indexer_response import MessageEventPayload
from aleph.schemas.pending_messages import parse_message
from aleph.toolkit.timestamp import timestamp_to_datetime
from aleph.types.chain_sync import ChainSyncProtocol
from aleph.types.db_session import DbSessionFactory
from aleph.types.db_session import DbSessionFactory, DbSession
import datetime as dt


@pytest.mark.asyncio
async def test_prepare_sync_event_payload(mocker):
archive_cid = "Qmsomething"

messages = [
MessageDb(
item_hash=ItemHash(
"abe22332402a5c45f20491b719b091fd0d7eab65ca1bcf4746840b787dee874b"
),
type=MessageType.store,
chain=Chain.ETH,
sender="0x0dAd142fDD76A817CD52a700EaCA2D9D3491086B",
signature="0x813f0be4ddd852e7f0c723ac95333be762d80690fe0fc0705ec0e1b7df7fa92d5cdbfba8ab0321aee8769c93f7bc5dc9d1268cb66e8cb453a6b8299ba3faac771b",
item_type=ItemType.inline,
item_content='{"address":"0x0dAd142fDD76A817CD52a700EaCA2D9D3491086B","time":1697718147.2695966,"item_type":"storage","item_hash":"ecbfcb9f92291b9385772c9b5cd094788f928ccb696ad1ecbf179a4e308e4350","mime_type":"application/octet-stream"}',
time=dt.datetime(2023, 10, 19, 12, 22, 27, 269707, tzinfo=dt.timezone.utc),
channel="TEST",
)
]

async def mock_add_file(
session: DbSession, fileobject: IO, engine: ItemType = ItemType.ipfs
) -> str:
content = fileobject.read()
archive = OnChainSyncEventPayload.parse_raw(content)

assert archive.version == 1
assert len(archive.content.messages) == len(messages)
# Check that the time field was converted
assert archive.content.messages[0].time == messages[0].time.timestamp()

return archive_cid

storage_service = mocker.AsyncMock()
storage_service.add_file = mock_add_file
chain_data_service = ChainDataService(
session_factory=mocker.MagicMock(), storage_service=storage_service
)

sync_event_payload = await chain_data_service.prepare_sync_event_payload(
session=mocker.MagicMock(), messages=messages
)
assert sync_event_payload.protocol == ChainSyncProtocol.OFF_CHAIN_SYNC
assert sync_event_payload.version == 1
assert sync_event_payload.content == archive_cid


@pytest.mark.asyncio
Expand Down
2 changes: 1 addition & 1 deletion tests/message_processing/test_process_pending_txs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from sqlalchemy import select

from aleph.chains.signature_verifier import SignatureVerifier
from aleph.chains.chaindata import ChainDataService
from aleph.chains.chain_data_service import ChainDataService
from aleph.db.models import PendingMessageDb, MessageStatusDb
from aleph.db.models.chains import ChainTxDb
from aleph.db.models.pending_txs import PendingTxDb
Expand Down

0 comments on commit fb9ebbe

Please sign in to comment.