From 231ab3fb5d23ad03ea8a0db48cd2771335612a33 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Thu, 31 Oct 2024 09:43:46 -0700 Subject: [PATCH] Feature/indexing logs (#3002) * improve logging around indexing tasks * task_logger doesn't work inside the spawned task --- .../background/celery/tasks/indexing/tasks.py | 92 +++++++++++++++---- 1 file changed, 76 insertions(+), 16 deletions(-) diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index d275ffd0633..175aef54055 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -391,7 +391,12 @@ def connector_indexing_proxy_task( tenant_id: str | None, ) -> None: """celery tasks are forked, but forking is unstable. This proxies work to a spawned task.""" - + task_logger.info( + f"Indexing proxy - starting: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) client = SimpleJobClient() job = client.submit( @@ -405,29 +410,56 @@ def connector_indexing_proxy_task( ) if not job: + task_logger.info( + f"Indexing proxy - spawn failed: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) return + task_logger.info( + f"Indexing proxy - spawn succeeded: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) + while True: sleep(10) - with get_session_with_tenant(tenant_id) as db_session: - index_attempt = get_index_attempt( - db_session=db_session, index_attempt_id=index_attempt_id - ) - # do nothing for ongoing jobs that haven't been stopped - if not job.done(): + # do nothing for ongoing jobs that haven't been stopped + if not job.done(): + with get_session_with_tenant(tenant_id) as db_session: + index_attempt = get_index_attempt( + db_session=db_session, index_attempt_id=index_attempt_id + ) + if not index_attempt: continue if not index_attempt.is_finished(): continue - if job.status == "error": - logger.error(job.exception()) + if job.status == "error": + task_logger.error( + f"Indexing proxy - spawned task exceptioned: " + f"attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id} " + f"error={job.exception()}" + ) - job.release() - break + job.release() + break + task_logger.info( + f"Indexing proxy - finished: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) return @@ -449,7 +481,17 @@ def connector_indexing_task( Returns None if the task did not run (possibly due to a conflict). Otherwise, returns an int >= 0 representing the number of indexed docs. + + NOTE: if an exception is raised out of this task, the primary worker will detect + that the task transitioned to a "READY" state but the generator_complete_key doesn't exist. + This will cause the primary worker to abort the indexing attempt and clean up. """ + logger.info( + f"Indexing spawned task starting: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) attempt = None n_final_progress = 0 @@ -488,19 +530,19 @@ def connector_indexing_task( cast(str, fence_json) ) except ValueError: - task_logger.exception( + logger.exception( f"connector_indexing_task: fence_data not decodeable: fence={rci.fence_key}" ) raise if fence_data.index_attempt_id is None or fence_data.celery_task_id is None: - task_logger.info( + logger.info( f"connector_indexing_task - Waiting for fence: fence={rci.fence_key}" ) sleep(1) continue - task_logger.info( + logger.info( f"connector_indexing_task - Fence found, continuing...: fence={rci.fence_key}" ) break @@ -512,7 +554,7 @@ def connector_indexing_task( acquired = lock.acquire(blocking=False) if not acquired: - task_logger.warning( + logger.warning( f"Indexing task already running, exiting...: " f"cc_pair={cc_pair_id} search_settings={search_settings_id}" ) @@ -555,6 +597,13 @@ def connector_indexing_task( rcs.fence_key, rci.generator_progress_key, lock, r ) + logger.info( + f"Indexing spawned task running entrypoint: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) + run_indexing_entrypoint( index_attempt_id, tenant_id, @@ -573,7 +622,12 @@ def connector_indexing_task( r.set(rci.generator_complete_key, HTTPStatus.OK.value) except Exception as e: - task_logger.exception(f"Indexing failed: cc_pair={cc_pair_id}") + logger.exception( + f"Indexing spawned task failed: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) if attempt: with get_session_with_tenant(tenant_id) as db_session: mark_attempt_failed(attempt, db_session, failure_reason=str(e)) @@ -587,4 +641,10 @@ def connector_indexing_task( if lock.owned(): lock.release() + logger.info( + f"Indexing spawned task finished: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) return n_final_progress