From 87931b759feb1431ce96090bd390e3e28cb30208 Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Fri, 8 Nov 2024 08:50:59 -0800 Subject: [PATCH] adjust default postgres schema for slack listener --- backend/danswer/danswerbot/slack/listener.py | 107 ++++++++++--------- 1 file changed, 58 insertions(+), 49 deletions(-) diff --git a/backend/danswer/danswerbot/slack/listener.py b/backend/danswer/danswerbot/slack/listener.py index 9f8a854f0a8..8e6ab4fb61f 100644 --- a/backend/danswer/danswerbot/slack/listener.py +++ b/backend/danswer/danswerbot/slack/listener.py @@ -77,6 +77,7 @@ from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable from shared_configs.configs import MODEL_SERVER_HOST from shared_configs.configs import MODEL_SERVER_PORT +from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA from shared_configs.configs import SLACK_CHANNEL_ID logger = setup_logger() @@ -189,59 +190,67 @@ def acquire_tenants(self) -> None: continue logger.debug(f"Acquired lock for tenant {tenant_id}") - with get_session_with_tenant(tenant_id) as db_session: - try: - logger.debug( - f"Setting tenant ID context variable for tenant {tenant_id}" - ) - token = CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id or "public") - slack_bot_tokens = fetch_tokens() - logger.debug(f"Fetched Slack bot tokens for tenant {tenant_id}") - CURRENT_TENANT_ID_CONTEXTVAR.reset(token) - logger.debug( - f"Reset tenant ID context variable for tenant {tenant_id}" - ) - - if not slack_bot_tokens: - logger.debug(f"No Slack bot token found for tenant {tenant_id}") + token = CURRENT_TENANT_ID_CONTEXTVAR.set( + tenant_id or POSTGRES_DEFAULT_SCHEMA + ) + try: + with get_session_with_tenant(tenant_id) as db_session: + try: + logger.debug( + f"Setting tenant ID context variable for tenant {tenant_id}" + ) + slack_bot_tokens = fetch_tokens() + logger.debug(f"Fetched Slack bot tokens for tenant {tenant_id}") + logger.debug( + f"Reset tenant ID context variable for tenant {tenant_id}" + ) + + if not slack_bot_tokens: + logger.debug( + f"No Slack bot token found for tenant {tenant_id}" + ) + if tenant_id in self.socket_clients: + asyncio.run(self.socket_clients[tenant_id].close()) + del self.socket_clients[tenant_id] + del self.slack_bot_tokens[tenant_id] + continue + + if ( + tenant_id not in self.slack_bot_tokens + or slack_bot_tokens != self.slack_bot_tokens[tenant_id] + ): + if tenant_id in self.slack_bot_tokens: + logger.info( + f"Slack Bot tokens have changed for tenant {tenant_id} - reconnecting" + ) + else: + search_settings = get_current_search_settings( + db_session + ) + embedding_model = EmbeddingModel.from_db_model( + search_settings=search_settings, + server_host=MODEL_SERVER_HOST, + server_port=MODEL_SERVER_PORT, + ) + warm_up_bi_encoder(embedding_model=embedding_model) + + self.slack_bot_tokens[tenant_id] = slack_bot_tokens + + if tenant_id in self.socket_clients: + asyncio.run(self.socket_clients[tenant_id].close()) + + self.start_socket_client(tenant_id, slack_bot_tokens) + + except KvKeyNotFoundError: + logger.debug(f"Missing Slack Bot tokens for tenant {tenant_id}") if tenant_id in self.socket_clients: asyncio.run(self.socket_clients[tenant_id].close()) del self.socket_clients[tenant_id] del self.slack_bot_tokens[tenant_id] - continue - - if ( - tenant_id not in self.slack_bot_tokens - or slack_bot_tokens != self.slack_bot_tokens[tenant_id] - ): - if tenant_id in self.slack_bot_tokens: - logger.info( - f"Slack Bot tokens have changed for tenant {tenant_id} - reconnecting" - ) - else: - search_settings = get_current_search_settings(db_session) - embedding_model = EmbeddingModel.from_db_model( - search_settings=search_settings, - server_host=MODEL_SERVER_HOST, - server_port=MODEL_SERVER_PORT, - ) - warm_up_bi_encoder(embedding_model=embedding_model) - - self.slack_bot_tokens[tenant_id] = slack_bot_tokens - - if tenant_id in self.socket_clients: - asyncio.run(self.socket_clients[tenant_id].close()) - - self.start_socket_client(tenant_id, slack_bot_tokens) - - except KvKeyNotFoundError: - logger.debug(f"Missing Slack Bot tokens for tenant {tenant_id}") - if tenant_id in self.socket_clients: - asyncio.run(self.socket_clients[tenant_id].close()) - del self.socket_clients[tenant_id] - del self.slack_bot_tokens[tenant_id] - except Exception as e: - logger.exception(f"Error handling tenant {tenant_id}: {e}") + except Exception as e: + logger.exception(f"Error handling tenant {tenant_id}: {e}") + finally: + CURRENT_TENANT_ID_CONTEXTVAR.reset(token) def send_heartbeats(self) -> None: current_time = int(time.time())