Skip to content

Commit

Permalink
Fix: warning on failure to close node cache properly (#514)
Browse files Browse the repository at this point in the history
Problem: a warning occurs in the tests because of an improper cleanup of
the Redis client object.

Solution: make the NodeCache class an asynchronous context manager and
make it clean up after itself.
  • Loading branch information
odesenfans authored Nov 3, 2023
1 parent c4543fa commit 0e4a3ea
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 172 deletions.
3 changes: 3 additions & 0 deletions src/aleph/api_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ async def configure_aiohttp_app(
node_cache = NodeCache(
redis_host=config.redis.host.value, redis_port=config.redis.port.value
)
# TODO: find a way to close the node cache when exiting the API process, not closing it causes
# a warning.
await node_cache.open()

ipfs_client = make_ipfs_client(config)
ipfs_service = IpfsService(ipfs_client=ipfs_client)
Expand Down
103 changes: 52 additions & 51 deletions src/aleph/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ async def init_node_cache(config: Config) -> NodeCache:
node_cache = NodeCache(
redis_host=config.redis.host.value, redis_port=config.redis.port.value
)

# Reset the cache
await node_cache.reset()
return node_cache


Expand Down Expand Up @@ -135,64 +132,68 @@ async def main(args: List[str]) -> None:
mq_channel = await mq_conn.channel()

node_cache = await init_node_cache(config)
ipfs_service = IpfsService(ipfs_client=make_ipfs_client(config))
storage_service = StorageService(
storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value),
ipfs_service=ipfs_service,
node_cache=node_cache,
)
chain_data_service = ChainDataService(
session_factory=session_factory,
storage_service=storage_service,
)
pending_tx_publisher = await PendingTxPublisher.new(config=config)
chain_connector = ChainConnector(
session_factory=session_factory,
pending_tx_publisher=pending_tx_publisher,
chain_data_service=chain_data_service,
)
async with node_cache:
# Reset the cache
await node_cache.reset()

set_start_method("spawn")
ipfs_service = IpfsService(ipfs_client=make_ipfs_client(config))
storage_service = StorageService(
storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value),
ipfs_service=ipfs_service,
node_cache=node_cache,
)
chain_data_service = ChainDataService(
session_factory=session_factory,
storage_service=storage_service,
)
pending_tx_publisher = await PendingTxPublisher.new(config=config)
chain_connector = ChainConnector(
session_factory=session_factory,
pending_tx_publisher=pending_tx_publisher,
chain_data_service=chain_data_service,
)

set_start_method("spawn")

tasks: List[Coroutine] = []
tasks: List[Coroutine] = []

if not args.no_jobs:
LOGGER.debug("Creating jobs")
tasks += start_jobs(
if not args.no_jobs:
LOGGER.debug("Creating jobs")
tasks += start_jobs(
config=config,
session_factory=session_factory,
ipfs_service=ipfs_service,
use_processes=True,
)

LOGGER.debug("Initializing p2p")
p2p_client, p2p_tasks = await p2p.init_p2p(
config=config,
session_factory=session_factory,
service_name="network-monitor",
ipfs_service=ipfs_service,
use_processes=True,
node_cache=node_cache,
)
tasks += p2p_tasks
LOGGER.debug("Initialized p2p")

LOGGER.debug("Initializing p2p")
p2p_client, p2p_tasks = await p2p.init_p2p(
config=config,
session_factory=session_factory,
service_name="network-monitor",
ipfs_service=ipfs_service,
node_cache=node_cache,
)
tasks += p2p_tasks
LOGGER.debug("Initialized p2p")

LOGGER.debug("Initializing listeners")
tasks += await listener_tasks(
config=config,
session_factory=session_factory,
node_cache=node_cache,
p2p_client=p2p_client,
mq_channel=mq_channel,
)
tasks.append(chain_connector.chain_event_loop(config))
LOGGER.debug("Initialized listeners")
LOGGER.debug("Initializing listeners")
tasks += await listener_tasks(
config=config,
session_factory=session_factory,
node_cache=node_cache,
p2p_client=p2p_client,
mq_channel=mq_channel,
)
tasks.append(chain_connector.chain_event_loop(config))
LOGGER.debug("Initialized listeners")

LOGGER.debug("Initializing cache tasks")
tasks.append(refresh_cache_materialized_views(session_factory))
LOGGER.debug("Initialized cache tasks")
LOGGER.debug("Initializing cache tasks")
tasks.append(refresh_cache_materialized_views(session_factory))
LOGGER.debug("Initialized cache tasks")

LOGGER.debug("Running event loop")
await asyncio.gather(*tasks)
LOGGER.debug("Running event loop")
await asyncio.gather(*tasks)


def run():
Expand Down
72 changes: 36 additions & 36 deletions src/aleph/jobs/fetch_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,46 +175,46 @@ async def fetch_messages_task(config: Config):
config=config, routing_key="fetch.*", channel=mq_channel
)

node_cache = NodeCache(
async with NodeCache(
redis_host=config.redis.host.value, redis_port=config.redis.port.value
)
ipfs_client = make_ipfs_client(config)
ipfs_service = IpfsService(ipfs_client=ipfs_client)
storage_service = StorageService(
storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value),
ipfs_service=ipfs_service,
node_cache=node_cache,
)
signature_verifier = SignatureVerifier()
message_handler = MessageHandler(
signature_verifier=signature_verifier,
storage_service=storage_service,
config=config,
)
fetcher = PendingMessageFetcher(
session_factory=session_factory,
message_handler=message_handler,
max_retries=config.aleph.jobs.pending_messages.max_retries.value,
pending_message_queue=pending_message_queue,
)
) as node_cache:
ipfs_client = make_ipfs_client(config)
ipfs_service = IpfsService(ipfs_client=ipfs_client)
storage_service = StorageService(
storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value),
ipfs_service=ipfs_service,
node_cache=node_cache,
)
signature_verifier = SignatureVerifier()
message_handler = MessageHandler(
signature_verifier=signature_verifier,
storage_service=storage_service,
config=config,
)
fetcher = PendingMessageFetcher(
session_factory=session_factory,
message_handler=message_handler,
max_retries=config.aleph.jobs.pending_messages.max_retries.value,
pending_message_queue=pending_message_queue,
)

async with fetcher:
while True:
try:
fetch_pipeline = fetcher.make_pipeline(
config=config, node_cache=node_cache
)
async for fetched_messages in fetch_pipeline:
for fetched_message in fetched_messages:
LOGGER.info(
"Successfully fetched %s", fetched_message.item_hash
)
async with fetcher:
while True:
try:
fetch_pipeline = fetcher.make_pipeline(
config=config, node_cache=node_cache
)
async for fetched_messages in fetch_pipeline:
for fetched_message in fetched_messages:
LOGGER.info(
"Successfully fetched %s", fetched_message.item_hash
)

except Exception:
LOGGER.exception("Unexpected error in pending messages fetch job")
except Exception:
LOGGER.exception("Unexpected error in pending messages fetch job")

LOGGER.debug("Waiting 1 second(s) for new pending messages...")
await asyncio.sleep(1)
LOGGER.debug("Waiting 1 second(s) for new pending messages...")
await asyncio.sleep(1)


def fetch_pending_messages_subprocess(config_values: Dict):
Expand Down
94 changes: 49 additions & 45 deletions src/aleph/jobs/process_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,54 +154,58 @@ async def fetch_and_process_messages_task(config: Config):
engine = make_engine(config=config, application_name="aleph-process")
session_factory = make_session_factory(engine)

node_cache = NodeCache(
async with NodeCache(
redis_host=config.redis.host.value, redis_port=config.redis.port.value
)
ipfs_client = make_ipfs_client(config)
ipfs_service = IpfsService(ipfs_client=ipfs_client)
storage_service = StorageService(
storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value),
ipfs_service=ipfs_service,
node_cache=node_cache,
)
signature_verifier = SignatureVerifier()
message_handler = MessageHandler(
signature_verifier=signature_verifier,
storage_service=storage_service,
config=config,
)
pending_message_processor = await PendingMessageProcessor.new(
session_factory=session_factory,
message_handler=message_handler,
max_retries=config.aleph.jobs.pending_messages.max_retries.value,
mq_host=config.p2p.mq_host.value,
mq_port=config.rabbitmq.port.value,
mq_username=config.rabbitmq.username.value,
mq_password=config.rabbitmq.password.value,
message_exchange_name=config.rabbitmq.message_exchange.value,
pending_message_exchange_name=config.rabbitmq.pending_message_exchange.value,
)
) as node_cache:
ipfs_client = make_ipfs_client(config)
ipfs_service = IpfsService(ipfs_client=ipfs_client)
storage_service = StorageService(
storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value),
ipfs_service=ipfs_service,
node_cache=node_cache,
)
signature_verifier = SignatureVerifier()
message_handler = MessageHandler(
signature_verifier=signature_verifier,
storage_service=storage_service,
config=config,
)
pending_message_processor = await PendingMessageProcessor.new(
session_factory=session_factory,
message_handler=message_handler,
max_retries=config.aleph.jobs.pending_messages.max_retries.value,
mq_host=config.p2p.mq_host.value,
mq_port=config.rabbitmq.port.value,
mq_username=config.rabbitmq.username.value,
mq_password=config.rabbitmq.password.value,
message_exchange_name=config.rabbitmq.message_exchange.value,
pending_message_exchange_name=config.rabbitmq.pending_message_exchange.value,
)

async with pending_message_processor:
while True:
with session_factory() as session:
async with pending_message_processor:
while True:
with session_factory() as session:
try:
message_processing_pipeline = (
pending_message_processor.make_pipeline()
)
async for processing_results in message_processing_pipeline:
for result in processing_results:
LOGGER.info(
"Successfully processed %s", result.item_hash
)

except Exception:
LOGGER.exception("Error in pending messages job")
session.rollback()

LOGGER.info("Waiting for new pending messages...")
# We still loop periodically for retried messages as we do not bother sending a message
# on the MQ for these.
try:
message_processing_pipeline = pending_message_processor.make_pipeline()
async for processing_results in message_processing_pipeline:
for result in processing_results:
LOGGER.info("Successfully processed %s", result.item_hash)

except Exception:
LOGGER.exception("Error in pending messages job")
session.rollback()

LOGGER.info("Waiting for new pending messages...")
# We still loop periodically for retried messages as we do not bother sending a message
# on the MQ for these.
try:
await asyncio.wait_for(pending_message_processor.ready(), 1)
except TimeoutError:
pass
await asyncio.wait_for(pending_message_processor.ready(), 1)
except TimeoutError:
pass


def pending_messages_subprocess(config_values: Dict):
Expand Down
Loading

0 comments on commit 0e4a3ea

Please sign in to comment.