Skip to content

Commit

Permalink
Internal: pass chain data service to chain connector constructor (#490)
Browse files Browse the repository at this point in the history
Problem: the chain connector class initializes its own chain data
service, which is not ideal for testing.

Solution: pass a chain data service instance to the constructor of the
chain connector class.
  • Loading branch information
odesenfans committed Oct 27, 2023
1 parent 8dfc22e commit 11bf24a
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 10 deletions.
19 changes: 13 additions & 6 deletions src/aleph/chains/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,32 @@


class ChainConnector:
"""
Service in charge of managing read/write links to blockchains.
This consists mostly of starting fetcher and publisher tasks for each chain and let the chain-specific
code do the heavy lifting.
"""

readers: Dict[Chain, ChainReader]
writers: Dict[Chain, ChainWriter]

def __init__(
self, session_factory: DbSessionFactory, storage_service: StorageService
self, session_factory: DbSessionFactory, chain_data_service: ChainDataService
):
self._session_factory = session_factory
self._chain_data_service = chain_data_service

self.readers = {}
self.writers = {}

self._chain_data_service = ChainDataService(
session_factory=session_factory, storage_service=storage_service
)

self._register_chains()

async def chain_reader_task(self, chain: Chain, config: Config):
connector = self.readers[chain]

while True:
try:
LOGGER.info("Fetching on-chain data...")
LOGGER.info("Fetching on-chain data for %s...", chain)
await connector.fetcher(config)
except Exception:
LOGGER.exception(
Expand All @@ -62,6 +65,10 @@ async def chain_writer_task(self, chain: Chain, config: Config):
await asyncio.sleep(10)

async def chain_event_loop(self, config: Config):
"""
Starts the listener and publisher tasks for all supported chains.
"""

listener_tasks = []
publisher_tasks = []

Expand Down
4 changes: 1 addition & 3 deletions src/aleph/chains/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,7 @@ async def fetch_ethereum_sync_events(self, config: Config):
async def fetch_sync_events_task(self, config: Config):
while True:
try:
with self.session_factory() as session:
await self.fetch_ethereum_sync_events(config)
session.commit()
await self.fetch_ethereum_sync_events(config)
except Exception:
LOGGER.exception(
"An unexpected exception occurred, "
Expand Down
6 changes: 5 additions & 1 deletion src/aleph/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from configmanager import Config

import aleph.config
from aleph.chains.chain_data_service import ChainDataService
from aleph.chains.connector import ChainConnector
from aleph.cli.args import parse_args
from aleph.db.connection import make_engine, make_session_factory, make_db_url
Expand Down Expand Up @@ -136,9 +137,12 @@ async def main(args: List[str]) -> None:
ipfs_service=ipfs_service,
node_cache=node_cache,
)
chain_connector = ChainConnector(
chain_data_service = ChainDataService(
session_factory=session_factory, storage_service=storage_service
)
chain_connector = ChainConnector(
session_factory=session_factory, chain_data_service=chain_data_service
)

set_start_method("spawn")

Expand Down

0 comments on commit 11bf24a

Please sign in to comment.