Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
odesenfans committed Oct 30, 2023
1 parent 464cd8b commit 08ae8fa
Showing 1 changed file with 40 additions and 9 deletions.
49 changes: 40 additions & 9 deletions src/aleph/jobs/job_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ async def make_pending_tx_queue(config: Config) -> aio_pika.abc.AbstractQueue:
)


async def make_pending_message_queue(config: Config, routing_key: str) -> aio_pika.abc.AbstractQueue:
async def make_pending_message_queue(
config: Config, routing_key: str
) -> aio_pika.abc.AbstractQueue:
return await _make_pending_queue(
config=config,
exchange_name=config.rabbitmq.pending_message_exchange.value,
Expand Down Expand Up @@ -107,6 +109,8 @@ def schedule_next_attempt(
set_next_retry(
session=session, pending_message=pending_message, next_attempt=next_attempt
)
pending_message.next_attempt = next_attempt
pending_message.retries += 1


def prepare_loop(config_values: Dict) -> Tuple[asyncio.AbstractEventLoop, Config]:
Expand All @@ -128,6 +132,8 @@ def prepare_loop(config_values: Dict) -> Tuple[asyncio.AbstractEventLoop, Config


class MessageJob:
retry_task: Optional[asyncio.Task]

def __init__(
self,
session_factory: DbSessionFactory,
Expand All @@ -139,8 +145,8 @@ def __init__(
self.message_handler = message_handler
self.max_retries = max_retries

self.task_lock = asyncio.Lock()
self.retry_task = None
self.retry_wait_queue = asyncio.Queue()

def _handle_rejection(
self,
Expand All @@ -164,12 +170,34 @@ def _handle_rejection(

return RejectedMessage(pending_message=pending_message, error_code=error_code)

def _reinsert_failed_messages(self):
with self.task_lock():



def _handle_retry(
async def _reinsert_failed_messages(self):
pending_messages = []
next_deadline = 3600 # Arbitrary default value

try:
while True:
queue_task = asyncio.create_task(self.retry_wait_queue.get())
sleep_task = asyncio.create_task(asyncio.sleep(next_deadline))
done, pending = await asyncio.wait(
[queue_task, sleep_task],
return_when=asyncio.FIRST_COMPLETED,
)
if queue_task in done:
pending_messages.append(queue_task.result())
self.retry_wait_queue.task_done()

except CancellationError:
...

async def __aenter__(self):
self.retry_task = asyncio.create_task(self._reinsert_failed_messages())
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
self.retry_task.cancel()
await self.retry_task

async def _handle_retry(
self,
session: DbSession,
pending_message: PendingMessageDb,
Expand All @@ -187,6 +215,8 @@ def _handle_retry(
.where(PendingMessageDb.id == pending_message.id)
.values(fetched=False)
)
await self.pending_message_queue()
# TODO add to fetch queue
elif isinstance(exception, RetryMessageException):
LOGGER.warning(
"%s error (%d) - message %s marked for retry",
Expand All @@ -196,6 +226,7 @@ def _handle_retry(
)
error_code = exception.error_code
schedule_next_attempt(session=session, pending_message=pending_message)
await self.retry_wait_queue.put(pending_message)
else:
LOGGER.exception(
"Unexpected error while fetching message", exc_info=exception
Expand Down Expand Up @@ -233,6 +264,6 @@ async def handle_processing_error(
session=session, pending_message=pending_message, exception=exception
)
else:
return self._handle_retry(
return await self._handle_retry(
session=session, pending_message=pending_message, exception=exception
)

0 comments on commit 08ae8fa

Please sign in to comment.