From aab63c1a1567e4bf8b45d00e6b3bbdd39a727797 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Thu, 7 Nov 2024 12:52:31 -0800 Subject: [PATCH] add vespa probe too --- .../background/celery/apps/app_base.py | 46 +++++++++++++++++++ .../danswer/background/celery/apps/heavy.py | 1 + .../background/celery/apps/indexing.py | 1 + .../danswer/background/celery/apps/light.py | 1 + .../danswer/background/celery/apps/primary.py | 1 + 5 files changed, 50 insertions(+) diff --git a/backend/danswer/background/celery/apps/app_base.py b/backend/danswer/background/celery/apps/app_base.py index 46e5016598d..7d3e7644ed9 100644 --- a/backend/danswer/background/celery/apps/app_base.py +++ b/backend/danswer/background/celery/apps/app_base.py @@ -3,6 +3,7 @@ import time from typing import Any +import requests import sentry_sdk from celery import Task from celery.app import trace @@ -19,6 +20,7 @@ from danswer.background.celery.celery_utils import celery_is_worker_primary from danswer.configs.constants import DanswerRedisLocks from danswer.db.engine import get_sqlalchemy_engine +from danswer.document_index.vespa_constants import VESPA_CONFIG_SERVER_URL from danswer.redis.redis_connector import RedisConnector from danswer.redis.redis_connector_credential_pair import RedisConnectorCredentialPair from danswer.redis.redis_connector_delete import RedisConnectorDelete @@ -225,6 +227,50 @@ def wait_for_db(sender: Any, **kwargs: Any) -> None: return +def wait_for_vespa(sender: Any, **kwargs: Any) -> None: + """Waits for Vespa to become ready subject to a hardcoded timeout. + Will raise WorkerShutdown to kill the celery worker if the timeout is reached.""" + + WAIT_INTERVAL = 5 + WAIT_LIMIT = 60 + + ready = False + time_start = time.monotonic() + logger.info("Vespa: Readiness probe starting.") + while True: + try: + response = requests.get(f"{VESPA_CONFIG_SERVER_URL}/state/v1/health") + response.raise_for_status() + + response_dict = response.json() + if response_dict["status"]["code"] == "up": + ready = True + break + except Exception: + pass + + time_elapsed = time.monotonic() - time_start + if time_elapsed > WAIT_LIMIT: + break + + logger.info( + f"Vespa: Readiness probe ongoing. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}" + ) + + time.sleep(WAIT_INTERVAL) + + if not ready: + msg = ( + f"Vespa: Readiness probe did not succeed within the timeout " + f"({WAIT_LIMIT} seconds). Exiting..." + ) + logger.error(msg) + raise WorkerShutdown(msg) + + logger.info("Vespa: Readiness probe succeeded. Continuing...") + return + + def on_secondary_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("Running as a secondary celery worker.") diff --git a/backend/danswer/background/celery/apps/heavy.py b/backend/danswer/background/celery/apps/heavy.py index f81fa2cef46..c124a6fc246 100644 --- a/backend/danswer/background/celery/apps/heavy.py +++ b/backend/danswer/background/celery/apps/heavy.py @@ -62,6 +62,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) + app_base.wait_for_vespa(sender, **kwargs) app_base.on_secondary_worker_init(sender, **kwargs) diff --git a/backend/danswer/background/celery/apps/indexing.py b/backend/danswer/background/celery/apps/indexing.py index e0b6c234db5..1c4cff425bc 100644 --- a/backend/danswer/background/celery/apps/indexing.py +++ b/backend/danswer/background/celery/apps/indexing.py @@ -62,6 +62,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) + app_base.wait_for_vespa(sender, **kwargs) app_base.on_secondary_worker_init(sender, **kwargs) diff --git a/backend/danswer/background/celery/apps/light.py b/backend/danswer/background/celery/apps/light.py index da1a1f4897f..fce19ed17c5 100644 --- a/backend/danswer/background/celery/apps/light.py +++ b/backend/danswer/background/celery/apps/light.py @@ -62,6 +62,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) + app_base.wait_for_vespa(sender, **kwargs) app_base.on_secondary_worker_init(sender, **kwargs) diff --git a/backend/danswer/background/celery/apps/primary.py b/backend/danswer/background/celery/apps/primary.py index d17a92fa172..5a5ffbb62c2 100644 --- a/backend/danswer/background/celery/apps/primary.py +++ b/backend/danswer/background/celery/apps/primary.py @@ -77,6 +77,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) + app_base.wait_for_vespa(sender, **kwargs) logger.info("Running as the primary celery worker.")