From 35b5c44cc706f50244cedba73894d0b5756d3a18 Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Thu, 21 Nov 2024 17:09:56 -0800 Subject: [PATCH 1/7] update default sidebar toggle --- web/src/lib/constants.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/web/src/lib/constants.ts b/web/src/lib/constants.ts index 914dbac0152..57cc99f5619 100644 --- a/web/src/lib/constants.ts +++ b/web/src/lib/constants.ts @@ -37,7 +37,8 @@ export const LOGOUT_DISABLED = process.env.NEXT_PUBLIC_DISABLE_LOGOUT?.toLowerCase() === "true"; export const NEXT_PUBLIC_DEFAULT_SIDEBAR_OPEN = - process.env.NEXT_PUBLIC_DEFAULT_SIDEBAR_OPEN?.toLowerCase() === "true"; + process.env.NEXT_PUBLIC_DEFAULT_SIDEBAR_OPEN?.toLowerCase() === "true" ?? + true; export const TOGGLED_CONNECTORS_COOKIE_NAME = "toggled_connectors"; From e3573b2bc1e345f1b9e4d3185289e83cc80d9db7 Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Thu, 21 Nov 2024 17:11:11 -0800 Subject: [PATCH 2/7] add comment --- web/src/lib/constants.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/web/src/lib/constants.ts b/web/src/lib/constants.ts index 57cc99f5619..e8037764adc 100644 --- a/web/src/lib/constants.ts +++ b/web/src/lib/constants.ts @@ -36,6 +36,7 @@ export const SIDEBAR_WIDTH = `w-[350px]`; export const LOGOUT_DISABLED = process.env.NEXT_PUBLIC_DISABLE_LOGOUT?.toLowerCase() === "true"; +// Default sidebar open is true if the environment variable is not set export const NEXT_PUBLIC_DEFAULT_SIDEBAR_OPEN = process.env.NEXT_PUBLIC_DEFAULT_SIDEBAR_OPEN?.toLowerCase() === "true" ?? true; From 682319d2e9d34ac059f750a69d0658ce35c87744 Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Thu, 21 Nov 2024 18:33:09 -0800 Subject: [PATCH 3/7] Bugfix/curator interface (#3198) * mystery solved * update config * update * update * update user role * remove values --- backend/DELETE | 0 backend/danswer/db/persona.py | 1 - backend/danswer/server/manage/llm/api.py | 1 - web/src/components/UserDropdown.tsx | 16 ++++++++++------ web/src/components/context/AssistantsContext.tsx | 8 +++++--- web/src/components/user/UserProvider.tsx | 5 ++++- 6 files changed, 19 insertions(+), 12 deletions(-) create mode 100644 backend/DELETE diff --git a/backend/DELETE b/backend/DELETE new file mode 100644 index 00000000000..e69de29bb2d diff --git a/backend/danswer/db/persona.py b/backend/danswer/db/persona.py index 6446e73827a..5194a06717f 100644 --- a/backend/danswer/db/persona.py +++ b/backend/danswer/db/persona.py @@ -259,7 +259,6 @@ def get_personas( ) -> Sequence[Persona]: stmt = select(Persona).distinct() stmt = _add_user_filters(stmt=stmt, user=user, get_editable=get_editable) - if not include_default: stmt = stmt.where(Persona.builtin_persona.is_(False)) if not include_slack_bot_personas: diff --git a/backend/danswer/server/manage/llm/api.py b/backend/danswer/server/manage/llm/api.py index 9cac96236b0..f52877d919a 100644 --- a/backend/danswer/server/manage/llm/api.py +++ b/backend/danswer/server/manage/llm/api.py @@ -30,7 +30,6 @@ logger = setup_logger() - admin_router = APIRouter(prefix="/admin/llm") basic_router = APIRouter(prefix="/llm") diff --git a/web/src/components/UserDropdown.tsx b/web/src/components/UserDropdown.tsx index 80c6be1785f..8a1503410c4 100644 --- a/web/src/components/UserDropdown.tsx +++ b/web/src/components/UserDropdown.tsx @@ -57,7 +57,7 @@ const DropdownOption: React.FC = ({ }; export function UserDropdown({ page }: { page?: pageType }) { - const { user } = useUser(); + const { user, isCurator } = useUser(); const [userInfoVisible, setUserInfoVisible] = useState(false); const userInfoRef = useRef(null); const router = useRouter(); @@ -95,7 +95,9 @@ export function UserDropdown({ page }: { page?: pageType }) { } // Construct the current URL - const currentUrl = `${pathname}${searchParams.toString() ? `?${searchParams.toString()}` : ""}`; + const currentUrl = `${pathname}${ + searchParams.toString() ? `?${searchParams.toString()}` : "" + }`; // Encode the current URL to use as a redirect parameter const encodedRedirect = encodeURIComponent(currentUrl); @@ -106,9 +108,7 @@ export function UserDropdown({ page }: { page?: pageType }) { }; const showAdminPanel = !user || user.role === UserRole.ADMIN; - const showCuratorPanel = - user && - (user.role === UserRole.CURATOR || user.role === UserRole.GLOBAL_CURATOR); + const showCuratorPanel = user && isCurator; const showLogout = user && !checkUserIsNoAuthUser(user.id) && !LOGOUT_DISABLED; @@ -244,7 +244,11 @@ export function UserDropdown({ page }: { page?: pageType }) { setShowNotifications(true); }} icon={} - label={`Notifications ${notifications && notifications.length > 0 ? `(${notifications.length})` : ""}`} + label={`Notifications ${ + notifications && notifications.length > 0 + ? `(${notifications.length})` + : "" + }`} /> {showLogout && diff --git a/web/src/components/context/AssistantsContext.tsx b/web/src/components/context/AssistantsContext.tsx index 87e498d16ec..4c389240266 100644 --- a/web/src/components/context/AssistantsContext.tsx +++ b/web/src/components/context/AssistantsContext.tsx @@ -47,7 +47,7 @@ export const AssistantsProvider: React.FC<{ const [assistants, setAssistants] = useState( initialAssistants || [] ); - const { user, isLoadingUser, isAdmin } = useUser(); + const { user, isLoadingUser, isAdmin, isCurator } = useUser(); const [editablePersonas, setEditablePersonas] = useState([]); const [allAssistants, setAllAssistants] = useState([]); @@ -83,7 +83,7 @@ export const AssistantsProvider: React.FC<{ useEffect(() => { const fetchPersonas = async () => { - if (!isAdmin) { + if (!isAdmin && !isCurator) { return; } @@ -101,6 +101,8 @@ export const AssistantsProvider: React.FC<{ if (allResponse.ok) { const allPersonas = await allResponse.json(); setAllAssistants(allPersonas); + } else { + console.error("Error fetching personas:", allResponse); } } catch (error) { console.error("Error fetching personas:", error); @@ -108,7 +110,7 @@ export const AssistantsProvider: React.FC<{ }; fetchPersonas(); - }, [isAdmin]); + }, [isAdmin, isCurator]); const refreshRecentAssistants = async (currentAssistant: number) => { const response = await fetch("/api/user/recent-assistants", { diff --git a/web/src/components/user/UserProvider.tsx b/web/src/components/user/UserProvider.tsx index 48ea8826f22..be5f0b8159e 100644 --- a/web/src/components/user/UserProvider.tsx +++ b/web/src/components/user/UserProvider.tsx @@ -67,7 +67,10 @@ export function UserProvider({ isLoadingUser, refreshUser, isAdmin: upToDateUser?.role === UserRole.ADMIN, - isCurator: upToDateUser?.role === UserRole.CURATOR, + // Curator status applies for either global or basic curator + isCurator: + upToDateUser?.role === UserRole.CURATOR || + upToDateUser?.role === UserRole.GLOBAL_CURATOR, isCloudSuperuser: upToDateUser?.is_cloud_superuser ?? false, }} > From 9819aa977a3e120361c04509f7c702c1cf145984 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Thu, 21 Nov 2024 20:21:02 -0800 Subject: [PATCH 4/7] implement double check pattern for error conditions (#3201) * Move unfenced check to check_for_indexing. implement a double check pattern for all indexing error checks * improved commenting * exclusions --- .../danswer/background/celery/apps/primary.py | 4 +- .../background/celery/tasks/indexing/tasks.py | 155 ++++++++++++++++-- .../background/celery/tasks/vespa/tasks.py | 78 ++------- backend/danswer/db/index_attempt.py | 7 + .../admin_performance_query_history.spec.ts | 32 ++-- web/tests/e2e/admin_performance_usage.spec.ts | 1 + 6 files changed, 186 insertions(+), 91 deletions(-) diff --git a/backend/danswer/background/celery/apps/primary.py b/backend/danswer/background/celery/apps/primary.py index 561c96960c3..f08e573723f 100644 --- a/backend/danswer/background/celery/apps/primary.py +++ b/backend/danswer/background/celery/apps/primary.py @@ -15,7 +15,9 @@ import danswer.background.celery.apps.app_base as app_base from danswer.background.celery.apps.app_base import task_logger from danswer.background.celery.celery_utils import celery_is_worker_primary -from danswer.background.celery.tasks.vespa.tasks import get_unfenced_index_attempt_ids +from danswer.background.celery.tasks.indexing.tasks import ( + get_unfenced_index_attempt_ids, +) from danswer.configs.constants import CELERY_PRIMARY_WORKER_LOCK_TIMEOUT from danswer.configs.constants import DanswerRedisLocks from danswer.configs.constants import POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index a353e7fe215..14bf66cdc56 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -3,6 +3,7 @@ from http import HTTPStatus from time import sleep +import redis import sentry_sdk from celery import Celery from celery import shared_task @@ -32,6 +33,8 @@ from danswer.db.enums import IndexingStatus from danswer.db.enums import IndexModelStatus from danswer.db.index_attempt import create_index_attempt +from danswer.db.index_attempt import delete_index_attempt +from danswer.db.index_attempt import get_all_index_attempts_by_status from danswer.db.index_attempt import get_index_attempt from danswer.db.index_attempt import get_last_attempt_for_cc_pair from danswer.db.index_attempt import mark_attempt_failed @@ -45,6 +48,7 @@ from danswer.natural_language_processing.search_nlp_models import EmbeddingModel from danswer.natural_language_processing.search_nlp_models import warm_up_bi_encoder from danswer.redis.redis_connector import RedisConnector +from danswer.redis.redis_connector_index import RedisConnectorIndex from danswer.redis.redis_connector_index import RedisConnectorIndexPayload from danswer.redis.redis_pool import get_redis_client from danswer.utils.logger import setup_logger @@ -100,6 +104,54 @@ def progress(self, tag: str, amount: int) -> None: self.redis_client.incrby(self.generator_progress_key, amount) +def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[int]: + """Gets a list of unfenced index attempts. Should not be possible, so we'd typically + want to clean them up. + + Unfenced = attempt not in terminal state and fence does not exist. + """ + unfenced_attempts: list[int] = [] + + # inner/outer/inner double check pattern to avoid race conditions when checking for + # bad state + # inner = index_attempt in non terminal state + # outer = r.fence_key down + + # check the db for index attempts in a non terminal state + attempts: list[IndexAttempt] = [] + attempts.extend( + get_all_index_attempts_by_status(IndexingStatus.NOT_STARTED, db_session) + ) + attempts.extend( + get_all_index_attempts_by_status(IndexingStatus.IN_PROGRESS, db_session) + ) + + for attempt in attempts: + fence_key = RedisConnectorIndex.fence_key_with_ids( + attempt.connector_credential_pair_id, attempt.search_settings_id + ) + + # if the fence is down / doesn't exist, possible error but not confirmed + if r.exists(fence_key): + continue + + # Between the time the attempts are first looked up and the time we see the fence down, + # the attempt may have completed and taken down the fence normally. + + # We need to double check that the index attempt is still in a non terminal state + # and matches the original state, which confirms we are really in a bad state. + attempt_2 = get_index_attempt(db_session, attempt.id) + if not attempt_2: + continue + + if attempt.status != attempt_2.status: + continue + + unfenced_attempts.append(attempt.id) + + return unfenced_attempts + + @shared_task( name="check_for_indexing", soft_time_limit=300, @@ -110,7 +162,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: r = get_redis_client(tenant_id=tenant_id) - lock_beat = r.lock( + lock_beat: RedisLock = r.lock( DanswerRedisLocks.CHECK_INDEXING_BEAT_LOCK, timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT, ) @@ -120,6 +172,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: if not lock_beat.acquire(blocking=False): return None + # check for search settings swap with get_session_with_tenant(tenant_id=tenant_id) as db_session: old_search_settings = check_index_swap(db_session=db_session) current_search_settings = get_current_search_settings(db_session) @@ -138,13 +191,18 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: embedding_model=embedding_model, ) + # gather cc_pair_ids cc_pair_ids: list[int] = [] with get_session_with_tenant(tenant_id) as db_session: + lock_beat.reacquire() cc_pairs = fetch_connector_credential_pairs(db_session) for cc_pair_entry in cc_pairs: cc_pair_ids.append(cc_pair_entry.id) + # kick off index attempts for cc_pair_id in cc_pair_ids: + lock_beat.reacquire() + redis_connector = RedisConnector(tenant_id, cc_pair_id) with get_session_with_tenant(tenant_id) as db_session: # Get the primary search settings @@ -201,6 +259,29 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: f"search_settings={search_settings_instance.id} " ) tasks_created += 1 + + # Fail any index attempts in the DB that don't have fences + # This shouldn't ever happen! + with get_session_with_tenant(tenant_id) as db_session: + unfenced_attempt_ids = get_unfenced_index_attempt_ids(db_session, r) + for attempt_id in unfenced_attempt_ids: + lock_beat.reacquire() + + attempt = get_index_attempt(db_session, attempt_id) + if not attempt: + continue + + failure_reason = ( + f"Unfenced index attempt found in DB: " + f"index_attempt={attempt.id} " + f"cc_pair={attempt.connector_credential_pair_id} " + f"search_settings={attempt.search_settings_id}" + ) + task_logger.error(failure_reason) + mark_attempt_failed( + attempt.id, db_session, failure_reason=failure_reason + ) + except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." @@ -210,6 +291,11 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: finally: if lock_beat.owned(): lock_beat.release() + else: + task_logger.error( + "check_for_indexing - Lock not owned on completion: " + f"tenant={tenant_id}" + ) return tasks_created @@ -314,10 +400,11 @@ def try_creating_indexing_task( """ LOCK_TIMEOUT = 30 + index_attempt_id: int | None = None # we need to serialize any attempt to trigger indexing since it can be triggered # either via celery beat or manually (API call) - lock = r.lock( + lock: RedisLock = r.lock( DANSWER_REDIS_FUNCTION_LOCK_PREFIX + "try_creating_indexing_task", timeout=LOCK_TIMEOUT, ) @@ -368,6 +455,8 @@ def try_creating_indexing_task( custom_task_id = redis_connector_index.generate_generator_task_id() + # when the task is sent, we have yet to finish setting up the fence + # therefore, the task must contain code that blocks until the fence is ready result = celery_app.send_task( "connector_indexing_proxy_task", kwargs=dict( @@ -388,13 +477,16 @@ def try_creating_indexing_task( payload.celery_task_id = result.id redis_connector_index.set_fence(payload) except Exception: - redis_connector_index.set_fence(None) task_logger.exception( - f"Unexpected exception: " + f"try_creating_indexing_task - Unexpected exception: " f"tenant={tenant_id} " f"cc_pair={cc_pair.id} " f"search_settings={search_settings.id}" ) + + if index_attempt_id is not None: + delete_index_attempt(db_session, index_attempt_id) + redis_connector_index.set_fence(None) return None finally: if lock.owned(): @@ -412,7 +504,7 @@ def connector_indexing_proxy_task( ) -> None: """celery tasks are forked, but forking is unstable. This proxies work to a spawned task.""" task_logger.info( - f"Indexing proxy - starting: attempt={index_attempt_id} " + f"Indexing watchdog - starting: attempt={index_attempt_id} " f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" @@ -420,7 +512,7 @@ def connector_indexing_proxy_task( client = SimpleJobClient() job = client.submit( - connector_indexing_task, + connector_indexing_task_wrapper, index_attempt_id, cc_pair_id, search_settings_id, @@ -431,7 +523,7 @@ def connector_indexing_proxy_task( if not job: task_logger.info( - f"Indexing proxy - spawn failed: attempt={index_attempt_id} " + f"Indexing watchdog - spawn failed: attempt={index_attempt_id} " f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" @@ -439,7 +531,7 @@ def connector_indexing_proxy_task( return task_logger.info( - f"Indexing proxy - spawn succeeded: attempt={index_attempt_id} " + f"Indexing watchdog - spawn succeeded: attempt={index_attempt_id} " f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" @@ -463,7 +555,7 @@ def connector_indexing_proxy_task( if job.status == "error": task_logger.error( - f"Indexing proxy - spawned task exceptioned: " + f"Indexing watchdog - spawned task exceptioned: " f"attempt={index_attempt_id} " f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " @@ -475,7 +567,7 @@ def connector_indexing_proxy_task( break task_logger.info( - f"Indexing proxy - finished: attempt={index_attempt_id} " + f"Indexing watchdog - finished: attempt={index_attempt_id} " f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" @@ -483,6 +575,38 @@ def connector_indexing_proxy_task( return +def connector_indexing_task_wrapper( + index_attempt_id: int, + cc_pair_id: int, + search_settings_id: int, + tenant_id: str | None, + is_ee: bool, +) -> int | None: + """Just wraps connector_indexing_task so we can log any exceptions before + re-raising it.""" + result: int | None = None + + try: + result = connector_indexing_task( + index_attempt_id, + cc_pair_id, + search_settings_id, + tenant_id, + is_ee, + ) + except: + logger.exception( + f"connector_indexing_task exceptioned: " + f"tenant={tenant_id} " + f"index_attempt={index_attempt_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) + raise + + return result + + def connector_indexing_task( index_attempt_id: int, cc_pair_id: int, @@ -537,6 +661,7 @@ def connector_indexing_task( if redis_connector.delete.fenced: raise RuntimeError( f"Indexing will not start because connector deletion is in progress: " + f"attempt={index_attempt_id} " f"cc_pair={cc_pair_id} " f"fence={redis_connector.delete.fence_key}" ) @@ -544,18 +669,18 @@ def connector_indexing_task( if redis_connector.stop.fenced: raise RuntimeError( f"Indexing will not start because a connector stop signal was detected: " + f"attempt={index_attempt_id} " f"cc_pair={cc_pair_id} " f"fence={redis_connector.stop.fence_key}" ) while True: - # wait for the fence to come up - if not redis_connector_index.fenced: + if not redis_connector_index.fenced: # The fence must exist raise ValueError( f"connector_indexing_task - fence not found: fence={redis_connector_index.fence_key}" ) - payload = redis_connector_index.payload + payload = redis_connector_index.payload # The payload must exist if not payload: raise ValueError("connector_indexing_task: payload invalid or not found") @@ -578,7 +703,7 @@ def connector_indexing_task( ) break - lock = r.lock( + lock: RedisLock = r.lock( redis_connector_index.generator_lock_key, timeout=CELERY_INDEXING_LOCK_TIMEOUT, ) @@ -587,7 +712,7 @@ def connector_indexing_task( if not acquired: logger.warning( f"Indexing task already running, exiting...: " - f"cc_pair={cc_pair_id} search_settings={search_settings_id}" + f"index_attempt={index_attempt_id} cc_pair={cc_pair_id} search_settings={search_settings_id}" ) return None diff --git a/backend/danswer/background/celery/tasks/vespa/tasks.py b/backend/danswer/background/celery/tasks/vespa/tasks.py index 12a1fe30d0e..ec7f52bc03c 100644 --- a/backend/danswer/background/celery/tasks/vespa/tasks.py +++ b/backend/danswer/background/celery/tasks/vespa/tasks.py @@ -5,7 +5,6 @@ from typing import cast import httpx -import redis from celery import Celery from celery import shared_task from celery import Task @@ -47,13 +46,10 @@ from danswer.db.document_set import get_document_set_by_id from danswer.db.document_set import mark_document_set_as_synced from danswer.db.engine import get_session_with_tenant -from danswer.db.enums import IndexingStatus from danswer.db.index_attempt import delete_index_attempts -from danswer.db.index_attempt import get_all_index_attempts_by_status from danswer.db.index_attempt import get_index_attempt from danswer.db.index_attempt import mark_attempt_failed from danswer.db.models import DocumentSet -from danswer.db.models import IndexAttempt from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.factory import get_default_document_index from danswer.document_index.interfaces import VespaDocumentFields @@ -649,20 +645,26 @@ def monitor_ccpair_indexing_taskset( # the task is still setting up return - # Read result state BEFORE generator_complete_key to avoid a race condition # never use any blocking methods on the result from inside a task! result: AsyncResult = AsyncResult(payload.celery_task_id) - result_state = result.state + # inner/outer/inner double check pattern to avoid race conditions when checking for + # bad state + + # inner = get_completion / generator_complete not signaled + # outer = result.state in READY state status_int = redis_connector_index.get_completion() - if status_int is None: # completion signal not set ... check for errors - # If we get here, and then the task both sets the completion signal and finishes, - # we will incorrectly abort the task. We must check result state, then check - # get_completion again to avoid the race condition. - if result_state in READY_STATES: + if status_int is None: # inner signal not set ... possible error + result_state = result.state + if ( + result_state in READY_STATES + ): # outer signal in terminal state ... possible error + # Now double check! if redis_connector_index.get_completion() is None: - # IF the task state is READY, THEN generator_complete should be set - # if it isn't, then the worker crashed + # inner signal still not set (and cannot change when outer result_state is READY) + # Task is finished but generator complete isn't set. + # We have a problem! Worker may have crashed. + msg = ( f"Connector indexing aborted or exceptioned: " f"attempt={payload.index_attempt_id} " @@ -697,37 +699,6 @@ def monitor_ccpair_indexing_taskset( redis_connector_index.reset() -def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[int]: - """Gets a list of unfenced index attempts. Should not be possible, so we'd typically - want to clean them up. - - Unfenced = attempt not in terminal state and fence does not exist. - """ - unfenced_attempts: list[int] = [] - - # do some cleanup before clearing fences - # check the db for any outstanding index attempts - attempts: list[IndexAttempt] = [] - attempts.extend( - get_all_index_attempts_by_status(IndexingStatus.NOT_STARTED, db_session) - ) - attempts.extend( - get_all_index_attempts_by_status(IndexingStatus.IN_PROGRESS, db_session) - ) - - for attempt in attempts: - # if attempts exist in the db but we don't detect them in redis, mark them as failed - fence_key = RedisConnectorIndex.fence_key_with_ids( - attempt.connector_credential_pair_id, attempt.search_settings_id - ) - if r.exists(fence_key): - continue - - unfenced_attempts.append(attempt.id) - - return unfenced_attempts - - @shared_task(name="monitor_vespa_sync", soft_time_limit=300, bind=True) def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool: """This is a celery beat task that monitors and finalizes metadata sync tasksets. @@ -779,25 +750,6 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool: f"permissions_sync={n_permissions_sync} " ) - # Fail any index attempts in the DB that don't have fences - with get_session_with_tenant(tenant_id) as db_session: - unfenced_attempt_ids = get_unfenced_index_attempt_ids(db_session, r) - for attempt_id in unfenced_attempt_ids: - attempt = get_index_attempt(db_session, attempt_id) - if not attempt: - continue - - failure_reason = ( - f"Unfenced index attempt found in DB: " - f"index_attempt={attempt.id} " - f"cc_pair={attempt.connector_credential_pair_id} " - f"search_settings={attempt.search_settings_id}" - ) - task_logger.warning(failure_reason) - mark_attempt_failed( - attempt.id, db_session, failure_reason=failure_reason - ) - lock_beat.reacquire() if r.exists(RedisConnectorCredentialPair.get_fence_key()): monitor_connector_taskset(r) diff --git a/backend/danswer/db/index_attempt.py b/backend/danswer/db/index_attempt.py index b9c3d9d4ca2..c0d28060ad5 100644 --- a/backend/danswer/db/index_attempt.py +++ b/backend/danswer/db/index_attempt.py @@ -67,6 +67,13 @@ def create_index_attempt( return new_attempt.id +def delete_index_attempt(db_session: Session, index_attempt_id: int) -> None: + index_attempt = get_index_attempt(db_session, index_attempt_id) + if index_attempt: + db_session.delete(index_attempt) + db_session.commit() + + def mock_successful_index_attempt( connector_credential_pair_id: int, search_settings_id: int, diff --git a/web/tests/e2e/admin_performance_query_history.spec.ts b/web/tests/e2e/admin_performance_query_history.spec.ts index 0362b1a57af..d15dba7c635 100644 --- a/web/tests/e2e/admin_performance_query_history.spec.ts +++ b/web/tests/e2e/admin_performance_query_history.spec.ts @@ -1,14 +1,22 @@ import { test, expect } from "@chromatic-com/playwright"; -test( - "Admin - Performance - Query History", - { - tag: "@admin", - }, - async ({ page }, testInfo) => { - // Test simple loading - await page.goto("http://localhost:3000/admin/performance/query-history"); - await expect(page.locator("h1.text-3xl")).toHaveText("Query History"); - await expect(page.locator("p.text-sm").nth(0)).toHaveText("Feedback Type"); - } -); +test.describe("Admin Performance Query History", () => { + // Ignores the diff for elements targeted by the specified list of selectors + // exclude button since they change based on the date + test.use({ ignoreSelectors: ["button"] }); + + test( + "Admin - Performance - Query History", + { + tag: "@admin", + }, + async ({ page }, testInfo) => { + // Test simple loading + await page.goto("http://localhost:3000/admin/performance/query-history"); + await expect(page.locator("h1.text-3xl")).toHaveText("Query History"); + await expect(page.locator("p.text-sm").nth(0)).toHaveText( + "Feedback Type" + ); + } + ); +}); diff --git a/web/tests/e2e/admin_performance_usage.spec.ts b/web/tests/e2e/admin_performance_usage.spec.ts index f1eff8f3080..0a5fff426d3 100644 --- a/web/tests/e2e/admin_performance_usage.spec.ts +++ b/web/tests/e2e/admin_performance_usage.spec.ts @@ -2,6 +2,7 @@ import { test, expect } from "@chromatic-com/playwright"; test.describe("Admin Performance Usage", () => { // Ignores the diff for elements targeted by the specified list of selectors + // exclude button and svg since they change based on the date test.use({ ignoreSelectors: ["button", "svg"] }); test( From 67bfcabbc5f152e22c8d82a5524d1e22522dbed5 Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Fri, 22 Nov 2024 08:53:24 -0800 Subject: [PATCH 5/7] llm provider causing re render in effect (#3205) * llm provider causing re render in effect * clean * unused * k --- web/src/app/chat/ChatPage.tsx | 4 ++-- web/src/lib/hooks.ts | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/web/src/app/chat/ChatPage.tsx b/web/src/app/chat/ChatPage.tsx index 51121621396..bd7c39646fd 100644 --- a/web/src/app/chat/ChatPage.tsx +++ b/web/src/app/chat/ChatPage.tsx @@ -52,6 +52,7 @@ import { useLayoutEffect, useRef, useState, + useMemo, } from "react"; import { usePopup } from "@/components/admin/connectors/Popup"; import { SEARCH_PARAM_NAMES, shouldSubmitOnLoad } from "./searchParams"; @@ -266,7 +267,6 @@ export function ChatPage({ availableAssistants[0]; const noAssistants = liveAssistant == null || liveAssistant == undefined; - // always set the model override for the chat session, when an assistant, llm provider, or user preference exists useEffect(() => { const personaDefault = getLLMProviderOverrideForPersona( @@ -282,7 +282,7 @@ export function ChatPage({ ); } // eslint-disable-next-line react-hooks/exhaustive-deps - }, [liveAssistant, llmProviders, user?.preferences.default_model]); + }, [liveAssistant, user?.preferences.default_model]); const stopGenerating = () => { const currentSession = currentSessionId(); diff --git a/web/src/lib/hooks.ts b/web/src/lib/hooks.ts index d0b3d0adfc0..a19c1eb7c4b 100644 --- a/web/src/lib/hooks.ts +++ b/web/src/lib/hooks.ts @@ -174,7 +174,6 @@ export function useLlmOverride( modelName: "", } ); - const [llmOverride, setLlmOverride] = useState( currentChatSession && currentChatSession.current_alternate_model ? destructureValue(currentChatSession.current_alternate_model) From 129c8f8fafd0c61aa665b446f8d7252930a65a89 Mon Sep 17 00:00:00 2001 From: Chris Weaver <25087905+Weves@users.noreply.github.com> Date: Fri, 22 Nov 2024 10:29:13 -0800 Subject: [PATCH 6/7] Add start/end date ability for query history as CSV endpoint (#3211) --- .../ee/danswer/server/query_history/api.py | 6 +- .../integration/common_utils/managers/chat.py | 9 +- .../integration/common_utils/test_models.py | 5 +- .../tests/query-history/test_query_history.py | 89 +++++++++++++++---- 4 files changed, 81 insertions(+), 28 deletions(-) diff --git a/backend/ee/danswer/server/query_history/api.py b/backend/ee/danswer/server/query_history/api.py index 7b86c16abfa..df6175cf271 100644 --- a/backend/ee/danswer/server/query_history/api.py +++ b/backend/ee/danswer/server/query_history/api.py @@ -427,12 +427,14 @@ def get_chat_session_admin( @router.get("/admin/query-history-csv") def get_query_history_as_csv( _: User | None = Depends(current_admin_user), + start: datetime | None = None, + end: datetime | None = None, db_session: Session = Depends(get_session), ) -> StreamingResponse: complete_chat_session_history = fetch_and_process_chat_session_history( db_session=db_session, - start=datetime.fromtimestamp(0, tz=timezone.utc), - end=datetime.now(tz=timezone.utc), + start=start or datetime.fromtimestamp(0, tz=timezone.utc), + end=end or datetime.now(tz=timezone.utc), feedback_type=None, limit=None, ) diff --git a/backend/tests/integration/common_utils/managers/chat.py b/backend/tests/integration/common_utils/managers/chat.py index a2edb32caec..5927d633068 100644 --- a/backend/tests/integration/common_utils/managers/chat.py +++ b/backend/tests/integration/common_utils/managers/chat.py @@ -142,7 +142,7 @@ def get_chat_history( user_performing_action: DATestUser | None = None, ) -> list[DATestChatMessage]: response = requests.get( - f"{API_SERVER_URL}/chat/history/{chat_session.id}", + f"{API_SERVER_URL}/chat/get-chat-session/{chat_session.id}", headers=user_performing_action.headers if user_performing_action else GENERAL_HEADERS, @@ -151,11 +151,10 @@ def get_chat_history( return [ DATestChatMessage( - id=msg["id"], + id=msg["message_id"], chat_session_id=chat_session.id, - parent_message_id=msg.get("parent_message_id"), + parent_message_id=msg.get("parent_message"), message=msg["message"], - response=msg.get("response", ""), ) - for msg in response.json() + for msg in response.json()["messages"] ] diff --git a/backend/tests/integration/common_utils/test_models.py b/backend/tests/integration/common_utils/test_models.py index 16156d8aab4..c0b9c92a577 100644 --- a/backend/tests/integration/common_utils/test_models.py +++ b/backend/tests/integration/common_utils/test_models.py @@ -136,11 +136,10 @@ class DATestChatSession(BaseModel): class DATestChatMessage(BaseModel): - id: str | None = None + id: int chat_session_id: UUID - parent_message_id: str | None + parent_message_id: int | None message: str - response: str class StreamedResponse(BaseModel): diff --git a/backend/tests/integration/tests/query-history/test_query_history.py b/backend/tests/integration/tests/query-history/test_query_history.py index bd66bb049f5..2d8eab72ab4 100644 --- a/backend/tests/integration/tests/query-history/test_query_history.py +++ b/backend/tests/integration/tests/query-history/test_query_history.py @@ -2,6 +2,7 @@ from datetime import timedelta from datetime import timezone +import pytest import requests from danswer.configs.constants import QAFeedbackType @@ -16,7 +17,8 @@ from tests.integration.common_utils.test_models import DATestUser -def test_query_history_endpoints(reset: None) -> None: +@pytest.fixture +def setup_chat_session(reset: None) -> tuple[DATestUser, str]: # Create admin user and required resources admin_user: DATestUser = UserManager.create(name="admin_user") cc_pair = CCPairManager.create_from_scratch(user_performing_action=admin_user) @@ -46,16 +48,29 @@ def test_query_history_endpoints(reset: None) -> None: user_performing_action=admin_user, ) - # Test get chat session history endpoint - end_time = datetime.now(tz=timezone.utc) - start_time = end_time - timedelta(days=1) + messages = ChatSessionManager.get_chat_history( + chat_session=chat_session, + user_performing_action=admin_user, + ) + + # Add another message to the chat session + ChatSessionManager.send_message( + chat_session_id=chat_session.id, + message="What about Q2 revenue?", + user_performing_action=admin_user, + parent_message_id=messages[-1].id, + ) + + return admin_user, str(chat_session.id) + + +def test_chat_history_endpoints( + reset: None, setup_chat_session: tuple[DATestUser, str] +) -> None: + admin_user, first_chat_id = setup_chat_session response = requests.get( f"{API_SERVER_URL}/admin/chat-session-history", - params={ - "start": start_time.isoformat(), - "end": end_time.isoformat(), - }, headers=admin_user.headers, ) assert response.status_code == 200 @@ -66,7 +81,6 @@ def test_query_history_endpoints(reset: None) -> None: # Verify the first chat session details first_session = history_response[0] - first_chat_id = first_session["id"] assert first_session["user_email"] == admin_user.email assert first_session["name"] == "Test chat session" assert first_session["first_user_message"] == "What was the Q1 revenue?" @@ -74,7 +88,22 @@ def test_query_history_endpoints(reset: None) -> None: assert first_session["assistant_id"] == 0 assert first_session["feedback_type"] is None assert first_session["flow_type"] == SessionType.CHAT.value - assert first_session["conversation_length"] == 2 # User message + AI response + assert first_session["conversation_length"] == 4 # 2 User messages + 2 AI responses + + # Test date filtering - should return no results + past_end = datetime.now(tz=timezone.utc) - timedelta(days=1) + past_start = past_end - timedelta(days=1) + response = requests.get( + f"{API_SERVER_URL}/admin/chat-session-history", + params={ + "start": past_start.isoformat(), + "end": past_end.isoformat(), + }, + headers=admin_user.headers, + ) + assert response.status_code == 200 + history_response = response.json() + assert len(history_response) == 0 # Test get specific chat session endpoint response = requests.get( @@ -89,7 +118,25 @@ def test_query_history_endpoints(reset: None) -> None: assert len(session_details["messages"]) > 0 assert session_details["flow_type"] == SessionType.CHAT.value - # Test CSV export endpoint + # Test filtering by feedback + response = requests.get( + f"{API_SERVER_URL}/admin/chat-session-history", + params={ + "feedback_type": QAFeedbackType.LIKE.value, + }, + headers=admin_user.headers, + ) + assert response.status_code == 200 + history_response = response.json() + assert len(history_response) == 0 + + +def test_chat_history_csv_export( + reset: None, setup_chat_session: tuple[DATestUser, str] +) -> None: + admin_user, _ = setup_chat_session + + # Test CSV export endpoint with date filtering response = requests.get( f"{API_SERVER_URL}/admin/query-history-csv", headers=admin_user.headers, @@ -100,20 +147,26 @@ def test_query_history_endpoints(reset: None) -> None: # Verify CSV content csv_content = response.content.decode() + csv_lines = csv_content.strip().split("\n") + assert len(csv_lines) == 3 # Header + 2 QA pairs assert "chat_session_id" in csv_content assert "user_message" in csv_content assert "ai_response" in csv_content + assert "What was the Q1 revenue?" in csv_content + assert "What about Q2 revenue?" in csv_content - # Test filtering by feedback + # Test CSV export with date filtering - should return no results + past_end = datetime.now(tz=timezone.utc) - timedelta(days=1) + past_start = past_end - timedelta(days=1) response = requests.get( - f"{API_SERVER_URL}/admin/chat-session-history", + f"{API_SERVER_URL}/admin/query-history-csv", params={ - "feedback_type": QAFeedbackType.LIKE.value, - "start": start_time.isoformat(), - "end": end_time.isoformat(), + "start": past_start.isoformat(), + "end": past_end.isoformat(), }, headers=admin_user.headers, ) assert response.status_code == 200 - history_response = response.json() - assert len(history_response) == 0 + csv_content = response.content.decode() + csv_lines = csv_content.strip().split("\n") + assert len(csv_lines) == 1 # Only header, no data rows From 5dc07d4178a676b66707e94449cd7d782d26604e Mon Sep 17 00:00:00 2001 From: hagen-danswer Date: Fri, 22 Nov 2024 11:06:19 -0800 Subject: [PATCH 7/7] Each section is now cleaned before being chunked (#3210) * Each section is now cleaned before being chunked * k --------- Co-authored-by: Yuhong Sun --- backend/danswer/indexing/chunker.py | 55 +++++++++++-------- .../search_nlp_models.py | 28 ---------- backend/danswer/utils/text_processing.py | 22 ++++++++ 3 files changed, 55 insertions(+), 50 deletions(-) diff --git a/backend/danswer/indexing/chunker.py b/backend/danswer/indexing/chunker.py index 35dd919af6b..287d3ba2d5e 100644 --- a/backend/danswer/indexing/chunker.py +++ b/backend/danswer/indexing/chunker.py @@ -14,6 +14,7 @@ from danswer.indexing.models import DocAwareChunk from danswer.natural_language_processing.utils import BaseTokenizer from danswer.utils.logger import setup_logger +from danswer.utils.text_processing import clean_text from danswer.utils.text_processing import shared_precompare_cleanup from shared_configs.configs import STRICT_CHUNK_TOKEN_LIMIT @@ -220,9 +221,20 @@ def _create_chunk( mini_chunk_texts=self._get_mini_chunk_texts(text), ) - for section in document.sections: - section_text = section.text + for section_idx, section in enumerate(document.sections): + section_text = clean_text(section.text) section_link_text = section.link or "" + # If there is no useful content, not even the title, just drop it + if not section_text and (not document.title or section_idx > 0): + # If a section is empty and the document has no title, we can just drop it. We return a list of + # DocAwareChunks where each one contains the necessary information needed down the line for indexing. + # There is no concern about dropping whole documents from this list, it should not cause any indexing failures. + logger.warning( + f"Skipping section {section.text} from document " + f"{document.semantic_identifier} due to empty text after cleaning " + f" with link {section_link_text}" + ) + continue section_token_count = len(self.tokenizer.tokenize(section_text)) @@ -238,31 +250,26 @@ def _create_chunk( split_texts = self.chunk_splitter.split_text(section_text) for i, split_text in enumerate(split_texts): - split_token_count = len(self.tokenizer.tokenize(split_text)) - - if STRICT_CHUNK_TOKEN_LIMIT: - split_token_count = len(self.tokenizer.tokenize(split_text)) - if split_token_count > content_token_limit: - # Further split the oversized chunk - smaller_chunks = self._split_oversized_chunk( - split_text, content_token_limit - ) - for i, small_chunk in enumerate(smaller_chunks): - chunks.append( - _create_chunk( - text=small_chunk, - links={0: section_link_text}, - is_continuation=(i != 0), - ) - ) - else: + if ( + STRICT_CHUNK_TOKEN_LIMIT + and + # Tokenizer only runs if STRICT_CHUNK_TOKEN_LIMIT is true + len(self.tokenizer.tokenize(split_text)) > content_token_limit + ): + # If STRICT_CHUNK_TOKEN_LIMIT is true, manually check + # the token count of each split text to ensure it is + # not larger than the content_token_limit + smaller_chunks = self._split_oversized_chunk( + split_text, content_token_limit + ) + for i, small_chunk in enumerate(smaller_chunks): chunks.append( _create_chunk( - text=split_text, + text=small_chunk, links={0: section_link_text}, + is_continuation=(i != 0), ) ) - else: chunks.append( _create_chunk( @@ -354,6 +361,10 @@ def _handle_single_document(self, document: Document) -> list[DocAwareChunk]: return normal_chunks def chunk(self, documents: list[Document]) -> list[DocAwareChunk]: + """ + Takes in a list of documents and chunks them into smaller chunks for indexing + while persisting the document metadata. + """ final_chunks: list[DocAwareChunk] = [] for document in documents: if self.callback: diff --git a/backend/danswer/natural_language_processing/search_nlp_models.py b/backend/danswer/natural_language_processing/search_nlp_models.py index 9a3d575c0f9..ee80292de63 100644 --- a/backend/danswer/natural_language_processing/search_nlp_models.py +++ b/backend/danswer/natural_language_processing/search_nlp_models.py @@ -1,4 +1,3 @@ -import re import threading import time from collections.abc import Callable @@ -50,28 +49,6 @@ def clean_model_name(model_str: str) -> str: return model_str.replace("/", "_").replace("-", "_").replace(".", "_") -_INITIAL_FILTER = re.compile( - "[" - "\U0000FFF0-\U0000FFFF" # Specials - "\U0001F000-\U0001F9FF" # Emoticons - "\U00002000-\U0000206F" # General Punctuation - "\U00002190-\U000021FF" # Arrows - "\U00002700-\U000027BF" # Dingbats - "]+", - flags=re.UNICODE, -) - - -def clean_openai_text(text: str) -> str: - # Remove specific Unicode ranges that might cause issues - cleaned = _INITIAL_FILTER.sub("", text) - - # Remove any control characters except for newline and tab - cleaned = "".join(ch for ch in cleaned if ch >= " " or ch in "\n\t") - - return cleaned - - def build_model_server_url( model_server_host: str, model_server_port: int, @@ -215,11 +192,6 @@ def encode( for text in texts ] - if self.provider_type == EmbeddingProvider.OPENAI: - # If the provider is openai, we need to clean the text - # as a temporary workaround for the openai API - texts = [clean_openai_text(text) for text in texts] - batch_size = ( api_embedding_batch_size if self.provider_type diff --git a/backend/danswer/utils/text_processing.py b/backend/danswer/utils/text_processing.py index da9776990ff..d26b5f357fb 100644 --- a/backend/danswer/utils/text_processing.py +++ b/backend/danswer/utils/text_processing.py @@ -126,6 +126,28 @@ def shared_precompare_cleanup(text: str) -> str: return text +_INITIAL_FILTER = re.compile( + "[" + "\U0000FFF0-\U0000FFFF" # Specials + "\U0001F000-\U0001F9FF" # Emoticons + "\U00002000-\U0000206F" # General Punctuation + "\U00002190-\U000021FF" # Arrows + "\U00002700-\U000027BF" # Dingbats + "]+", + flags=re.UNICODE, +) + + +def clean_text(text: str) -> str: + # Remove specific Unicode ranges that might cause issues + cleaned = _INITIAL_FILTER.sub("", text) + + # Remove any control characters except for newline and tab + cleaned = "".join(ch for ch in cleaned if ch >= " " or ch in "\n\t") + + return cleaned + + def is_valid_email(text: str) -> bool: """Can use a library instead if more detailed checks are needed""" regex = r"^[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"