From 48b3f9082996d0d3f909ec600b68bdb9faa2e29d Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Mon, 23 Oct 2023 14:01:16 +0200 Subject: [PATCH] Internal: pass chain data service to chain connector constructor (#490) Problem: the chain connector class initializes its own chain data service, which is not ideal for testing. Solution: pass a chain data service instance to the constructor of the chain connector class. --- src/aleph/chains/connector.py | 19 +++++++++++++------ src/aleph/chains/ethereum.py | 4 +--- src/aleph/commands.py | 6 +++++- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/src/aleph/chains/connector.py b/src/aleph/chains/connector.py index 4bb091875..825c0e00c 100644 --- a/src/aleph/chains/connector.py +++ b/src/aleph/chains/connector.py @@ -18,21 +18,24 @@ class ChainConnector: + """ + Service in charge of managing read/write links to blockchains. + This consists mostly of starting fetcher and publisher tasks for each chain and let the chain-specific + code do the heavy lifting. + """ + readers: Dict[Chain, ChainReader] writers: Dict[Chain, ChainWriter] def __init__( - self, session_factory: DbSessionFactory, storage_service: StorageService + self, session_factory: DbSessionFactory, chain_data_service: ChainDataService ): self._session_factory = session_factory + self._chain_data_service = chain_data_service self.readers = {} self.writers = {} - self._chain_data_service = ChainDataService( - session_factory=session_factory, storage_service=storage_service - ) - self._register_chains() async def chain_reader_task(self, chain: Chain, config: Config): @@ -40,7 +43,7 @@ async def chain_reader_task(self, chain: Chain, config: Config): while True: try: - LOGGER.info("Fetching on-chain data...") + LOGGER.info("Fetching on-chain data for %s...", chain) await connector.fetcher(config) except Exception: LOGGER.exception( @@ -62,6 +65,10 @@ async def chain_writer_task(self, chain: Chain, config: Config): await asyncio.sleep(10) async def chain_event_loop(self, config: Config): + """ + Starts the listener and publisher tasks for all supported chains. + """ + listener_tasks = [] publisher_tasks = [] diff --git a/src/aleph/chains/ethereum.py b/src/aleph/chains/ethereum.py index 27d29ad87..e63ceeafe 100644 --- a/src/aleph/chains/ethereum.py +++ b/src/aleph/chains/ethereum.py @@ -264,9 +264,7 @@ async def fetch_ethereum_sync_events(self, config: Config): async def fetch_sync_events_task(self, config: Config): while True: try: - with self.session_factory() as session: - await self.fetch_ethereum_sync_events(config) - session.commit() + await self.fetch_ethereum_sync_events(config) except Exception: LOGGER.exception( "An unexpected exception occurred, " diff --git a/src/aleph/commands.py b/src/aleph/commands.py index 73b365629..5e8db0426 100644 --- a/src/aleph/commands.py +++ b/src/aleph/commands.py @@ -23,6 +23,7 @@ from configmanager import Config import aleph.config +from aleph.chains.chain_data_service import ChainDataService from aleph.chains.connector import ChainConnector from aleph.cli.args import parse_args from aleph.db.connection import make_engine, make_session_factory, make_db_url @@ -141,9 +142,12 @@ async def main(args: List[str]) -> None: ipfs_service=ipfs_service, node_cache=node_cache, ) - chain_connector = ChainConnector( + chain_data_service = ChainDataService( session_factory=session_factory, storage_service=storage_service ) + chain_connector = ChainConnector( + session_factory=session_factory, chain_data_service=chain_data_service + ) set_start_method("spawn")