Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into multi-slackbot-cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
hagen-danswer committed Nov 19, 2024
2 parents 8e2ff38 + b712877 commit 5323b1c
Show file tree
Hide file tree
Showing 17 changed files with 382 additions and 62 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<a href="https://docs.danswer.dev/" target="_blank">
<img src="https://img.shields.io/badge/docs-view-blue" alt="Documentation">
</a>
<a href="https://join.slack.com/t/danswer/shared_invite/zt-2lcmqw703-071hBuZBfNEOGUsLa5PXvQ" target="_blank">
<a href="https://join.slack.com/t/danswer/shared_invite/zt-2twesxdr6-5iQitKZQpgq~hYIZ~dv3KA" target="_blank">
<img src="https://img.shields.io/badge/slack-join-blue.svg?logo=slack" alt="Slack">
</a>
<a href="https://discord.gg/TDJ59cGV2X" target="_blank">
Expand Down
7 changes: 4 additions & 3 deletions backend/danswer/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
)
if attempt_id:
task_logger.info(
f"Indexing queued: index_attempt={attempt_id} "
f"Connector indexing queued: "
f"index_attempt={attempt_id} "
f"cc_pair={cc_pair.id} "
f"search_settings={search_settings_instance.id} "
)
Expand Down Expand Up @@ -383,7 +384,6 @@ def try_creating_indexing_task(
payload.index_attempt_id = index_attempt_id
payload.celery_task_id = result.id
redis_connector_index.set_fence(payload)

except Exception:
redis_connector_index.set_fence(None)
task_logger.exception(
Expand Down Expand Up @@ -516,7 +516,8 @@ def connector_indexing_task(
logger.debug("Sentry DSN not provided, skipping Sentry initialization")

logger.info(
f"Indexing spawned task starting: attempt={index_attempt_id} "
f"Indexing spawned task starting: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
Expand Down
27 changes: 21 additions & 6 deletions backend/danswer/background/celery/tasks/pruning/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,14 @@


def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool:
"""Returns boolean indicating if pruning is due."""
"""Returns boolean indicating if pruning is due.
Next pruning time is calculated as a delta from the last successful prune, or the
last successful indexing if pruning has never succeeded.
TODO(rkuo): consider whether we should allow pruning to be immediately rescheduled
if pruning fails (which is what it does now). A backoff could be reasonable.
"""

# skip pruning if no prune frequency is set
# pruning can still be forced via the API which will run a pruning task directly
Expand Down Expand Up @@ -225,6 +232,8 @@ def connector_pruning_generator_task(
pruning_ctx_dict["request_id"] = self.request.id
pruning_ctx.set(pruning_ctx_dict)

task_logger.info(f"Pruning generator starting: cc_pair={cc_pair_id}")

redis_connector = RedisConnector(tenant_id, cc_pair_id)

r = get_redis_client(tenant_id=tenant_id)
Expand Down Expand Up @@ -255,6 +264,11 @@ def connector_pruning_generator_task(
)
return

task_logger.info(
f"Pruning generator running connector: "
f"cc_pair={cc_pair_id} "
f"connector_source={cc_pair.connector.source}"
)
runnable_connector = instantiate_connector(
db_session,
cc_pair.connector.source,
Expand All @@ -269,6 +283,7 @@ def connector_pruning_generator_task(
lock,
r,
)

# a list of docs in the source
all_connector_doc_ids: set[str] = extract_ids_from_runnable_connector(
runnable_connector, callback
Expand All @@ -290,8 +305,8 @@ def connector_pruning_generator_task(
task_logger.info(
f"Pruning set collected: "
f"cc_pair={cc_pair_id} "
f"docs_to_remove={len(doc_ids_to_remove)} "
f"doc_source={cc_pair.connector.source}"
f"connector_source={cc_pair.connector.source} "
f"docs_to_remove={len(doc_ids_to_remove)}"
)

task_logger.info(
Expand All @@ -314,10 +329,10 @@ def connector_pruning_generator_task(
f"Failed to run pruning: cc_pair={cc_pair_id} connector={connector_id}"
)

redis_connector.prune.generator_clear()
redis_connector.prune.taskset_clear()
redis_connector.prune.set_fence(False)
redis_connector.prune.reset()
raise e
finally:
if lock.owned():
lock.release()

task_logger.info(f"Pruning generator finished: cc_pair={cc_pair_id}")
12 changes: 11 additions & 1 deletion backend/danswer/background/celery/tasks/shared/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,17 @@ def document_by_cc_pair_cleanup_task(
f"Max celery task retries reached. Marking doc as dirty for reconciliation: "
f"tenant={tenant_id} doc={document_id}"
)
with get_session_with_tenant(tenant_id):
with get_session_with_tenant(tenant_id) as db_session:
# delete the cc pair relationship now and let reconciliation clean it up
# in vespa
delete_document_by_connector_credential_pair__no_commit(
db_session=db_session,
document_id=document_id,
connector_credential_pair_identifier=ConnectorCredentialPairIdentifier(
connector_id=connector_id,
credential_id=credential_id,
),
)
mark_document_as_modified(document_id, db_session)
return False

Expand Down
64 changes: 41 additions & 23 deletions backend/danswer/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,11 +444,22 @@ def monitor_connector_deletion_taskset(
db_session, cc_pair.connector_id, cc_pair.credential_id
)
if len(doc_ids) > 0:
# if this happens, documents somehow got added while deletion was in progress. Likely a bug
# gating off pruning and indexing work before deletion starts
# NOTE(rkuo): if this happens, documents somehow got added while
# deletion was in progress. Likely a bug gating off pruning and indexing
# work before deletion starts.
task_logger.warning(
f"Connector deletion - documents still found after taskset completion: "
f"cc_pair={cc_pair_id} num={len(doc_ids)}"
"Connector deletion - documents still found after taskset completion. "
"Clearing the current deletion attempt and allowing deletion to restart: "
f"cc_pair={cc_pair_id} "
f"docs_deleted={fence_data.num_tasks} "
f"docs_remaining={len(doc_ids)}"
)

# We don't want to waive off why we get into this state, but resetting
# our attempt and letting the deletion restart is a good way to recover
redis_connector.delete.reset()
raise RuntimeError(
"Connector deletion - documents still found after taskset completion"
)

# clean up the rest of the related Postgres entities
Expand Down Expand Up @@ -512,8 +523,7 @@ def monitor_connector_deletion_taskset(
f"docs_deleted={fence_data.num_tasks}"
)

redis_connector.delete.taskset_clear()
redis_connector.delete.set_fence(None)
redis_connector.delete.reset()


def monitor_ccpair_pruning_taskset(
Expand Down Expand Up @@ -645,26 +655,34 @@ def monitor_ccpair_indexing_taskset(
result_state = result.state

status_int = redis_connector_index.get_completion()
if status_int is None:
if status_int is None: # completion signal not set ... check for errors
# If we get here, and then the task both sets the completion signal and finishes,
# we will incorrectly abort the task. We must check result state, then check
# get_completion again to avoid the race condition.
if result_state in READY_STATES:
# IF the task state is READY, THEN generator_complete should be set
# if it isn't, then the worker crashed
task_logger.info(
f"Connector indexing aborted: "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}"
)

index_attempt = get_index_attempt(db_session, payload.index_attempt_id)
if index_attempt:
mark_attempt_failed(
index_attempt_id=payload.index_attempt_id,
db_session=db_session,
failure_reason="Connector indexing aborted or exceptioned.",
if redis_connector_index.get_completion() is None:
# IF the task state is READY, THEN generator_complete should be set
# if it isn't, then the worker crashed
msg = (
f"Connector indexing aborted or exceptioned: "
f"attempt={payload.index_attempt_id} "
f"celery_task={payload.celery_task_id} "
f"result_state={result_state} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}"
)
task_logger.warning(msg)

index_attempt = get_index_attempt(db_session, payload.index_attempt_id)
if index_attempt:
mark_attempt_failed(
index_attempt_id=payload.index_attempt_id,
db_session=db_session,
failure_reason=msg,
)

redis_connector_index.reset()
redis_connector_index.reset()
return

status_enum = HTTPStatus(status_int)
Expand Down
9 changes: 9 additions & 0 deletions backend/danswer/document_index/vespa/indexing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
from datetime import datetime
from datetime import timezone
from http import HTTPStatus

import httpx
from retry import retry
Expand Down Expand Up @@ -194,6 +195,14 @@ def _index_vespa_chunk(
logger.exception(
f"Failed to index document: '{document.id}'. Got response: '{res.text}'"
)
if isinstance(e, httpx.HTTPStatusError):
if e.response.status_code == HTTPStatus.INSUFFICIENT_STORAGE:
logger.error(
"NOTE: HTTP Status 507 Insufficient Storage usually means "
"you need to allocate more memory or disk space to the "
"Vespa/index container."
)

raise e


Expand Down
10 changes: 10 additions & 0 deletions backend/danswer/indexing/indexing_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import traceback
from functools import partial
from http import HTTPStatus
from typing import Protocol

import httpx
from pydantic import BaseModel
from pydantic import ConfigDict
from sqlalchemy.orm import Session
Expand Down Expand Up @@ -154,6 +156,14 @@ def index_doc_batch_with_handler(
tenant_id=tenant_id,
)
except Exception as e:
if isinstance(e, httpx.HTTPStatusError):
if e.response.status_code == HTTPStatus.INSUFFICIENT_STORAGE:
logger.error(
"NOTE: HTTP Status 507 Insufficient Storage indicates "
"you need to allocate more memory or disk space to the "
"Vespa/index container."
)

if INDEXING_EXCEPTION_LIMIT == 0:
raise

Expand Down
8 changes: 7 additions & 1 deletion backend/danswer/redis/redis_connector_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from danswer.configs.constants import DanswerCeleryQueues
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
from danswer.db.document import construct_document_select_for_connector_credential_pair
from danswer.db.models import Document as DbDocument


class RedisConnectorDeletionFenceData(BaseModel):
Expand Down Expand Up @@ -98,7 +99,8 @@ def generate_tasks(
stmt = construct_document_select_for_connector_credential_pair(
cc_pair.connector_id, cc_pair.credential_id
)
for doc in db_session.scalars(stmt).yield_per(1):
for doc_temp in db_session.scalars(stmt).yield_per(1):
doc: DbDocument = doc_temp
current_time = time.monotonic()
if current_time - last_lock_time >= (
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4
Expand Down Expand Up @@ -130,6 +132,10 @@ def generate_tasks(

return len(async_results)

def reset(self) -> None:
self.redis.delete(self.taskset_key)
self.redis.delete(self.fence_key)

@staticmethod
def remove_from_taskset(id: int, task_id: str, r: redis.Redis) -> None:
taskset_key = f"{RedisConnectorDelete.TASKSET_PREFIX}_{id}"
Expand Down
6 changes: 6 additions & 0 deletions backend/danswer/redis/redis_connector_prune.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ def generate_tasks(

return len(async_results)

def reset(self) -> None:
self.redis.delete(self.generator_progress_key)
self.redis.delete(self.generator_complete_key)
self.redis.delete(self.taskset_key)
self.redis.delete(self.fence_key)

@staticmethod
def remove_from_taskset(id: int, task_id: str, r: redis.Redis) -> None:
taskset_key = f"{RedisConnectorPrune.TASKSET_PREFIX}_{id}"
Expand Down
18 changes: 14 additions & 4 deletions backend/danswer/server/documents/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,21 +865,31 @@ def connector_run_once(
)
if attempt_id:
logger.info(
f"try_creating_indexing_task succeeded: cc_pair={cc_pair.id} attempt_id={attempt_id}"
f"connector_run_once - try_creating_indexing_task succeeded: "
f"connector={run_info.connector_id} "
f"cc_pair={cc_pair.id} "
f"attempt={attempt_id} "
)
index_attempt_ids.append(attempt_id)
else:
logger.info(f"try_creating_indexing_task failed: cc_pair={cc_pair.id}")
logger.info(
f"connector_run_once - try_creating_indexing_task failed: "
f"connector={run_info.connector_id} "
f"cc_pair={cc_pair.id}"
)

if not index_attempt_ids:
msg = "No new indexing attempts created, indexing jobs are queued or running."
logger.info(msg)
raise HTTPException(
status_code=400,
detail="No new indexing attempts created, indexing jobs are queued or running.",
detail=msg,
)

msg = f"Successfully created {len(index_attempt_ids)} index attempts. {index_attempt_ids}"
return StatusResponse(
success=True,
message=f"Successfully created {len(index_attempt_ids)} index attempts",
message=msg,
data=index_attempt_ids,
)

Expand Down
Loading

0 comments on commit 5323b1c

Please sign in to comment.