diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index 7d2e6e0b3dd..e2446592adc 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -489,7 +489,7 @@ def connector_indexing_task( f"search_settings={search_settings_id}" ) - attempt = None + attempt_found = False n_final_progress: int | None = None redis_connector = RedisConnector(tenant_id, cc_pair_id) @@ -557,6 +557,7 @@ def connector_indexing_task( raise ValueError( f"Index attempt not found: index_attempt={index_attempt_id}" ) + attempt_found = True cc_pair = get_connector_credential_pair_from_id( cc_pair_id=cc_pair_id, @@ -576,32 +577,32 @@ def connector_indexing_task( f"Credential not found: cc_pair={cc_pair_id} credential={cc_pair.credential_id}" ) - # define a callback class - callback = RunIndexingCallback( - redis_connector.stop.fence_key, - redis_connector_index.generator_progress_key, - lock, - r, - ) + # define a callback class + callback = RunIndexingCallback( + redis_connector.stop.fence_key, + redis_connector_index.generator_progress_key, + lock, + r, + ) - logger.info( - f"Indexing spawned task running entrypoint: attempt={index_attempt_id} " - f"tenant={tenant_id} " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id}" - ) + logger.info( + f"Indexing spawned task running entrypoint: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) - run_indexing_entrypoint( - index_attempt_id, - tenant_id, - cc_pair_id, - is_ee, - callback=callback, - ) + run_indexing_entrypoint( + index_attempt_id, + tenant_id, + cc_pair_id, + is_ee, + callback=callback, + ) - # get back the total number of indexed docs and return it - n_final_progress = redis_connector_index.get_progress() - redis_connector_index.set_generator_complete(HTTPStatus.OK.value) + # get back the total number of indexed docs and return it + n_final_progress = redis_connector_index.get_progress() + redis_connector_index.set_generator_complete(HTTPStatus.OK.value) except Exception as e: logger.exception( f"Indexing spawned task failed: attempt={index_attempt_id} " @@ -609,9 +610,9 @@ def connector_indexing_task( f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) - if attempt: + if attempt_found: with get_session_with_tenant(tenant_id) as db_session: - mark_attempt_failed(attempt, db_session, failure_reason=str(e)) + mark_attempt_failed(index_attempt_id, db_session, failure_reason=str(e)) redis_connector_index.reset() raise e diff --git a/backend/danswer/background/celery/tasks/vespa/tasks.py b/backend/danswer/background/celery/tasks/vespa/tasks.py index 944750c7a78..1be928f5994 100644 --- a/backend/danswer/background/celery/tasks/vespa/tasks.py +++ b/backend/danswer/background/celery/tasks/vespa/tasks.py @@ -610,7 +610,7 @@ def monitor_ccpair_indexing_taskset( index_attempt = get_index_attempt(db_session, payload.index_attempt_id) if index_attempt: mark_attempt_failed( - index_attempt=index_attempt, + index_attempt_id=payload.index_attempt_id, db_session=db_session, failure_reason="Connector indexing aborted or exceptioned.", ) @@ -696,7 +696,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool: a.connector_credential_pair_id, a.search_settings_id ) ): - mark_attempt_failed(a, db_session, failure_reason=failure_reason) + mark_attempt_failed(a.id, db_session, failure_reason=failure_reason) lock_beat.reacquire() if r.exists(RedisConnectorCredentialPair.get_fence_key()): diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index 252309e3faa..35cb080b903 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -337,7 +337,7 @@ def _run_indexing( or index_attempt.status != IndexingStatus.IN_PROGRESS ): mark_attempt_failed( - index_attempt, + index_attempt.id, db_session, failure_reason=str(e), full_exception_trace=traceback.format_exc(), @@ -372,7 +372,7 @@ def _run_indexing( and index_attempt_md.num_exceptions >= batch_num ): mark_attempt_failed( - index_attempt, + index_attempt.id, db_session, failure_reason="All batches exceptioned.", ) diff --git a/backend/danswer/db/index_attempt.py b/backend/danswer/db/index_attempt.py index 45a07387949..b9c3d9d4ca2 100644 --- a/backend/danswer/db/index_attempt.py +++ b/backend/danswer/db/index_attempt.py @@ -219,7 +219,7 @@ def mark_attempt_partially_succeeded( def mark_attempt_failed( - index_attempt: IndexAttempt, + index_attempt_id: int, db_session: Session, failure_reason: str = "Unknown", full_exception_trace: str | None = None, @@ -227,7 +227,7 @@ def mark_attempt_failed( try: attempt = db_session.execute( select(IndexAttempt) - .where(IndexAttempt.id == index_attempt.id) + .where(IndexAttempt.id == index_attempt_id) .with_for_update() ).scalar_one()