From 63655cfbedafdf912f6b15b97752f17ee9890bc4 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Fri, 4 Oct 2024 08:43:04 -0700 Subject: [PATCH] update_single should be optimized for a single call now (#2671) Co-authored-by: Richard Kuo --- .../danswer/background/celery/celery_app.py | 2 - .../danswer/background/connector_deletion.py | 6 +- .../background/indexing/run_indexing.py | 18 ++- backend/danswer/background/update.py | 10 +- backend/danswer/document_index/interfaces.py | 29 +++-- backend/danswer/document_index/vespa/index.py | 123 +++++++++--------- 6 files changed, 106 insertions(+), 82 deletions(-) diff --git a/backend/danswer/background/celery/celery_app.py b/backend/danswer/background/celery/celery_app.py index 5244d9b94da..d20c49831a9 100644 --- a/backend/danswer/background/celery/celery_app.py +++ b/backend/danswer/background/celery/celery_app.py @@ -364,8 +364,6 @@ def run_periodic_task(self, worker: Any) -> None: lock: redis.lock.Lock = worker.primary_worker_lock - task_logger.info("Reacquiring primary worker lock.") - if lock.owned(): task_logger.debug("Reacquiring primary worker lock.") lock.reacquire() diff --git a/backend/danswer/background/connector_deletion.py b/backend/danswer/background/connector_deletion.py index 84b696dd8e4..962183f71ae 100644 --- a/backend/danswer/background/connector_deletion.py +++ b/backend/danswer/background/connector_deletion.py @@ -33,6 +33,7 @@ from danswer.document_index.factory import get_default_document_index from danswer.document_index.interfaces import DocumentIndex from danswer.document_index.interfaces import UpdateRequest +from danswer.document_index.interfaces import VespaDocumentFields from danswer.server.documents.models import ConnectorCredentialPairIdentifier from danswer.utils.logger import setup_logger @@ -168,8 +169,7 @@ def document_by_cc_pair_cleanup_task( doc_sets = fetch_document_sets_for_document(document_id, db_session) update_doc_sets: set[str] = set(doc_sets) - update_request = UpdateRequest( - document_ids=[document_id], + fields = VespaDocumentFields( document_sets=update_doc_sets, access=doc_access, boost=doc.boost, @@ -177,7 +177,7 @@ def document_by_cc_pair_cleanup_task( ) # update Vespa. OK if doc doesn't exist. Raises exception otherwise. - document_index.update_single(update_request=update_request) + document_index.update_single(document_id, fields=fields) # there are still other cc_pair references to the doc, so just resync to Vespa delete_document_by_connector_credential_pair__no_commit( diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index 499899ac225..b3d011a422b 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -14,6 +14,7 @@ from danswer.connectors.connector_runner import ConnectorRunner from danswer.connectors.factory import instantiate_connector from danswer.connectors.models import IndexAttemptMetadata +from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id from danswer.db.connector_credential_pair import get_last_successful_attempt_time from danswer.db.connector_credential_pair import update_connector_credential_pair from danswer.db.engine import get_sqlalchemy_engine @@ -49,7 +50,7 @@ def _get_connector_runner( """ NOTE: `start_time` and `end_time` are only used for poll connectors - Returns an interator of document batches and whether the returned documents + Returns an iterator of document batches and whether the returned documents are the complete list of existing documents of the connector. If the task of type LOAD_STATE, the list will be considered complete and otherwise incomplete. """ @@ -67,12 +68,17 @@ def _get_connector_runner( logger.exception(f"Unable to instantiate connector due to {e}") # since we failed to even instantiate the connector, we pause the CCPair since # it will never succeed - update_connector_credential_pair( - db_session=db_session, - connector_id=attempt.connector_credential_pair.connector.id, - credential_id=attempt.connector_credential_pair.credential.id, - status=ConnectorCredentialPairStatus.PAUSED, + + cc_pair = get_connector_credential_pair_from_id( + attempt.connector_credential_pair.id, db_session ) + if cc_pair and cc_pair.status == ConnectorCredentialPairStatus.ACTIVE: + update_connector_credential_pair( + db_session=db_session, + connector_id=attempt.connector_credential_pair.connector.id, + credential_id=attempt.connector_credential_pair.credential.id, + status=ConnectorCredentialPairStatus.PAUSED, + ) raise e return ConnectorRunner( diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index 94e703635ee..c88b349e4fe 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -96,14 +96,20 @@ def _should_create_new_indexing( if last_index.status == IndexingStatus.IN_PROGRESS: return False else: - if connector.id == 0: # Ingestion API + if ( + connector.id == 0 or connector.source == DocumentSource.INGESTION_API + ): # Ingestion API return False return True # If the connector is paused or is the ingestion API, don't index # NOTE: during an embedding model switch over, the following logic # is bypassed by the above check for a future model - if not cc_pair.status.is_active() or connector.id == 0: + if ( + not cc_pair.status.is_active() + or connector.id == 0 + or connector.source == DocumentSource.INGESTION_API + ): return False if not last_index: diff --git a/backend/danswer/document_index/interfaces.py b/backend/danswer/document_index/interfaces.py index b499d696743..e4ff90a40cf 100644 --- a/backend/danswer/document_index/interfaces.py +++ b/backend/danswer/document_index/interfaces.py @@ -55,6 +55,21 @@ class DocumentMetadata: from_ingestion_api: bool = False +@dataclass +class VespaDocumentFields: + """ + Specifies fields in Vespa for a document. Fields set to None will be ignored. + Perhaps we should name this in an implementation agnostic fashion, but it's more + understandable like this for now. + """ + + # all other fields except these 4 will always be left alone by the update request + access: DocumentAccess | None = None + document_sets: set[str] | None = None + boost: float | None = None + hidden: bool | None = None + + @dataclass class UpdateRequest: """ @@ -188,11 +203,9 @@ class Updatable(abc.ABC): """ @abc.abstractmethod - def update_single(self, update_request: UpdateRequest) -> None: + def update_single(self, doc_id: str, fields: VespaDocumentFields) -> None: """ - Updates some set of chunks for a document. The document and fields to update - are specified in the update request. Each update request in the list applies - its changes to a list of document ids. + Updates all chunks for a document with the specified fields. None values mean that the field does not need an update. The rationale for a single update function is that it allows retries and parallelism @@ -200,14 +213,10 @@ def update_single(self, update_request: UpdateRequest) -> None: us to individually handle error conditions per document. Parameters: - - update_request: for a list of document ids in the update request, apply the same updates - to all of the documents with those ids. + - fields: the fields to update in the document. Any field set to None will not be changed. Return: - - an HTTPStatus code. The code can used to decide whether to fail immediately, - retry, etc. Although this method likely hits an HTTP API behind the - scenes, the usage of HTTPStatus is a convenience and the interface is not - actually HTTP specific. + None """ raise NotImplementedError diff --git a/backend/danswer/document_index/vespa/index.py b/backend/danswer/document_index/vespa/index.py index 467260ed619..512eb932156 100644 --- a/backend/danswer/document_index/vespa/index.py +++ b/backend/danswer/document_index/vespa/index.py @@ -1,5 +1,6 @@ import concurrent.futures import io +import logging import os import re import time @@ -23,6 +24,7 @@ from danswer.document_index.interfaces import DocumentInsertionRecord from danswer.document_index.interfaces import UpdateRequest from danswer.document_index.interfaces import VespaChunkRequest +from danswer.document_index.interfaces import VespaDocumentFields from danswer.document_index.vespa.chunk_retrieval import batch_search_api_retrieval from danswer.document_index.vespa.chunk_retrieval import ( get_all_vespa_ids_for_document_id, @@ -69,6 +71,10 @@ logger = setup_logger() +# Set the logging level to WARNING to ignore INFO and DEBUG logs +httpx_logger = logging.getLogger("httpx") +httpx_logger.setLevel(logging.WARNING) + @dataclass class _VespaUpdateRequest: @@ -378,89 +384,86 @@ def update(self, update_requests: list[UpdateRequest]) -> None: time.monotonic() - update_start, ) - def update_single(self, update_request: UpdateRequest) -> None: + def update_single(self, doc_id: str, fields: VespaDocumentFields) -> None: """Note: if the document id does not exist, the update will be a no-op and the function will complete with no errors or exceptions. Handle other exceptions if you wish to implement retry behavior """ - if len(update_request.document_ids) != 1: - raise ValueError("update_request must contain a single document id") # Handle Vespa character limitations # Mutating update_request but it's not used later anyway - update_request.document_ids = [ - replace_invalid_doc_id_characters(doc_id) - for doc_id in update_request.document_ids - ] - - # update_start = time.monotonic() - - # Fetch all chunks for each document ahead of time - index_names = [self.index_name] - if self.secondary_index_name: - index_names.append(self.secondary_index_name) - - chunk_id_start_time = time.monotonic() - all_doc_chunk_ids: list[str] = [] - for index_name in index_names: - for document_id in update_request.document_ids: - # this calls vespa and can raise http exceptions - doc_chunk_ids = get_all_vespa_ids_for_document_id( - document_id=document_id, - index_name=index_name, - filters=None, - get_large_chunks=True, - ) - all_doc_chunk_ids.extend(doc_chunk_ids) - logger.debug( - f"Took {time.monotonic() - chunk_id_start_time:.2f} seconds to fetch all Vespa chunk IDs" - ) + normalized_doc_id = replace_invalid_doc_id_characters(doc_id) # Build the _VespaUpdateRequest objects update_dict: dict[str, dict] = {"fields": {}} - if update_request.boost is not None: - update_dict["fields"][BOOST] = {"assign": update_request.boost} - if update_request.document_sets is not None: + if fields.boost is not None: + update_dict["fields"][BOOST] = {"assign": fields.boost} + if fields.document_sets is not None: update_dict["fields"][DOCUMENT_SETS] = { - "assign": { - document_set: 1 for document_set in update_request.document_sets - } + "assign": {document_set: 1 for document_set in fields.document_sets} } - if update_request.access is not None: + if fields.access is not None: update_dict["fields"][ACCESS_CONTROL_LIST] = { - "assign": {acl_entry: 1 for acl_entry in update_request.access.to_acl()} + "assign": {acl_entry: 1 for acl_entry in fields.access.to_acl()} } - if update_request.hidden is not None: - update_dict["fields"][HIDDEN] = {"assign": update_request.hidden} + if fields.hidden is not None: + update_dict["fields"][HIDDEN] = {"assign": fields.hidden} if not update_dict["fields"]: logger.error("Update request received but nothing to update") return - processed_update_requests: list[_VespaUpdateRequest] = [] - for document_id in update_request.document_ids: - for doc_chunk_id in all_doc_chunk_ids: - processed_update_requests.append( - _VespaUpdateRequest( - document_id=document_id, - url=f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}/{doc_chunk_id}", - update_request=update_dict, - ) - ) + index_names = [self.index_name] + if self.secondary_index_name: + index_names.append(self.secondary_index_name) with httpx.Client(http2=True) as http_client: - for update in processed_update_requests: - http_client.put( - update.url, - headers={"Content-Type": "application/json"}, - json=update.update_request, + for index_name in index_names: + params = httpx.QueryParams( + { + "selection": f"{index_name}.document_id=='{normalized_doc_id}'", + "cluster": DOCUMENT_INDEX_NAME, + } ) - # logger.debug( - # "Finished updating Vespa documents in %.2f seconds", - # time.monotonic() - update_start, - # ) + total_chunks_updated = 0 + while True: + try: + resp = http_client.put( + f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}", + params=params, + headers={"Content-Type": "application/json"}, + json=update_dict, + ) + + resp.raise_for_status() + except httpx.HTTPStatusError as e: + logger.error( + f"Failed to update chunks, details: {e.response.text}" + ) + raise + + resp_data = resp.json() + + if "documentCount" in resp_data: + chunks_updated = resp_data["documentCount"] + total_chunks_updated += chunks_updated + # Check for continuation token to handle pagination + if "continuation" not in resp_data: + break # Exit loop if no continuation token + + if not resp_data["continuation"]: + break # Exit loop if continuation token is empty + + params = params.set("continuation", resp_data["continuation"]) + + logger.debug( + f"VespaIndex.update_single: " + f"index={index_name} " + f"doc={normalized_doc_id} " + f"chunks_deleted={total_chunks_updated}" + ) return def delete(self, doc_ids: list[str]) -> None: @@ -479,6 +482,7 @@ def delete(self, doc_ids: list[str]) -> None: delete_vespa_docs( document_ids=doc_ids, index_name=index_name, http_client=http_client ) + return def delete_single(self, doc_id: str) -> None: """Possibly faster overall than the delete method due to using a single @@ -539,6 +543,7 @@ def delete_single(self, doc_id: str) -> None: f"doc={doc_id} " f"chunks_deleted={total_chunks_deleted}" ) + return def id_based_retrieval( self,