Skip to content

Commit

Permalink
improved commenting
Browse files Browse the repository at this point in the history
  • Loading branch information
rkuo-danswer committed Nov 22, 2024
1 parent 2095148 commit 2c4e082
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 9 deletions.
15 changes: 9 additions & 6 deletions backend/danswer/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,12 @@ def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[
"""
unfenced_attempts: list[int] = []

# inner double check pattern to avoid race conditions without locking
# inner/outer/inner double check pattern to avoid race conditions when checking for
# bad state
# inner = index_attempt in non terminal state
# outer = r.fence_key
# outer = r.fence_key down

# check the db for any non terminal state index attempts
# check the db for index attempts in a non terminal state
attempts: list[IndexAttempt] = []
attempts.extend(
get_all_index_attempts_by_status(IndexingStatus.NOT_STARTED, db_session)
Expand All @@ -131,9 +132,11 @@ def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[
if r.exists(fence_key):
continue

# NOTE: Now double check!
# The attempt can complete and take down the fence before we start checking the fence.
# We need to double check that the index attempt didn't change if the fence is down
# Between the time the attempts are first looked up and the time we see the fence down,
# the attempt may have completed and taken down the fence normally.

# We need to double check that the index attempt is still in a non terminal state
# and matches the original state, which confirms we are really in a bad state.
attempt_2 = get_index_attempt(db_session, attempt.id)
if not attempt_2:
continue
Expand Down
8 changes: 5 additions & 3 deletions backend/danswer/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,9 +648,11 @@ def monitor_ccpair_indexing_taskset(
# never use any blocking methods on the result from inside a task!
result: AsyncResult = AsyncResult(payload.celery_task_id)

# inner double check pattern to avoid race conditions without locking
# inner = get_completion / generator_complete signal
# outer = result.state (signaled if in READY state)
# inner/outer/inner double check pattern to avoid race conditions when checking for
# bad state

# inner = get_completion / generator_complete not signaled
# outer = result.state in READY state
status_int = redis_connector_index.get_completion()
if status_int is None: # inner signal not set ... possible error
result_state = result.state
Expand Down

0 comments on commit 2c4e082

Please sign in to comment.