Skip to content

Commit

Permalink
add vespa probe too
Browse files Browse the repository at this point in the history
  • Loading branch information
rkuo-danswer committed Nov 7, 2024
1 parent 607bfa1 commit aab63c1
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 0 deletions.
46 changes: 46 additions & 0 deletions backend/danswer/background/celery/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
from typing import Any

import requests
import sentry_sdk
from celery import Task
from celery.app import trace
Expand All @@ -19,6 +20,7 @@
from danswer.background.celery.celery_utils import celery_is_worker_primary
from danswer.configs.constants import DanswerRedisLocks
from danswer.db.engine import get_sqlalchemy_engine
from danswer.document_index.vespa_constants import VESPA_CONFIG_SERVER_URL
from danswer.redis.redis_connector import RedisConnector
from danswer.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
from danswer.redis.redis_connector_delete import RedisConnectorDelete
Expand Down Expand Up @@ -225,6 +227,50 @@ def wait_for_db(sender: Any, **kwargs: Any) -> None:
return


def wait_for_vespa(sender: Any, **kwargs: Any) -> None:
"""Waits for Vespa to become ready subject to a hardcoded timeout.
Will raise WorkerShutdown to kill the celery worker if the timeout is reached."""

WAIT_INTERVAL = 5
WAIT_LIMIT = 60

ready = False
time_start = time.monotonic()
logger.info("Vespa: Readiness probe starting.")
while True:
try:
response = requests.get(f"{VESPA_CONFIG_SERVER_URL}/state/v1/health")
response.raise_for_status()

response_dict = response.json()
if response_dict["status"]["code"] == "up":
ready = True
break
except Exception:
pass

time_elapsed = time.monotonic() - time_start
if time_elapsed > WAIT_LIMIT:
break

logger.info(
f"Vespa: Readiness probe ongoing. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
)

time.sleep(WAIT_INTERVAL)

if not ready:
msg = (
f"Vespa: Readiness probe did not succeed within the timeout "
f"({WAIT_LIMIT} seconds). Exiting..."
)
logger.error(msg)
raise WorkerShutdown(msg)

logger.info("Vespa: Readiness probe succeeded. Continuing...")
return


def on_secondary_worker_init(sender: Any, **kwargs: Any) -> None:
logger.info("Running as a secondary celery worker.")

Expand Down
1 change: 1 addition & 0 deletions backend/danswer/background/celery/apps/heavy.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)
app_base.on_secondary_worker_init(sender, **kwargs)


Expand Down
1 change: 1 addition & 0 deletions backend/danswer/background/celery/apps/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)
app_base.on_secondary_worker_init(sender, **kwargs)


Expand Down
1 change: 1 addition & 0 deletions backend/danswer/background/celery/apps/light.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)
app_base.on_secondary_worker_init(sender, **kwargs)


Expand Down
1 change: 1 addition & 0 deletions backend/danswer/background/celery/apps/primary.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)

logger.info("Running as the primary celery worker.")

Expand Down

0 comments on commit aab63c1

Please sign in to comment.