From 08ae8faf79b4d4bc2df5bd7f7140c8ff6f0d3ab7 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Mon, 30 Oct 2023 12:12:01 +0100 Subject: [PATCH] wip --- src/aleph/jobs/job_utils.py | 49 ++++++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/src/aleph/jobs/job_utils.py b/src/aleph/jobs/job_utils.py index a438941d5..d988bba0a 100644 --- a/src/aleph/jobs/job_utils.py +++ b/src/aleph/jobs/job_utils.py @@ -60,7 +60,9 @@ async def make_pending_tx_queue(config: Config) -> aio_pika.abc.AbstractQueue: ) -async def make_pending_message_queue(config: Config, routing_key: str) -> aio_pika.abc.AbstractQueue: +async def make_pending_message_queue( + config: Config, routing_key: str +) -> aio_pika.abc.AbstractQueue: return await _make_pending_queue( config=config, exchange_name=config.rabbitmq.pending_message_exchange.value, @@ -107,6 +109,8 @@ def schedule_next_attempt( set_next_retry( session=session, pending_message=pending_message, next_attempt=next_attempt ) + pending_message.next_attempt = next_attempt + pending_message.retries += 1 def prepare_loop(config_values: Dict) -> Tuple[asyncio.AbstractEventLoop, Config]: @@ -128,6 +132,8 @@ def prepare_loop(config_values: Dict) -> Tuple[asyncio.AbstractEventLoop, Config class MessageJob: + retry_task: Optional[asyncio.Task] + def __init__( self, session_factory: DbSessionFactory, @@ -139,8 +145,8 @@ def __init__( self.message_handler = message_handler self.max_retries = max_retries - self.task_lock = asyncio.Lock() self.retry_task = None + self.retry_wait_queue = asyncio.Queue() def _handle_rejection( self, @@ -164,12 +170,34 @@ def _handle_rejection( return RejectedMessage(pending_message=pending_message, error_code=error_code) - def _reinsert_failed_messages(self): - with self.task_lock(): - - - - def _handle_retry( + async def _reinsert_failed_messages(self): + pending_messages = [] + next_deadline = 3600 # Arbitrary default value + + try: + while True: + queue_task = asyncio.create_task(self.retry_wait_queue.get()) + sleep_task = asyncio.create_task(asyncio.sleep(next_deadline)) + done, pending = await asyncio.wait( + [queue_task, sleep_task], + return_when=asyncio.FIRST_COMPLETED, + ) + if queue_task in done: + pending_messages.append(queue_task.result()) + self.retry_wait_queue.task_done() + + except CancellationError: + ... + + async def __aenter__(self): + self.retry_task = asyncio.create_task(self._reinsert_failed_messages()) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + self.retry_task.cancel() + await self.retry_task + + async def _handle_retry( self, session: DbSession, pending_message: PendingMessageDb, @@ -187,6 +215,8 @@ def _handle_retry( .where(PendingMessageDb.id == pending_message.id) .values(fetched=False) ) + await self.pending_message_queue() + # TODO add to fetch queue elif isinstance(exception, RetryMessageException): LOGGER.warning( "%s error (%d) - message %s marked for retry", @@ -196,6 +226,7 @@ def _handle_retry( ) error_code = exception.error_code schedule_next_attempt(session=session, pending_message=pending_message) + await self.retry_wait_queue.put(pending_message) else: LOGGER.exception( "Unexpected error while fetching message", exc_info=exception @@ -233,6 +264,6 @@ async def handle_processing_error( session=session, pending_message=pending_message, exception=exception ) else: - return self._handle_retry( + return await self._handle_retry( session=session, pending_message=pending_message, exception=exception )