diff --git a/backend/onyx/connectors/google_drive/connector.py b/backend/onyx/connectors/google_drive/connector.py index cc999c01d63..9089a551bcc 100644 --- a/backend/onyx/connectors/google_drive/connector.py +++ b/backend/onyx/connectors/google_drive/connector.py @@ -8,6 +8,7 @@ from google.oauth2.credentials import Credentials as OAuthCredentials # type: ignore from google.oauth2.service_account import Credentials as ServiceAccountCredentials # type: ignore +from googleapiclient.errors import HttpError # type: ignore from onyx.configs.app_configs import INDEX_BATCH_SIZE from onyx.configs.app_configs import MAX_FILE_SIZE_BYTES @@ -20,6 +21,7 @@ from onyx.connectors.google_drive.file_retrieval import get_all_files_for_oauth from onyx.connectors.google_drive.file_retrieval import get_all_files_in_my_drive from onyx.connectors.google_drive.file_retrieval import get_files_in_shared_drive +from onyx.connectors.google_drive.file_retrieval import get_root_folder_id from onyx.connectors.google_drive.models import GoogleDriveFileType from onyx.connectors.google_utils.google_auth import get_google_creds from onyx.connectors.google_utils.google_utils import execute_paginated_retrieval @@ -41,6 +43,7 @@ from onyx.connectors.interfaces import SecondsSinceUnixEpoch from onyx.connectors.interfaces import SlimConnector from onyx.utils.logger import setup_logger +from onyx.utils.retry_wrapper import retry_builder logger = setup_logger() # TODO: Improve this by using the batch utility: https://googleapis.github.io/google-api-python-client/docs/batch.html @@ -286,13 +289,30 @@ def _impersonate_user_for_retrieval( start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None, ) -> Iterator[GoogleDriveFileType]: + logger.info(f"Impersonating user {user_email}") + drive_service = get_drive_service(self.creds, user_email) + # validate that the user has access to the drive APIs by performing a simple + # request and checking for a 401 + try: + retry_builder()(get_root_folder_id)(drive_service) + except HttpError as e: + if e.status_code == 401: + # fail gracefully, let the other impersonations continue + # one user without access shouldn't block the entire connector + logger.exception( + f"User '{user_email}' does not have access to the drive APIs." + ) + return + raise + # if we are including my drives, try to get the current user's my # drive if any of the following are true: # - include_my_drives is true # - the current user's email is in the requested emails if self.include_my_drives or user_email in self._requested_my_drive_emails: + logger.info(f"Getting all files in my drive as '{user_email}'") yield from get_all_files_in_my_drive( service=drive_service, update_traversed_ids_func=self._update_traversed_parent_ids, @@ -303,6 +323,7 @@ def _impersonate_user_for_retrieval( remaining_drive_ids = filtered_drive_ids - self._retrieved_ids for drive_id in remaining_drive_ids: + logger.info(f"Getting files in shared drive '{drive_id}' as '{user_email}'") yield from get_files_in_shared_drive( service=drive_service, drive_id=drive_id, @@ -314,6 +335,7 @@ def _impersonate_user_for_retrieval( remaining_folders = filtered_folder_ids - self._retrieved_ids for folder_id in remaining_folders: + logger.info(f"Getting files in folder '{folder_id}' as '{user_email}'") yield from crawl_folders_for_files( service=drive_service, parent_id=folder_id, @@ -344,6 +366,15 @@ def _manage_service_account_retrieval( elif self.include_shared_drives: drive_ids_to_retrieve = all_drive_ids + # checkpoint - we've found all users and drives, now time to actually start + # fetching stuff + logger.info(f"Found {len(all_org_emails)} users to impersonate") + logger.debug(f"Users: {all_org_emails}") + logger.info(f"Found {len(drive_ids_to_retrieve)} drives to retrieve") + logger.debug(f"Drives: {drive_ids_to_retrieve}") + logger.info(f"Found {len(folder_ids_to_retrieve)} folders to retrieve") + logger.debug(f"Folders: {folder_ids_to_retrieve}") + # Process users in parallel using ThreadPoolExecutor with ThreadPoolExecutor(max_workers=10) as executor: future_to_email = { @@ -380,6 +411,13 @@ def _manage_oauth_retrieval( drive_service = get_drive_service(self.creds, self.primary_admin_email) if self.include_files_shared_with_me or self.include_my_drives: + logger.info( + f"Getting shared files/my drive files for OAuth " + f"with include_files_shared_with_me={self.include_files_shared_with_me}, " + f"include_my_drives={self.include_my_drives}, " + f"include_shared_drives={self.include_shared_drives}." + f"Using '{self.primary_admin_email}' as the account." + ) yield from get_all_files_for_oauth( service=drive_service, include_files_shared_with_me=self.include_files_shared_with_me, @@ -412,6 +450,9 @@ def _manage_oauth_retrieval( drive_ids_to_retrieve = all_drive_ids for drive_id in drive_ids_to_retrieve: + logger.info( + f"Getting files in shared drive '{drive_id}' as '{self.primary_admin_email}'" + ) yield from get_files_in_shared_drive( service=drive_service, drive_id=drive_id, @@ -425,6 +466,9 @@ def _manage_oauth_retrieval( # that could be folders. remaining_folders = folder_ids_to_retrieve - self._retrieved_ids for folder_id in remaining_folders: + logger.info( + f"Getting files in folder '{folder_id}' as '{self.primary_admin_email}'" + ) yield from crawl_folders_for_files( service=drive_service, parent_id=folder_id,