From fb9ebbec182eb875cf6f29e1ebb378f42d1cb894 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Thu, 19 Oct 2023 17:05:36 +0200 Subject: [PATCH] Internal: improve chain data service Problem: a few todos are still open on the chain data service. Solution: improve and rename the `get_chaindata` method. The new `prepare_sync_event_payload` method now always returns an off-chain sync event (on-chain sync events never occur anyway). It also returns the result as a Pydantic model for more flexibility. Serialization is left upon the caller. We now use a Pydantic model to translate `MessageDb` objects in the correct format. --- src/aleph/chains/bsc.py | 2 +- .../{chaindata.py => chain_data_service.py} | 93 ++++++++----------- src/aleph/chains/connector.py | 2 +- src/aleph/chains/ethereum.py | 8 +- src/aleph/chains/indexer_reader.py | 2 +- src/aleph/chains/nuls2.py | 5 +- src/aleph/chains/tezos.py | 2 +- src/aleph/jobs/process_pending_txs.py | 2 +- src/aleph/schemas/chains/sync_events.py | 52 +++++++++++ src/aleph/storage.py | 9 +- tests/chains/test_chain_data_service.py | 68 +++++++++++++- .../test_process_pending_txs.py | 2 +- 12 files changed, 168 insertions(+), 79 deletions(-) rename src/aleph/chains/{chaindata.py => chain_data_service.py} (78%) create mode 100644 src/aleph/schemas/chains/sync_events.py diff --git a/src/aleph/chains/bsc.py b/src/aleph/chains/bsc.py index 1e8b14158..1a0a06086 100644 --- a/src/aleph/chains/bsc.py +++ b/src/aleph/chains/bsc.py @@ -1,7 +1,7 @@ from aleph_message.models import Chain from configmanager import Config -from aleph.chains.chaindata import ChainDataService +from aleph.chains.chain_data_service import ChainDataService from aleph.chains.abc import ChainReader from aleph.chains.indexer_reader import AlephIndexerReader from aleph.types.chain_sync import ChainEventType diff --git a/src/aleph/chains/chaindata.py b/src/aleph/chains/chain_data_service.py similarity index 78% rename from src/aleph/chains/chaindata.py rename to src/aleph/chains/chain_data_service.py index 1af14a4f6..498b472ea 100644 --- a/src/aleph/chains/chaindata.py +++ b/src/aleph/chains/chain_data_service.py @@ -1,5 +1,5 @@ import asyncio -import json +from io import StringIO from typing import Dict, Optional, List, Any, Mapping, Set, cast, Type, Union from aleph_message.models import StoreContent, ItemType, Chain, MessageType @@ -17,6 +17,12 @@ ContentCurrentlyUnavailable, ) from aleph.schemas.chains.indexer_response import MessageEvent, GenericMessageEvent +from aleph.schemas.chains.sync_events import ( + OffChainSyncEventPayload, + OnChainSyncEventPayload, + OnChainContent, + OnChainMessage, +) from aleph.schemas.chains.tezos_indexer_response import ( MessageEventPayload as TezosMessageEventPayload, ) @@ -27,18 +33,6 @@ from aleph.types.files import FileType from aleph.utils import get_sha256 -INCOMING_MESSAGE_AUTHORIZED_FIELDS = [ - "item_hash", - "item_content", - "item_type", - "chain", - "channel", - "sender", - "type", - "time", - "signature", -] - class ChainDataService: def __init__( @@ -47,52 +41,39 @@ def __init__( self.session_factory = session_factory self.storage_service = storage_service - # TODO: split this function in severa - async def get_chaindata( - self, session: DbSession, messages: List[MessageDb], bulk_threshold: int = 2000 - ): - """Returns content ready to be broadcast on-chain (aka chaindata). + async def prepare_sync_event_payload( + self, session: DbSession, messages: List[MessageDb] + ) -> OffChainSyncEventPayload: + """ + Returns the payload of a sync event to be published on chain. + + We publish message archives on-chain at regular intervals. This function prepares the data + before the node emits a transaction on-chain: + 1. Pack all messages as a JSON file + 2. Add this file to IPFS and get its CID + 3. Return the CID + some metadata. - If message length is over bulk_threshold (default 2000 chars), store list - in IPFS and store the object hash instead of raw list. + Note that the archive file is pinned on IPFS but not inserted in the `file_pins` table + here. This is left upon the caller once the event is successfully emitted on chain to avoid + persisting unused archives. """ + # In previous versions, it was envisioned to store messages on-chain. This proved to be + # too expensive. The archive uses the same format as these "on-chain" data. + archive = OnChainSyncEventPayload( + protocol=ChainSyncProtocol.ON_CHAIN_SYNC, + version=1, + content=OnChainContent( + messages=[OnChainMessage.from_orm(message) for message in messages] + ), + ) + archive_content = archive.json() - # TODO: this function is used to guarantee that the chain sync protocol is not broken - # while shifting to Postgres. - # * exclude the useless fields in the DB query directly and get rid of - # INCOMING_MESSAGE_AUTHORIZED_FIELDS - # * use a Pydantic model to enforce the output format - def message_to_dict(_message: MessageDb) -> Mapping[str, Any]: - message_dict = { - k: v - for k, v in _message.to_dict().items() - if k in INCOMING_MESSAGE_AUTHORIZED_FIELDS - } - # Convert the time field to epoch - message_dict["time"] = message_dict["time"].timestamp() - return message_dict - - message_dicts = [message_to_dict(message) for message in messages] - - chaindata = { - "protocol": ChainSyncProtocol.ON_CHAIN_SYNC, - "version": 1, - "content": {"messages": message_dicts}, - } - content = json.dumps(chaindata) - if len(content) > bulk_threshold: - ipfs_id = await self.storage_service.add_json( - session=session, value=chaindata - ) - return json.dumps( - { - "protocol": ChainSyncProtocol.OFF_CHAIN_SYNC, - "version": 1, - "content": ipfs_id, - } - ) - else: - return content + ipfs_cid = await self.storage_service.add_file( + session=session, fileobject=StringIO(archive_content), engine=ItemType.ipfs + ) + return OffChainSyncEventPayload( + protocol=ChainSyncProtocol.OFF_CHAIN_SYNC, version=1, content=ipfs_cid + ) @staticmethod def _get_sync_messages(tx_content: Mapping[str, Any]): diff --git a/src/aleph/chains/connector.py b/src/aleph/chains/connector.py index 7612d90d1..4bb091875 100644 --- a/src/aleph/chains/connector.py +++ b/src/aleph/chains/connector.py @@ -8,7 +8,7 @@ from aleph.storage import StorageService from aleph.types.db_session import DbSessionFactory from .bsc import BscConnector -from .chaindata import ChainDataService +from .chain_data_service import ChainDataService from .abc import ChainReader, ChainWriter from .ethereum import EthereumConnector from .nuls2 import Nuls2Connector diff --git a/src/aleph/chains/ethereum.py b/src/aleph/chains/ethereum.py index eaa53b0b4..27d29ad87 100644 --- a/src/aleph/chains/ethereum.py +++ b/src/aleph/chains/ethereum.py @@ -27,7 +27,7 @@ from aleph.toolkit.timestamp import utc_now from aleph.types.db_session import DbSessionFactory from aleph.utils import run_in_executor -from .chaindata import ChainDataService +from .chain_data_service import ChainDataService from .abc import ChainWriter, Verifier, ChainReader from .indexer_reader import AlephIndexerReader from ..db.models import ChainTxDb @@ -346,8 +346,8 @@ async def packer(self, config: Config): LOGGER.info("Chain sync: %d unconfirmed messages") # This function prepares a chain data file and makes it downloadable from the node. - content = await self.chain_data_service.get_chaindata( - session=session, messages=messages, bulk_threshold=200 + sync_event_payload = await self.chain_data_service.prepare_sync_event_payload( + session=session, messages=messages ) # Required to apply update to the files table in get_chaindata session.commit() @@ -360,7 +360,7 @@ async def packer(self, config: Config): account, int(gas_price * 1.1), nonce, - content, + sync_event_payload.json(), ) LOGGER.info("Broadcast %r on %s" % (response, CHAIN_NAME)) diff --git a/src/aleph/chains/indexer_reader.py b/src/aleph/chains/indexer_reader.py index 0fbfd81a7..933a4b1f0 100644 --- a/src/aleph/chains/indexer_reader.py +++ b/src/aleph/chains/indexer_reader.py @@ -21,7 +21,7 @@ from pydantic import BaseModel import aleph.toolkit.json as aleph_json -from aleph.chains.chaindata import ChainDataService +from aleph.chains.chain_data_service import ChainDataService from aleph.db.accessors.chains import ( get_missing_indexer_datetime_multirange, add_indexer_range, diff --git a/src/aleph/chains/nuls2.py b/src/aleph/chains/nuls2.py index 9387ee7f3..8e08d3a0b 100644 --- a/src/aleph/chains/nuls2.py +++ b/src/aleph/chains/nuls2.py @@ -30,7 +30,7 @@ from aleph.toolkit.timestamp import utc_now from aleph.types.db_session import DbSessionFactory from aleph.utils import run_in_executor -from .chaindata import ChainDataService +from .chain_data_service import ChainDataService from .abc import Verifier, ChainWriter from aleph.schemas.chains.tx_context import TxContext from ..db.models import ChainTxDb @@ -197,12 +197,13 @@ async def packer(self, config: Config): if len(messages): # This function prepares a chain data file and makes it downloadable from the node. - content = await self.chain_data_service.get_chaindata( + sync_event_payload = await self.chain_data_service.prepare_sync_event_payload( session=session, messages=messages ) # Required to apply update to the files table in get_chaindata session.commit() + content = sync_event_payload.json() tx = await prepare_transfer_tx( address, [(target_addr, CHEAP_UNIT_FEE)], diff --git a/src/aleph/chains/tezos.py b/src/aleph/chains/tezos.py index 74ca543a4..5e29dd197 100644 --- a/src/aleph/chains/tezos.py +++ b/src/aleph/chains/tezos.py @@ -11,7 +11,7 @@ from nacl.exceptions import BadSignatureError import aleph.toolkit.json as aleph_json -from aleph.chains.chaindata import ChainDataService +from aleph.chains.chain_data_service import ChainDataService from aleph.chains.common import get_verification_buffer from aleph.chains.abc import Verifier, ChainReader from aleph.db.accessors.chains import get_last_height, upsert_chain_sync_status diff --git a/src/aleph/jobs/process_pending_txs.py b/src/aleph/jobs/process_pending_txs.py index 76d2912cf..e1f7e9e2d 100644 --- a/src/aleph/jobs/process_pending_txs.py +++ b/src/aleph/jobs/process_pending_txs.py @@ -11,7 +11,7 @@ from sqlalchemy import delete from ..chains.signature_verifier import SignatureVerifier -from aleph.chains.chaindata import ChainDataService +from aleph.chains.chain_data_service import ChainDataService from aleph.db.accessors.pending_txs import get_pending_txs from aleph.db.connection import make_engine, make_session_factory from aleph.db.models.pending_txs import PendingTxDb diff --git a/src/aleph/schemas/chains/sync_events.py b/src/aleph/schemas/chains/sync_events.py new file mode 100644 index 000000000..1a25b17de --- /dev/null +++ b/src/aleph/schemas/chains/sync_events.py @@ -0,0 +1,52 @@ +from typing import Literal, Optional, List, Union, Annotated + +from aleph_message.models import ItemHash, ItemType, Chain, MessageType +from pydantic import BaseModel, Field, validator + +from aleph.types.chain_sync import ChainSyncProtocol +from aleph.types.channel import Channel +import datetime as dt + + +class OnChainMessage(BaseModel): + class Config: + orm_mode = True + + sender: str + chain: Chain + signature: Optional[str] + type: MessageType + item_content: Optional[str] + item_type: ItemType + item_hash: ItemHash + time: float + channel: Optional[Channel] = None + + @validator("time", pre=True) + def check_time(cls, v, values): + if isinstance(v, dt.datetime): + return v.timestamp() + + return v + + +class OnChainContent(BaseModel): + messages: List[OnChainMessage] + + +class OnChainSyncEventPayload(BaseModel): + protocol: Literal[ChainSyncProtocol.ON_CHAIN_SYNC] + version: int + content: OnChainContent + + +class OffChainSyncEventPayload(BaseModel): + protocol: Literal[ChainSyncProtocol.OFF_CHAIN_SYNC] + version: int + content: str + + +SyncEventPayload = Annotated[ + Union[OnChainSyncEventPayload, OffChainSyncEventPayload], + Field(discriminator="protocol"), +] diff --git a/src/aleph/storage.py b/src/aleph/storage.py index 983d94366..f183fa9c4 100644 --- a/src/aleph/storage.py +++ b/src/aleph/storage.py @@ -5,7 +5,6 @@ import logging from hashlib import sha256 from typing import Any, IO, Optional, cast, Final -from aiohttp import web from aleph_message.models import ItemType @@ -20,13 +19,9 @@ from aleph.services.ipfs.common import get_cid_version from aleph.services.p2p.http import request_hash as p2p_http_request_hash from aleph.services.storage.engine import StorageEngine -from aleph.toolkit.constants import MiB from aleph.types.db_session import DbSession from aleph.types.files import FileType from aleph.utils import get_sha256 -from aleph.schemas.pending_messages import ( - parse_message, -) LOGGER = logging.getLogger(__name__) @@ -144,7 +139,9 @@ async def _verify_content_hash( ) -> None: """ Checks that the hash of a content we fetched from the network matches the expected hash. - :return: True if the hashes match, False otherwise. + Raises an exception if the content does not match the expected hash. + :raises InvalidContent: The computed hash does not match. + :raises ContentCurrentlyUnavailable: The hash cannot be computed at this time. """ config = get_config() ipfs_enabled = config.ipfs.enabled.value diff --git a/tests/chains/test_chain_data_service.py b/tests/chains/test_chain_data_service.py index a1d0feef1..8073e231c 100644 --- a/tests/chains/test_chain_data_service.py +++ b/tests/chains/test_chain_data_service.py @@ -1,13 +1,71 @@ -import pytest -from aleph_message.models import Chain, StoreContent, MessageType, ItemType, PostContent +from typing import IO -from aleph.chains.chaindata import ChainDataService -from aleph.db.models import ChainTxDb +import pytest +from aleph_message.models import ( + Chain, + StoreContent, + MessageType, + ItemType, + PostContent, + ItemHash, +) + +from aleph.chains.chain_data_service import ChainDataService +from aleph.db.models import ChainTxDb, MessageDb +from aleph.schemas.chains.sync_events import OnChainSyncEventPayload from aleph.schemas.chains.tezos_indexer_response import MessageEventPayload from aleph.schemas.pending_messages import parse_message from aleph.toolkit.timestamp import timestamp_to_datetime from aleph.types.chain_sync import ChainSyncProtocol -from aleph.types.db_session import DbSessionFactory +from aleph.types.db_session import DbSessionFactory, DbSession +import datetime as dt + + +@pytest.mark.asyncio +async def test_prepare_sync_event_payload(mocker): + archive_cid = "Qmsomething" + + messages = [ + MessageDb( + item_hash=ItemHash( + "abe22332402a5c45f20491b719b091fd0d7eab65ca1bcf4746840b787dee874b" + ), + type=MessageType.store, + chain=Chain.ETH, + sender="0x0dAd142fDD76A817CD52a700EaCA2D9D3491086B", + signature="0x813f0be4ddd852e7f0c723ac95333be762d80690fe0fc0705ec0e1b7df7fa92d5cdbfba8ab0321aee8769c93f7bc5dc9d1268cb66e8cb453a6b8299ba3faac771b", + item_type=ItemType.inline, + item_content='{"address":"0x0dAd142fDD76A817CD52a700EaCA2D9D3491086B","time":1697718147.2695966,"item_type":"storage","item_hash":"ecbfcb9f92291b9385772c9b5cd094788f928ccb696ad1ecbf179a4e308e4350","mime_type":"application/octet-stream"}', + time=dt.datetime(2023, 10, 19, 12, 22, 27, 269707, tzinfo=dt.timezone.utc), + channel="TEST", + ) + ] + + async def mock_add_file( + session: DbSession, fileobject: IO, engine: ItemType = ItemType.ipfs + ) -> str: + content = fileobject.read() + archive = OnChainSyncEventPayload.parse_raw(content) + + assert archive.version == 1 + assert len(archive.content.messages) == len(messages) + # Check that the time field was converted + assert archive.content.messages[0].time == messages[0].time.timestamp() + + return archive_cid + + storage_service = mocker.AsyncMock() + storage_service.add_file = mock_add_file + chain_data_service = ChainDataService( + session_factory=mocker.MagicMock(), storage_service=storage_service + ) + + sync_event_payload = await chain_data_service.prepare_sync_event_payload( + session=mocker.MagicMock(), messages=messages + ) + assert sync_event_payload.protocol == ChainSyncProtocol.OFF_CHAIN_SYNC + assert sync_event_payload.version == 1 + assert sync_event_payload.content == archive_cid @pytest.mark.asyncio diff --git a/tests/message_processing/test_process_pending_txs.py b/tests/message_processing/test_process_pending_txs.py index de0a258c8..602a3c0a3 100644 --- a/tests/message_processing/test_process_pending_txs.py +++ b/tests/message_processing/test_process_pending_txs.py @@ -8,7 +8,7 @@ from sqlalchemy import select from aleph.chains.signature_verifier import SignatureVerifier -from aleph.chains.chaindata import ChainDataService +from aleph.chains.chain_data_service import ChainDataService from aleph.db.models import PendingMessageDb, MessageStatusDb from aleph.db.models.chains import ChainTxDb from aleph.db.models.pending_txs import PendingTxDb