From c934cbc825cc621b9607789aaf4f5bf5a58e774b Mon Sep 17 00:00:00 2001 From: fubuloubu <3859395+fubuloubu@users.noreply.github.com> Date: Mon, 8 Apr 2024 17:19:26 -0400 Subject: [PATCH] fix: catch more shutdown issues to resolve shutdown without dropping --- silverback/runner.py | 5 ++++- silverback/subscriptions.py | 9 ++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/silverback/runner.py b/silverback/runner.py index 49580586..042841ad 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -126,7 +126,10 @@ async def run(self): if len(tasks) == 0: raise Halt("No tasks to execute") - await asyncio.gather(*tasks) + try: + await asyncio.gather(*tasks) + except Exception as e: + logger.error(f"Fatal error detected, shutting down: '{e}'") # Execute Silverback shutdown task before shutting down the broker if shutdown_handler := self.app.get_shutdown_handler(): diff --git a/silverback/subscriptions.py b/silverback/subscriptions.py index cee56dce..057cfa38 100644 --- a/silverback/subscriptions.py +++ b/silverback/subscriptions.py @@ -4,6 +4,7 @@ from typing import AsyncGenerator, Dict, List, Optional from ape.logging import logger +from websockets import ConnectionClosedError from websockets import client as ws_client @@ -145,6 +146,12 @@ async def __aexit__(self, exc_type, exc, tb): # Try to gracefully unsubscribe to all events await asyncio.gather(*(self.unsubscribe(sub_id) for sub_id in self._subscriptions)) + except ConnectionClosedError: + pass # Websocket already closed (ctrl+C and patiently waiting) + finally: # Disconnect and release websocket - await self.connection.close() + try: + await self.connection.close() + except RuntimeError: + pass # No running event loop to disconnect from (multiple ctrl+C presses)