Skip to content

Commit

Permalink
add vespa + embedding timeout env variables (#2689)
Browse files Browse the repository at this point in the history
* add vespa + embedding timeout env variables

* nit: integration test

* add dangerous override

* k

* add additional clarity

* nit

* nit
  • Loading branch information
pablonyx authored Oct 9, 2024
1 parent 10f221c commit d5b9a6e
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 6 deletions.
3 changes: 3 additions & 0 deletions backend/danswer/configs/app_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@
os.environ.get("CUSTOM_ANSWER_VALIDITY_CONDITIONS", "[]")
)

VESPA_REQUEST_TIMEOUT = int(os.environ.get("VESPA_REQUEST_TIMEOUT") or "5")

SYSTEM_RECURSION_LIMIT = int(os.environ.get("SYSTEM_RECURSION_LIMIT") or "1000")

#####
# Enterprise Edition Configs
Expand Down
11 changes: 6 additions & 5 deletions backend/danswer/document_index/vespa/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import requests

from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
from danswer.configs.app_configs import VESPA_REQUEST_TIMEOUT
from danswer.configs.chat_configs import DOC_TIME_DECAY
from danswer.configs.chat_configs import NUM_RETURNED_HITS
from danswer.configs.chat_configs import TITLE_CONTENT_RATIO
Expand Down Expand Up @@ -211,7 +212,7 @@ def index(
# indexing / updates / deletes since we have to make a large volume of requests.
with (
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
httpx.Client(http2=True) as http_client,
httpx.Client(http2=True, timeout=VESPA_REQUEST_TIMEOUT) as http_client,
):
# Check for existing documents, existing documents need to have all of their chunks deleted
# prior to indexing as the document size (num chunks) may have shrunk
Expand Down Expand Up @@ -275,7 +276,7 @@ def _update_chunk(
# indexing / updates / deletes since we have to make a large volume of requests.
with (
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
httpx.Client(http2=True) as http_client,
httpx.Client(http2=True, timeout=VESPA_REQUEST_TIMEOUT) as http_client,
):
for update_batch in batch_generator(updates, batch_size):
future_to_document_id = {
Expand Down Expand Up @@ -419,7 +420,7 @@ def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int:
if self.secondary_index_name:
index_names.append(self.secondary_index_name)

with httpx.Client(http2=True) as http_client:
with httpx.Client(http2=True, timeout=VESPA_REQUEST_TIMEOUT) as http_client:
for index_name in index_names:
params = httpx.QueryParams(
{
Expand Down Expand Up @@ -475,7 +476,7 @@ def delete(self, doc_ids: list[str]) -> None:

# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for
# indexing / updates / deletes since we have to make a large volume of requests.
with httpx.Client(http2=True) as http_client:
with httpx.Client(http2=True, timeout=VESPA_REQUEST_TIMEOUT) as http_client:
index_names = [self.index_name]
if self.secondary_index_name:
index_names.append(self.secondary_index_name)
Expand Down Expand Up @@ -503,7 +504,7 @@ def delete_single(self, doc_id: str) -> int:
if self.secondary_index_name:
index_names.append(self.secondary_index_name)

with httpx.Client(http2=True) as http_client:
with httpx.Client(http2=True, timeout=VESPA_REQUEST_TIMEOUT) as http_client:
for index_name in index_names:
params = httpx.QueryParams(
{
Expand Down
1 change: 1 addition & 0 deletions backend/danswer/indexing/chunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
MAX_METADATA_PERCENTAGE = 0.25
CHUNK_MIN_CONTENT = 256


logger = setup_logger()


Expand Down
7 changes: 7 additions & 0 deletions backend/danswer/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sys
import traceback
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
Expand Down Expand Up @@ -32,6 +33,7 @@
from danswer.configs.app_configs import OAUTH_CLIENT_SECRET
from danswer.configs.app_configs import POSTGRES_API_SERVER_POOL_OVERFLOW
from danswer.configs.app_configs import POSTGRES_API_SERVER_POOL_SIZE
from danswer.configs.app_configs import SYSTEM_RECURSION_LIMIT
from danswer.configs.app_configs import USER_AUTH_SECRET
from danswer.configs.app_configs import WEB_DOMAIN
from danswer.configs.constants import AuthType
Expand Down Expand Up @@ -140,6 +142,11 @@ def include_router_with_global_prefix_prepended(

@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
# Set recursion limit
if SYSTEM_RECURSION_LIMIT is not None:
sys.setrecursionlimit(SYSTEM_RECURSION_LIMIT)
logger.notice(f"System recursion limit set to {SYSTEM_RECURSION_LIMIT}")

SqlEngine.set_app_name(POSTGRES_WEB_APP_NAME)
SqlEngine.init_engine(
pool_size=POSTGRES_API_SERVER_POOL_SIZE,
Expand Down
3 changes: 2 additions & 1 deletion backend/model_server/encoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from model_server.constants import EmbeddingProvider
from model_server.utils import simple_log_function_time
from shared_configs.configs import INDEXING_ONLY
from shared_configs.configs import OPENAI_EMBEDDING_TIMEOUT
from shared_configs.enums import EmbedTextType
from shared_configs.enums import RerankerProvider
from shared_configs.model_server_models import Embedding
Expand Down Expand Up @@ -56,7 +57,7 @@ def _initialize_client(
api_key: str, provider: EmbeddingProvider, model: str | None = None
) -> Any:
if provider == EmbeddingProvider.OPENAI:
return openai.OpenAI(api_key=api_key)
return openai.OpenAI(api_key=api_key, timeout=OPENAI_EMBEDDING_TIMEOUT)
elif provider == EmbeddingProvider.COHERE:
return CohereClient(api_key=api_key)
elif provider == EmbeddingProvider.VOYAGE:
Expand Down
3 changes: 3 additions & 0 deletions backend/shared_configs/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
# notset, debug, info, notice, warning, error, or critical
LOG_LEVEL = os.environ.get("LOG_LEVEL", "notice")

# Only used for OpenAI
OPENAI_EMBEDDING_TIMEOUT = int(os.environ.get("OPENAI_EMBEDDING_TIMEOUT", "600"))


# Fields which should only be set on new search setting
PRESERVED_SEARCH_FIELDS = [
Expand Down
1 change: 1 addition & 0 deletions deployment/docker_compose/docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ services:
- INDEXING_ONLY=True
# Set to debug to get more fine-grained logs
- LOG_LEVEL=${LOG_LEVEL:-info}
- CLIENT_EMBEDDING_TIMEOUT=${CLIENT_EMBEDDING_TIMEOUT:-}
volumes:
# Not necessary, this is just to reduce download time during startup
- indexing_huggingface_model_cache:/root/.cache/huggingface/
Expand Down
4 changes: 4 additions & 0 deletions deployment/docker_compose/docker-compose.gpu-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ services:
- DISABLE_RERANK_FOR_STREAMING=${DISABLE_RERANK_FOR_STREAMING:-}
- MODEL_SERVER_HOST=${MODEL_SERVER_HOST:-inference_model_server}
- MODEL_SERVER_PORT=${MODEL_SERVER_PORT:-}
- VESPA_REQUEST_TIMEOUT=${VESPA_REQUEST_TIMEOUT:-}
# We do not recommend changing this value
- SYSTEM_RECURSION_LIMIT=${SYSTEM_RECURSION_LIMIT:-}
# Leave this on pretty please? Nothing sensitive is collected!
# https://docs.danswer.dev/more/telemetry
- DISABLE_TELEMETRY=${DISABLE_TELEMETRY:-}
Expand Down Expand Up @@ -252,6 +255,7 @@ services:
- MIN_THREADS_ML_MODELS=${MIN_THREADS_ML_MODELS:-}
# Set to debug to get more fine-grained logs
- LOG_LEVEL=${LOG_LEVEL:-info}
- CLIENT_EMBEDDING_TIMEOUT=${CLIENT_EMBEDDING_TIMEOUT:-}
volumes:
# Not necessary, this is just to reduce download time during startup
- model_cache_huggingface:/root/.cache/huggingface/
Expand Down

0 comments on commit d5b9a6e

Please sign in to comment.