From f6bb53378777bfdc91caeee92dce9eedde349c23 Mon Sep 17 00:00:00 2001 From: BobTheBuidler <70677534+BobTheBuidler@users.noreply.github.com> Date: Thu, 19 Dec 2024 00:34:40 -0400 Subject: [PATCH 1/3] chore: extract @log_broken decorator --- a_sync/primitives/queue.py | 103 +++++++++++++++++++------------------ 1 file changed, 53 insertions(+), 50 deletions(-) diff --git a/a_sync/primitives/queue.py b/a_sync/primitives/queue.py index a4e18563..ced6d0e4 100644 --- a/a_sync/primitives/queue.py +++ b/a_sync/primitives/queue.py @@ -251,6 +251,17 @@ def get_multi_nowait(self, i: int, can_return_less: bool = False) -> List[T]: return items +def log_broken(func: Callable[[Any], NoReturn]) -> Callable[[Any], NoReturn]: + @wraps(func) + async def __worker_exc_wrap(self): + try: + return func(self) + except Exception as e: + logger.error("%s for %s is broken!!!", type(self).__name__, func) + logger.exception(e) + raise + return __worker_exc_wrap + class ProcessingQueue(_Queue[Tuple[P, "asyncio.Future[V]"]], Generic[P, V]): """ A queue designed for processing tasks asynchronously with multiple workers. @@ -480,6 +491,7 @@ def _workers(self) -> "asyncio.Task[NoReturn]": task._workers = workers return task + @log_broken async def __worker_coro(self) -> NoReturn: """ The coroutine executed by worker tasks to process the queue. @@ -502,37 +514,32 @@ async def __worker_coro(self) -> NoReturn: else: fut: asyncio.Future[V] while True: + args, kwargs, fut = await get_next_job() try: - args, kwargs, fut = await get_next_job() + if fut is None: + # the weakref was already cleaned up, we don't need to process this item + task_done() + continue + result = await func(*args, **kwargs) + fut.set_result(result) + except InvalidStateError: + logger.error( + "cannot set result for %s %s: %s", + func.__name__, + fut, + result, + ) + except Exception as e: try: - if fut is None: - # the weakref was already cleaned up, we don't need to process this item - task_done() - continue - result = await func(*args, **kwargs) - fut.set_result(result) + fut.set_exception(e) except InvalidStateError: logger.error( - "cannot set result for %s %s: %s", + "cannot set exception for %s %s: %s", func.__name__, fut, - result, + e, ) - except Exception as e: - try: - fut.set_exception(e) - except InvalidStateError: - logger.error( - "cannot set exception for %s %s: %s", - func.__name__, - fut, - e, - ) - task_done() - except Exception as e: - logger.error("%s for %s is broken!!!", type(self).__name__, func) - logger.exception(e) - raise + task_done() def _validate_args(i: int, can_return_less: bool) -> None: @@ -889,6 +896,7 @@ def _get(self): fut, args, kwargs = super()._get() return args, kwargs, fut() + @log_broken async def __worker_coro(self) -> NoReturn: """ Worker coroutine responsible for processing tasks in the queue. @@ -911,35 +919,30 @@ async def __worker_coro(self) -> NoReturn: fut: SmartFuture[V] while True: try: + args, kwargs, fut = await get_next_job() + if fut is None: + # the weakref was already cleaned up, we don't need to process this item + task_done() + continue + log_debug("processing %s", fut) + result = await func(*args, **kwargs) + fut.set_result(result) + except InvalidStateError: + logger.error( + "cannot set result for %s %s: %s", + func.__name__, + fut, + result, + ) + except Exception as e: + log_debug("%s: %s", type(e).__name__, e) try: - args, kwargs, fut = await get_next_job() - if fut is None: - # the weakref was already cleaned up, we don't need to process this item - task_done() - continue - log_debug("processing %s", fut) - result = await func(*args, **kwargs) - fut.set_result(result) + fut.set_exception(e) except InvalidStateError: logger.error( - "cannot set result for %s %s: %s", + "cannot set exception for %s %s: %s", func.__name__, fut, - result, + e, ) - except Exception as e: - log_debug("%s: %s", type(e).__name__, e) - try: - fut.set_exception(e) - except InvalidStateError: - logger.error( - "cannot set exception for %s %s: %s", - func.__name__, - fut, - e, - ) - task_done() - except Exception as e: - logger.error("%s for %s is broken!!!", type(self).__name__, func) - logger.exception(e) - raise + task_done() From 75576be3bc03dbe71d5836668866db114afbacab Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 19 Dec 2024 04:35:00 +0000 Subject: [PATCH 2/3] chore: `black .` --- a_sync/primitives/queue.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/a_sync/primitives/queue.py b/a_sync/primitives/queue.py index ced6d0e4..b2d73c16 100644 --- a/a_sync/primitives/queue.py +++ b/a_sync/primitives/queue.py @@ -260,8 +260,10 @@ async def __worker_exc_wrap(self): logger.error("%s for %s is broken!!!", type(self).__name__, func) logger.exception(e) raise + return __worker_exc_wrap - + + class ProcessingQueue(_Queue[Tuple[P, "asyncio.Future[V]"]], Generic[P, V]): """ A queue designed for processing tasks asynchronously with multiple workers. From 34f8cf23ea32a602a949eaa9fe0342c827812590 Mon Sep 17 00:00:00 2001 From: BobTheBuidler <70677534+BobTheBuidler@users.noreply.github.com> Date: Thu, 19 Dec 2024 00:40:10 -0400 Subject: [PATCH 3/3] Update queue.py --- a_sync/primitives/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/a_sync/primitives/queue.py b/a_sync/primitives/queue.py index b2d73c16..0312f8c8 100644 --- a/a_sync/primitives/queue.py +++ b/a_sync/primitives/queue.py @@ -255,7 +255,7 @@ def log_broken(func: Callable[[Any], NoReturn]) -> Callable[[Any], NoReturn]: @wraps(func) async def __worker_exc_wrap(self): try: - return func(self) + return await func(self) except Exception as e: logger.error("%s for %s is broken!!!", type(self).__name__, func) logger.exception(e)