Skip to content

Commit

Permalink
Parallelization
Browse files Browse the repository at this point in the history
  • Loading branch information
Weves committed Dec 1, 2024
1 parent 19faa80 commit 1b9e3f6
Show file tree
Hide file tree
Showing 6 changed files with 885 additions and 37 deletions.
3 changes: 3 additions & 0 deletions backend/danswer/document_index/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ def get_default_document_index(
index_name=primary_index_name,
secondary_index_name=secondary_index_name,
multitenant=MULTI_TENANT,
preserve_existing_indices=bool(
VECTOR_DB_INDEX_NAME_PREFIX__INTEGRATION_TEST_ONLY
),
)


Expand Down
99 changes: 68 additions & 31 deletions backend/danswer/document_index/vespa/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,23 +129,38 @@ def __init__(
index_name: str,
secondary_index_name: str | None,
multitenant: bool = False,
preserve_existing_indices: bool = False,
) -> None:
self.index_name = index_name
self.secondary_index_name = secondary_index_name
self.multitenant = multitenant
self.preserve_existing_indices = preserve_existing_indices
self.http_client = get_vespa_http_client()

def ensure_indices_exist(
self,
index_embedding_dim: int,
secondary_index_embedding_dim: int | None,
) -> None:
if MULTI_TENANT:
logger.info(
"Skipping Vespa index seup for multitenant (would wipe all indices)"
)
return None
@classmethod
def _get_existing_schemas(cls) -> list[tuple[str, int, bool]]:
schema_url = f"{VESPA_APPLICATION_ENDPOINT}/application/v2/tenant/default/application/default/schema"
response = requests.get(schema_url)
response.raise_for_status()

indices: list[tuple[str, int, bool]] = []

# Add existing indices that we're not explicitly creating
for schema in response.json():
# Extract embedding dimension from schema content
for field in schema["fields"]:
if field["name"] == "embeddings":
dim = field["tensorType"]["dimensions"][0]["size"]
indices.append((schema["name"], dim, False))
logger.info(
f"Preserving existing index: {schema['name']} with dimension {dim}"
)
break

return indices

@classmethod
def create_indices(cls, indices: list[tuple[str, int, bool]]) -> None:
deploy_url = f"{VESPA_APPLICATION_ENDPOINT}/tenant/default/prepareandactivate"
logger.notice(f"Deploying Vespa application package to {deploy_url}")

Expand All @@ -159,22 +174,14 @@ def ensure_indices_exist(
with open(services_file, "r") as services_f:
services_template = services_f.read()

schema_names = [self.index_name, self.secondary_index_name]
schema_names = [index_name for (index_name, _, _) in indices]

doc_lines = _create_document_xml_lines(schema_names)
services = services_template.replace(DOCUMENT_REPLACEMENT_PAT, doc_lines)
services = services.replace(
SEARCH_THREAD_NUMBER_PAT, str(VESPA_SEARCHER_THREADS)
)

kv_store = get_kv_store()

needs_reindexing = False
try:
needs_reindexing = cast(bool, kv_store.load(KV_REINDEX_KEY))
except Exception:
logger.debug("Could not load the reindexing flag. Using ngrams")

with open(overrides_file, "r") as overrides_f:
overrides_template = overrides_f.read()

Expand All @@ -195,19 +202,14 @@ def ensure_indices_exist(
schema_template = schema_f.read()
schema_template = schema_template.replace(TENANT_ID_PAT, "")

schema = schema_template.replace(
DANSWER_CHUNK_REPLACEMENT_PAT, self.index_name
).replace(VESPA_DIM_REPLACEMENT_PAT, str(index_embedding_dim))

schema = add_ngrams_to_schema(schema) if needs_reindexing else schema
schema = schema.replace(TENANT_ID_PAT, "")
zip_dict[f"schemas/{schema_names[0]}.sd"] = schema.encode("utf-8")
for index_name, index_embedding_dim, needs_reindexing in indices:
schema = schema_template.replace(
DANSWER_CHUNK_REPLACEMENT_PAT, index_name
).replace(VESPA_DIM_REPLACEMENT_PAT, str(index_embedding_dim))

if self.secondary_index_name:
upcoming_schema = schema_template.replace(
DANSWER_CHUNK_REPLACEMENT_PAT, self.secondary_index_name
).replace(VESPA_DIM_REPLACEMENT_PAT, str(secondary_index_embedding_dim))
zip_dict[f"schemas/{schema_names[1]}.sd"] = upcoming_schema.encode("utf-8")
schema = add_ngrams_to_schema(schema) if needs_reindexing else schema
schema = schema.replace(TENANT_ID_PAT, "")
zip_dict[f"schemas/{index_name}.sd"] = schema.encode("utf-8")

zip_file = in_memory_zip_from_file_bytes(zip_dict)

Expand All @@ -218,6 +220,41 @@ def ensure_indices_exist(
f"Failed to prepare Vespa Danswer Index. Response: {response.text}"
)

def ensure_indices_exist(
self,
index_embedding_dim: int,
secondary_index_embedding_dim: int | None,
) -> None:
if self.multitenant or MULTI_TENANT: # be extra safe here
logger.info(
"Skipping Vespa index setup for multitenant (would wipe all indices)"
)
return None

kv_store = get_kv_store()
primary_needs_reindexing = False
try:
primary_needs_reindexing = cast(bool, kv_store.load(KV_REINDEX_KEY))
except Exception:
logger.debug("Could not load the reindexing flag. Using ngrams")

indices = [
(self.index_name, index_embedding_dim, primary_needs_reindexing),
]
if self.secondary_index_name and secondary_index_embedding_dim:
indices.append(
(self.secondary_index_name, secondary_index_embedding_dim, False)
)

if self.preserve_existing_indices:
existing_indices = self._get_existing_schemas()
for index_info in existing_indices:
if index_info[0] in [self.index_name, self.secondary_index_name]:
continue
indices.append(index_info)

self.create_indices(indices)

@staticmethod
def register_multitenant_indices(
indices: list[str],
Expand Down
55 changes: 55 additions & 0 deletions backend/tests/integration/introspection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from pathlib import Path

import pytest
from _pytest.nodes import Item


def list_all_tests(directory: str | Path = ".") -> list[str]:
"""
List all pytest test functions under the specified directory.
Args:
directory: Directory path to search for tests (defaults to current directory)
Returns:
List of test function names with their module paths
"""
directory = Path(directory).absolute()
print(f"Searching for tests in: {directory}")

class TestCollector:
def __init__(self):
self.collected = []

def pytest_collection_modifyitems(self, items):
for item in items:
if isinstance(item, Item):
# Get the relative path from the test file to the directory we're searching from
rel_path = Path(item.fspath).relative_to(directory)
# Remove the .py extension
module_path = str(rel_path.with_suffix(""))
# Replace directory separators with dots
module_path = module_path.replace("/", ".")
test_name = item.name
self.collected.append(f"{module_path}::{test_name}")

collector = TestCollector()

# Run pytest in collection-only mode
pytest.main(
[
str(directory),
"--collect-only",
"-q", # quiet mode
],
plugins=[collector],
)

return sorted(collector.collected)


if __name__ == "__main__":
tests = list_all_tests()
print("\nFound tests:")
for test in tests:
print(f"- {test}")
Loading

0 comments on commit 1b9e3f6

Please sign in to comment.