Skip to content

Commit

Permalink
abort mismatched tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
rkuo-danswer committed Nov 6, 2024
1 parent 62c4127 commit 93ec2a6
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
12 changes: 10 additions & 2 deletions backend/danswer/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
)
if attempt_id:
task_logger.info(
f"Indexing queued: cc_pair={cc_pair.id} index_attempt={attempt_id}"
f"Indexing queued: index_attempt={attempt_id} "
f"cc_pair={cc_pair.id} "
f"search_settings={search_settings_instance.id} "
)
tasks_created += 1
except SoftTimeLimitExceeded:
Expand Down Expand Up @@ -529,6 +531,13 @@ def connector_indexing_task(
sleep(1)
continue

if payload.index_attempt_id != index_attempt_id:
raise ValueError(
f"connector_indexing_task - id mismatch. Task may be left over from previous run.: "
f"task_index_attempt={index_attempt_id} "
f"payload_index_attempt={payload.index_attempt_id}"
)

logger.info(
f"connector_indexing_task - Fence found, continuing...: fence={redis_connector_index.fence_key}"
)
Expand Down Expand Up @@ -614,7 +623,6 @@ def connector_indexing_task(
with get_session_with_tenant(tenant_id) as db_session:
mark_attempt_failed(index_attempt_id, db_session, failure_reason=str(e))

redis_connector_index.reset()
raise e
finally:
if lock.owned():
Expand Down
15 changes: 10 additions & 5 deletions backend/danswer/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,12 +690,17 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:

for a in attempts:
# if attempts exist in the db but we don't detect them in redis, mark them as failed
failure_reason = f"Unknown index attempt {a.id}. Might be left over from a process restart."
if not r.exists(
RedisConnectorIndex.fence_key_with_ids(
a.connector_credential_pair_id, a.search_settings_id
fence_key = RedisConnectorIndex.fence_key_with_ids(
a.connector_credential_pair_id, a.search_settings_id
)
if not r.exists(fence_key):
failure_reason = (
f"Unknown index attempt. Might be left over from a process restart: "
f"index_attempt={a.id} "
f"cc_pair={a.connector_credential_pair_id} "
f"search_settings={a.search_settings_id}"
)
):
task_logger.warning(failure_reason)
mark_attempt_failed(a.id, db_session, failure_reason=failure_reason)

lock_beat.reacquire()
Expand Down

0 comments on commit 93ec2a6

Please sign in to comment.