From c4543faf740041c5d712755c98093fc055e3a8ac Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Fri, 3 Nov 2023 11:47:10 +0100 Subject: [PATCH] Internal: message processing is now event-based (#500) Problem: the pending message fetcher and processor use a polling loop to look for messages to fetch/process. This leads to some latency when the pending_messages table is empty as the task sleeps while waiting for new pending messages. Solution: add an exchange + queue in RabbitMQ to signal the arrival of new messages. To avoid modifying the message processor too much and avoid depending on coherency between the DB and RabbitMQ, the fetcher and processor simply spawn a new task that looks for messages and sets an asyncio Event object. The main fetching/processing loop waits on this event (with a timeout). Note that this system is not used for retries as this would require another task that posts messages to the MQ on their next attempt. Retried messages simply wait for the next iteration of the loop (every second). This solution has the following advantages and drawbacks: + No more arbitrary latency when processing new messages + No major modification of the pipeline, even if the MQ system fails for some reason the pending message processor will still process messages every second + No dependency on the state of the message queue, if the RabbitMQ queue is deleted for any reason the processor will keep on working - RabbitMQ overhead (one more exchange + queue). --- src/aleph/commands.py | 8 ++- src/aleph/db/accessors/pending_messages.py | 7 ++ src/aleph/handlers/message_handler.py | 15 +++- src/aleph/jobs/fetch_pending_messages.py | 43 ++++++++---- src/aleph/jobs/job_utils.py | 69 +++++++++++++++++-- src/aleph/jobs/process_pending_messages.py | 57 ++++++++++----- src/aleph/jobs/process_pending_txs.py | 34 ++++----- src/aleph/network.py | 13 +++- src/aleph/toolkit/rabbitmq.py | 11 +++ tests/api/conftest.py | 1 + tests/message_processing/conftest.py | 1 + .../test_process_pending_txs.py | 2 + 12 files changed, 193 insertions(+), 68 deletions(-) create mode 100644 src/aleph/toolkit/rabbitmq.py diff --git a/src/aleph/commands.py b/src/aleph/commands.py index c5a8a750e..b861f483c 100644 --- a/src/aleph/commands.py +++ b/src/aleph/commands.py @@ -45,6 +45,8 @@ __copyright__ = "Moshe Malawach" __license__ = "mit" +from aleph.toolkit.rabbitmq import make_mq_conn + LOGGER = logging.getLogger(__name__) @@ -129,6 +131,9 @@ async def main(args: List[str]) -> None: setup_logging(args.loglevel) + mq_conn = await make_mq_conn(config) + mq_channel = await mq_conn.channel() + node_cache = await init_node_cache(config) ipfs_service = IpfsService(ipfs_client=make_ipfs_client(config)) storage_service = StorageService( @@ -172,11 +177,12 @@ async def main(args: List[str]) -> None: LOGGER.debug("Initialized p2p") LOGGER.debug("Initializing listeners") - tasks += listener_tasks( + tasks += await listener_tasks( config=config, session_factory=session_factory, node_cache=node_cache, p2p_client=p2p_client, + mq_channel=mq_channel, ) tasks.append(chain_connector.chain_event_loop(config)) LOGGER.debug("Initialized listeners") diff --git a/src/aleph/db/accessors/pending_messages.py b/src/aleph/db/accessors/pending_messages.py index 29c814772..dacd0601e 100644 --- a/src/aleph/db/accessors/pending_messages.py +++ b/src/aleph/db/accessors/pending_messages.py @@ -76,6 +76,13 @@ def get_pending_messages( return session.execute(select_stmt).scalars() +def get_pending_message(session: DbSession, pending_message_id: int) -> Optional[PendingMessageDb]: + select_stmt = select(PendingMessageDb).where( + PendingMessageDb.id == pending_message_id + ) + return session.execute(select_stmt).scalar_one_or_none() + + def count_pending_messages(session: DbSession, chain: Optional[Chain] = None) -> int: """ Counts pending messages. diff --git a/src/aleph/handlers/message_handler.py b/src/aleph/handlers/message_handler.py index 034edc6f2..1b8ebb3c5 100644 --- a/src/aleph/handlers/message_handler.py +++ b/src/aleph/handlers/message_handler.py @@ -2,6 +2,7 @@ import logging from typing import Optional, Dict, Any, Mapping +import aio_pika.abc import psycopg2 import sqlalchemy.exc from aleph_message.models import MessageType, ItemType @@ -175,12 +176,21 @@ def __init__( session_factory: DbSessionFactory, storage_service: StorageService, config: Config, + pending_message_exchange: aio_pika.abc.AbstractExchange, ): super().__init__( storage_service=storage_service, config=config, ) self.session_factory = session_factory + self.pending_message_exchange = pending_message_exchange + + async def _publish_pending_message(self, pending_message: PendingMessageDb) -> None: + mq_message = aio_pika.Message(body=f"{pending_message.id}".encode("utf-8")) + process_or_fetch = "process" if pending_message.fetched else "fetch" + await self.pending_message_exchange.publish( + mq_message, routing_key=f"{process_or_fetch}.{pending_message.item_hash}" + ) async def add_pending_message( self, @@ -241,7 +251,6 @@ async def add_pending_message( session.execute(upsert_message_status_stmt) session.execute(insert_pending_message_stmt) session.commit() - return pending_message except (psycopg2.Error, sqlalchemy.exc.SQLAlchemyError) as e: LOGGER.warning( @@ -259,6 +268,9 @@ async def add_pending_message( session.commit() return None + await self._publish_pending_message(pending_message) + return pending_message + class MessageHandler(BaseMessageHandler): """ @@ -299,7 +311,6 @@ async def confirm_existing_message( ) ) - async def insert_message( self, session: DbSession, pending_message: PendingMessageDb, message: MessageDb ): diff --git a/src/aleph/jobs/fetch_pending_messages.py b/src/aleph/jobs/fetch_pending_messages.py index 0d4b6ee2a..d8e4ecd45 100644 --- a/src/aleph/jobs/fetch_pending_messages.py +++ b/src/aleph/jobs/fetch_pending_messages.py @@ -13,17 +13,19 @@ NewType, ) +import aio_pika.abc from configmanager import Config from setproctitle import setproctitle -from ..chains.signature_verifier import SignatureVerifier +from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.pending_messages import ( make_pending_message_fetched_statement, get_next_pending_messages, ) from aleph.db.connection import make_engine, make_session_factory -from aleph.db.models import PendingMessageDb, MessageDb +from aleph.db.models import MessageDb, PendingMessageDb from aleph.handlers.message_handler import MessageHandler +from aleph.services.cache.node_cache import NodeCache from aleph.services.ipfs import IpfsService from aleph.services.ipfs.common import make_ipfs_client from aleph.services.storage.fileystem_engine import FileSystemStorageEngine @@ -32,8 +34,8 @@ from aleph.toolkit.monitoring import setup_sentry from aleph.toolkit.timestamp import utc_now from aleph.types.db_session import DbSessionFactory -from .job_utils import prepare_loop, MessageJob -from ..services.cache.node_cache import NodeCache +from .job_utils import prepare_loop, MessageJob, make_pending_message_queue +from ..toolkit.rabbitmq import make_mq_conn LOGGER = getLogger(__name__) @@ -47,12 +49,15 @@ def __init__( session_factory: DbSessionFactory, message_handler: MessageHandler, max_retries: int, + pending_message_queue: aio_pika.abc.AbstractQueue, ): super().__init__( session_factory=session_factory, message_handler=message_handler, max_retries=max_retries, + pending_message_queue=pending_message_queue, ) + self.pending_message_queue = pending_message_queue async def fetch_pending_message(self, pending_message: PendingMessageDb): with self.session_factory() as session: @@ -76,6 +81,7 @@ async def fetch_pending_message(self, pending_message: PendingMessageDb): exception=e, ) session.commit() + return None async def fetch_pending_messages( self, config: Config, node_cache: NodeCache, loop: bool = True @@ -140,8 +146,11 @@ async def fetch_pending_messages( break # If we are done, wait a few seconds until retrying if not fetch_tasks: - LOGGER.info("waiting 1 second(s) for new pending messages...") - await asyncio.sleep(1) + LOGGER.info("waiting for new pending messages...") + try: + await asyncio.wait_for(self.ready(), 1) + except TimeoutError: + pass def make_pipeline( self, @@ -156,12 +165,16 @@ def make_pipeline( async def fetch_messages_task(config: Config): - # TODO: this sleep can probably be removed - await asyncio.sleep(4) - engine = make_engine(config=config, application_name="aleph-fetch") session_factory = make_session_factory(engine) + mq_conn = await make_mq_conn(config=config) + mq_channel = await mq_conn.channel() + + pending_message_queue = await make_pending_message_queue( + config=config, routing_key="fetch.*", channel=mq_channel + ) + node_cache = NodeCache( redis_host=config.redis.host.value, redis_port=config.redis.port.value ) @@ -182,10 +195,11 @@ async def fetch_messages_task(config: Config): session_factory=session_factory, message_handler=message_handler, max_retries=config.aleph.jobs.pending_messages.max_retries.value, + pending_message_queue=pending_message_queue, ) - while True: - with session_factory() as session: + async with fetcher: + while True: try: fetch_pipeline = fetcher.make_pipeline( config=config, node_cache=node_cache @@ -197,11 +211,10 @@ async def fetch_messages_task(config: Config): ) except Exception: - LOGGER.exception("Error in pending messages job") - session.rollback() + LOGGER.exception("Unexpected error in pending messages fetch job") - LOGGER.debug("Waiting 1 second(s) for new pending messages...") - await asyncio.sleep(1) + LOGGER.debug("Waiting 1 second(s) for new pending messages...") + await asyncio.sleep(1) def fetch_pending_messages_subprocess(config_values: Dict): diff --git a/src/aleph/jobs/job_utils.py b/src/aleph/jobs/job_utils.py index d144ff3e8..20c5819c9 100644 --- a/src/aleph/jobs/job_utils.py +++ b/src/aleph/jobs/job_utils.py @@ -1,10 +1,10 @@ import asyncio import datetime as dt import logging -from typing import Dict, Union +from typing import Dict, Union, Optional from typing import Tuple -import aio_pika.abc +import aio_pika from configmanager import Config from sqlalchemy import update @@ -29,6 +29,58 @@ MAX_RETRY_INTERVAL: int = 300 +async def _make_pending_queue( + config: Config, + exchange_name: str, + queue_name: str, + routing_key: str, + channel: Optional[aio_pika.abc.AbstractChannel] = None, +) -> aio_pika.abc.AbstractQueue: + if not channel: + mq_conn = await aio_pika.connect_robust( + host=config.p2p.mq_host.value, + port=config.rabbitmq.port.value, + login=config.rabbitmq.username.value, + password=config.rabbitmq.password.value, + ) + channel = await mq_conn.channel() + + exchange = await channel.declare_exchange( + name=exchange_name, + type=aio_pika.ExchangeType.TOPIC, + auto_delete=False, + ) + queue = await channel.declare_queue(name=queue_name, auto_delete=False) + await queue.bind(exchange, routing_key=routing_key) + return queue + + +async def make_pending_tx_queue( + config: Config, channel: aio_pika.abc.AbstractChannel +) -> aio_pika.abc.AbstractQueue: + return await _make_pending_queue( + config=config, + exchange_name=config.rabbitmq.pending_tx_exchange.value, + queue_name="pending-tx-queue", + routing_key="#", + channel=channel, + ) + + +async def make_pending_message_queue( + config: Config, + routing_key: str, + channel: Optional[aio_pika.abc.AbstractChannel] = None, +) -> aio_pika.abc.AbstractQueue: + return await _make_pending_queue( + config=config, + exchange_name=config.rabbitmq.pending_message_exchange.value, + queue_name="pending_message_queue", + routing_key=routing_key, + channel=channel, + ) + + def compute_next_retry_interval(attempts: int) -> dt.timedelta: """ Computes the time interval for the next attempt/retry of a message. @@ -67,6 +119,8 @@ def schedule_next_attempt( set_next_retry( session=session, pending_message=pending_message, next_attempt=next_attempt ) + pending_message.next_attempt = next_attempt + pending_message.retries += 1 def prepare_loop(config_values: Dict) -> Tuple[asyncio.AbstractEventLoop, Config]: @@ -125,19 +179,22 @@ async def ready(self): self._event.clear() -class MessageJob: +class MessageJob(MqWatcher): def __init__( self, session_factory: DbSessionFactory, message_handler: MessageHandler, max_retries: int, + pending_message_queue: aio_pika.abc.AbstractQueue, ): + super().__init__(mq_queue=pending_message_queue) + self.session_factory = session_factory self.message_handler = message_handler self.max_retries = max_retries + @staticmethod def _handle_rejection( - self, session: DbSession, pending_message: PendingMessageDb, exception: BaseException, @@ -158,7 +215,7 @@ def _handle_rejection( return RejectedMessage(pending_message=pending_message, error_code=error_code) - def _handle_retry( + async def _handle_retry( self, session: DbSession, pending_message: PendingMessageDb, @@ -222,6 +279,6 @@ async def handle_processing_error( session=session, pending_message=pending_message, exception=exception ) else: - return self._handle_retry( + return await self._handle_retry( session=session, pending_message=pending_message, exception=exception ) diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index 5fb6f1f2e..f43d23fc3 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -14,10 +14,11 @@ from setproctitle import setproctitle import aleph.toolkit.json as aleph_json -from ..chains.signature_verifier import SignatureVerifier +from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.pending_messages import get_next_pending_message from aleph.db.connection import make_engine, make_session_factory from aleph.handlers.message_handler import MessageHandler +from aleph.services.cache.node_cache import NodeCache from aleph.services.ipfs import IpfsService from aleph.services.ipfs.common import make_ipfs_client from aleph.services.storage.fileystem_engine import FileSystemStorageEngine @@ -26,12 +27,11 @@ from aleph.toolkit.monitoring import setup_sentry from aleph.toolkit.timestamp import utc_now from aleph.types.db_session import DbSessionFactory +from aleph.types.message_processing_result import MessageProcessingResult from .job_utils import ( prepare_loop, MessageJob, ) -from ..services.cache.node_cache import NodeCache -from ..types.message_processing_result import MessageProcessingResult LOGGER = getLogger(__name__) @@ -44,11 +44,13 @@ def __init__( max_retries: int, mq_conn: aio_pika.abc.AbstractConnection, mq_message_exchange: aio_pika.abc.AbstractExchange, + pending_message_queue: aio_pika.abc.AbstractQueue, ): super().__init__( session_factory=session_factory, message_handler=message_handler, max_retries=max_retries, + pending_message_queue=pending_message_queue, ) self.mq_conn = mq_conn @@ -65,6 +67,7 @@ async def new( mq_username: str, mq_password: str, message_exchange_name: str, + pending_message_exchange_name: str, ): mq_conn = await aio_pika.connect_robust( host=mq_host, port=mq_port, login=mq_username, password=mq_password @@ -75,12 +78,25 @@ async def new( type=aio_pika.ExchangeType.TOPIC, auto_delete=False, ) + pending_message_exchange = await channel.declare_exchange( + name=pending_message_exchange_name, + type=aio_pika.ExchangeType.TOPIC, + auto_delete=False, + ) + pending_message_queue = await channel.declare_queue( + name="pending_message_queue" + ) + await pending_message_queue.bind( + pending_message_exchange, routing_key="process.*" + ) + return cls( session_factory=session_factory, message_handler=message_handler, max_retries=max_retries, mq_conn=mq_conn, mq_message_exchange=mq_message_exchange, + pending_message_queue=pending_message_queue, ) async def close(self): @@ -119,7 +135,6 @@ async def process_messages( async def publish_to_mq( self, message_iterator: AsyncIterator[Sequence[MessageProcessingResult]] ) -> AsyncIterator[Sequence[MessageProcessingResult]]: - async for processing_results in message_iterator: for result in processing_results: mq_message = aio_pika.Message(body=aleph_json.dumps(result.to_dict())) @@ -136,9 +151,6 @@ def make_pipeline(self) -> AsyncIterator[Sequence[MessageProcessingResult]]: async def fetch_and_process_messages_task(config: Config): - # TODO: this sleep can probably be removed - await asyncio.sleep(4) - engine = make_engine(config=config, application_name="aleph-process") session_factory = make_session_factory(engine) @@ -167,22 +179,29 @@ async def fetch_and_process_messages_task(config: Config): mq_username=config.rabbitmq.username.value, mq_password=config.rabbitmq.password.value, message_exchange_name=config.rabbitmq.message_exchange.value, + pending_message_exchange_name=config.rabbitmq.pending_message_exchange.value, ) - while True: - with session_factory() as session: - try: - message_processing_pipeline = pending_message_processor.make_pipeline() - async for processing_results in message_processing_pipeline: - for result in processing_results: - LOGGER.info("Successfully processed %s", result.item_hash) + async with pending_message_processor: + while True: + with session_factory() as session: + try: + message_processing_pipeline = pending_message_processor.make_pipeline() + async for processing_results in message_processing_pipeline: + for result in processing_results: + LOGGER.info("Successfully processed %s", result.item_hash) - except Exception: - LOGGER.exception("Error in pending messages job") - session.rollback() + except Exception: + LOGGER.exception("Error in pending messages job") + session.rollback() - LOGGER.info("Waiting 1 second(s) for new pending messages...") - await asyncio.sleep(1) + LOGGER.info("Waiting for new pending messages...") + # We still loop periodically for retried messages as we do not bother sending a message + # on the MQ for these. + try: + await asyncio.wait_for(pending_message_processor.ready(), 1) + except TimeoutError: + pass def pending_messages_subprocess(config_values: Dict): diff --git a/src/aleph/jobs/process_pending_txs.py b/src/aleph/jobs/process_pending_txs.py index a8ff8ae03..2903f6baa 100644 --- a/src/aleph/jobs/process_pending_txs.py +++ b/src/aleph/jobs/process_pending_txs.py @@ -22,34 +22,15 @@ from aleph.storage import StorageService from aleph.toolkit.logging import setup_logging from aleph.toolkit.monitoring import setup_sentry +from aleph.toolkit.rabbitmq import make_mq_conn from aleph.toolkit.timestamp import utc_now from aleph.types.chain_sync import ChainSyncProtocol from aleph.types.db_session import DbSessionFactory -from .job_utils import prepare_loop, MqWatcher +from .job_utils import MqWatcher, prepare_loop, make_pending_tx_queue LOGGER = logging.getLogger(__name__) -async def make_pending_tx_queue(config: Config) -> aio_pika.abc.AbstractQueue: - mq_conn = await aio_pika.connect_robust( - host=config.p2p.mq_host.value, - port=config.rabbitmq.port.value, - login=config.rabbitmq.username.value, - password=config.rabbitmq.password.value, - ) - channel = await mq_conn.channel() - pending_tx_exchange = await channel.declare_exchange( - name=config.rabbitmq.pending_tx_exchange.value, - type=aio_pika.ExchangeType.TOPIC, - auto_delete=False, - ) - pending_tx_queue = await channel.declare_queue( - name="pending-tx-queue", durable=True, auto_delete=False - ) - await pending_tx_queue.bind(pending_tx_exchange, routing_key="#") - return pending_tx_queue - - class PendingTxProcessor(MqWatcher): def __init__( self, @@ -138,7 +119,15 @@ async def handle_txs_task(config: Config): engine = make_engine(config=config, application_name="aleph-txs") session_factory = make_session_factory(engine) - pending_tx_queue = await make_pending_tx_queue(config=config) + mq_conn = await make_mq_conn(config=config) + mq_channel = await mq_conn.channel() + + pending_message_exchange = await mq_channel.declare_exchange( + name=config.rabbitmq.pending_message_exchange.value, + type=aio_pika.ExchangeType.TOPIC, + auto_delete=False, + ) + pending_tx_queue = await make_pending_tx_queue(config=config, channel=mq_channel) node_cache = NodeCache( redis_host=config.redis.host.value, redis_port=config.redis.port.value @@ -154,6 +143,7 @@ async def handle_txs_task(config: Config): session_factory=session_factory, storage_service=storage_service, config=config, + pending_message_exchange=pending_message_exchange, ) chain_data_service = ChainDataService( session_factory=session_factory, storage_service=storage_service diff --git a/src/aleph/network.py b/src/aleph/network.py index a4086e055..69c1bd9cb 100644 --- a/src/aleph/network.py +++ b/src/aleph/network.py @@ -2,11 +2,11 @@ from typing import Coroutine, List, Any, Dict from urllib.parse import unquote +import aio_pika.abc from aleph_p2p_client import AlephP2PServiceClient import aleph.toolkit.json as aleph_json -from aleph.chains.signature_verifier import SignatureVerifier -from aleph.handlers.message_handler import MessageHandler, MessagePublisher +from aleph.handlers.message_handler import MessagePublisher from aleph.services.cache.node_cache import NodeCache from aleph.services.ipfs import IpfsService from aleph.services.ipfs.common import make_ipfs_client @@ -36,11 +36,12 @@ async def decode_pubsub_message(message_data: bytes) -> Dict[str, Any]: return message_dict -def listener_tasks( +async def listener_tasks( config, session_factory: DbSessionFactory, node_cache: NodeCache, p2p_client: AlephP2PServiceClient, + mq_channel: aio_pika.abc.AbstractChannel, ) -> List[Coroutine]: from aleph.services.p2p.protocol import incoming_channel as incoming_p2p_channel @@ -52,10 +53,16 @@ def listener_tasks( ipfs_service=ipfs_service, node_cache=node_cache, ) + pending_message_exchange = await mq_channel.declare_exchange( + name=config.rabbitmq.pending_message_exchange.value, + type = aio_pika.ExchangeType.TOPIC, + auto_delete=False, + ) message_publisher = MessagePublisher( session_factory=session_factory, storage_service=storage_service, config=config, + pending_message_exchange=pending_message_exchange, ) # for now (1st milestone), we only listen on a single global topic... diff --git a/src/aleph/toolkit/rabbitmq.py b/src/aleph/toolkit/rabbitmq.py new file mode 100644 index 000000000..0de3bd58c --- /dev/null +++ b/src/aleph/toolkit/rabbitmq.py @@ -0,0 +1,11 @@ +import aio_pika + + +async def make_mq_conn(config) -> aio_pika.abc.AbstractConnection: + mq_conn = await aio_pika.connect_robust( + host=config.p2p.mq_host.value, + port=config.rabbitmq.port.value, + login=config.rabbitmq.username.value, + password=config.rabbitmq.password.value, + ) + return mq_conn diff --git a/tests/api/conftest.py b/tests/api/conftest.py index 094b5e04e..783304483 100644 --- a/tests/api/conftest.py +++ b/tests/api/conftest.py @@ -216,5 +216,6 @@ def message_processor(mocker, mock_config: Config, session_factory: DbSessionFac max_retries=0, mq_message_exchange=mocker.AsyncMock(), mq_conn=mocker.AsyncMock(), + pending_message_queue=mocker.AsyncMock(), ) return message_processor diff --git a/tests/message_processing/conftest.py b/tests/message_processing/conftest.py index 6fea094a7..da37e723d 100644 --- a/tests/message_processing/conftest.py +++ b/tests/message_processing/conftest.py @@ -93,5 +93,6 @@ def message_processor(mocker, mock_config: Config, session_factory: DbSessionFac max_retries=0, mq_message_exchange=mocker.AsyncMock(), mq_conn=mocker.AsyncMock(), + pending_message_queue=mocker.AsyncMock(), ) return message_processor diff --git a/tests/message_processing/test_process_pending_txs.py b/tests/message_processing/test_process_pending_txs.py index a2696ecf2..a07580fd1 100644 --- a/tests/message_processing/test_process_pending_txs.py +++ b/tests/message_processing/test_process_pending_txs.py @@ -45,6 +45,7 @@ async def test_process_pending_tx_on_chain_protocol( session_factory=session_factory, storage_service=test_storage_service, config=mock_config, + pending_message_exchange=mocker.AsyncMock(), ), chain_data_service=chain_data_service, pending_tx_queue=mocker.AsyncMock(), @@ -119,6 +120,7 @@ async def _process_smart_contract_tx( session_factory=session_factory, storage_service=test_storage_service, config=mock_config, + pending_message_exchange=mocker.AsyncMock(), ), chain_data_service=chain_data_service, pending_tx_queue=mocker.AsyncMock(),