diff --git a/src/aleph/commands.py b/src/aleph/commands.py index c5a8a750e..b861f483c 100644 --- a/src/aleph/commands.py +++ b/src/aleph/commands.py @@ -45,6 +45,8 @@ __copyright__ = "Moshe Malawach" __license__ = "mit" +from aleph.toolkit.rabbitmq import make_mq_conn + LOGGER = logging.getLogger(__name__) @@ -129,6 +131,9 @@ async def main(args: List[str]) -> None: setup_logging(args.loglevel) + mq_conn = await make_mq_conn(config) + mq_channel = await mq_conn.channel() + node_cache = await init_node_cache(config) ipfs_service = IpfsService(ipfs_client=make_ipfs_client(config)) storage_service = StorageService( @@ -172,11 +177,12 @@ async def main(args: List[str]) -> None: LOGGER.debug("Initialized p2p") LOGGER.debug("Initializing listeners") - tasks += listener_tasks( + tasks += await listener_tasks( config=config, session_factory=session_factory, node_cache=node_cache, p2p_client=p2p_client, + mq_channel=mq_channel, ) tasks.append(chain_connector.chain_event_loop(config)) LOGGER.debug("Initialized listeners") diff --git a/src/aleph/db/accessors/pending_messages.py b/src/aleph/db/accessors/pending_messages.py index 29c814772..dacd0601e 100644 --- a/src/aleph/db/accessors/pending_messages.py +++ b/src/aleph/db/accessors/pending_messages.py @@ -76,6 +76,13 @@ def get_pending_messages( return session.execute(select_stmt).scalars() +def get_pending_message(session: DbSession, pending_message_id: int) -> Optional[PendingMessageDb]: + select_stmt = select(PendingMessageDb).where( + PendingMessageDb.id == pending_message_id + ) + return session.execute(select_stmt).scalar_one_or_none() + + def count_pending_messages(session: DbSession, chain: Optional[Chain] = None) -> int: """ Counts pending messages. diff --git a/src/aleph/handlers/message_handler.py b/src/aleph/handlers/message_handler.py index 034edc6f2..1b8ebb3c5 100644 --- a/src/aleph/handlers/message_handler.py +++ b/src/aleph/handlers/message_handler.py @@ -2,6 +2,7 @@ import logging from typing import Optional, Dict, Any, Mapping +import aio_pika.abc import psycopg2 import sqlalchemy.exc from aleph_message.models import MessageType, ItemType @@ -175,12 +176,21 @@ def __init__( session_factory: DbSessionFactory, storage_service: StorageService, config: Config, + pending_message_exchange: aio_pika.abc.AbstractExchange, ): super().__init__( storage_service=storage_service, config=config, ) self.session_factory = session_factory + self.pending_message_exchange = pending_message_exchange + + async def _publish_pending_message(self, pending_message: PendingMessageDb) -> None: + mq_message = aio_pika.Message(body=f"{pending_message.id}".encode("utf-8")) + process_or_fetch = "process" if pending_message.fetched else "fetch" + await self.pending_message_exchange.publish( + mq_message, routing_key=f"{process_or_fetch}.{pending_message.item_hash}" + ) async def add_pending_message( self, @@ -241,7 +251,6 @@ async def add_pending_message( session.execute(upsert_message_status_stmt) session.execute(insert_pending_message_stmt) session.commit() - return pending_message except (psycopg2.Error, sqlalchemy.exc.SQLAlchemyError) as e: LOGGER.warning( @@ -259,6 +268,9 @@ async def add_pending_message( session.commit() return None + await self._publish_pending_message(pending_message) + return pending_message + class MessageHandler(BaseMessageHandler): """ @@ -299,7 +311,6 @@ async def confirm_existing_message( ) ) - async def insert_message( self, session: DbSession, pending_message: PendingMessageDb, message: MessageDb ): diff --git a/src/aleph/jobs/fetch_pending_messages.py b/src/aleph/jobs/fetch_pending_messages.py index 0d4b6ee2a..d8e4ecd45 100644 --- a/src/aleph/jobs/fetch_pending_messages.py +++ b/src/aleph/jobs/fetch_pending_messages.py @@ -13,17 +13,19 @@ NewType, ) +import aio_pika.abc from configmanager import Config from setproctitle import setproctitle -from ..chains.signature_verifier import SignatureVerifier +from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.pending_messages import ( make_pending_message_fetched_statement, get_next_pending_messages, ) from aleph.db.connection import make_engine, make_session_factory -from aleph.db.models import PendingMessageDb, MessageDb +from aleph.db.models import MessageDb, PendingMessageDb from aleph.handlers.message_handler import MessageHandler +from aleph.services.cache.node_cache import NodeCache from aleph.services.ipfs import IpfsService from aleph.services.ipfs.common import make_ipfs_client from aleph.services.storage.fileystem_engine import FileSystemStorageEngine @@ -32,8 +34,8 @@ from aleph.toolkit.monitoring import setup_sentry from aleph.toolkit.timestamp import utc_now from aleph.types.db_session import DbSessionFactory -from .job_utils import prepare_loop, MessageJob -from ..services.cache.node_cache import NodeCache +from .job_utils import prepare_loop, MessageJob, make_pending_message_queue +from ..toolkit.rabbitmq import make_mq_conn LOGGER = getLogger(__name__) @@ -47,12 +49,15 @@ def __init__( session_factory: DbSessionFactory, message_handler: MessageHandler, max_retries: int, + pending_message_queue: aio_pika.abc.AbstractQueue, ): super().__init__( session_factory=session_factory, message_handler=message_handler, max_retries=max_retries, + pending_message_queue=pending_message_queue, ) + self.pending_message_queue = pending_message_queue async def fetch_pending_message(self, pending_message: PendingMessageDb): with self.session_factory() as session: @@ -76,6 +81,7 @@ async def fetch_pending_message(self, pending_message: PendingMessageDb): exception=e, ) session.commit() + return None async def fetch_pending_messages( self, config: Config, node_cache: NodeCache, loop: bool = True @@ -140,8 +146,11 @@ async def fetch_pending_messages( break # If we are done, wait a few seconds until retrying if not fetch_tasks: - LOGGER.info("waiting 1 second(s) for new pending messages...") - await asyncio.sleep(1) + LOGGER.info("waiting for new pending messages...") + try: + await asyncio.wait_for(self.ready(), 1) + except TimeoutError: + pass def make_pipeline( self, @@ -156,12 +165,16 @@ def make_pipeline( async def fetch_messages_task(config: Config): - # TODO: this sleep can probably be removed - await asyncio.sleep(4) - engine = make_engine(config=config, application_name="aleph-fetch") session_factory = make_session_factory(engine) + mq_conn = await make_mq_conn(config=config) + mq_channel = await mq_conn.channel() + + pending_message_queue = await make_pending_message_queue( + config=config, routing_key="fetch.*", channel=mq_channel + ) + node_cache = NodeCache( redis_host=config.redis.host.value, redis_port=config.redis.port.value ) @@ -182,10 +195,11 @@ async def fetch_messages_task(config: Config): session_factory=session_factory, message_handler=message_handler, max_retries=config.aleph.jobs.pending_messages.max_retries.value, + pending_message_queue=pending_message_queue, ) - while True: - with session_factory() as session: + async with fetcher: + while True: try: fetch_pipeline = fetcher.make_pipeline( config=config, node_cache=node_cache @@ -197,11 +211,10 @@ async def fetch_messages_task(config: Config): ) except Exception: - LOGGER.exception("Error in pending messages job") - session.rollback() + LOGGER.exception("Unexpected error in pending messages fetch job") - LOGGER.debug("Waiting 1 second(s) for new pending messages...") - await asyncio.sleep(1) + LOGGER.debug("Waiting 1 second(s) for new pending messages...") + await asyncio.sleep(1) def fetch_pending_messages_subprocess(config_values: Dict): diff --git a/src/aleph/jobs/job_utils.py b/src/aleph/jobs/job_utils.py index 5fe7ec953..90e26be52 100644 --- a/src/aleph/jobs/job_utils.py +++ b/src/aleph/jobs/job_utils.py @@ -1,9 +1,10 @@ import asyncio import datetime as dt import logging -from typing import Dict, Union +from typing import Dict, Union, Optional from typing import Tuple +import aio_pika from configmanager import Config from sqlalchemy import update @@ -28,6 +29,58 @@ MAX_RETRY_INTERVAL: int = 300 +async def _make_pending_queue( + config: Config, + exchange_name: str, + queue_name: str, + routing_key: str, + channel: Optional[aio_pika.abc.AbstractChannel] = None, +) -> aio_pika.abc.AbstractQueue: + if not channel: + mq_conn = await aio_pika.connect_robust( + host=config.p2p.mq_host.value, + port=config.rabbitmq.port.value, + login=config.rabbitmq.username.value, + password=config.rabbitmq.password.value, + ) + channel = await mq_conn.channel() + + exchange = await channel.declare_exchange( + name=exchange_name, + type=aio_pika.ExchangeType.TOPIC, + auto_delete=False, + ) + queue = await channel.declare_queue(name=queue_name, auto_delete=False) + await queue.bind(exchange, routing_key=routing_key) + return queue + + +async def make_pending_tx_queue( + config: Config, channel: aio_pika.abc.AbstractChannel +) -> aio_pika.abc.AbstractQueue: + return await _make_pending_queue( + config=config, + exchange_name=config.rabbitmq.pending_tx_exchange.value, + queue_name="pending-tx-queue", + routing_key="#", + channel=channel, + ) + + +async def make_pending_message_queue( + config: Config, + routing_key: str, + channel: Optional[aio_pika.abc.AbstractChannel] = None, +) -> aio_pika.abc.AbstractQueue: + return await _make_pending_queue( + config=config, + exchange_name=config.rabbitmq.pending_message_exchange.value, + queue_name="pending_message_queue", + routing_key=routing_key, + channel=channel, + ) + + def compute_next_retry_interval(attempts: int) -> dt.timedelta: """ Computes the time interval for the next attempt/retry of a message. @@ -39,7 +92,7 @@ def compute_next_retry_interval(attempts: int) -> dt.timedelta: :return: The time interval between the previous processing attempt and the next one. """ - seconds = 2 ** attempts + seconds = 2**attempts return dt.timedelta(seconds=min(seconds, MAX_RETRY_INTERVAL)) @@ -66,6 +119,8 @@ def schedule_next_attempt( set_next_retry( session=session, pending_message=pending_message, next_attempt=next_attempt ) + pending_message.next_attempt = next_attempt + pending_message.retries += 1 def prepare_loop(config_values: Dict) -> Tuple[asyncio.AbstractEventLoop, Config]: @@ -87,16 +142,24 @@ def prepare_loop(config_values: Dict) -> Tuple[asyncio.AbstractEventLoop, Config class MessageJob: + retry_task: Optional[asyncio.Task] + def __init__( self, session_factory: DbSessionFactory, message_handler: MessageHandler, max_retries: int, + pending_message_queue: aio_pika.abc.AbstractQueue, ): self.session_factory = session_factory self.message_handler = message_handler self.max_retries = max_retries + self.retry_task = None + self.check_for_pending_messages_task = None + self.pending_message_queue = pending_message_queue + self.pending_message_event = asyncio.Event() + def _handle_rejection( self, session: DbSession, @@ -119,7 +182,27 @@ def _handle_rejection( return RejectedMessage(pending_message=pending_message, error_code=error_code) - def _handle_retry( + async def _check_for_pending_message(self): + async with self.pending_message_queue.iterator(no_ack=True) as queue_iter: + async for _ in queue_iter: + self.pending_message_event.set() + + async def ready(self): + await self.pending_message_event.wait() + self.pending_message_event.clear() + + async def __aenter__(self): + self.check_for_pending_messages_task = asyncio.create_task( + self._check_for_pending_message() + ) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if self.check_for_pending_messages_task is not None: + self.check_for_pending_messages_task.cancel() + await self.check_for_pending_messages_task + + async def _handle_retry( self, session: DbSession, pending_message: PendingMessageDb, @@ -183,6 +266,6 @@ async def handle_processing_error( session=session, pending_message=pending_message, exception=exception ) else: - return self._handle_retry( + return await self._handle_retry( session=session, pending_message=pending_message, exception=exception ) diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index 5fb6f1f2e..f43d23fc3 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -14,10 +14,11 @@ from setproctitle import setproctitle import aleph.toolkit.json as aleph_json -from ..chains.signature_verifier import SignatureVerifier +from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.pending_messages import get_next_pending_message from aleph.db.connection import make_engine, make_session_factory from aleph.handlers.message_handler import MessageHandler +from aleph.services.cache.node_cache import NodeCache from aleph.services.ipfs import IpfsService from aleph.services.ipfs.common import make_ipfs_client from aleph.services.storage.fileystem_engine import FileSystemStorageEngine @@ -26,12 +27,11 @@ from aleph.toolkit.monitoring import setup_sentry from aleph.toolkit.timestamp import utc_now from aleph.types.db_session import DbSessionFactory +from aleph.types.message_processing_result import MessageProcessingResult from .job_utils import ( prepare_loop, MessageJob, ) -from ..services.cache.node_cache import NodeCache -from ..types.message_processing_result import MessageProcessingResult LOGGER = getLogger(__name__) @@ -44,11 +44,13 @@ def __init__( max_retries: int, mq_conn: aio_pika.abc.AbstractConnection, mq_message_exchange: aio_pika.abc.AbstractExchange, + pending_message_queue: aio_pika.abc.AbstractQueue, ): super().__init__( session_factory=session_factory, message_handler=message_handler, max_retries=max_retries, + pending_message_queue=pending_message_queue, ) self.mq_conn = mq_conn @@ -65,6 +67,7 @@ async def new( mq_username: str, mq_password: str, message_exchange_name: str, + pending_message_exchange_name: str, ): mq_conn = await aio_pika.connect_robust( host=mq_host, port=mq_port, login=mq_username, password=mq_password @@ -75,12 +78,25 @@ async def new( type=aio_pika.ExchangeType.TOPIC, auto_delete=False, ) + pending_message_exchange = await channel.declare_exchange( + name=pending_message_exchange_name, + type=aio_pika.ExchangeType.TOPIC, + auto_delete=False, + ) + pending_message_queue = await channel.declare_queue( + name="pending_message_queue" + ) + await pending_message_queue.bind( + pending_message_exchange, routing_key="process.*" + ) + return cls( session_factory=session_factory, message_handler=message_handler, max_retries=max_retries, mq_conn=mq_conn, mq_message_exchange=mq_message_exchange, + pending_message_queue=pending_message_queue, ) async def close(self): @@ -119,7 +135,6 @@ async def process_messages( async def publish_to_mq( self, message_iterator: AsyncIterator[Sequence[MessageProcessingResult]] ) -> AsyncIterator[Sequence[MessageProcessingResult]]: - async for processing_results in message_iterator: for result in processing_results: mq_message = aio_pika.Message(body=aleph_json.dumps(result.to_dict())) @@ -136,9 +151,6 @@ def make_pipeline(self) -> AsyncIterator[Sequence[MessageProcessingResult]]: async def fetch_and_process_messages_task(config: Config): - # TODO: this sleep can probably be removed - await asyncio.sleep(4) - engine = make_engine(config=config, application_name="aleph-process") session_factory = make_session_factory(engine) @@ -167,22 +179,29 @@ async def fetch_and_process_messages_task(config: Config): mq_username=config.rabbitmq.username.value, mq_password=config.rabbitmq.password.value, message_exchange_name=config.rabbitmq.message_exchange.value, + pending_message_exchange_name=config.rabbitmq.pending_message_exchange.value, ) - while True: - with session_factory() as session: - try: - message_processing_pipeline = pending_message_processor.make_pipeline() - async for processing_results in message_processing_pipeline: - for result in processing_results: - LOGGER.info("Successfully processed %s", result.item_hash) + async with pending_message_processor: + while True: + with session_factory() as session: + try: + message_processing_pipeline = pending_message_processor.make_pipeline() + async for processing_results in message_processing_pipeline: + for result in processing_results: + LOGGER.info("Successfully processed %s", result.item_hash) - except Exception: - LOGGER.exception("Error in pending messages job") - session.rollback() + except Exception: + LOGGER.exception("Error in pending messages job") + session.rollback() - LOGGER.info("Waiting 1 second(s) for new pending messages...") - await asyncio.sleep(1) + LOGGER.info("Waiting for new pending messages...") + # We still loop periodically for retried messages as we do not bother sending a message + # on the MQ for these. + try: + await asyncio.wait_for(pending_message_processor.ready(), 1) + except TimeoutError: + pass def pending_messages_subprocess(config_values: Dict): diff --git a/src/aleph/jobs/process_pending_txs.py b/src/aleph/jobs/process_pending_txs.py index 97df33dc9..7da40427d 100644 --- a/src/aleph/jobs/process_pending_txs.py +++ b/src/aleph/jobs/process_pending_txs.py @@ -27,7 +27,8 @@ from aleph.toolkit.timestamp import utc_now from aleph.types.chain_sync import ChainSyncProtocol from aleph.types.db_session import DbSessionFactory -from .job_utils import prepare_loop +from .job_utils import prepare_loop, make_pending_tx_queue +from ..toolkit.rabbitmq import make_mq_conn LOGGER = logging.getLogger(__name__) @@ -97,30 +98,20 @@ async def process_pending_txs(self) -> None: ) -async def make_pending_tx_queue(config: Config) -> aio_pika.abc.AbstractQueue: - mq_conn = await aio_pika.connect_robust( - host=config.p2p.mq_host.value, - port=config.rabbitmq.port.value, - login=config.rabbitmq.username.value, - password=config.rabbitmq.password.value, - ) - channel = await mq_conn.channel() - pending_tx_exchange = await channel.declare_exchange( - name=config.rabbitmq.pending_tx_exchange.value, - type=aio_pika.ExchangeType.TOPIC, - auto_delete=False, - ) - pending_tx_queue = await channel.declare_queue( - name="pending-tx-queue", durable=True, auto_delete=False - ) - await pending_tx_queue.bind(pending_tx_exchange, routing_key="#") - return pending_tx_queue - - async def handle_txs_task(config: Config): engine = make_engine(config=config, application_name="aleph-txs") session_factory = make_session_factory(engine) + mq_conn = await make_mq_conn(config=config) + mq_channel = await mq_conn.channel() + + pending_message_exchange = await mq_channel.declare_exchange( + name=config.rabbitmq.pending_message_exchange.value, + type=aio_pika.ExchangeType.TOPIC, + auto_delete=False, + ) + pending_tx_queue = await make_pending_tx_queue(config=config, channel=mq_channel) + node_cache = NodeCache( redis_host=config.redis.host.value, redis_port=config.redis.port.value ) @@ -135,11 +126,11 @@ async def handle_txs_task(config: Config): session_factory=session_factory, storage_service=storage_service, config=config, + pending_message_exchange=pending_message_exchange, ) chain_data_service = ChainDataService( session_factory=session_factory, storage_service=storage_service ) - pending_tx_queue = await make_pending_tx_queue(config=config) pending_tx_processor = PendingTxProcessor( session_factory=session_factory, message_publisher=message_publisher, diff --git a/src/aleph/network.py b/src/aleph/network.py index a4086e055..69c1bd9cb 100644 --- a/src/aleph/network.py +++ b/src/aleph/network.py @@ -2,11 +2,11 @@ from typing import Coroutine, List, Any, Dict from urllib.parse import unquote +import aio_pika.abc from aleph_p2p_client import AlephP2PServiceClient import aleph.toolkit.json as aleph_json -from aleph.chains.signature_verifier import SignatureVerifier -from aleph.handlers.message_handler import MessageHandler, MessagePublisher +from aleph.handlers.message_handler import MessagePublisher from aleph.services.cache.node_cache import NodeCache from aleph.services.ipfs import IpfsService from aleph.services.ipfs.common import make_ipfs_client @@ -36,11 +36,12 @@ async def decode_pubsub_message(message_data: bytes) -> Dict[str, Any]: return message_dict -def listener_tasks( +async def listener_tasks( config, session_factory: DbSessionFactory, node_cache: NodeCache, p2p_client: AlephP2PServiceClient, + mq_channel: aio_pika.abc.AbstractChannel, ) -> List[Coroutine]: from aleph.services.p2p.protocol import incoming_channel as incoming_p2p_channel @@ -52,10 +53,16 @@ def listener_tasks( ipfs_service=ipfs_service, node_cache=node_cache, ) + pending_message_exchange = await mq_channel.declare_exchange( + name=config.rabbitmq.pending_message_exchange.value, + type = aio_pika.ExchangeType.TOPIC, + auto_delete=False, + ) message_publisher = MessagePublisher( session_factory=session_factory, storage_service=storage_service, config=config, + pending_message_exchange=pending_message_exchange, ) # for now (1st milestone), we only listen on a single global topic... diff --git a/src/aleph/toolkit/rabbitmq.py b/src/aleph/toolkit/rabbitmq.py new file mode 100644 index 000000000..0de3bd58c --- /dev/null +++ b/src/aleph/toolkit/rabbitmq.py @@ -0,0 +1,11 @@ +import aio_pika + + +async def make_mq_conn(config) -> aio_pika.abc.AbstractConnection: + mq_conn = await aio_pika.connect_robust( + host=config.p2p.mq_host.value, + port=config.rabbitmq.port.value, + login=config.rabbitmq.username.value, + password=config.rabbitmq.password.value, + ) + return mq_conn diff --git a/tests/api/conftest.py b/tests/api/conftest.py index 094b5e04e..783304483 100644 --- a/tests/api/conftest.py +++ b/tests/api/conftest.py @@ -216,5 +216,6 @@ def message_processor(mocker, mock_config: Config, session_factory: DbSessionFac max_retries=0, mq_message_exchange=mocker.AsyncMock(), mq_conn=mocker.AsyncMock(), + pending_message_queue=mocker.AsyncMock(), ) return message_processor diff --git a/tests/message_processing/conftest.py b/tests/message_processing/conftest.py index 6fea094a7..da37e723d 100644 --- a/tests/message_processing/conftest.py +++ b/tests/message_processing/conftest.py @@ -93,5 +93,6 @@ def message_processor(mocker, mock_config: Config, session_factory: DbSessionFac max_retries=0, mq_message_exchange=mocker.AsyncMock(), mq_conn=mocker.AsyncMock(), + pending_message_queue=mocker.AsyncMock(), ) return message_processor diff --git a/tests/message_processing/test_process_pending_txs.py b/tests/message_processing/test_process_pending_txs.py index a21a913a7..dbfec22ce 100644 --- a/tests/message_processing/test_process_pending_txs.py +++ b/tests/message_processing/test_process_pending_txs.py @@ -45,6 +45,7 @@ async def test_process_pending_tx_on_chain_protocol( session_factory=session_factory, storage_service=test_storage_service, config=mock_config, + pending_message_exchange=mocker.AsyncMock(), ), chain_data_service=chain_data_service, pending_tx_queue=mocker.AsyncMock(), @@ -119,6 +120,7 @@ async def _process_smart_contract_tx( session_factory=session_factory, storage_service=test_storage_service, config=mock_config, + pending_message_exchange=mocker.AsyncMock(), ), chain_data_service=chain_data_service, pending_tx_queue=mocker.AsyncMock(),