From 88057a4dc19570ebc47980225dba3cb558c9c37c Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Thu, 19 Oct 2023 12:14:23 +0200 Subject: [PATCH] Internal: split chain service Problem: the chain service is in charge of providing all chain-related functionalities, namely: verify signature and read/write on chain. It turns out that most use cases of the chain service only care about signature verification, meaning that the dependencies required for the chain service are not necessary most of the time. As we plan on adding more dependencies for chain readers, the chain service is getting too complex for its own good. Solution: split the chain service into a signature verifier class and a chain connector class. This way, the signature verifier can be instantiated without dependencies in all the places where it is required and the chain connector can get a bit more complex without impacting the rest. Other changes: * Removed the conditional import of chain connectors as they must all be installed on each node. --- src/aleph/api_entrypoint.py | 11 +- src/aleph/chains/abc.py | 24 +++ src/aleph/chains/avalanche.py | 2 +- src/aleph/chains/bsc.py | 2 +- src/aleph/chains/chain_service.py | 189 ------------------ src/aleph/chains/connector.py | 119 +++++++++-- src/aleph/chains/cosmos.py | 2 +- src/aleph/chains/ethereum.py | 34 ++-- src/aleph/chains/nuls.py | 2 +- src/aleph/chains/nuls2.py | 30 ++- src/aleph/chains/signature_verifier.py | 44 ++++ src/aleph/chains/solana.py | 2 +- src/aleph/chains/substrate.py | 2 +- src/aleph/chains/tezos.py | 21 +- src/aleph/commands.py | 6 +- src/aleph/handlers/message_handler.py | 9 +- src/aleph/jobs/fetch_pending_messages.py | 8 +- src/aleph/jobs/process_pending_messages.py | 8 +- src/aleph/jobs/process_pending_txs.py | 9 +- src/aleph/network.py | 8 +- .../web/controllers/app_state_getters.py | 9 +- src/aleph/web/controllers/storage.py | 16 +- tests/api/conftest.py | 11 +- tests/api/test_storage.py | 11 +- tests/chains/test_common.py | 4 +- tests/chains/test_confirmation.py | 12 +- tests/chains/test_nuls2.py | 9 +- tests/chains/test_tezos.py | 20 +- tests/message_processing/conftest.py | 10 +- .../test_process_aggregates.py | 5 +- .../test_process_pending_txs.py | 7 +- .../message_processing/test_process_stores.py | 6 +- tests/test_network.py | 27 +-- 33 files changed, 314 insertions(+), 365 deletions(-) create mode 100644 src/aleph/chains/abc.py delete mode 100644 src/aleph/chains/chain_service.py create mode 100644 src/aleph/chains/signature_verifier.py diff --git a/src/aleph/api_entrypoint.py b/src/aleph/api_entrypoint.py index 2582bf206..394dedba5 100644 --- a/src/aleph/api_entrypoint.py +++ b/src/aleph/api_entrypoint.py @@ -6,7 +6,7 @@ from configmanager import Config import aleph.config -from aleph.chains.chain_service import ChainService +from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.connection import make_engine, make_session_factory from aleph.services.cache.node_cache import NodeCache from aleph.services.ipfs import IpfsService @@ -25,7 +25,7 @@ APP_STATE_STORAGE_SERVICE, APP_STATE_MQ_CHANNEL, APP_STATE_MQ_WS_CHANNEL, - APP_STATE_CHAIN_SERVICE, + APP_STATE_SIGNATURE_VERIFIER, ) @@ -53,9 +53,7 @@ async def configure_aiohttp_app( ipfs_service=ipfs_service, node_cache=node_cache, ) - chain_service = ChainService( - storage_service=storage_service, session_factory=session_factory - ) + signature_verifier = SignatureVerifier() app = create_aiohttp_app() @@ -74,7 +72,7 @@ async def configure_aiohttp_app( app[APP_STATE_NODE_CACHE] = node_cache app[APP_STATE_STORAGE_SERVICE] = storage_service app[APP_STATE_SESSION_FACTORY] = session_factory - app[APP_STATE_CHAIN_SERVICE] = chain_service + app[APP_STATE_SIGNATURE_VERIFIER] = signature_verifier return app @@ -95,5 +93,4 @@ async def create_app() -> web.Application: if __name__ == "__main__": - import asyncio web.run_app(create_app(), host="127.0.0.1", port=8000) diff --git a/src/aleph/chains/abc.py b/src/aleph/chains/abc.py new file mode 100644 index 000000000..949d25536 --- /dev/null +++ b/src/aleph/chains/abc.py @@ -0,0 +1,24 @@ +import abc + +from configmanager import Config + +from aleph.schemas.pending_messages import BasePendingMessage +from aleph.types.chain_sync import ChainEventType + + +class Verifier(abc.ABC): + @abc.abstractmethod + async def verify_signature(self, message: BasePendingMessage) -> bool: + ... + + +class ChainReader(abc.ABC): + @abc.abstractmethod + async def fetcher(self, config: Config): + ... + + +class ChainWriter(ChainReader): + @abc.abstractmethod + async def packer(self, config: Config): + ... diff --git a/src/aleph/chains/avalanche.py b/src/aleph/chains/avalanche.py index 2baed7260..a872cef1e 100644 --- a/src/aleph/chains/avalanche.py +++ b/src/aleph/chains/avalanche.py @@ -8,7 +8,7 @@ from aleph.chains.common import get_verification_buffer from aleph.schemas.pending_messages import BasePendingMessage -from .connector import Verifier +from .abc import Verifier LOGGER = logging.getLogger("chains.avalanche") CHAIN_NAME = "AVAX" diff --git a/src/aleph/chains/bsc.py b/src/aleph/chains/bsc.py index 111a8cbd8..1e8b14158 100644 --- a/src/aleph/chains/bsc.py +++ b/src/aleph/chains/bsc.py @@ -2,7 +2,7 @@ from configmanager import Config from aleph.chains.chaindata import ChainDataService -from aleph.chains.connector import ChainReader +from aleph.chains.abc import ChainReader from aleph.chains.indexer_reader import AlephIndexerReader from aleph.types.chain_sync import ChainEventType from aleph.types.db_session import DbSessionFactory diff --git a/src/aleph/chains/chain_service.py b/src/aleph/chains/chain_service.py deleted file mode 100644 index a2b78d991..000000000 --- a/src/aleph/chains/chain_service.py +++ /dev/null @@ -1,189 +0,0 @@ -import asyncio -import logging -from typing import Dict - -from aleph_message.models import Chain -from configmanager import Config - -from aleph.schemas.pending_messages import BasePendingMessage -from aleph.storage import StorageService -from aleph.types.db_session import DbSessionFactory -from aleph.types.message_status import InvalidMessageFormat, InvalidSignature -from .chaindata import ChainDataService -from .connector import ChainConnector, ChainReader, ChainWriter, Verifier - -LOGGER = logging.getLogger(__name__) - - -class ChainService: - connectors: Dict[Chain, ChainConnector] - verifiers: Dict[Chain, Verifier] - readers: Dict[Chain, ChainReader] - writers: Dict[Chain, ChainWriter] - - def __init__( - self, session_factory: DbSessionFactory, storage_service: StorageService - ): - - self._session_factory = session_factory - - self.connectors = {} - self.verifiers = {} - self.readers = {} - self.writers = {} - - self._chain_data_service = ChainDataService( - session_factory=session_factory, storage_service=storage_service - ) - - self._register_chains() - - async def verify_signature(self, message: BasePendingMessage) -> None: - try: - verifier = self.verifiers[message.chain] - except KeyError: - raise InvalidMessageFormat(f"Unknown chain for validation: {message.chain}") - - try: - if await verifier.verify_signature(message): - return - else: - raise InvalidSignature("The signature of the message is invalid") - - except ValueError as e: - raise InvalidSignature(f"Signature validation error: {str(e)}") - - async def chain_reader_task(self, chain: Chain, config: Config): - connector = self.readers[chain] - - while True: - try: - LOGGER.info("Fetching on-chain data...") - await connector.fetcher(config) - except Exception: - LOGGER.exception( - "Chain reader task for %s failed, retrying in 60 seconds.", chain - ) - - await asyncio.sleep(60) - - async def chain_writer_task(self, chain: Chain, config: Config): - connector = self.writers[chain] - - while True: - try: - await connector.packer(config) - except Exception: - LOGGER.exception( - "Chain writer task for %s failed, relaunching in 10 seconds.", chain - ) - await asyncio.sleep(10) - - async def chain_event_loop(self, config: Config): - listener_tasks = [] - publisher_tasks = [] - - if config.bsc.enabled.value: - listener_tasks.append(self.chain_reader_task(Chain.BSC, config)) - - if config.ethereum.enabled.value: - listener_tasks.append(self.chain_reader_task(Chain.ETH, config)) - if config.ethereum.packing_node.value: - publisher_tasks.append(self.chain_writer_task(Chain.ETH, config)) - - if config.tezos.enabled.value: - listener_tasks.append(self.chain_reader_task(Chain.TEZOS, config)) - - await asyncio.gather(*(listener_tasks + publisher_tasks)) - - def _add_chain(self, chain: Chain, connector: ChainConnector): - self.connectors[chain] = connector - - if isinstance(connector, Verifier): - self.verifiers[chain] = connector - if isinstance(connector, ChainReader): - self.readers[chain] = connector - if isinstance(connector, ChainWriter): - self.writers[chain] = connector - - def _register_chains(self): - try: - from .avalanche import AvalancheConnector - - self._add_chain(Chain.AVAX, AvalancheConnector()) - except ModuleNotFoundError as error: - LOGGER.warning("Can't load AVAX: %s", error.msg) - - try: - from .bsc import BscConnector - - self._add_chain( - Chain.BSC, - BscConnector( - session_factory=self._session_factory, - chain_data_service=self._chain_data_service, - ), - ) - except ModuleNotFoundError as error: - LOGGER.warning("Can't load: BSC: %s", error.msg) - - try: - from .nuls import NulsConnector - - self._add_chain(Chain.NULS, NulsConnector()) - except ModuleNotFoundError as error: - LOGGER.warning("Can't load NULS: %s", error.msg) - try: - from .nuls2 import Nuls2Connector - - self._add_chain( - Chain.NULS2, - Nuls2Connector( - session_factory=self._session_factory, - chain_data_service=self._chain_data_service, - ), - ) - except ModuleNotFoundError as error: - LOGGER.warning("Can't load NULS2: %s", error.msg) - try: - from .ethereum import EthereumConnector - - self._add_chain( - Chain.ETH, - EthereumConnector( - session_factory=self._session_factory, - chain_data_service=self._chain_data_service, - ), - ) - except ModuleNotFoundError as error: - LOGGER.warning("Can't load ETH: %s", error.msg) - try: - from .substrate import SubstrateConnector - - self._add_chain(Chain.DOT, SubstrateConnector()) - except (ModuleNotFoundError, ImportError) as error: - LOGGER.warning("Can't load DOT: %s", error.msg) - try: - from .cosmos import CosmosConnector - - self._add_chain(Chain.CSDK, CosmosConnector()) - except ModuleNotFoundError as error: - LOGGER.warning("Can't load CSDK: %s", error.msg) - try: - from .solana import SolanaConnector - - self._add_chain(Chain.SOL, SolanaConnector()) - except ModuleNotFoundError as error: - LOGGER.warning("Can't load SOL: %s", error.msg) - try: - from .tezos import TezosConnector - - self._add_chain( - Chain.TEZOS, - TezosConnector( - session_factory=self._session_factory, - chain_data_service=self._chain_data_service, - ), - ) - except ModuleNotFoundError as error: - LOGGER.warning("Can't load Tezos: %s", error.msg) diff --git a/src/aleph/chains/connector.py b/src/aleph/chains/connector.py index bddcd1080..7612d90d1 100644 --- a/src/aleph/chains/connector.py +++ b/src/aleph/chains/connector.py @@ -1,28 +1,115 @@ -import abc +import asyncio +import logging +from typing import Dict, Union +from aleph_message.models import Chain from configmanager import Config -from aleph.schemas.pending_messages import BasePendingMessage -from aleph.types.chain_sync import ChainEventType +from aleph.storage import StorageService +from aleph.types.db_session import DbSessionFactory +from .bsc import BscConnector +from .chaindata import ChainDataService +from .abc import ChainReader, ChainWriter +from .ethereum import EthereumConnector +from .nuls2 import Nuls2Connector +from .tezos import TezosConnector + +LOGGER = logging.getLogger(__name__) class ChainConnector: - ... + readers: Dict[Chain, ChainReader] + writers: Dict[Chain, ChainWriter] + + def __init__( + self, session_factory: DbSessionFactory, storage_service: StorageService + ): + self._session_factory = session_factory + + self.readers = {} + self.writers = {} + + self._chain_data_service = ChainDataService( + session_factory=session_factory, storage_service=storage_service + ) + + self._register_chains() + + async def chain_reader_task(self, chain: Chain, config: Config): + connector = self.readers[chain] + + while True: + try: + LOGGER.info("Fetching on-chain data...") + await connector.fetcher(config) + except Exception: + LOGGER.exception( + "Chain reader task for %s failed, retrying in 60 seconds.", chain + ) + + await asyncio.sleep(60) + + async def chain_writer_task(self, chain: Chain, config: Config): + connector = self.writers[chain] + + while True: + try: + await connector.packer(config) + except Exception: + LOGGER.exception( + "Chain writer task for %s failed, relaunching in 10 seconds.", chain + ) + await asyncio.sleep(10) + + async def chain_event_loop(self, config: Config): + listener_tasks = [] + publisher_tasks = [] + if config.bsc.enabled.value: + listener_tasks.append(self.chain_reader_task(Chain.BSC, config)) -class Verifier(abc.ABC, ChainConnector): - @abc.abstractmethod - async def verify_signature(self, message: BasePendingMessage) -> bool: - ... + if config.ethereum.enabled.value: + listener_tasks.append(self.chain_reader_task(Chain.ETH, config)) + if config.ethereum.packing_node.value: + publisher_tasks.append(self.chain_writer_task(Chain.ETH, config)) + if config.tezos.enabled.value: + listener_tasks.append(self.chain_reader_task(Chain.TEZOS, config)) -class ChainReader(abc.ABC, ChainConnector): - @abc.abstractmethod - async def fetcher(self, config: Config): - ... + await asyncio.gather(*(listener_tasks + publisher_tasks)) + def _add_chain(self, chain: Chain, connector: Union[ChainReader, ChainWriter]): + if isinstance(connector, ChainReader): + self.readers[chain] = connector + if isinstance(connector, ChainWriter): + self.writers[chain] = connector -class ChainWriter(ChainReader): - @abc.abstractmethod - async def packer(self, config: Config): - ... + def _register_chains(self): + self._add_chain( + Chain.BSC, + BscConnector( + session_factory=self._session_factory, + chain_data_service=self._chain_data_service, + ), + ) + self._add_chain( + Chain.NULS2, + Nuls2Connector( + session_factory=self._session_factory, + chain_data_service=self._chain_data_service, + ), + ) + self._add_chain( + Chain.ETH, + EthereumConnector( + session_factory=self._session_factory, + chain_data_service=self._chain_data_service, + ), + ) + self._add_chain( + Chain.TEZOS, + TezosConnector( + session_factory=self._session_factory, + chain_data_service=self._chain_data_service, + ), + ) diff --git a/src/aleph/chains/cosmos.py b/src/aleph/chains/cosmos.py index 0010a549c..9be11b1b3 100644 --- a/src/aleph/chains/cosmos.py +++ b/src/aleph/chains/cosmos.py @@ -10,7 +10,7 @@ from aleph.chains.common import get_verification_buffer from aleph.schemas.pending_messages import BasePendingMessage -from .connector import Verifier +from .abc import Verifier LOGGER = logging.getLogger("chains.cosmos") CHAIN_NAME = "CSDK" diff --git a/src/aleph/chains/ethereum.py b/src/aleph/chains/ethereum.py index 757233b32..eaa53b0b4 100644 --- a/src/aleph/chains/ethereum.py +++ b/src/aleph/chains/ethereum.py @@ -28,7 +28,7 @@ from aleph.types.db_session import DbSessionFactory from aleph.utils import run_in_executor from .chaindata import ChainDataService -from .connector import ChainWriter, Verifier +from .abc import ChainWriter, Verifier, ChainReader from .indexer_reader import AlephIndexerReader from ..db.models import ChainTxDb from ..types.chain_sync import ChainEventType @@ -67,21 +67,7 @@ def get_logs_query(web3: Web3, contract, start_height, end_height): ) -class EthereumConnector(Verifier, ChainWriter): - def __init__( - self, - session_factory: DbSessionFactory, - chain_data_service: ChainDataService, - ): - self.session_factory = session_factory - self.chain_data_service = chain_data_service - - self.indexer_reader = AlephIndexerReader( - chain=Chain.ETH, - session_factory=session_factory, - chain_data_service=chain_data_service, - ) - +class EthereumVerifier(Verifier): async def verify_signature(self, message: BasePendingMessage) -> bool: """Verifies a signature of a message, return True if verified, false if not""" @@ -114,6 +100,22 @@ async def verify_signature(self, message: BasePendingMessage) -> bool: return verified + +class EthereumConnector(ChainWriter): + def __init__( + self, + session_factory: DbSessionFactory, + chain_data_service: ChainDataService, + ): + self.session_factory = session_factory + self.chain_data_service = chain_data_service + + self.indexer_reader = AlephIndexerReader( + chain=Chain.ETH, + session_factory=session_factory, + chain_data_service=chain_data_service, + ) + async def get_last_height(self, sync_type: ChainEventType) -> int: """Returns the last height for which we already have the ethereum data.""" with self.session_factory() as session: diff --git a/src/aleph/chains/nuls.py b/src/aleph/chains/nuls.py index f48e8a192..8cb0744f7 100644 --- a/src/aleph/chains/nuls.py +++ b/src/aleph/chains/nuls.py @@ -11,7 +11,7 @@ from aleph.chains.common import get_verification_buffer from aleph.schemas.pending_messages import BasePendingMessage from aleph.utils import run_in_executor -from .connector import Verifier +from .abc import Verifier LOGGER = logging.getLogger("chains.nuls") CHAIN_NAME = "NULS" diff --git a/src/aleph/chains/nuls2.py b/src/aleph/chains/nuls2.py index 020ab8efd..9387ee7f3 100644 --- a/src/aleph/chains/nuls2.py +++ b/src/aleph/chains/nuls2.py @@ -31,7 +31,7 @@ from aleph.types.db_session import DbSessionFactory from aleph.utils import run_in_executor from .chaindata import ChainDataService -from .connector import Verifier, ChainWriter +from .abc import Verifier, ChainWriter from aleph.schemas.chains.tx_context import TxContext from ..db.models import ChainTxDb from ..types.chain_sync import ChainEventType @@ -40,13 +40,7 @@ CHAIN_NAME = "NULS2" -class Nuls2Connector(Verifier, ChainWriter): - def __init__( - self, session_factory: DbSessionFactory, chain_data_service: ChainDataService - ): - self.session_factory = session_factory - self.chain_data_service = chain_data_service - +class Nuls2Verifier(Verifier): async def verify_signature(self, message: BasePendingMessage) -> bool: """Verifies a signature of a message, return True if verified, false if not""" @@ -81,10 +75,20 @@ async def verify_signature(self, message: BasePendingMessage) -> bool: else: return True + +class Nuls2Connector(ChainWriter): + def __init__( + self, session_factory: DbSessionFactory, chain_data_service: ChainDataService + ): + self.session_factory = session_factory + self.chain_data_service = chain_data_service + async def get_last_height(self, sync_type: ChainEventType) -> int: """Returns the last height for which we already have the nuls data.""" with self.session_factory() as session: - last_height = get_last_height(session=session, chain=Chain.NULS2, sync_type=sync_type) + last_height = get_last_height( + session=session, chain=Chain.NULS2, sync_type=sync_type + ) if last_height is None: last_height = -1 @@ -140,7 +144,9 @@ async def fetcher(self, config: Config): LOGGER.info("Last block is #%d" % last_stored_height) async with aiohttp.ClientSession() as http_session: while True: - last_stored_height = await self.get_last_height(sync_type=ChainEventType.SYNC) + last_stored_height = await self.get_last_height( + sync_type=ChainEventType.SYNC + ) async for jdata, context in self._request_transactions( config, http_session, last_stored_height + 1 ): @@ -191,7 +197,9 @@ async def packer(self, config: Config): if len(messages): # This function prepares a chain data file and makes it downloadable from the node. - content = await self.chain_data_service.get_chaindata(session=session, messages=messages) + content = await self.chain_data_service.get_chaindata( + session=session, messages=messages + ) # Required to apply update to the files table in get_chaindata session.commit() diff --git a/src/aleph/chains/signature_verifier.py b/src/aleph/chains/signature_verifier.py new file mode 100644 index 000000000..6cbba9887 --- /dev/null +++ b/src/aleph/chains/signature_verifier.py @@ -0,0 +1,44 @@ +from typing import Dict + +from aleph_message.models import Chain + +from aleph.chains.avalanche import AvalancheConnector +from aleph.chains.abc import Verifier +from aleph.chains.ethereum import EthereumVerifier +from aleph.chains.nuls import NulsConnector +from aleph.chains.nuls2 import Nuls2Verifier +from aleph.chains.solana import SolanaConnector +from aleph.chains.substrate import SubstrateConnector +from aleph.chains.tezos import TezosVerifier +from aleph.schemas.pending_messages import BasePendingMessage +from aleph.types.message_status import InvalidMessageFormat, InvalidSignature + + +class SignatureVerifier: + verifiers: Dict[Chain, Verifier] + + def __init__(self): + self.verifiers = { + Chain.AVAX: AvalancheConnector(), + Chain.DOT: SubstrateConnector(), + Chain.ETH: EthereumVerifier(), + Chain.NULS: NulsConnector(), + Chain.NULS2: Nuls2Verifier(), + Chain.SOL: SolanaConnector(), + Chain.TEZOS: TezosVerifier(), + } + + async def verify_signature(self, message: BasePendingMessage) -> None: + try: + verifier = self.verifiers[message.chain] + except KeyError: + raise InvalidMessageFormat(f"Unknown chain for validation: {message.chain}") + + try: + if await verifier.verify_signature(message): + return + else: + raise InvalidSignature("The signature of the message is invalid") + + except ValueError as e: + raise InvalidSignature(f"Signature validation error: {str(e)}") diff --git a/src/aleph/chains/solana.py b/src/aleph/chains/solana.py index 9812d37ee..2fad61e6f 100644 --- a/src/aleph/chains/solana.py +++ b/src/aleph/chains/solana.py @@ -7,7 +7,7 @@ from aleph.chains.common import get_verification_buffer from aleph.schemas.pending_messages import BasePendingMessage -from .connector import Verifier +from .abc import Verifier LOGGER = logging.getLogger("chains.solana") CHAIN_NAME = "SOL" diff --git a/src/aleph/chains/substrate.py b/src/aleph/chains/substrate.py index 0cd63c0c0..76288844d 100644 --- a/src/aleph/chains/substrate.py +++ b/src/aleph/chains/substrate.py @@ -5,7 +5,7 @@ from aleph.chains.common import get_verification_buffer from aleph.schemas.pending_messages import BasePendingMessage -from .connector import Verifier +from .abc import Verifier LOGGER = logging.getLogger("chains.substrate") diff --git a/src/aleph/chains/tezos.py b/src/aleph/chains/tezos.py index d09dc424e..74ca543a4 100644 --- a/src/aleph/chains/tezos.py +++ b/src/aleph/chains/tezos.py @@ -13,7 +13,7 @@ import aleph.toolkit.json as aleph_json from aleph.chains.chaindata import ChainDataService from aleph.chains.common import get_verification_buffer -from aleph.chains.connector import Verifier, ChainReader +from aleph.chains.abc import Verifier, ChainReader from aleph.db.accessors.chains import get_last_height, upsert_chain_sync_status from aleph.db.models import PendingMessageDb, ChainTxDb from aleph.schemas.chains.tezos_indexer_response import ( @@ -151,7 +151,6 @@ async def fetch_messages( limit: int, skip: int, ) -> IndexerResponse[IndexerMessageEvent]: - query = make_graphql_query( limit=limit, skip=skip, @@ -169,7 +168,6 @@ async def fetch_messages( def indexer_event_to_chain_tx( indexer_event: IndexerMessageEvent, ) -> ChainTxDb: - chain_tx = ChainTxDb( hash=indexer_event.operation_hash, chain=Chain.TEZOS, @@ -187,18 +185,11 @@ def indexer_event_to_chain_tx( async def extract_aleph_messages_from_indexer_response( indexer_response: IndexerResponse[IndexerMessageEvent], ) -> List[ChainTxDb]: - events = indexer_response.data.events return [indexer_event_to_chain_tx(event) for event in events] -class TezosConnector(Verifier, ChainReader): - def __init__( - self, session_factory: DbSessionFactory, chain_data_service: ChainDataService - ): - self.session_factory = session_factory - self.chain_data_service = chain_data_service - +class TezosVerifier(Verifier): async def verify_signature(self, message: BasePendingMessage) -> bool: """ Verifies the cryptographic signature of a message signed with a Tezos key. @@ -254,6 +245,14 @@ async def verify_signature(self, message: BasePendingMessage) -> bool: return True + +class TezosConnector(ChainReader): + def __init__( + self, session_factory: DbSessionFactory, chain_data_service: ChainDataService + ): + self.session_factory = session_factory + self.chain_data_service = chain_data_service + async def get_last_height(self, sync_type: ChainEventType) -> int: """Returns the last height for which we already have the ethereum data.""" with self.session_factory() as session: diff --git a/src/aleph/commands.py b/src/aleph/commands.py index 99ccdd4fd..182a87f0b 100644 --- a/src/aleph/commands.py +++ b/src/aleph/commands.py @@ -23,7 +23,7 @@ from configmanager import Config import aleph.config -from aleph.chains.chain_service import ChainService +from aleph.chains.connector import ChainConnector from aleph.cli.args import parse_args from aleph.db.connection import make_engine, make_session_factory, make_db_url from aleph.exceptions import InvalidConfigException, KeyNotFoundException @@ -143,7 +143,7 @@ async def main(args: List[str]) -> None: ipfs_service=ipfs_service, node_cache=node_cache, ) - chain_service = ChainService( + chain_connector = ChainConnector( session_factory=session_factory, storage_service=storage_service ) @@ -178,7 +178,7 @@ async def main(args: List[str]) -> None: node_cache=node_cache, p2p_client=p2p_client, ) - tasks.append(chain_service.chain_event_loop(config)) + tasks.append(chain_connector.chain_event_loop(config)) LOGGER.debug("Initialized listeners") LOGGER.debug("Initializing cache tasks") diff --git a/src/aleph/handlers/message_handler.py b/src/aleph/handlers/message_handler.py index 084773620..ea214d7c8 100644 --- a/src/aleph/handlers/message_handler.py +++ b/src/aleph/handlers/message_handler.py @@ -9,7 +9,8 @@ from pydantic import ValidationError from sqlalchemy import insert -from aleph.chains.chain_service import ChainService +from aleph.chains.connector import ChainConnector +from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.files import insert_content_file_pin, upsert_file from aleph.db.accessors.messages import ( get_message_by_item_hash, @@ -58,12 +59,12 @@ class MessageHandler: def __init__( self, session_factory: DbSessionFactory, - chain_service: ChainService, + signature_verifier: SignatureVerifier, storage_service: StorageService, config: Config, ): self.session_factory = session_factory - self.chain_service = chain_service + self._signature_verifier = signature_verifier self.storage_service = storage_service vm_handler = VmMessageHandler() @@ -90,7 +91,7 @@ def get_content_handler(self, message_type: MessageType) -> ContentHandler: async def verify_signature(self, pending_message: PendingMessageDb): if pending_message.check_message: # TODO: remove type: ignore by deciding the pending message type - await self.chain_service.verify_signature(pending_message) # type: ignore + await self._signature_verifier.verify_signature(pending_message) # type: ignore async def fetch_pending_message( self, pending_message: PendingMessageDb diff --git a/src/aleph/jobs/fetch_pending_messages.py b/src/aleph/jobs/fetch_pending_messages.py index 1680d6d09..972ee0075 100644 --- a/src/aleph/jobs/fetch_pending_messages.py +++ b/src/aleph/jobs/fetch_pending_messages.py @@ -16,7 +16,7 @@ from configmanager import Config from setproctitle import setproctitle -from aleph.chains.chain_service import ChainService +from ..chains.signature_verifier import SignatureVerifier from aleph.db.accessors.pending_messages import ( make_pending_message_fetched_statement, get_next_pending_messages, @@ -172,12 +172,10 @@ async def fetch_messages_task(config: Config): ipfs_service=ipfs_service, node_cache=node_cache, ) - chain_service = ChainService( - session_factory=session_factory, storage_service=storage_service - ) + signature_verifier = SignatureVerifier() message_handler = MessageHandler( session_factory=session_factory, - chain_service=chain_service, + signature_verifier=signature_verifier, storage_service=storage_service, config=config, ) diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index cf0292636..138728db8 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -14,7 +14,7 @@ from setproctitle import setproctitle import aleph.toolkit.json as aleph_json -from aleph.chains.chain_service import ChainService +from ..chains.signature_verifier import SignatureVerifier from aleph.db.accessors.pending_messages import get_next_pending_message from aleph.db.connection import make_engine, make_session_factory from aleph.handlers.message_handler import MessageHandler @@ -152,12 +152,10 @@ async def fetch_and_process_messages_task(config: Config): ipfs_service=ipfs_service, node_cache=node_cache, ) - chain_service = ChainService( - session_factory=session_factory, storage_service=storage_service - ) + signature_verifier = SignatureVerifier() message_handler = MessageHandler( session_factory=session_factory, - chain_service=chain_service, + signature_verifier=signature_verifier, storage_service=storage_service, config=config, ) diff --git a/src/aleph/jobs/process_pending_txs.py b/src/aleph/jobs/process_pending_txs.py index 8da65b1c6..76d2912cf 100644 --- a/src/aleph/jobs/process_pending_txs.py +++ b/src/aleph/jobs/process_pending_txs.py @@ -10,7 +10,7 @@ from setproctitle import setproctitle from sqlalchemy import delete -from aleph.chains.chain_service import ChainService +from ..chains.signature_verifier import SignatureVerifier from aleph.chains.chaindata import ChainDataService from aleph.db.accessors.pending_txs import get_pending_txs from aleph.db.connection import make_engine, make_session_factory @@ -134,13 +134,10 @@ async def handle_txs_task(config: Config): ipfs_service=ipfs_service, node_cache=node_cache, ) - chain_service = ChainService( - session_factory=session_factory, storage_service=storage_service - ) - + signature_verifier = SignatureVerifier() message_handler = MessageHandler( session_factory=session_factory, - chain_service=chain_service, + signature_verifier=signature_verifier, storage_service=storage_service, config=config, ) diff --git a/src/aleph/network.py b/src/aleph/network.py index d30dc2bc1..18e28a870 100644 --- a/src/aleph/network.py +++ b/src/aleph/network.py @@ -5,7 +5,7 @@ from aleph_p2p_client import AlephP2PServiceClient import aleph.toolkit.json as aleph_json -from aleph.chains.chain_service import ChainService +from aleph.chains.signature_verifier import SignatureVerifier from aleph.handlers.message_handler import MessageHandler from aleph.services.cache.node_cache import NodeCache from aleph.services.ipfs import IpfsService @@ -52,12 +52,10 @@ def listener_tasks( ipfs_service=ipfs_service, node_cache=node_cache, ) - chain_service = ChainService( - session_factory=session_factory, storage_service=storage_service - ) + signature_verifier = SignatureVerifier() message_handler = MessageHandler( session_factory=session_factory, - chain_service=chain_service, + signature_verifier=signature_verifier, storage_service=storage_service, config=config, ) diff --git a/src/aleph/web/controllers/app_state_getters.py b/src/aleph/web/controllers/app_state_getters.py index 0b4b3d65b..27b75a624 100644 --- a/src/aleph/web/controllers/app_state_getters.py +++ b/src/aleph/web/controllers/app_state_getters.py @@ -11,7 +11,8 @@ from aleph_p2p_client import AlephP2PServiceClient from configmanager import Config -from aleph.chains.chain_service import ChainService +from aleph.chains.connector import ChainConnector +from aleph.chains.signature_verifier import SignatureVerifier from aleph.services.cache.node_cache import NodeCache from aleph.services.ipfs import IpfsService from aleph.storage import StorageService @@ -28,7 +29,7 @@ APP_STATE_P2P_CLIENT = "p2p_client" APP_STATE_SESSION_FACTORY = "session_factory" APP_STATE_STORAGE_SERVICE = "storage_service" -APP_STATE_CHAIN_SERVICE = "chain_service" +APP_STATE_SIGNATURE_VERIFIER = "signature_verifier" T = TypeVar("T") @@ -106,5 +107,5 @@ def get_storage_service_from_request(request: web.Request) -> StorageService: return cast(StorageService, request.app[APP_STATE_STORAGE_SERVICE]) -def get_chain_service_from_request(request: web.Request) -> ChainService: - return cast(ChainService, request.app[APP_STATE_CHAIN_SERVICE]) +def get_signature_verifier_from_request(request: web.Request) -> SignatureVerifier: + return cast(SignatureVerifier, request.app[APP_STATE_SIGNATURE_VERIFIER]) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index cc16c42a7..82426383e 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -11,7 +11,7 @@ from mypy.dmypy_server import MiB from pydantic import ValidationError -from aleph.chains.chain_service import ChainService +from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.balances import get_total_balance from aleph.db.accessors.cost import get_total_cost_for_address from aleph.db.accessors.files import count_file_pins, get_file @@ -32,7 +32,7 @@ get_storage_service_from_request, get_config_from_request, get_mq_channel_from_request, - get_chain_service_from_request, + get_signature_verifier_from_request, ) from aleph.web.controllers.utils import ( mq_make_aleph_message_topic_queue, @@ -84,10 +84,10 @@ async def add_storage_json_controller(request: web.Request): async def _verify_message_signature( - pending_message: BasePendingMessage, chain_service: ChainService + pending_message: BasePendingMessage, signature_verifier: SignatureVerifier ) -> None: try: - await chain_service.verify_signature(pending_message) + await signature_verifier.verify_signature(pending_message) except InvalidSignature: raise web.HTTPForbidden() @@ -151,7 +151,7 @@ def size(self) -> int: async def _check_and_add_file( session: DbSession, - chain_service: ChainService, + signature_verifier: SignatureVerifier, storage_service: StorageService, message: Optional[PendingStoreMessage], file: UploadedFile, @@ -159,7 +159,7 @@ async def _check_and_add_file( # Perform authentication and balance checks if message: await _verify_message_signature( - pending_message=message, chain_service=chain_service + pending_message=message, signature_verifier=signature_verifier ) try: message_content = StoreContent.parse_raw(message.item_content) @@ -217,7 +217,7 @@ async def _make_mq_queue( async def storage_add_file(request: web.Request): storage_service = get_storage_service_from_request(request) session_factory = get_session_factory_from_request(request) - chain_service: ChainService = get_chain_service_from_request(request) + signature_verifier = get_signature_verifier_from_request(request) post = await request.post() try: @@ -263,7 +263,7 @@ async def storage_add_file(request: web.Request): with session_factory() as session: file_hash = await _check_and_add_file( session=session, - chain_service=chain_service, + signature_verifier=signature_verifier, storage_service=storage_service, message=message, file=uploaded_file, diff --git a/tests/api/conftest.py b/tests/api/conftest.py index cc7e2a0bb..f2d8e207e 100644 --- a/tests/api/conftest.py +++ b/tests/api/conftest.py @@ -1,3 +1,4 @@ +import datetime as dt import json from pathlib import Path from typing import Any, Dict, Sequence, cast, Tuple @@ -8,7 +9,7 @@ from configmanager import Config from sqlalchemy import insert -from aleph.chains.chain_service import ChainService +from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.aggregates import refresh_aggregate from aleph.db.models import ( MessageDb, @@ -22,8 +23,6 @@ from aleph.storage import StorageService from aleph.toolkit.timestamp import timestamp_to_datetime from aleph.types.db_session import DbSessionFactory -import datetime as dt - from in_memory_storage_engine import InMemoryStorageEngine @@ -205,12 +204,10 @@ def message_processor(mocker, mock_config: Config, session_factory: DbSessionFac ipfs_service=mocker.AsyncMock(), node_cache=mocker.AsyncMock(), ) - chain_service = ChainService( - session_factory=session_factory, storage_service=storage_service - ) + signature_verifier = SignatureVerifier() message_handler = MessageHandler( session_factory=session_factory, - chain_service=chain_service, + signature_verifier=signature_verifier, storage_service=storage_service, config=mock_config, ) diff --git a/tests/api/test_storage.py b/tests/api/test_storage.py index 9be988f52..1d37cfb74 100644 --- a/tests/api/test_storage.py +++ b/tests/api/test_storage.py @@ -9,13 +9,15 @@ import requests from aleph_message.models import ItemHash, Chain -from aleph.chains.chain_service import ChainService +from aleph.chains.connector import ChainConnector +from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.files import get_file from aleph.db.models import AlephBalanceDb from aleph.storage import StorageService from aleph.types.db_session import DbSessionFactory from aleph.types.files import FileType from aleph.types.message_status import MessageStatus +from aleph.web.controllers.app_state_getters import APP_STATE_SIGNATURE_VERIFIER, APP_STATE_STORAGE_SERVICE from aleph.web.controllers.utils import BroadcastStatus, PublicationStatus from in_memory_storage_engine import InMemoryStorageEngine @@ -80,16 +82,13 @@ def api_client(ccn_api_client, mocker): } ) - ccn_api_client.app["storage_service"] = StorageService( + ccn_api_client.app[APP_STATE_STORAGE_SERVICE] = StorageService( storage_engine=InMemoryStorageEngine(files={}), ipfs_service=ipfs_service, node_cache=mocker.AsyncMock(), ) - ccn_api_client.app["chain_service"] = ChainService( - session_factory=ccn_api_client.app["session_factory"], - storage_service=ccn_api_client.app["storage_service"], - ) + ccn_api_client.app[APP_STATE_SIGNATURE_VERIFIER] = SignatureVerifier() return ccn_api_client diff --git a/tests/chains/test_common.py b/tests/chains/test_common.py index 22f13592a..85f7b5298 100644 --- a/tests/chains/test_common.py +++ b/tests/chains/test_common.py @@ -3,7 +3,7 @@ import pytest from aleph_message.models import MessageType, ItemType -from aleph.chains.chain_service import ChainService +from aleph.chains.connector import ChainConnector from aleph.chains.common import get_verification_buffer from aleph.db.models import PendingMessageDb from aleph.schemas.pending_messages import BasePendingMessage, parse_message @@ -41,7 +41,7 @@ async def async_magic(): MagicMock.__await__ = lambda x: async_magic().__await__() - message_processor =MessageHandler(chain_service=ChainService()) + message_processor =MessageHandler(chain_service=ChainConnector()) mocker.patch("aleph.model.db") diff --git a/tests/chains/test_confirmation.py b/tests/chains/test_confirmation.py index 75ab15373..7b21ded25 100644 --- a/tests/chains/test_confirmation.py +++ b/tests/chains/test_confirmation.py @@ -7,7 +7,7 @@ from aleph_message.models import Chain from configmanager import Config -from aleph.chains.chain_service import ChainService +from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.messages import get_message_by_item_hash from aleph.db.models import ChainTxDb, PendingMessageDb from aleph.handlers.message_handler import MessageHandler @@ -71,11 +71,10 @@ async def test_confirm_message( item_hash = MESSAGE_DICT["item_hash"] content = json.loads(MESSAGE_DICT["item_content"]) + signature_verifier = SignatureVerifier() message_handler = MessageHandler( session_factory=session_factory, - chain_service=ChainService( - session_factory=session_factory, storage_service=test_storage_service - ), + signature_verifier=signature_verifier, storage_service=test_storage_service, config=mock_config, ) @@ -135,11 +134,10 @@ async def test_process_confirmed_message( item_hash = MESSAGE_DICT["item_hash"] + signature_verifier = SignatureVerifier() message_handler = MessageHandler( session_factory=session_factory, - chain_service=ChainService( - session_factory=session_factory, storage_service=test_storage_service - ), + signature_verifier=signature_verifier, storage_service=test_storage_service, config=mock_config, ) diff --git a/tests/chains/test_nuls2.py b/tests/chains/test_nuls2.py index 4ad5635c7..26e12120a 100644 --- a/tests/chains/test_nuls2.py +++ b/tests/chains/test_nuls2.py @@ -1,4 +1,4 @@ -from aleph.chains.nuls2 import Nuls2Connector +from aleph.chains.nuls2 import Nuls2Connector, Nuls2Verifier from aleph.schemas.pending_messages import parse_message import pytest @@ -24,9 +24,6 @@ async def test_verify_signature_nuls2(mocker): "item_content": '{"type":"amend","address":"NULSd6HgeZVDvQ2pKQLakAsStYvGAT6WVFu9K","content":{"body":"test","title":"Mutsi Test","private":false},"time":1574266270.022,"ref":"43eef54be4a92c65ca24d3f2419414224129b7944ecaefed088897787aed70b4"}', } - connector = Nuls2Connector( - chain_data_service=mocker.AsyncMock(), session_factory=mocker.Mock() - ) - + verifier = Nuls2Verifier() message = parse_message(message_dict) - assert await connector.verify_signature(message) + assert await verifier.verify_signature(message) diff --git a/tests/chains/test_tezos.py b/tests/chains/test_tezos.py index 11508fa8f..a87865339 100644 --- a/tests/chains/test_tezos.py +++ b/tests/chains/test_tezos.py @@ -12,6 +12,7 @@ TezosConnector, datetime_to_iso_8601, indexer_event_to_chain_tx, + TezosVerifier, ) from aleph.db.models import PendingMessageDb from aleph.schemas.chains.tezos_indexer_response import ( @@ -40,12 +41,9 @@ async def test_tezos_verify_signature_raw(mocker): "type": "test", }, } - connector = TezosConnector( - session_factory=mocker.MagicMock(), chain_data_service=mocker.AsyncMock() - ) - + verifier = TezosVerifier() message = parse_message(message_dict) - assert await connector.verify_signature(message) + assert await verifier.verify_signature(message) @pytest.mark.asyncio @@ -62,12 +60,10 @@ async def test_tezos_verify_signature_raw_ed25519(mocker): "item_hash": "41de1a7766c7e5fad54772470eefde63b6bef8683c4159d9179d74955009deb4", } - connector = TezosConnector( - session_factory=mocker.MagicMock(), chain_data_service=mocker.AsyncMock() - ) + verifier = TezosVerifier() message = parse_message(message_dict) - assert await connector.verify_signature(message) + assert await verifier.verify_signature(message) @pytest.mark.asyncio @@ -82,16 +78,14 @@ async def test_tezos_verify_signature_micheline(mocker): "item_type": "storage", "item_hash": "72b2722b95582419cfa71f631ff6c6afc56344dc6a4609e772877621813040b7", } - connector = TezosConnector( - session_factory=mocker.MagicMock(), chain_data_service=mocker.AsyncMock() - ) + verifier = TezosVerifier() message = PendingMessageDb.from_message_dict( message_dict, reception_time=dt.datetime(2022, 1, 1), fetched=True, ) - assert await connector.verify_signature(message) + assert await verifier.verify_signature(message) def test_datetime_to_iso_8601(): diff --git a/tests/message_processing/conftest.py b/tests/message_processing/conftest.py index c3755e49e..c3b3d513c 100644 --- a/tests/message_processing/conftest.py +++ b/tests/message_processing/conftest.py @@ -7,7 +7,8 @@ import pytest_asyncio from configmanager import Config -from aleph.chains.chain_service import ChainService +from aleph.chains.connector import ChainConnector +from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.models import ChainTxDb, PendingMessageDb from aleph.handlers.message_handler import MessageHandler from aleph.jobs.process_pending_messages import PendingMessageProcessor @@ -80,12 +81,10 @@ def message_processor(mocker, mock_config: Config, session_factory: DbSessionFac ipfs_service=mocker.AsyncMock(), node_cache=mocker.AsyncMock(), ) - chain_service = ChainService( - session_factory=session_factory, storage_service=storage_service - ) + signature_verifier = SignatureVerifier() message_handler = MessageHandler( session_factory=session_factory, - chain_service=chain_service, + signature_verifier=signature_verifier, storage_service=storage_service, config=mock_config, ) @@ -97,4 +96,3 @@ def message_processor(mocker, mock_config: Config, session_factory: DbSessionFac mq_conn=mocker.AsyncMock(), ) return message_processor - diff --git a/tests/message_processing/test_process_aggregates.py b/tests/message_processing/test_process_aggregates.py index 286aa078e..e225fe885 100644 --- a/tests/message_processing/test_process_aggregates.py +++ b/tests/message_processing/test_process_aggregates.py @@ -10,6 +10,7 @@ from sqlalchemy import select from sqlalchemy.orm import selectinload +from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.aggregates import get_aggregate_by_key, get_aggregate_elements from aleph.db.models import PendingMessageDb, MessageDb, AggregateElementDb, AggregateDb from aleph.handlers.content.aggregate import AggregateMessageHandler @@ -35,10 +36,10 @@ async def test_process_aggregate_first_element( ipfs_service=mocker.AsyncMock(), node_cache=mocker.AsyncMock(), ) - chain_service = mocker.AsyncMock() + signature_verifier = SignatureVerifier() message_handler = MessageHandler( session_factory=session_factory, - chain_service=chain_service, + signature_verifier=signature_verifier, storage_service=storage_service, config=mock_config, ) diff --git a/tests/message_processing/test_process_pending_txs.py b/tests/message_processing/test_process_pending_txs.py index cea1e9944..de0a258c8 100644 --- a/tests/message_processing/test_process_pending_txs.py +++ b/tests/message_processing/test_process_pending_txs.py @@ -7,6 +7,7 @@ from configmanager import Config from sqlalchemy import select +from aleph.chains.signature_verifier import SignatureVerifier from aleph.chains.chaindata import ChainDataService from aleph.db.models import PendingMessageDb, MessageStatusDb from aleph.db.models.chains import ChainTxDb @@ -39,12 +40,13 @@ async def test_process_pending_tx_on_chain_protocol( ): chain_data_service = mocker.AsyncMock() chain_data_service.get_tx_messages = get_fixture_chaindata_messages + signature_verifier = SignatureVerifier() pending_tx_processor = PendingTxProcessor( session_factory=session_factory, storage_service=test_storage_service, message_handler=MessageHandler( session_factory=session_factory, - chain_service=mocker.AsyncMock(), + signature_verifier=signature_verifier, storage_service=test_storage_service, config=mock_config, ), @@ -113,12 +115,13 @@ async def _process_smart_contract_tx( chain_data_service = ChainDataService( session_factory=session_factory, storage_service=mocker.AsyncMock() ) + signature_verifier = SignatureVerifier() pending_tx_processor = PendingTxProcessor( session_factory=session_factory, storage_service=test_storage_service, message_handler=MessageHandler( session_factory=session_factory, - chain_service=mocker.AsyncMock(), + signature_verifier=signature_verifier, storage_service=test_storage_service, config=mock_config, ), diff --git a/tests/message_processing/test_process_stores.py b/tests/message_processing/test_process_stores.py index c946f8123..621f81607 100644 --- a/tests/message_processing/test_process_stores.py +++ b/tests/message_processing/test_process_stores.py @@ -6,6 +6,7 @@ from aleph_message.models import Chain, MessageType, ItemType from configmanager import Config +from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.files import get_message_file_pin from aleph.db.accessors.messages import get_message_by_item_hash from aleph.db.models import PendingMessageDb, MessageStatusDb @@ -74,10 +75,11 @@ async def test_process_store( ipfs_service=mocker.AsyncMock(), node_cache=mocker.AsyncMock(), ) - chain_service = mocker.AsyncMock() + # Disable signature verification + signature_verifier = mocker.AsyncMock() message_handler = MessageHandler( session_factory=session_factory, - chain_service=chain_service, + signature_verifier=signature_verifier, storage_service=storage_service, config=mock_config, ) diff --git a/tests/test_network.py b/tests/test_network.py index 6212948a3..88f183540 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -3,7 +3,8 @@ import pytest from configmanager import Config -from aleph.chains.chain_service import ChainService +from aleph.chains.connector import ChainConnector +from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.models import PendingMessageDb from aleph.handlers.message_handler import MessageHandler from aleph.schemas.pending_messages import parse_message @@ -26,11 +27,9 @@ async def test_valid_message(mocker): "signature": "2103041b0b357446927d2c8c62fdddd27910d82f665f16a4907a2be927b5901f5e6c004730450221009a54ecaff6869664e94ad68554520c79c21d4f63822864bd910f9916c32c1b5602201576053180d225ec173fb0b6e4af5efb2dc474ce6aa77a3bdd67fd14e1d806b4", } - chain_service = ChainService( - session_factory=mocker.AsyncMock(), storage_service=mocker.MagicMock() - ) + signature_verifier = SignatureVerifier() sample_message = parse_message(sample_message_dict) - await chain_service.verify_signature(sample_message) + await signature_verifier.verify_signature(sample_message) @pytest.mark.asyncio @@ -63,13 +62,11 @@ async def test_invalid_signature_message(mocker): "signature": "BAR", } - chain_service = ChainService( - session_factory=mocker.AsyncMock(), storage_service=mocker.MagicMock() - ) + signature_verifier = SignatureVerifier() sample_message = parse_message(sample_message_dict) with pytest.raises(InvalidMessageException): - _ = await chain_service.verify_signature(sample_message) + _ = await signature_verifier.verify_signature(sample_message) @pytest.mark.asyncio @@ -84,13 +81,12 @@ async def test_invalid_signature_message_2(mocker): "time": 1563279102.3155158, "signature": "2153041b0b357446927d2c8c62fdddd27910d82f665f16a4907a2be927b5901f5e6c004730450221009a54ecaff6869664e94ad68554525c79c21d4f63822864bd910f9916c32c1b5602201576053180d225ec173fb0b6e4af5efb2dc474ce6aa77a3bdd67fd14e1d806b4", } - chain_service = ChainService( - session_factory=mocker.AsyncMock(), storage_service=mocker.MagicMock() - ) + + signature_verifier = SignatureVerifier() sample_message = parse_message(sample_message_dict) with pytest.raises(InvalidMessageException): - _ = await chain_service.verify_signature(sample_message) + _ = await signature_verifier.verify_signature(sample_message) @pytest.mark.asyncio @@ -111,11 +107,10 @@ async def test_incoming_inline_content( "signature": "21027c108022f992f090bbe5c78ca8822f5b7adceb705ae2cd5318543d7bcdd2a74700473045022100b59f7df5333d57080a93be53b9af74e66a284170ec493455e675eb2539ac21db022077ffc66fe8dde7707038344496a85266bf42af1240017d4e1fa0d7068c588ca7", } + signature_verifier = SignatureVerifier() message_handler = MessageHandler( session_factory=session_factory, - chain_service=ChainService( - session_factory=session_factory, storage_service=test_storage_service - ), + signature_verifier=signature_verifier, storage_service=test_storage_service, config=mock_config, )