diff --git a/backend/danswer/background/celery/apps/beat.py b/backend/danswer/background/celery/apps/beat.py index 8842343ffae..f7ae3ec2655 100644 --- a/backend/danswer/background/celery/apps/beat.py +++ b/backend/danswer/background/celery/apps/beat.py @@ -12,6 +12,7 @@ from danswer.db.engine import SqlEngine from danswer.utils.logger import setup_logger from danswer.utils.variable_functionality import fetch_versioned_implementation +from shared_configs.configs import IGNORED_SYNCING_TENANT_LIST from shared_configs.configs import MULTI_TENANT logger = setup_logger(__name__) @@ -72,6 +73,15 @@ def _update_tenant_tasks(self) -> None: logger.info(f"Found {len(existing_tenants)} existing tenants in schedule") for tenant_id in tenant_ids: + if ( + IGNORED_SYNCING_TENANT_LIST + and tenant_id in IGNORED_SYNCING_TENANT_LIST + ): + logger.info( + f"Skipping tenant {tenant_id} as it is in the ignored syncing list" + ) + continue + if tenant_id not in existing_tenants: logger.info(f"Processing new tenant: {tenant_id}") diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index e0710cef81b..5db3e6fefd2 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -103,45 +103,44 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: with get_session_with_tenant(tenant_id=tenant_id) as db_session: old_search_settings = check_index_swap(db_session=db_session) current_search_settings = get_current_search_settings(db_session) - # So that the first time users aren't surprised by really slow speed of first - # batch of documents indexed - if current_search_settings.provider_type is None and not MULTI_TENANT: - if old_search_settings: - embedding_model = EmbeddingModel.from_db_model( - search_settings=current_search_settings, - server_host=INDEXING_MODEL_SERVER_HOST, - server_port=INDEXING_MODEL_SERVER_PORT, - ) - # only warm up if search settings were changed - warm_up_bi_encoder( - embedding_model=embedding_model, - ) + # So that the first time users aren't surprised by really slow speed of first + # batch of documents indexed + if current_search_settings.provider_type is None and not MULTI_TENANT: + if old_search_settings: + embedding_model = EmbeddingModel.from_db_model( + search_settings=current_search_settings, + server_host=INDEXING_MODEL_SERVER_HOST, + server_port=INDEXING_MODEL_SERVER_PORT, + ) - cc_pair_ids: list[int] = [] + # only warm up if search settings were changed + warm_up_bi_encoder( + embedding_model=embedding_model, + ) + + with get_session_with_tenant(tenant_id=tenant_id) as db_session: cc_pairs = fetch_connector_credential_pairs(db_session) - for cc_pair_entry in cc_pairs: - cc_pair_ids.append(cc_pair_entry.id) - - for cc_pair_id in cc_pair_ids: - redis_connector = RedisConnector(tenant_id, cc_pair_id) - # Get the primary search settings - primary_search_settings = get_current_search_settings(db_session) - search_settings = [primary_search_settings] - - # Check for secondary search settings - secondary_search_settings = get_secondary_search_settings(db_session) - if secondary_search_settings is not None: - # If secondary settings exist, add them to the list - search_settings.append(secondary_search_settings) - - for search_settings_instance in search_settings: - redis_connector_index = redis_connector.new_index( - search_settings_instance.id - ) - if redis_connector_index.fenced: - continue + cc_pair_ids: list[int] = [cc_pair_entry.id for cc_pair_entry in cc_pairs] + + with get_session_with_tenant(tenant_id=tenant_id) as db_session: + primary_search_settings = get_current_search_settings(db_session) + secondary_search_settings = get_secondary_search_settings(db_session) + + search_settings = [primary_search_settings] + if secondary_search_settings is not None: + search_settings.append(secondary_search_settings) + + for cc_pair_id in cc_pair_ids: + redis_connector = RedisConnector(tenant_id, cc_pair_id) + for search_settings_instance in search_settings: + redis_connector_index = redis_connector.new_index( + search_settings_instance.id + ) + if redis_connector_index.fenced: + continue + with get_session_with_tenant(tenant_id=tenant_id) as db_session: cc_pair = get_connector_credential_pair_from_id( cc_pair_id, db_session ) @@ -178,6 +177,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: f"search_settings={search_settings_instance.id} " ) tasks_created += 1 + except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." diff --git a/backend/danswer/db/engine.py b/backend/danswer/db/engine.py index 8e6e148b80d..639f6addc75 100644 --- a/backend/danswer/db/engine.py +++ b/backend/danswer/db/engine.py @@ -38,7 +38,6 @@ from danswer.configs.app_configs import USER_AUTH_SECRET from danswer.configs.constants import POSTGRES_UNKNOWN_APP_NAME from danswer.utils.logger import setup_logger -from shared_configs.configs import IGNORED_SYNCING_TENANT_LIST from shared_configs.configs import MULTI_TENANT from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA from shared_configs.configs import TENANT_ID_PREFIX @@ -190,13 +189,6 @@ def get_app_name(cls) -> str: return "" return cls._app_name - @classmethod - def reset_engine(cls) -> None: - with cls._lock: - if cls._engine: - cls._engine.dispose() - cls._engine = None - def get_all_tenant_ids() -> list[str] | list[None]: if not MULTI_TENANT: @@ -215,14 +207,7 @@ def get_all_tenant_ids() -> list[str] | list[None]: valid_tenants = [ tenant for tenant in tenant_ids - if tenant is None - or ( - tenant.startswith(TENANT_ID_PREFIX) - and ( - IGNORED_SYNCING_TENANT_LIST is None - or tenant not in IGNORED_SYNCING_TENANT_LIST - ) - ) + if tenant is None or tenant.startswith(TENANT_ID_PREFIX) ] return valid_tenants