Skip to content

Commit

Permalink
limit session scope of index attempt (use id's where appropriate as w… (
Browse files Browse the repository at this point in the history
#3049)

* limit session scope of index attempt (use id's where appropriate as well)

* fix session scope

---------

Co-authored-by: Richard Kuo <[email protected]>
  • Loading branch information
rkuo-danswer and LostVector authored Nov 5, 2024
1 parent b2c55eb commit 6bf06ac
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 32 deletions.
53 changes: 27 additions & 26 deletions backend/danswer/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ def connector_indexing_task(
f"search_settings={search_settings_id}"
)

attempt = None
attempt_found = False
n_final_progress: int | None = None

redis_connector = RedisConnector(tenant_id, cc_pair_id)
Expand Down Expand Up @@ -557,6 +557,7 @@ def connector_indexing_task(
raise ValueError(
f"Index attempt not found: index_attempt={index_attempt_id}"
)
attempt_found = True

cc_pair = get_connector_credential_pair_from_id(
cc_pair_id=cc_pair_id,
Expand All @@ -576,42 +577,42 @@ def connector_indexing_task(
f"Credential not found: cc_pair={cc_pair_id} credential={cc_pair.credential_id}"
)

# define a callback class
callback = RunIndexingCallback(
redis_connector.stop.fence_key,
redis_connector_index.generator_progress_key,
lock,
r,
)
# define a callback class
callback = RunIndexingCallback(
redis_connector.stop.fence_key,
redis_connector_index.generator_progress_key,
lock,
r,
)

logger.info(
f"Indexing spawned task running entrypoint: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
logger.info(
f"Indexing spawned task running entrypoint: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)

run_indexing_entrypoint(
index_attempt_id,
tenant_id,
cc_pair_id,
is_ee,
callback=callback,
)
run_indexing_entrypoint(
index_attempt_id,
tenant_id,
cc_pair_id,
is_ee,
callback=callback,
)

# get back the total number of indexed docs and return it
n_final_progress = redis_connector_index.get_progress()
redis_connector_index.set_generator_complete(HTTPStatus.OK.value)
# get back the total number of indexed docs and return it
n_final_progress = redis_connector_index.get_progress()
redis_connector_index.set_generator_complete(HTTPStatus.OK.value)
except Exception as e:
logger.exception(
f"Indexing spawned task failed: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
if attempt:
if attempt_found:
with get_session_with_tenant(tenant_id) as db_session:
mark_attempt_failed(attempt, db_session, failure_reason=str(e))
mark_attempt_failed(index_attempt_id, db_session, failure_reason=str(e))

redis_connector_index.reset()
raise e
Expand Down
4 changes: 2 additions & 2 deletions backend/danswer/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ def monitor_ccpair_indexing_taskset(
index_attempt = get_index_attempt(db_session, payload.index_attempt_id)
if index_attempt:
mark_attempt_failed(
index_attempt=index_attempt,
index_attempt_id=payload.index_attempt_id,
db_session=db_session,
failure_reason="Connector indexing aborted or exceptioned.",
)
Expand Down Expand Up @@ -696,7 +696,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
a.connector_credential_pair_id, a.search_settings_id
)
):
mark_attempt_failed(a, db_session, failure_reason=failure_reason)
mark_attempt_failed(a.id, db_session, failure_reason=failure_reason)

lock_beat.reacquire()
if r.exists(RedisConnectorCredentialPair.get_fence_key()):
Expand Down
4 changes: 2 additions & 2 deletions backend/danswer/background/indexing/run_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ def _run_indexing(
or index_attempt.status != IndexingStatus.IN_PROGRESS
):
mark_attempt_failed(
index_attempt,
index_attempt.id,
db_session,
failure_reason=str(e),
full_exception_trace=traceback.format_exc(),
Expand Down Expand Up @@ -372,7 +372,7 @@ def _run_indexing(
and index_attempt_md.num_exceptions >= batch_num
):
mark_attempt_failed(
index_attempt,
index_attempt.id,
db_session,
failure_reason="All batches exceptioned.",
)
Expand Down
4 changes: 2 additions & 2 deletions backend/danswer/db/index_attempt.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,15 @@ def mark_attempt_partially_succeeded(


def mark_attempt_failed(
index_attempt: IndexAttempt,
index_attempt_id: int,
db_session: Session,
failure_reason: str = "Unknown",
full_exception_trace: str | None = None,
) -> None:
try:
attempt = db_session.execute(
select(IndexAttempt)
.where(IndexAttempt.id == index_attempt.id)
.where(IndexAttempt.id == index_attempt_id)
.with_for_update()
).scalar_one()

Expand Down

0 comments on commit 6bf06ac

Please sign in to comment.