From 66a19553091556d71dd3a192728f3269cf5fd94a Mon Sep 17 00:00:00 2001 From: hagen-danswer Date: Wed, 4 Dec 2024 08:03:11 -0800 Subject: [PATCH 1/4] Related permission docs to cc_pair to prevent orphan docs --- backend/danswer/access/models.py | 5 ++++ .../tasks/doc_permission_syncing/tasks.py | 24 ++++++++++++++++--- .../redis/redis_connector_doc_perm_sync.py | 4 ++++ backend/ee/danswer/db/document.py | 9 ++++--- 4 files changed, 36 insertions(+), 6 deletions(-) diff --git a/backend/danswer/access/models.py b/backend/danswer/access/models.py index 126648eb41e..11cf57638f2 100644 --- a/backend/danswer/access/models.py +++ b/backend/danswer/access/models.py @@ -18,6 +18,11 @@ class ExternalAccess: @dataclass(frozen=True) class DocExternalAccess: + """ + This is just a class to wrap the external access and the document ID + together. It's used for syncing document permissions to Redis. + """ + external_access: ExternalAccess # The document ID doc_id: str diff --git a/backend/danswer/background/celery/tasks/doc_permission_syncing/tasks.py b/backend/danswer/background/celery/tasks/doc_permission_syncing/tasks.py index 747197621df..9d17698b672 100644 --- a/backend/danswer/background/celery/tasks/doc_permission_syncing/tasks.py +++ b/backend/danswer/background/celery/tasks/doc_permission_syncing/tasks.py @@ -22,6 +22,7 @@ from danswer.configs.constants import DanswerRedisLocks from danswer.configs.constants import DocumentSource from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id +from danswer.db.document import upsert_document_by_connector_credential_pair from danswer.db.engine import get_session_with_tenant from danswer.db.enums import AccessType from danswer.db.enums import ConnectorCredentialPairStatus @@ -262,7 +263,12 @@ def connector_permission_sync_generator_task( f"RedisConnector.permissions.generate_tasks starting. cc_pair={cc_pair_id}" ) tasks_generated = redis_connector.permissions.generate_tasks( - self.app, lock, document_external_accesses, source_type + celery_app=self.app, + lock=lock, + new_permissions=document_external_accesses, + source_string=source_type, + connector_id=cc_pair.connector.id, + credential_id=cc_pair.credential.id, ) if tasks_generated is None: return None @@ -298,6 +304,8 @@ def update_external_document_permissions_task( tenant_id: str | None, serialized_doc_external_access: dict, source_string: str, + connector_id: int, + credential_id: int, ) -> bool: document_external_access = DocExternalAccess.from_dict( serialized_doc_external_access @@ -306,18 +314,28 @@ def update_external_document_permissions_task( external_access = document_external_access.external_access try: with get_session_with_tenant(tenant_id) as db_session: - # Then we build the update requests to update vespa + # Add the users to the DB if they don't exist batch_add_ext_perm_user_if_not_exists( db_session=db_session, emails=list(external_access.external_user_emails), ) - upsert_document_external_perms( + # Then we upsert the document's external permissions in postgres + created_new_doc: bool = upsert_document_external_perms( db_session=db_session, doc_id=doc_id, external_access=external_access, source_type=DocumentSource(source_string), ) + if created_new_doc: + # If a new document was created, we associate it with the cc_pair + upsert_document_by_connector_credential_pair( + db_session=db_session, + connector_id=connector_id, + credential_id=credential_id, + document_ids=[doc_id], + ) + logger.debug( f"Successfully synced postgres document permissions for {doc_id}" ) diff --git a/backend/danswer/redis/redis_connector_doc_perm_sync.py b/backend/danswer/redis/redis_connector_doc_perm_sync.py index 2e5eb69b533..f14d761f709 100644 --- a/backend/danswer/redis/redis_connector_doc_perm_sync.py +++ b/backend/danswer/redis/redis_connector_doc_perm_sync.py @@ -133,6 +133,8 @@ def generate_tasks( lock: RedisLock | None, new_permissions: list[DocExternalAccess], source_string: str, + connector_id: int, + credential_id: int, ) -> int | None: last_lock_time = time.monotonic() async_results = [] @@ -155,6 +157,8 @@ def generate_tasks( tenant_id=self.tenant_id, serialized_doc_external_access=doc_perm.to_dict(), source_string=source_string, + connector_id=connector_id, + credential_id=credential_id, ), queue=DanswerCeleryQueues.DOC_PERMISSIONS_UPSERT, task_id=custom_task_id, diff --git a/backend/ee/danswer/db/document.py b/backend/ee/danswer/db/document.py index e061db6c75b..ec1d5741314 100644 --- a/backend/ee/danswer/db/document.py +++ b/backend/ee/danswer/db/document.py @@ -55,9 +55,10 @@ def upsert_document_external_perms( doc_id: str, external_access: ExternalAccess, source_type: DocumentSource, -) -> None: +) -> bool: """ - This sets the permissions for a document in postgres. + This sets the permissions for a document in postgres. Returns True if the + a new document was created, False otherwise. NOTE: this will replace any existing external access, it will not do a union """ document = db_session.scalars( @@ -85,7 +86,7 @@ def upsert_document_external_perms( ) db_session.add(document) db_session.commit() - return + return True # If the document exists, we need to check if the external access has changed if ( @@ -98,3 +99,5 @@ def upsert_document_external_perms( document.is_public = external_access.is_public document.last_modified = datetime.now(timezone.utc) db_session.commit() + + return False From 46a04b8c70d18873d95ad4808031d37a231559ab Mon Sep 17 00:00:00 2001 From: hagen-danswer Date: Wed, 4 Dec 2024 08:56:05 -0800 Subject: [PATCH 2/4] added script --- backend/scripts/orphan_doc_cleanup_script.py | 79 ++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 backend/scripts/orphan_doc_cleanup_script.py diff --git a/backend/scripts/orphan_doc_cleanup_script.py b/backend/scripts/orphan_doc_cleanup_script.py new file mode 100644 index 00000000000..4007123ca3c --- /dev/null +++ b/backend/scripts/orphan_doc_cleanup_script.py @@ -0,0 +1,79 @@ +import os +import sys + +from sqlalchemy import text +from sqlalchemy.orm import Session + +# makes it so `PYTHONPATH=.` is not required when running this script +parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(parent_dir) + +from danswer.db.engine import get_session_context_manager # noqa: E402 +from danswer.db.document import delete_documents_complete__no_commit # noqa: E402 +from danswer.db.search_settings import get_current_search_settings # noqa: E402 +from danswer.document_index.vespa.index import VespaIndex # noqa: E402 +from danswer.background.celery.tasks.shared.RetryDocumentIndex import ( # noqa: E402 + RetryDocumentIndex, +) + + +def _get_orphaned_document_ids(db_session: Session) -> list[str]: + """Get document IDs that don't have any entries in document_by_connector_credential_pair""" + query = text( + """ + SELECT d.id + FROM document d + LEFT JOIN document_by_connector_credential_pair dbcc ON d.id = dbcc.id + WHERE dbcc.id IS NULL + """ + ) + orphaned_ids = [doc_id[0] for doc_id in db_session.execute(query)] + print(f"Found {len(orphaned_ids)} orphaned documents") + return orphaned_ids + + +def main() -> None: + with get_session_context_manager() as db_session: + # Get orphaned document IDs + orphaned_ids = _get_orphaned_document_ids(db_session) + if not orphaned_ids: + print("No orphaned documents found") + return + + # Setup Vespa index + search_settings = get_current_search_settings(db_session) + index_name = search_settings.index_name + vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None) + retry_index = RetryDocumentIndex(vespa_index) + + # Delete chunks from Vespa first + print("Deleting orphaned document chunks from Vespa") + successfully_vespa_deleted_doc_ids = [] + for doc_id in orphaned_ids: + try: + chunks_deleted = retry_index.delete_single(doc_id) + successfully_vespa_deleted_doc_ids.append(doc_id) + if chunks_deleted > 0: + print(f"Deleted {chunks_deleted} chunks for document {doc_id}") + except Exception as e: + print( + f"Error deleting document {doc_id} in Vespa and will not delete from Postgres: {e}" + ) + + # Delete documents from Postgres + print("Deleting orphaned documents from Postgres") + try: + delete_documents_complete__no_commit( + db_session, successfully_vespa_deleted_doc_ids + ) + db_session.commit() + except Exception as e: + print(f"Error deleting documents from Postgres: {e}") + + print( + f"Successfully cleaned up {len(successfully_vespa_deleted_doc_ids)} orphaned documents" + ) + + +if __name__ == "__main__": + main() From ba219c3cfa52dad3429cecde43667e806dfd41be Mon Sep 17 00:00:00 2001 From: hagen-danswer Date: Wed, 4 Dec 2024 12:19:33 -0800 Subject: [PATCH 3/4] group sync deduping --- .../tasks/doc_permission_syncing/tasks.py | 2 +- .../tasks/external_group_syncing/tasks.py | 20 +++++++++++++++++++ .../danswer/connectors/confluence/utils.py | 5 ++++- .../danswer/db/connector_credential_pair.py | 5 +++++ .../external_permissions/sync_params.py | 9 +++++++-- 5 files changed, 37 insertions(+), 4 deletions(-) diff --git a/backend/danswer/background/celery/tasks/doc_permission_syncing/tasks.py b/backend/danswer/background/celery/tasks/doc_permission_syncing/tasks.py index 9d17698b672..b6d48a02a9c 100644 --- a/backend/danswer/background/celery/tasks/doc_permission_syncing/tasks.py +++ b/backend/danswer/background/celery/tasks/doc_permission_syncing/tasks.py @@ -320,7 +320,7 @@ def update_external_document_permissions_task( emails=list(external_access.external_user_emails), ) # Then we upsert the document's external permissions in postgres - created_new_doc: bool = upsert_document_external_perms( + created_new_doc = upsert_document_external_perms( db_session=db_session, doc_id=doc_id, external_access=external_access, diff --git a/backend/danswer/background/celery/tasks/external_group_syncing/tasks.py b/backend/danswer/background/celery/tasks/external_group_syncing/tasks.py index 698894c7507..c8ac5f870d3 100644 --- a/backend/danswer/background/celery/tasks/external_group_syncing/tasks.py +++ b/backend/danswer/background/celery/tasks/external_group_syncing/tasks.py @@ -32,10 +32,14 @@ from danswer.redis.redis_pool import get_redis_client from danswer.utils.logger import setup_logger from ee.danswer.db.connector_credential_pair import get_all_auto_sync_cc_pairs +from ee.danswer.db.connector_credential_pair import get_cc_pairs_by_source from ee.danswer.db.external_perm import ExternalUserGroup from ee.danswer.db.external_perm import replace_user__ext_group_for_cc_pair from ee.danswer.external_permissions.sync_params import EXTERNAL_GROUP_SYNC_PERIODS from ee.danswer.external_permissions.sync_params import GROUP_PERMISSIONS_FUNC_MAP +from ee.danswer.external_permissions.sync_params import ( + GROUP_PERMISSIONS_IS_CC_PAIR_AGNOSTIC, +) logger = setup_logger() @@ -107,6 +111,22 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None: with get_session_with_tenant(tenant_id) as db_session: cc_pairs = get_all_auto_sync_cc_pairs(db_session) + # We only want to sync one cc_pair per source type in + # GROUP_PERMISSIONS_IS_CC_PAIR_AGNOSTIC + for source in GROUP_PERMISSIONS_IS_CC_PAIR_AGNOSTIC: + # These are ordered by cc_pair id so the first one is the one we want + cc_pairs_to_dedupe = get_cc_pairs_by_source( + db_session, source, only_sync=True + ) + # We only want to sync one cc_pair per source type + # in GROUP_PERMISSIONS_IS_CC_PAIR_AGNOSTIC so we dedupe here + for cc_pair_to_remove in cc_pairs_to_dedupe[1:]: + cc_pairs = [ + cc_pair + for cc_pair in cc_pairs + if cc_pair.id != cc_pair_to_remove.id + ] + for cc_pair in cc_pairs: if _is_external_group_sync_due(cc_pair): cc_pair_ids_to_sync.append(cc_pair.id) diff --git a/backend/danswer/connectors/confluence/utils.py b/backend/danswer/connectors/confluence/utils.py index e6ac0308a3a..3d76d905623 100644 --- a/backend/danswer/connectors/confluence/utils.py +++ b/backend/danswer/connectors/confluence/utils.py @@ -32,7 +32,10 @@ def get_user_email_from_username__server( response = confluence_client.get_mobile_parameters(user_name) email = response.get("email") except Exception: - email = None + # For now, we'll just return a string that indicates failure + # We may want to revert to returning None in the future + # email = None + email = f"FAILED TO GET CONFLUENCE EMAIL FOR {user_name}" _USER_EMAIL_CACHE[user_name] = email return _USER_EMAIL_CACHE[user_name] diff --git a/backend/ee/danswer/db/connector_credential_pair.py b/backend/ee/danswer/db/connector_credential_pair.py index bb91c0de74f..fea6caba61b 100644 --- a/backend/ee/danswer/db/connector_credential_pair.py +++ b/backend/ee/danswer/db/connector_credential_pair.py @@ -37,10 +37,15 @@ def get_cc_pairs_by_source( source_type: DocumentSource, only_sync: bool, ) -> list[ConnectorCredentialPair]: + """ + Get all cc_pairs for a given source type (and optionally only sync) + result is sorted by cc_pair id + """ query = ( db_session.query(ConnectorCredentialPair) .join(ConnectorCredentialPair.connector) .filter(Connector.source == source_type) + .order_by(ConnectorCredentialPair.id) ) if only_sync: diff --git a/backend/ee/danswer/external_permissions/sync_params.py b/backend/ee/danswer/external_permissions/sync_params.py index 43c8a78122c..e821971cc81 100644 --- a/backend/ee/danswer/external_permissions/sync_params.py +++ b/backend/ee/danswer/external_permissions/sync_params.py @@ -48,6 +48,11 @@ } +GROUP_PERMISSIONS_IS_CC_PAIR_AGNOSTIC: set[DocumentSource] = { + DocumentSource.CONFLUENCE, +} + + # If nothing is specified here, we run the doc_sync every time the celery beat runs DOC_PERMISSION_SYNC_PERIODS: dict[DocumentSource, int] = { # Polling is not supported so we fetch all doc permissions every 5 minutes @@ -57,9 +62,9 @@ # If nothing is specified here, we run the doc_sync every time the celery beat runs EXTERNAL_GROUP_SYNC_PERIODS: dict[DocumentSource, int] = { - # Polling is not supported so we fetch all group permissions every 5 minutes + # Polling is not supported so we fetch all group permissions every 30 minutes DocumentSource.GOOGLE_DRIVE: 5 * 60, - DocumentSource.CONFLUENCE: 5 * 60, + DocumentSource.CONFLUENCE: 30 * 60, } From bdfe260017928e0c1a4937816ef9ae6fa6bc17a9 Mon Sep 17 00:00:00 2001 From: hagen-danswer Date: Wed, 4 Dec 2024 12:22:03 -0800 Subject: [PATCH 4/4] logging --- backend/danswer/connectors/confluence/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/danswer/connectors/confluence/utils.py b/backend/danswer/connectors/confluence/utils.py index 3d76d905623..f5511063d88 100644 --- a/backend/danswer/connectors/confluence/utils.py +++ b/backend/danswer/connectors/confluence/utils.py @@ -36,6 +36,7 @@ def get_user_email_from_username__server( # We may want to revert to returning None in the future # email = None email = f"FAILED TO GET CONFLUENCE EMAIL FOR {user_name}" + logger.warning(f"failed to get confluence email for {user_name}") _USER_EMAIL_CACHE[user_name] = email return _USER_EMAIL_CACHE[user_name]