Skip to content

Commit

Permalink
Internal: message processing is now event-based (#500)
Browse files Browse the repository at this point in the history
Problem: the pending message fetcher and processor use a polling loop to
look for messages to fetch/process. This leads to some latency when the
pending_messages table is empty as the task sleeps while waiting for new
pending messages.

Solution: add an exchange + queue in RabbitMQ to signal the arrival of
new messages. To avoid modifying the message processor too much and
avoid depending on coherency between the DB and RabbitMQ, the fetcher
and processor simply spawn a new task that looks for messages and sets
an asyncio Event object. The main fetching/processing loop waits on this
event (with a timeout).

Note that this system is not used for retries as this would require
another task that posts messages to the MQ on their next attempt.
Retried messages simply wait for the next iteration of the loop
(every second).

This solution has the following advantages and drawbacks:
+ No more arbitrary latency when processing new messages
+ No major modification of the pipeline, even if the MQ system fails
  for some reason the pending message processor will still process
  messages every second
+ No dependency on the state of the message queue, if the RabbitMQ queue
  is deleted for any reason the processor will keep on working
- RabbitMQ overhead (one more exchange + queue).
  • Loading branch information
odesenfans authored Nov 3, 2023
1 parent 9732aa5 commit c4543fa
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 68 deletions.
8 changes: 7 additions & 1 deletion src/aleph/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
__copyright__ = "Moshe Malawach"
__license__ = "mit"

from aleph.toolkit.rabbitmq import make_mq_conn

LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -129,6 +131,9 @@ async def main(args: List[str]) -> None:

setup_logging(args.loglevel)

mq_conn = await make_mq_conn(config)
mq_channel = await mq_conn.channel()

node_cache = await init_node_cache(config)
ipfs_service = IpfsService(ipfs_client=make_ipfs_client(config))
storage_service = StorageService(
Expand Down Expand Up @@ -172,11 +177,12 @@ async def main(args: List[str]) -> None:
LOGGER.debug("Initialized p2p")

LOGGER.debug("Initializing listeners")
tasks += listener_tasks(
tasks += await listener_tasks(
config=config,
session_factory=session_factory,
node_cache=node_cache,
p2p_client=p2p_client,
mq_channel=mq_channel,
)
tasks.append(chain_connector.chain_event_loop(config))
LOGGER.debug("Initialized listeners")
Expand Down
7 changes: 7 additions & 0 deletions src/aleph/db/accessors/pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ def get_pending_messages(
return session.execute(select_stmt).scalars()


def get_pending_message(session: DbSession, pending_message_id: int) -> Optional[PendingMessageDb]:
select_stmt = select(PendingMessageDb).where(
PendingMessageDb.id == pending_message_id
)
return session.execute(select_stmt).scalar_one_or_none()


def count_pending_messages(session: DbSession, chain: Optional[Chain] = None) -> int:
"""
Counts pending messages.
Expand Down
15 changes: 13 additions & 2 deletions src/aleph/handlers/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from typing import Optional, Dict, Any, Mapping

import aio_pika.abc
import psycopg2
import sqlalchemy.exc
from aleph_message.models import MessageType, ItemType
Expand Down Expand Up @@ -175,12 +176,21 @@ def __init__(
session_factory: DbSessionFactory,
storage_service: StorageService,
config: Config,
pending_message_exchange: aio_pika.abc.AbstractExchange,
):
super().__init__(
storage_service=storage_service,
config=config,
)
self.session_factory = session_factory
self.pending_message_exchange = pending_message_exchange

async def _publish_pending_message(self, pending_message: PendingMessageDb) -> None:
mq_message = aio_pika.Message(body=f"{pending_message.id}".encode("utf-8"))
process_or_fetch = "process" if pending_message.fetched else "fetch"
await self.pending_message_exchange.publish(
mq_message, routing_key=f"{process_or_fetch}.{pending_message.item_hash}"
)

async def add_pending_message(
self,
Expand Down Expand Up @@ -241,7 +251,6 @@ async def add_pending_message(
session.execute(upsert_message_status_stmt)
session.execute(insert_pending_message_stmt)
session.commit()
return pending_message

except (psycopg2.Error, sqlalchemy.exc.SQLAlchemyError) as e:
LOGGER.warning(
Expand All @@ -259,6 +268,9 @@ async def add_pending_message(
session.commit()
return None

await self._publish_pending_message(pending_message)
return pending_message


class MessageHandler(BaseMessageHandler):
"""
Expand Down Expand Up @@ -299,7 +311,6 @@ async def confirm_existing_message(
)
)


async def insert_message(
self, session: DbSession, pending_message: PendingMessageDb, message: MessageDb
):
Expand Down
43 changes: 28 additions & 15 deletions src/aleph/jobs/fetch_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@
NewType,
)

import aio_pika.abc
from configmanager import Config
from setproctitle import setproctitle

from ..chains.signature_verifier import SignatureVerifier
from aleph.chains.signature_verifier import SignatureVerifier
from aleph.db.accessors.pending_messages import (
make_pending_message_fetched_statement,
get_next_pending_messages,
)
from aleph.db.connection import make_engine, make_session_factory
from aleph.db.models import PendingMessageDb, MessageDb
from aleph.db.models import MessageDb, PendingMessageDb
from aleph.handlers.message_handler import MessageHandler
from aleph.services.cache.node_cache import NodeCache
from aleph.services.ipfs import IpfsService
from aleph.services.ipfs.common import make_ipfs_client
from aleph.services.storage.fileystem_engine import FileSystemStorageEngine
Expand All @@ -32,8 +34,8 @@
from aleph.toolkit.monitoring import setup_sentry
from aleph.toolkit.timestamp import utc_now
from aleph.types.db_session import DbSessionFactory
from .job_utils import prepare_loop, MessageJob
from ..services.cache.node_cache import NodeCache
from .job_utils import prepare_loop, MessageJob, make_pending_message_queue
from ..toolkit.rabbitmq import make_mq_conn

LOGGER = getLogger(__name__)

Expand All @@ -47,12 +49,15 @@ def __init__(
session_factory: DbSessionFactory,
message_handler: MessageHandler,
max_retries: int,
pending_message_queue: aio_pika.abc.AbstractQueue,
):
super().__init__(
session_factory=session_factory,
message_handler=message_handler,
max_retries=max_retries,
pending_message_queue=pending_message_queue,
)
self.pending_message_queue = pending_message_queue

async def fetch_pending_message(self, pending_message: PendingMessageDb):
with self.session_factory() as session:
Expand All @@ -76,6 +81,7 @@ async def fetch_pending_message(self, pending_message: PendingMessageDb):
exception=e,
)
session.commit()
return None

async def fetch_pending_messages(
self, config: Config, node_cache: NodeCache, loop: bool = True
Expand Down Expand Up @@ -140,8 +146,11 @@ async def fetch_pending_messages(
break
# If we are done, wait a few seconds until retrying
if not fetch_tasks:
LOGGER.info("waiting 1 second(s) for new pending messages...")
await asyncio.sleep(1)
LOGGER.info("waiting for new pending messages...")
try:
await asyncio.wait_for(self.ready(), 1)
except TimeoutError:
pass

def make_pipeline(
self,
Expand All @@ -156,12 +165,16 @@ def make_pipeline(


async def fetch_messages_task(config: Config):
# TODO: this sleep can probably be removed
await asyncio.sleep(4)

engine = make_engine(config=config, application_name="aleph-fetch")
session_factory = make_session_factory(engine)

mq_conn = await make_mq_conn(config=config)
mq_channel = await mq_conn.channel()

pending_message_queue = await make_pending_message_queue(
config=config, routing_key="fetch.*", channel=mq_channel
)

node_cache = NodeCache(
redis_host=config.redis.host.value, redis_port=config.redis.port.value
)
Expand All @@ -182,10 +195,11 @@ async def fetch_messages_task(config: Config):
session_factory=session_factory,
message_handler=message_handler,
max_retries=config.aleph.jobs.pending_messages.max_retries.value,
pending_message_queue=pending_message_queue,
)

while True:
with session_factory() as session:
async with fetcher:
while True:
try:
fetch_pipeline = fetcher.make_pipeline(
config=config, node_cache=node_cache
Expand All @@ -197,11 +211,10 @@ async def fetch_messages_task(config: Config):
)

except Exception:
LOGGER.exception("Error in pending messages job")
session.rollback()
LOGGER.exception("Unexpected error in pending messages fetch job")

LOGGER.debug("Waiting 1 second(s) for new pending messages...")
await asyncio.sleep(1)
LOGGER.debug("Waiting 1 second(s) for new pending messages...")
await asyncio.sleep(1)


def fetch_pending_messages_subprocess(config_values: Dict):
Expand Down
69 changes: 63 additions & 6 deletions src/aleph/jobs/job_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import asyncio
import datetime as dt
import logging
from typing import Dict, Union
from typing import Dict, Union, Optional
from typing import Tuple

import aio_pika.abc
import aio_pika
from configmanager import Config
from sqlalchemy import update

Expand All @@ -29,6 +29,58 @@
MAX_RETRY_INTERVAL: int = 300


async def _make_pending_queue(
config: Config,
exchange_name: str,
queue_name: str,
routing_key: str,
channel: Optional[aio_pika.abc.AbstractChannel] = None,
) -> aio_pika.abc.AbstractQueue:
if not channel:
mq_conn = await aio_pika.connect_robust(
host=config.p2p.mq_host.value,
port=config.rabbitmq.port.value,
login=config.rabbitmq.username.value,
password=config.rabbitmq.password.value,
)
channel = await mq_conn.channel()

exchange = await channel.declare_exchange(
name=exchange_name,
type=aio_pika.ExchangeType.TOPIC,
auto_delete=False,
)
queue = await channel.declare_queue(name=queue_name, auto_delete=False)
await queue.bind(exchange, routing_key=routing_key)
return queue


async def make_pending_tx_queue(
config: Config, channel: aio_pika.abc.AbstractChannel
) -> aio_pika.abc.AbstractQueue:
return await _make_pending_queue(
config=config,
exchange_name=config.rabbitmq.pending_tx_exchange.value,
queue_name="pending-tx-queue",
routing_key="#",
channel=channel,
)


async def make_pending_message_queue(
config: Config,
routing_key: str,
channel: Optional[aio_pika.abc.AbstractChannel] = None,
) -> aio_pika.abc.AbstractQueue:
return await _make_pending_queue(
config=config,
exchange_name=config.rabbitmq.pending_message_exchange.value,
queue_name="pending_message_queue",
routing_key=routing_key,
channel=channel,
)


def compute_next_retry_interval(attempts: int) -> dt.timedelta:
"""
Computes the time interval for the next attempt/retry of a message.
Expand Down Expand Up @@ -67,6 +119,8 @@ def schedule_next_attempt(
set_next_retry(
session=session, pending_message=pending_message, next_attempt=next_attempt
)
pending_message.next_attempt = next_attempt
pending_message.retries += 1


def prepare_loop(config_values: Dict) -> Tuple[asyncio.AbstractEventLoop, Config]:
Expand Down Expand Up @@ -125,19 +179,22 @@ async def ready(self):
self._event.clear()


class MessageJob:
class MessageJob(MqWatcher):
def __init__(
self,
session_factory: DbSessionFactory,
message_handler: MessageHandler,
max_retries: int,
pending_message_queue: aio_pika.abc.AbstractQueue,
):
super().__init__(mq_queue=pending_message_queue)

self.session_factory = session_factory
self.message_handler = message_handler
self.max_retries = max_retries

@staticmethod
def _handle_rejection(
self,
session: DbSession,
pending_message: PendingMessageDb,
exception: BaseException,
Expand All @@ -158,7 +215,7 @@ def _handle_rejection(

return RejectedMessage(pending_message=pending_message, error_code=error_code)

def _handle_retry(
async def _handle_retry(
self,
session: DbSession,
pending_message: PendingMessageDb,
Expand Down Expand Up @@ -222,6 +279,6 @@ async def handle_processing_error(
session=session, pending_message=pending_message, exception=exception
)
else:
return self._handle_retry(
return await self._handle_retry(
session=session, pending_message=pending_message, exception=exception
)
Loading

0 comments on commit c4543fa

Please sign in to comment.