Skip to content

Commit

Permalink
Internal: message processing is now event-based
Browse files Browse the repository at this point in the history
Problem: the pending message fetcher and processor use a polling loop to
look for messages to fetch/process. This leads to some latency when the
pending_messages table is empty as the task sleeps while waiting for new
pending messages.

Solution: add an exchange + queue in RabbitMQ to signal the arrival of
new messages. To avoid modifying the message processor too much and
avoid depending on coherency between the DB and RabbitMQ, the fetcher
and processor simply spawn a new task that looks for messages and sets
an asyncio Event object. The main fetching/processing loop waits on this
event (with a timeout).

Note that this system is not used for retries as this would require
another task that posts messages to the MQ on their next attempt.
Retried messages simply wait for the next iteration of the loop
(every second).

This solution has the following advantages and drawbacks:
+ No more arbitrary latency when processing new messages
+ No major modification of the pipeline, even if the MQ system fails
  for some reason the pending message processor will still process
  messages every second
+ No dependency on the state of the message queue, if the RabbitMQ queue
  is deleted for any reason the processor will keep on working
- RabbitMQ overhead (one more exchange + queue).
  • Loading branch information
odesenfans committed Nov 2, 2023
1 parent befa366 commit c2e1d02
Show file tree
Hide file tree
Showing 12 changed files with 218 additions and 66 deletions.
8 changes: 7 additions & 1 deletion src/aleph/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
__copyright__ = "Moshe Malawach"
__license__ = "mit"

from aleph.toolkit.rabbitmq import make_mq_conn

LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -129,6 +131,9 @@ async def main(args: List[str]) -> None:

setup_logging(args.loglevel)

mq_conn = await make_mq_conn(config)
mq_channel = await mq_conn.channel()

node_cache = await init_node_cache(config)
ipfs_service = IpfsService(ipfs_client=make_ipfs_client(config))
storage_service = StorageService(
Expand Down Expand Up @@ -172,11 +177,12 @@ async def main(args: List[str]) -> None:
LOGGER.debug("Initialized p2p")

LOGGER.debug("Initializing listeners")
tasks += listener_tasks(
tasks += await listener_tasks(
config=config,
session_factory=session_factory,
node_cache=node_cache,
p2p_client=p2p_client,
mq_channel=mq_channel,
)
tasks.append(chain_connector.chain_event_loop(config))
LOGGER.debug("Initialized listeners")
Expand Down
7 changes: 7 additions & 0 deletions src/aleph/db/accessors/pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ def get_pending_messages(
return session.execute(select_stmt).scalars()


def get_pending_message(session: DbSession, pending_message_id: int) -> Optional[PendingMessageDb]:
select_stmt = select(PendingMessageDb).where(
PendingMessageDb.id == pending_message_id
)
return session.execute(select_stmt).scalar_one_or_none()


def count_pending_messages(session: DbSession, chain: Optional[Chain] = None) -> int:
"""
Counts pending messages.
Expand Down
15 changes: 13 additions & 2 deletions src/aleph/handlers/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from typing import Optional, Dict, Any, Mapping

import aio_pika.abc
import psycopg2
import sqlalchemy.exc
from aleph_message.models import MessageType, ItemType
Expand Down Expand Up @@ -175,12 +176,21 @@ def __init__(
session_factory: DbSessionFactory,
storage_service: StorageService,
config: Config,
pending_message_exchange: aio_pika.abc.AbstractExchange,
):
super().__init__(
storage_service=storage_service,
config=config,
)
self.session_factory = session_factory
self.pending_message_exchange = pending_message_exchange

async def _publish_pending_message(self, pending_message: PendingMessageDb) -> None:
mq_message = aio_pika.Message(body=f"{pending_message.id}".encode("utf-8"))
process_or_fetch = "process" if pending_message.fetched else "fetch"
await self.pending_message_exchange.publish(
mq_message, routing_key=f"{process_or_fetch}.{pending_message.item_hash}"
)

async def add_pending_message(
self,
Expand Down Expand Up @@ -241,7 +251,6 @@ async def add_pending_message(
session.execute(upsert_message_status_stmt)
session.execute(insert_pending_message_stmt)
session.commit()
return pending_message

except (psycopg2.Error, sqlalchemy.exc.SQLAlchemyError) as e:
LOGGER.warning(
Expand All @@ -259,6 +268,9 @@ async def add_pending_message(
session.commit()
return None

await self._publish_pending_message(pending_message)
return pending_message


class MessageHandler(BaseMessageHandler):
"""
Expand Down Expand Up @@ -299,7 +311,6 @@ async def confirm_existing_message(
)
)


async def insert_message(
self, session: DbSession, pending_message: PendingMessageDb, message: MessageDb
):
Expand Down
43 changes: 28 additions & 15 deletions src/aleph/jobs/fetch_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@
NewType,
)

import aio_pika.abc
from configmanager import Config
from setproctitle import setproctitle

from ..chains.signature_verifier import SignatureVerifier
from aleph.chains.signature_verifier import SignatureVerifier
from aleph.db.accessors.pending_messages import (
make_pending_message_fetched_statement,
get_next_pending_messages,
)
from aleph.db.connection import make_engine, make_session_factory
from aleph.db.models import PendingMessageDb, MessageDb
from aleph.db.models import MessageDb, PendingMessageDb
from aleph.handlers.message_handler import MessageHandler
from aleph.services.cache.node_cache import NodeCache
from aleph.services.ipfs import IpfsService
from aleph.services.ipfs.common import make_ipfs_client
from aleph.services.storage.fileystem_engine import FileSystemStorageEngine
Expand All @@ -32,8 +34,8 @@
from aleph.toolkit.monitoring import setup_sentry
from aleph.toolkit.timestamp import utc_now
from aleph.types.db_session import DbSessionFactory
from .job_utils import prepare_loop, MessageJob
from ..services.cache.node_cache import NodeCache
from .job_utils import prepare_loop, MessageJob, make_pending_message_queue
from ..toolkit.rabbitmq import make_mq_conn

LOGGER = getLogger(__name__)

Expand All @@ -47,12 +49,15 @@ def __init__(
session_factory: DbSessionFactory,
message_handler: MessageHandler,
max_retries: int,
pending_message_queue: aio_pika.abc.AbstractQueue,
):
super().__init__(
session_factory=session_factory,
message_handler=message_handler,
max_retries=max_retries,
pending_message_queue=pending_message_queue,
)
self.pending_message_queue = pending_message_queue

async def fetch_pending_message(self, pending_message: PendingMessageDb):
with self.session_factory() as session:
Expand All @@ -76,6 +81,7 @@ async def fetch_pending_message(self, pending_message: PendingMessageDb):
exception=e,
)
session.commit()
return None

async def fetch_pending_messages(
self, config: Config, node_cache: NodeCache, loop: bool = True
Expand Down Expand Up @@ -140,8 +146,11 @@ async def fetch_pending_messages(
break
# If we are done, wait a few seconds until retrying
if not fetch_tasks:
LOGGER.info("waiting 1 second(s) for new pending messages...")
await asyncio.sleep(1)
LOGGER.info("waiting for new pending messages...")
try:
await asyncio.wait_for(self.ready(), 1)
except TimeoutError:
pass

def make_pipeline(
self,
Expand All @@ -156,12 +165,16 @@ def make_pipeline(


async def fetch_messages_task(config: Config):
# TODO: this sleep can probably be removed
await asyncio.sleep(4)

engine = make_engine(config=config, application_name="aleph-fetch")
session_factory = make_session_factory(engine)

mq_conn = await make_mq_conn(config=config)
mq_channel = await mq_conn.channel()

pending_message_queue = await make_pending_message_queue(
config=config, routing_key="fetch.*", channel=mq_channel
)

node_cache = NodeCache(
redis_host=config.redis.host.value, redis_port=config.redis.port.value
)
Expand All @@ -182,10 +195,11 @@ async def fetch_messages_task(config: Config):
session_factory=session_factory,
message_handler=message_handler,
max_retries=config.aleph.jobs.pending_messages.max_retries.value,
pending_message_queue=pending_message_queue,
)

while True:
with session_factory() as session:
async with fetcher:
while True:
try:
fetch_pipeline = fetcher.make_pipeline(
config=config, node_cache=node_cache
Expand All @@ -197,11 +211,10 @@ async def fetch_messages_task(config: Config):
)

except Exception:
LOGGER.exception("Error in pending messages job")
session.rollback()
LOGGER.exception("Unexpected error in pending messages fetch job")

LOGGER.debug("Waiting 1 second(s) for new pending messages...")
await asyncio.sleep(1)
LOGGER.debug("Waiting 1 second(s) for new pending messages...")
await asyncio.sleep(1)


def fetch_pending_messages_subprocess(config_values: Dict):
Expand Down
91 changes: 87 additions & 4 deletions src/aleph/jobs/job_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
import datetime as dt
import logging
from typing import Dict, Union
from typing import Dict, Union, Optional
from typing import Tuple

import aio_pika
from configmanager import Config
from sqlalchemy import update

Expand All @@ -28,6 +29,58 @@
MAX_RETRY_INTERVAL: int = 300


async def _make_pending_queue(
config: Config,
exchange_name: str,
queue_name: str,
routing_key: str,
channel: Optional[aio_pika.abc.AbstractChannel] = None,
) -> aio_pika.abc.AbstractQueue:
if not channel:
mq_conn = await aio_pika.connect_robust(
host=config.p2p.mq_host.value,
port=config.rabbitmq.port.value,
login=config.rabbitmq.username.value,
password=config.rabbitmq.password.value,
)
channel = await mq_conn.channel()

exchange = await channel.declare_exchange(
name=exchange_name,
type=aio_pika.ExchangeType.TOPIC,
auto_delete=False,
)
queue = await channel.declare_queue(name=queue_name, auto_delete=False)
await queue.bind(exchange, routing_key=routing_key)
return queue


async def make_pending_tx_queue(
config: Config, channel: aio_pika.abc.AbstractChannel
) -> aio_pika.abc.AbstractQueue:
return await _make_pending_queue(
config=config,
exchange_name=config.rabbitmq.pending_tx_exchange.value,
queue_name="pending-tx-queue",
routing_key="#",
channel=channel,
)


async def make_pending_message_queue(
config: Config,
routing_key: str,
channel: Optional[aio_pika.abc.AbstractChannel] = None,
) -> aio_pika.abc.AbstractQueue:
return await _make_pending_queue(
config=config,
exchange_name=config.rabbitmq.pending_message_exchange.value,
queue_name="pending_message_queue",
routing_key=routing_key,
channel=channel,
)


def compute_next_retry_interval(attempts: int) -> dt.timedelta:
"""
Computes the time interval for the next attempt/retry of a message.
Expand All @@ -39,7 +92,7 @@ def compute_next_retry_interval(attempts: int) -> dt.timedelta:
:return: The time interval between the previous processing attempt and the next one.
"""

seconds = 2 ** attempts
seconds = 2**attempts
return dt.timedelta(seconds=min(seconds, MAX_RETRY_INTERVAL))


Expand All @@ -66,6 +119,8 @@ def schedule_next_attempt(
set_next_retry(
session=session, pending_message=pending_message, next_attempt=next_attempt
)
pending_message.next_attempt = next_attempt
pending_message.retries += 1


def prepare_loop(config_values: Dict) -> Tuple[asyncio.AbstractEventLoop, Config]:
Expand All @@ -87,16 +142,24 @@ def prepare_loop(config_values: Dict) -> Tuple[asyncio.AbstractEventLoop, Config


class MessageJob:
retry_task: Optional[asyncio.Task]

def __init__(
self,
session_factory: DbSessionFactory,
message_handler: MessageHandler,
max_retries: int,
pending_message_queue: aio_pika.abc.AbstractQueue,
):
self.session_factory = session_factory
self.message_handler = message_handler
self.max_retries = max_retries

self.retry_task = None
self.check_for_pending_messages_task = None
self.pending_message_queue = pending_message_queue
self.pending_message_event = asyncio.Event()

def _handle_rejection(
self,
session: DbSession,
Expand All @@ -119,7 +182,27 @@ def _handle_rejection(

return RejectedMessage(pending_message=pending_message, error_code=error_code)

def _handle_retry(
async def _check_for_pending_message(self):
async with self.pending_message_queue.iterator(no_ack=True) as queue_iter:
async for _ in queue_iter:
self.pending_message_event.set()

async def ready(self):
await self.pending_message_event.wait()
self.pending_message_event.clear()

async def __aenter__(self):
self.check_for_pending_messages_task = asyncio.create_task(
self._check_for_pending_message()
)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.check_for_pending_messages_task is not None:
self.check_for_pending_messages_task.cancel()
await self.check_for_pending_messages_task

async def _handle_retry(
self,
session: DbSession,
pending_message: PendingMessageDb,
Expand Down Expand Up @@ -183,6 +266,6 @@ async def handle_processing_error(
session=session, pending_message=pending_message, exception=exception
)
else:
return self._handle_retry(
return await self._handle_retry(
session=session, pending_message=pending_message, exception=exception
)
Loading

0 comments on commit c2e1d02

Please sign in to comment.