Skip to content

Commit

Permalink
update_single should be optimized for a single call now (#2671)
Browse files Browse the repository at this point in the history
Co-authored-by: Richard Kuo <[email protected]>
  • Loading branch information
rkuo-danswer and LostVector authored Oct 4, 2024
1 parent 7f788e4 commit 63655cf
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 82 deletions.
2 changes: 0 additions & 2 deletions backend/danswer/background/celery/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,6 @@ def run_periodic_task(self, worker: Any) -> None:

lock: redis.lock.Lock = worker.primary_worker_lock

task_logger.info("Reacquiring primary worker lock.")

if lock.owned():
task_logger.debug("Reacquiring primary worker lock.")
lock.reacquire()
Expand Down
6 changes: 3 additions & 3 deletions backend/danswer/background/connector_deletion.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from danswer.document_index.factory import get_default_document_index
from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.interfaces import UpdateRequest
from danswer.document_index.interfaces import VespaDocumentFields
from danswer.server.documents.models import ConnectorCredentialPairIdentifier
from danswer.utils.logger import setup_logger

Expand Down Expand Up @@ -168,16 +169,15 @@ def document_by_cc_pair_cleanup_task(
doc_sets = fetch_document_sets_for_document(document_id, db_session)
update_doc_sets: set[str] = set(doc_sets)

update_request = UpdateRequest(
document_ids=[document_id],
fields = VespaDocumentFields(
document_sets=update_doc_sets,
access=doc_access,
boost=doc.boost,
hidden=doc.hidden,
)

# update Vespa. OK if doc doesn't exist. Raises exception otherwise.
document_index.update_single(update_request=update_request)
document_index.update_single(document_id, fields=fields)

# there are still other cc_pair references to the doc, so just resync to Vespa
delete_document_by_connector_credential_pair__no_commit(
Expand Down
18 changes: 12 additions & 6 deletions backend/danswer/background/indexing/run_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from danswer.connectors.connector_runner import ConnectorRunner
from danswer.connectors.factory import instantiate_connector
from danswer.connectors.models import IndexAttemptMetadata
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
from danswer.db.connector_credential_pair import get_last_successful_attempt_time
from danswer.db.connector_credential_pair import update_connector_credential_pair
from danswer.db.engine import get_sqlalchemy_engine
Expand Down Expand Up @@ -49,7 +50,7 @@ def _get_connector_runner(
"""
NOTE: `start_time` and `end_time` are only used for poll connectors
Returns an interator of document batches and whether the returned documents
Returns an iterator of document batches and whether the returned documents
are the complete list of existing documents of the connector. If the task
of type LOAD_STATE, the list will be considered complete and otherwise incomplete.
"""
Expand All @@ -67,12 +68,17 @@ def _get_connector_runner(
logger.exception(f"Unable to instantiate connector due to {e}")
# since we failed to even instantiate the connector, we pause the CCPair since
# it will never succeed
update_connector_credential_pair(
db_session=db_session,
connector_id=attempt.connector_credential_pair.connector.id,
credential_id=attempt.connector_credential_pair.credential.id,
status=ConnectorCredentialPairStatus.PAUSED,

cc_pair = get_connector_credential_pair_from_id(
attempt.connector_credential_pair.id, db_session
)
if cc_pair and cc_pair.status == ConnectorCredentialPairStatus.ACTIVE:
update_connector_credential_pair(
db_session=db_session,
connector_id=attempt.connector_credential_pair.connector.id,
credential_id=attempt.connector_credential_pair.credential.id,
status=ConnectorCredentialPairStatus.PAUSED,
)
raise e

return ConnectorRunner(
Expand Down
10 changes: 8 additions & 2 deletions backend/danswer/background/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,20 @@ def _should_create_new_indexing(
if last_index.status == IndexingStatus.IN_PROGRESS:
return False
else:
if connector.id == 0: # Ingestion API
if (
connector.id == 0 or connector.source == DocumentSource.INGESTION_API
): # Ingestion API
return False
return True

# If the connector is paused or is the ingestion API, don't index
# NOTE: during an embedding model switch over, the following logic
# is bypassed by the above check for a future model
if not cc_pair.status.is_active() or connector.id == 0:
if (
not cc_pair.status.is_active()
or connector.id == 0
or connector.source == DocumentSource.INGESTION_API
):
return False

if not last_index:
Expand Down
29 changes: 19 additions & 10 deletions backend/danswer/document_index/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ class DocumentMetadata:
from_ingestion_api: bool = False


@dataclass
class VespaDocumentFields:
"""
Specifies fields in Vespa for a document. Fields set to None will be ignored.
Perhaps we should name this in an implementation agnostic fashion, but it's more
understandable like this for now.
"""

# all other fields except these 4 will always be left alone by the update request
access: DocumentAccess | None = None
document_sets: set[str] | None = None
boost: float | None = None
hidden: bool | None = None


@dataclass
class UpdateRequest:
"""
Expand Down Expand Up @@ -188,26 +203,20 @@ class Updatable(abc.ABC):
"""

@abc.abstractmethod
def update_single(self, update_request: UpdateRequest) -> None:
def update_single(self, doc_id: str, fields: VespaDocumentFields) -> None:
"""
Updates some set of chunks for a document. The document and fields to update
are specified in the update request. Each update request in the list applies
its changes to a list of document ids.
Updates all chunks for a document with the specified fields.
None values mean that the field does not need an update.
The rationale for a single update function is that it allows retries and parallelism
to happen at a higher / more strategic level, is simpler to read, and allows
us to individually handle error conditions per document.
Parameters:
- update_request: for a list of document ids in the update request, apply the same updates
to all of the documents with those ids.
- fields: the fields to update in the document. Any field set to None will not be changed.
Return:
- an HTTPStatus code. The code can used to decide whether to fail immediately,
retry, etc. Although this method likely hits an HTTP API behind the
scenes, the usage of HTTPStatus is a convenience and the interface is not
actually HTTP specific.
None
"""
raise NotImplementedError

Expand Down
123 changes: 64 additions & 59 deletions backend/danswer/document_index/vespa/index.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import concurrent.futures
import io
import logging
import os
import re
import time
Expand All @@ -23,6 +24,7 @@
from danswer.document_index.interfaces import DocumentInsertionRecord
from danswer.document_index.interfaces import UpdateRequest
from danswer.document_index.interfaces import VespaChunkRequest
from danswer.document_index.interfaces import VespaDocumentFields
from danswer.document_index.vespa.chunk_retrieval import batch_search_api_retrieval
from danswer.document_index.vespa.chunk_retrieval import (
get_all_vespa_ids_for_document_id,
Expand Down Expand Up @@ -69,6 +71,10 @@

logger = setup_logger()

# Set the logging level to WARNING to ignore INFO and DEBUG logs
httpx_logger = logging.getLogger("httpx")
httpx_logger.setLevel(logging.WARNING)


@dataclass
class _VespaUpdateRequest:
Expand Down Expand Up @@ -378,89 +384,86 @@ def update(self, update_requests: list[UpdateRequest]) -> None:
time.monotonic() - update_start,
)

def update_single(self, update_request: UpdateRequest) -> None:
def update_single(self, doc_id: str, fields: VespaDocumentFields) -> None:
"""Note: if the document id does not exist, the update will be a no-op and the
function will complete with no errors or exceptions.
Handle other exceptions if you wish to implement retry behavior
"""
if len(update_request.document_ids) != 1:
raise ValueError("update_request must contain a single document id")

# Handle Vespa character limitations
# Mutating update_request but it's not used later anyway
update_request.document_ids = [
replace_invalid_doc_id_characters(doc_id)
for doc_id in update_request.document_ids
]

# update_start = time.monotonic()

# Fetch all chunks for each document ahead of time
index_names = [self.index_name]
if self.secondary_index_name:
index_names.append(self.secondary_index_name)

chunk_id_start_time = time.monotonic()
all_doc_chunk_ids: list[str] = []
for index_name in index_names:
for document_id in update_request.document_ids:
# this calls vespa and can raise http exceptions
doc_chunk_ids = get_all_vespa_ids_for_document_id(
document_id=document_id,
index_name=index_name,
filters=None,
get_large_chunks=True,
)
all_doc_chunk_ids.extend(doc_chunk_ids)
logger.debug(
f"Took {time.monotonic() - chunk_id_start_time:.2f} seconds to fetch all Vespa chunk IDs"
)
normalized_doc_id = replace_invalid_doc_id_characters(doc_id)

# Build the _VespaUpdateRequest objects
update_dict: dict[str, dict] = {"fields": {}}
if update_request.boost is not None:
update_dict["fields"][BOOST] = {"assign": update_request.boost}
if update_request.document_sets is not None:
if fields.boost is not None:
update_dict["fields"][BOOST] = {"assign": fields.boost}
if fields.document_sets is not None:
update_dict["fields"][DOCUMENT_SETS] = {
"assign": {
document_set: 1 for document_set in update_request.document_sets
}
"assign": {document_set: 1 for document_set in fields.document_sets}
}
if update_request.access is not None:
if fields.access is not None:
update_dict["fields"][ACCESS_CONTROL_LIST] = {
"assign": {acl_entry: 1 for acl_entry in update_request.access.to_acl()}
"assign": {acl_entry: 1 for acl_entry in fields.access.to_acl()}
}
if update_request.hidden is not None:
update_dict["fields"][HIDDEN] = {"assign": update_request.hidden}
if fields.hidden is not None:
update_dict["fields"][HIDDEN] = {"assign": fields.hidden}

if not update_dict["fields"]:
logger.error("Update request received but nothing to update")
return

processed_update_requests: list[_VespaUpdateRequest] = []
for document_id in update_request.document_ids:
for doc_chunk_id in all_doc_chunk_ids:
processed_update_requests.append(
_VespaUpdateRequest(
document_id=document_id,
url=f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}/{doc_chunk_id}",
update_request=update_dict,
)
)
index_names = [self.index_name]
if self.secondary_index_name:
index_names.append(self.secondary_index_name)

with httpx.Client(http2=True) as http_client:
for update in processed_update_requests:
http_client.put(
update.url,
headers={"Content-Type": "application/json"},
json=update.update_request,
for index_name in index_names:
params = httpx.QueryParams(
{
"selection": f"{index_name}.document_id=='{normalized_doc_id}'",
"cluster": DOCUMENT_INDEX_NAME,
}
)

# logger.debug(
# "Finished updating Vespa documents in %.2f seconds",
# time.monotonic() - update_start,
# )
total_chunks_updated = 0
while True:
try:
resp = http_client.put(
f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}",
params=params,
headers={"Content-Type": "application/json"},
json=update_dict,
)

resp.raise_for_status()
except httpx.HTTPStatusError as e:
logger.error(
f"Failed to update chunks, details: {e.response.text}"
)
raise

resp_data = resp.json()

if "documentCount" in resp_data:
chunks_updated = resp_data["documentCount"]
total_chunks_updated += chunks_updated

# Check for continuation token to handle pagination
if "continuation" not in resp_data:
break # Exit loop if no continuation token

if not resp_data["continuation"]:
break # Exit loop if continuation token is empty

params = params.set("continuation", resp_data["continuation"])

logger.debug(
f"VespaIndex.update_single: "
f"index={index_name} "
f"doc={normalized_doc_id} "
f"chunks_deleted={total_chunks_updated}"
)
return

def delete(self, doc_ids: list[str]) -> None:
Expand All @@ -479,6 +482,7 @@ def delete(self, doc_ids: list[str]) -> None:
delete_vespa_docs(
document_ids=doc_ids, index_name=index_name, http_client=http_client
)
return

def delete_single(self, doc_id: str) -> None:
"""Possibly faster overall than the delete method due to using a single
Expand Down Expand Up @@ -539,6 +543,7 @@ def delete_single(self, doc_id: str) -> None:
f"doc={doc_id} "
f"chunks_deleted={total_chunks_deleted}"
)
return

def id_based_retrieval(
self,
Expand Down

0 comments on commit 63655cf

Please sign in to comment.