diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index e2446592adc..cd6126223c4 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -173,7 +173,9 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: ) if attempt_id: task_logger.info( - f"Indexing queued: cc_pair={cc_pair.id} index_attempt={attempt_id}" + f"Indexing queued: index_attempt={attempt_id} " + f"cc_pair={cc_pair.id} " + f"search_settings={search_settings_instance.id} " ) tasks_created += 1 except SoftTimeLimitExceeded: @@ -529,6 +531,13 @@ def connector_indexing_task( sleep(1) continue + if payload.index_attempt_id != index_attempt_id: + raise ValueError( + f"connector_indexing_task - id mismatch. Task may be left over from previous run.: " + f"task_index_attempt={index_attempt_id} " + f"payload_index_attempt={payload.index_attempt_id}" + ) + logger.info( f"connector_indexing_task - Fence found, continuing...: fence={redis_connector_index.fence_key}" ) @@ -614,7 +623,6 @@ def connector_indexing_task( with get_session_with_tenant(tenant_id) as db_session: mark_attempt_failed(index_attempt_id, db_session, failure_reason=str(e)) - redis_connector_index.reset() raise e finally: if lock.owned(): diff --git a/backend/danswer/background/celery/tasks/vespa/tasks.py b/backend/danswer/background/celery/tasks/vespa/tasks.py index 1be928f5994..b01a0eac815 100644 --- a/backend/danswer/background/celery/tasks/vespa/tasks.py +++ b/backend/danswer/background/celery/tasks/vespa/tasks.py @@ -690,12 +690,17 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool: for a in attempts: # if attempts exist in the db but we don't detect them in redis, mark them as failed - failure_reason = f"Unknown index attempt {a.id}. Might be left over from a process restart." - if not r.exists( - RedisConnectorIndex.fence_key_with_ids( - a.connector_credential_pair_id, a.search_settings_id + fence_key = RedisConnectorIndex.fence_key_with_ids( + a.connector_credential_pair_id, a.search_settings_id + ) + if not r.exists(fence_key): + failure_reason = ( + f"Unknown index attempt. Might be left over from a process restart: " + f"index_attempt={a.id} " + f"cc_pair={a.connector_credential_pair_id} " + f"search_settings={a.search_settings_id}" ) - ): + task_logger.warning(failure_reason) mark_attempt_failed(a.id, db_session, failure_reason=failure_reason) lock_beat.reacquire()