Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Related permission docs to cc_pair to prevent orphan docs #3336

Merged
merged 4 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions backend/danswer/access/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ class ExternalAccess:

@dataclass(frozen=True)
class DocExternalAccess:
"""
This is just a class to wrap the external access and the document ID
together. It's used for syncing document permissions to Redis.
"""

external_access: ExternalAccess
# The document ID
doc_id: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from danswer.configs.constants import DanswerRedisLocks
from danswer.configs.constants import DocumentSource
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
from danswer.db.document import upsert_document_by_connector_credential_pair
from danswer.db.engine import get_session_with_tenant
from danswer.db.enums import AccessType
from danswer.db.enums import ConnectorCredentialPairStatus
Expand Down Expand Up @@ -262,7 +263,12 @@ def connector_permission_sync_generator_task(
f"RedisConnector.permissions.generate_tasks starting. cc_pair={cc_pair_id}"
)
tasks_generated = redis_connector.permissions.generate_tasks(
self.app, lock, document_external_accesses, source_type
celery_app=self.app,
lock=lock,
new_permissions=document_external_accesses,
source_string=source_type,
connector_id=cc_pair.connector.id,
credential_id=cc_pair.credential.id,
)
if tasks_generated is None:
return None
Expand Down Expand Up @@ -298,6 +304,8 @@ def update_external_document_permissions_task(
tenant_id: str | None,
serialized_doc_external_access: dict,
source_string: str,
connector_id: int,
credential_id: int,
) -> bool:
document_external_access = DocExternalAccess.from_dict(
serialized_doc_external_access
Expand All @@ -306,18 +314,28 @@ def update_external_document_permissions_task(
external_access = document_external_access.external_access
try:
with get_session_with_tenant(tenant_id) as db_session:
# Then we build the update requests to update vespa
# Add the users to the DB if they don't exist
batch_add_ext_perm_user_if_not_exists(
db_session=db_session,
emails=list(external_access.external_user_emails),
)
upsert_document_external_perms(
# Then we upsert the document's external permissions in postgres
created_new_doc: bool = upsert_document_external_perms(
hagen-danswer marked this conversation as resolved.
Show resolved Hide resolved
db_session=db_session,
doc_id=doc_id,
external_access=external_access,
source_type=DocumentSource(source_string),
)

if created_new_doc:
# If a new document was created, we associate it with the cc_pair
upsert_document_by_connector_credential_pair(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
document_ids=[doc_id],
)

logger.debug(
f"Successfully synced postgres document permissions for {doc_id}"
)
Expand Down
4 changes: 4 additions & 0 deletions backend/danswer/redis/redis_connector_doc_perm_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ def generate_tasks(
lock: RedisLock | None,
new_permissions: list[DocExternalAccess],
source_string: str,
connector_id: int,
credential_id: int,
) -> int | None:
last_lock_time = time.monotonic()
async_results = []
Expand All @@ -155,6 +157,8 @@ def generate_tasks(
tenant_id=self.tenant_id,
serialized_doc_external_access=doc_perm.to_dict(),
source_string=source_string,
connector_id=connector_id,
credential_id=credential_id,
),
queue=DanswerCeleryQueues.DOC_PERMISSIONS_UPSERT,
task_id=custom_task_id,
Expand Down
9 changes: 6 additions & 3 deletions backend/ee/danswer/db/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ def upsert_document_external_perms(
doc_id: str,
external_access: ExternalAccess,
source_type: DocumentSource,
) -> None:
) -> bool:
"""
This sets the permissions for a document in postgres.
This sets the permissions for a document in postgres. Returns True if the
a new document was created, False otherwise.
NOTE: this will replace any existing external access, it will not do a union
"""
document = db_session.scalars(
Expand Down Expand Up @@ -85,7 +86,7 @@ def upsert_document_external_perms(
)
db_session.add(document)
db_session.commit()
return
return True

# If the document exists, we need to check if the external access has changed
if (
Expand All @@ -98,3 +99,5 @@ def upsert_document_external_perms(
document.is_public = external_access.is_public
document.last_modified = datetime.now(timezone.utc)
db_session.commit()

return False
79 changes: 79 additions & 0 deletions backend/scripts/orphan_doc_cleanup_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os
import sys

from sqlalchemy import text
from sqlalchemy.orm import Session

# makes it so `PYTHONPATH=.` is not required when running this script
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)

from danswer.db.engine import get_session_context_manager # noqa: E402
from danswer.db.document import delete_documents_complete__no_commit # noqa: E402
from danswer.db.search_settings import get_current_search_settings # noqa: E402
from danswer.document_index.vespa.index import VespaIndex # noqa: E402
from danswer.background.celery.tasks.shared.RetryDocumentIndex import ( # noqa: E402
RetryDocumentIndex,
)


def _get_orphaned_document_ids(db_session: Session) -> list[str]:
"""Get document IDs that don't have any entries in document_by_connector_credential_pair"""
query = text(
"""
SELECT d.id
FROM document d
LEFT JOIN document_by_connector_credential_pair dbcc ON d.id = dbcc.id
WHERE dbcc.id IS NULL
"""
)
orphaned_ids = [doc_id[0] for doc_id in db_session.execute(query)]
print(f"Found {len(orphaned_ids)} orphaned documents")
return orphaned_ids


def main() -> None:
with get_session_context_manager() as db_session:
# Get orphaned document IDs
orphaned_ids = _get_orphaned_document_ids(db_session)
if not orphaned_ids:
print("No orphaned documents found")
return

# Setup Vespa index
search_settings = get_current_search_settings(db_session)
index_name = search_settings.index_name
vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None)
retry_index = RetryDocumentIndex(vespa_index)

# Delete chunks from Vespa first
print("Deleting orphaned document chunks from Vespa")
successfully_vespa_deleted_doc_ids = []
for doc_id in orphaned_ids:
try:
chunks_deleted = retry_index.delete_single(doc_id)
successfully_vespa_deleted_doc_ids.append(doc_id)
if chunks_deleted > 0:
print(f"Deleted {chunks_deleted} chunks for document {doc_id}")
except Exception as e:
print(
f"Error deleting document {doc_id} in Vespa and will not delete from Postgres: {e}"
)

# Delete documents from Postgres
print("Deleting orphaned documents from Postgres")
try:
delete_documents_complete__no_commit(
db_session, successfully_vespa_deleted_doc_ids
)
db_session.commit()
except Exception as e:
print(f"Error deleting documents from Postgres: {e}")

print(
f"Successfully cleaned up {len(successfully_vespa_deleted_doc_ids)} orphaned documents"
)


if __name__ == "__main__":
main()
Loading