From 1b9e3f683e728fc8863dc1f52174786ff96087cb Mon Sep 17 00:00:00 2001 From: Weves Date: Sun, 1 Dec 2024 11:41:03 -0800 Subject: [PATCH] Parallelization --- backend/danswer/document_index/factory.py | 3 + backend/danswer/document_index/vespa/index.py | 99 ++- backend/tests/integration/introspection.py | 55 ++ backend/tests/integration/kickoff.py | 597 ++++++++++++++++++ backend/tests/integration/run.py | 126 ++++ kickoff.py | 42 +- 6 files changed, 885 insertions(+), 37 deletions(-) create mode 100644 backend/tests/integration/introspection.py create mode 100644 backend/tests/integration/kickoff.py create mode 100644 backend/tests/integration/run.py diff --git a/backend/danswer/document_index/factory.py b/backend/danswer/document_index/factory.py index 7a9fa1dbd92..6bc385daede 100644 --- a/backend/danswer/document_index/factory.py +++ b/backend/danswer/document_index/factory.py @@ -29,6 +29,9 @@ def get_default_document_index( index_name=primary_index_name, secondary_index_name=secondary_index_name, multitenant=MULTI_TENANT, + preserve_existing_indices=bool( + VECTOR_DB_INDEX_NAME_PREFIX__INTEGRATION_TEST_ONLY + ), ) diff --git a/backend/danswer/document_index/vespa/index.py b/backend/danswer/document_index/vespa/index.py index ebe6daca1a2..1e4610c5cc6 100644 --- a/backend/danswer/document_index/vespa/index.py +++ b/backend/danswer/document_index/vespa/index.py @@ -129,23 +129,38 @@ def __init__( index_name: str, secondary_index_name: str | None, multitenant: bool = False, + preserve_existing_indices: bool = False, ) -> None: self.index_name = index_name self.secondary_index_name = secondary_index_name self.multitenant = multitenant + self.preserve_existing_indices = preserve_existing_indices self.http_client = get_vespa_http_client() - def ensure_indices_exist( - self, - index_embedding_dim: int, - secondary_index_embedding_dim: int | None, - ) -> None: - if MULTI_TENANT: - logger.info( - "Skipping Vespa index seup for multitenant (would wipe all indices)" - ) - return None + @classmethod + def _get_existing_schemas(cls) -> list[tuple[str, int, bool]]: + schema_url = f"{VESPA_APPLICATION_ENDPOINT}/application/v2/tenant/default/application/default/schema" + response = requests.get(schema_url) + response.raise_for_status() + + indices: list[tuple[str, int, bool]] = [] + + # Add existing indices that we're not explicitly creating + for schema in response.json(): + # Extract embedding dimension from schema content + for field in schema["fields"]: + if field["name"] == "embeddings": + dim = field["tensorType"]["dimensions"][0]["size"] + indices.append((schema["name"], dim, False)) + logger.info( + f"Preserving existing index: {schema['name']} with dimension {dim}" + ) + break + + return indices + @classmethod + def create_indices(cls, indices: list[tuple[str, int, bool]]) -> None: deploy_url = f"{VESPA_APPLICATION_ENDPOINT}/tenant/default/prepareandactivate" logger.notice(f"Deploying Vespa application package to {deploy_url}") @@ -159,7 +174,7 @@ def ensure_indices_exist( with open(services_file, "r") as services_f: services_template = services_f.read() - schema_names = [self.index_name, self.secondary_index_name] + schema_names = [index_name for (index_name, _, _) in indices] doc_lines = _create_document_xml_lines(schema_names) services = services_template.replace(DOCUMENT_REPLACEMENT_PAT, doc_lines) @@ -167,14 +182,6 @@ def ensure_indices_exist( SEARCH_THREAD_NUMBER_PAT, str(VESPA_SEARCHER_THREADS) ) - kv_store = get_kv_store() - - needs_reindexing = False - try: - needs_reindexing = cast(bool, kv_store.load(KV_REINDEX_KEY)) - except Exception: - logger.debug("Could not load the reindexing flag. Using ngrams") - with open(overrides_file, "r") as overrides_f: overrides_template = overrides_f.read() @@ -195,19 +202,14 @@ def ensure_indices_exist( schema_template = schema_f.read() schema_template = schema_template.replace(TENANT_ID_PAT, "") - schema = schema_template.replace( - DANSWER_CHUNK_REPLACEMENT_PAT, self.index_name - ).replace(VESPA_DIM_REPLACEMENT_PAT, str(index_embedding_dim)) - - schema = add_ngrams_to_schema(schema) if needs_reindexing else schema - schema = schema.replace(TENANT_ID_PAT, "") - zip_dict[f"schemas/{schema_names[0]}.sd"] = schema.encode("utf-8") + for index_name, index_embedding_dim, needs_reindexing in indices: + schema = schema_template.replace( + DANSWER_CHUNK_REPLACEMENT_PAT, index_name + ).replace(VESPA_DIM_REPLACEMENT_PAT, str(index_embedding_dim)) - if self.secondary_index_name: - upcoming_schema = schema_template.replace( - DANSWER_CHUNK_REPLACEMENT_PAT, self.secondary_index_name - ).replace(VESPA_DIM_REPLACEMENT_PAT, str(secondary_index_embedding_dim)) - zip_dict[f"schemas/{schema_names[1]}.sd"] = upcoming_schema.encode("utf-8") + schema = add_ngrams_to_schema(schema) if needs_reindexing else schema + schema = schema.replace(TENANT_ID_PAT, "") + zip_dict[f"schemas/{index_name}.sd"] = schema.encode("utf-8") zip_file = in_memory_zip_from_file_bytes(zip_dict) @@ -218,6 +220,41 @@ def ensure_indices_exist( f"Failed to prepare Vespa Danswer Index. Response: {response.text}" ) + def ensure_indices_exist( + self, + index_embedding_dim: int, + secondary_index_embedding_dim: int | None, + ) -> None: + if self.multitenant or MULTI_TENANT: # be extra safe here + logger.info( + "Skipping Vespa index setup for multitenant (would wipe all indices)" + ) + return None + + kv_store = get_kv_store() + primary_needs_reindexing = False + try: + primary_needs_reindexing = cast(bool, kv_store.load(KV_REINDEX_KEY)) + except Exception: + logger.debug("Could not load the reindexing flag. Using ngrams") + + indices = [ + (self.index_name, index_embedding_dim, primary_needs_reindexing), + ] + if self.secondary_index_name and secondary_index_embedding_dim: + indices.append( + (self.secondary_index_name, secondary_index_embedding_dim, False) + ) + + if self.preserve_existing_indices: + existing_indices = self._get_existing_schemas() + for index_info in existing_indices: + if index_info[0] in [self.index_name, self.secondary_index_name]: + continue + indices.append(index_info) + + self.create_indices(indices) + @staticmethod def register_multitenant_indices( indices: list[str], diff --git a/backend/tests/integration/introspection.py b/backend/tests/integration/introspection.py new file mode 100644 index 00000000000..00adb7c79bf --- /dev/null +++ b/backend/tests/integration/introspection.py @@ -0,0 +1,55 @@ +from pathlib import Path + +import pytest +from _pytest.nodes import Item + + +def list_all_tests(directory: str | Path = ".") -> list[str]: + """ + List all pytest test functions under the specified directory. + + Args: + directory: Directory path to search for tests (defaults to current directory) + + Returns: + List of test function names with their module paths + """ + directory = Path(directory).absolute() + print(f"Searching for tests in: {directory}") + + class TestCollector: + def __init__(self): + self.collected = [] + + def pytest_collection_modifyitems(self, items): + for item in items: + if isinstance(item, Item): + # Get the relative path from the test file to the directory we're searching from + rel_path = Path(item.fspath).relative_to(directory) + # Remove the .py extension + module_path = str(rel_path.with_suffix("")) + # Replace directory separators with dots + module_path = module_path.replace("/", ".") + test_name = item.name + self.collected.append(f"{module_path}::{test_name}") + + collector = TestCollector() + + # Run pytest in collection-only mode + pytest.main( + [ + str(directory), + "--collect-only", + "-q", # quiet mode + ], + plugins=[collector], + ) + + return sorted(collector.collected) + + +if __name__ == "__main__": + tests = list_all_tests() + print("\nFound tests:") + for test in tests: + print(f"- {test}") diff --git a/backend/tests/integration/kickoff.py b/backend/tests/integration/kickoff.py new file mode 100644 index 00000000000..79872174e0e --- /dev/null +++ b/backend/tests/integration/kickoff.py @@ -0,0 +1,597 @@ +#!/usr/bin/env python3 +import atexit +import os +import random +import signal +import socket +import subprocess +import sys +import time +import uuid +from collections.abc import Callable +from multiprocessing.pool import ThreadPool +from pathlib import Path +from typing import NamedTuple + +import requests +import yaml + +from danswer.document_index.vespa.index import VespaIndex + + +BACKEND_DIR_PATH = Path(__file__).parent.parent.parent +COMPOSE_DIR_PATH = BACKEND_DIR_PATH.parent / "deployment/docker_compose" + + +class DeploymentConfig(NamedTuple): + instance_num: int + api_port: int + web_port: int + nginx_port: int + + +def get_random_port() -> int: + """Find a random available port.""" + while True: + port = random.randint(10000, 65535) + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + if sock.connect_ex(("localhost", port)) != 0: + return port + + +def cleanup_pid(pid: int) -> None: + """Cleanup a specific PID.""" + print(f"Killing process {pid}") + try: + os.kill(pid, signal.SIGTERM) + except ProcessLookupError: + print(f"Process {pid} not found") + + +def get_shared_services_stack_name(run_id: uuid.UUID) -> str: + return f"base-danswer-{run_id}" + + +def get_db_name(instance_num: int) -> str: + """Get the database name for a given instance number.""" + return f"danswer_{instance_num}" + + +def get_vector_db_prefix(instance_num: int) -> str: + """Get the vector DB prefix for a given instance number.""" + return f"test_instance_{instance_num}" + + +def setup_db( + instance_num: int, + postgres_port: int, +) -> None: + env = os.environ.copy() + + # Wait for postgres to be ready + max_attempts = 10 + for attempt in range(max_attempts): + try: + subprocess.run( + [ + "psql", + "-h", + "localhost", + "-p", + str(postgres_port), + "-U", + "postgres", + "-c", + "SELECT 1", + ], + env={**env, "PGPASSWORD": "password"}, + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + break + except subprocess.CalledProcessError: + if attempt == max_attempts - 1: + raise RuntimeError("Postgres failed to become ready within timeout") + time.sleep(1) + + db_name = get_db_name(instance_num) + # Create the database first + subprocess.run( + [ + "psql", + "-h", + "localhost", + "-p", + str(postgres_port), + "-U", + "postgres", + "-c", + f"CREATE DATABASE {db_name}", + ], + env={**env, "PGPASSWORD": "password"}, + check=True, + ) + + # Run alembic upgrade to create tables + subprocess.run( + ["alembic", "upgrade", "head"], + env={ + **env, + "POSTGRES_HOST": "localhost", + "POSTGRES_PORT": str(postgres_port), + "POSTGRES_DB": db_name, + }, + check=True, + cwd=str(BACKEND_DIR_PATH), + ) + + +def start_api_server( + instance_num: int, + model_server_port: int, + postgres_port: int, + vespa_port: int, + vespa_tenant_port: int, + redis_port: int, + register_process: Callable[[subprocess.Popen], None], +) -> int: + """Start the API server. + + NOTE: assumes that Postgres is all set up (database exists, migrations ran) + """ + print("Starting API server...") + db_name = get_db_name(instance_num) + vector_db_prefix = get_vector_db_prefix(instance_num) + + env = os.environ.copy() + env.update( + { + "POSTGRES_HOST": "localhost", + "POSTGRES_PORT": str(postgres_port), + "POSTGRES_DB": db_name, + "REDIS_HOST": "localhost", + "REDIS_PORT": str(redis_port), + "VESPA_HOST": "localhost", + "VESPA_PORT": str(vespa_port), + "VESPA_TENANT_PORT": str(vespa_tenant_port), + "MODEL_SERVER_PORT": str(model_server_port), + "VECTOR_DB_INDEX_NAME_PREFIX__INTEGRATION_TEST_ONLY": vector_db_prefix, + } + ) + + port = get_random_port() + + # Open log file for API server in /tmp + log_file = open(f"/tmp/api_server_{instance_num}.txt", "w") + + process = subprocess.Popen( + [ + "uvicorn", + "danswer.main:app", + "--host", + "localhost", + "--port", + str(port), + ], + env=env, + cwd=str(BACKEND_DIR_PATH), + stdout=log_file, + stderr=subprocess.STDOUT, + ) + register_process(process) + + return port + + +def start_model_server( + instance_num: int, + postgres_port: int, + vespa_port: int, + vespa_tenant_port: int, + redis_port: int, + register_process: Callable[[subprocess.Popen], None], +) -> int: + """Start the model server.""" + print("Starting model server...") + + env = os.environ.copy() + env.update( + { + "POSTGRES_HOST": "localhost", + "POSTGRES_PORT": str(postgres_port), + "REDIS_HOST": "localhost", + "REDIS_PORT": str(redis_port), + "VESPA_HOST": "localhost", + "VESPA_PORT": str(vespa_port), + "VESPA_TENANT_PORT": str(vespa_tenant_port), + } + ) + + port = get_random_port() + + # Open log file for model server in /tmp + log_file = open(f"/tmp/model_server_{instance_num}.txt", "w") + + process = subprocess.Popen( + [ + "uvicorn", + "model_server.main:app", + "--host", + "0.0.0.0", + "--port", + str(port), + ], + env=env, + cwd=str(BACKEND_DIR_PATH), + stdout=log_file, + stderr=subprocess.STDOUT, + ) + register_process(process) + + return port + + +def start_background( + instance_num: int, + postgres_port: int, + vespa_port: int, + vespa_tenant_port: int, + redis_port: int, + register_process: Callable[[subprocess.Popen], None], +) -> None: + """Start the background process.""" + print("Starting background process...") + env = os.environ.copy() + env.update( + { + "POSTGRES_HOST": "localhost", + "POSTGRES_PORT": str(postgres_port), + "POSTGRES_DB": get_db_name(instance_num), + "REDIS_HOST": "localhost", + "REDIS_PORT": str(redis_port), + "VESPA_HOST": "localhost", + "VESPA_PORT": str(vespa_port), + "VESPA_TENANT_PORT": str(vespa_tenant_port), + "VECTOR_DB_INDEX_NAME_PREFIX__INTEGRATION_TEST_ONLY": get_vector_db_prefix( + instance_num + ), + } + ) + + str(Path(__file__).parent / "backend") + + # Open log file for background process in /tmp + log_file = open(f"/tmp/background_{instance_num}.txt", "w") + + process = subprocess.Popen( + ["supervisord", "-n", "-c", "./supervisord.conf"], + env=env, + cwd=str(BACKEND_DIR_PATH), + stdout=log_file, + stderr=subprocess.STDOUT, + ) + register_process(process) + + +def start_shared_services(run_id: uuid.UUID) -> tuple[int, int, int]: + """Start Postgres and Vespa using docker-compose. + Returns (postgres_port, vespa_port, vespa_tenant_port) + """ + print("Starting database services...") + + postgres_port = get_random_port() + vespa_port = get_random_port() + vespa_tenant_port = get_random_port() + + minimal_compose = { + "services": { + "relational_db": { + "image": "postgres:15.2-alpine", + "command": "-c 'max_connections=250'", + "environment": { + "POSTGRES_USER": os.getenv("POSTGRES_USER", "postgres"), + "POSTGRES_PASSWORD": os.getenv("POSTGRES_PASSWORD", "password"), + }, + "ports": [f"{postgres_port}:5432"], + }, + "index": { + "image": "vespaengine/vespa:8.277.17", + "ports": [ + f"{vespa_port}:8081", # Main Vespa port + f"{vespa_tenant_port}:19071", # Tenant port + ], + }, + }, + } + + # Write the minimal compose file + temp_compose = Path("/tmp/docker-compose.minimal.yml") + with open(temp_compose, "w") as f: + yaml.dump(minimal_compose, f) + + # Start the services + subprocess.run( + [ + "docker", + "compose", + "-f", + str(temp_compose), + "-p", + get_shared_services_stack_name(run_id), + "up", + "-d", + ], + check=True, + ) + + return postgres_port, vespa_port, vespa_tenant_port + + +def prepare_vespa(instance_ids: list[int]) -> None: + schema_names = [ + (get_vector_db_prefix(instance_id), 768, False) for instance_id in instance_ids + ] + VespaIndex.create_indices(schema_names) + + +def start_redis( + instance_num: int, + register_process: Callable[[subprocess.Popen], None], +) -> int: + """Start a Redis instance for a specific deployment.""" + print(f"Starting Redis for instance {instance_num}...") + + redis_port = get_random_port() + + # Create a Redis-specific compose file + redis_compose = { + "services": { + f"cache_{instance_num}": { + "image": "redis:7.4-alpine", + "ports": [f"{redis_port}:6379"], + "command": 'redis-server --save "" --appendonly no', + }, + }, + } + + temp_compose = Path(f"/tmp/docker-compose.redis.{instance_num}.yml") + with open(temp_compose, "w") as f: + yaml.dump(redis_compose, f) + + subprocess.run( + [ + "docker", + "compose", + "-f", + str(temp_compose), + "-p", + f"redis-danswer-{instance_num}", + "up", + "-d", + ], + check=True, + ) + + return redis_port + + +def launch_instance( + instance_num: int, + postgres_port: int, + vespa_port: int, + vespa_tenant_port: int, + register_process: Callable[[subprocess.Popen], None], +) -> DeploymentConfig: + """Launch a Docker Compose instance with custom ports.""" + api_port = get_random_port() + web_port = get_random_port() + nginx_port = get_random_port() + + # Start Redis for this instance + redis_port = start_redis(instance_num, register_process) + + try: + model_server_port = start_model_server( + instance_num, + postgres_port, + vespa_port, + vespa_tenant_port, + redis_port, # Pass instance-specific Redis port + register_process, + ) + setup_db(instance_num, postgres_port) + api_port = start_api_server( + instance_num, + model_server_port, + postgres_port, + vespa_port, + vespa_tenant_port, + redis_port, + register_process, + ) + start_background( + instance_num, + postgres_port, + vespa_port, + vespa_tenant_port, + redis_port, + register_process, + ) + except Exception as e: + print(f"Failed to start API server for instance {instance_num}: {e}") + raise + + return DeploymentConfig(instance_num, api_port, web_port, nginx_port) + + +def wait_for_instance( + ports: DeploymentConfig, max_attempts: int = 60, wait_seconds: int = 2 +) -> None: + """Wait for an instance to be healthy.""" + print(f"Waiting for instance {ports.instance_num} to be ready...") + + for attempt in range(1, max_attempts + 1): + try: + response = requests.get(f"http://localhost:{ports.api_port}/health") + if response.status_code == 200: + print( + f"Instance {ports.instance_num} is ready on port {ports.api_port}" + ) + return + raise ConnectionError( + f"Health check returned status {response.status_code}" + ) + except (requests.RequestException, ConnectionError): + if attempt == max_attempts: + raise TimeoutError( + f"Timeout waiting for instance {ports.instance_num} " + f"on port {ports.api_port}" + ) + print( + f"Waiting for instance {ports.instance_num} on port " + f" {ports.api_port}... ({attempt}/{max_attempts})" + ) + time.sleep(wait_seconds) + + +def cleanup_instance(instance_num: int) -> None: + """Cleanup a specific instance.""" + print(f"Cleaning up instance {instance_num}...") + temp_compose = Path(f"/tmp/docker-compose.dev.instance{instance_num}.yml") + + try: + subprocess.run( + [ + "docker", + "compose", + "-f", + str(temp_compose), + "-p", + f"danswer-stack-{instance_num}", + "down", + ], + check=True, + ) + print(f"Instance {instance_num} cleaned up successfully") + except subprocess.CalledProcessError: + print(f"Error cleaning up instance {instance_num}") + except FileNotFoundError: + print(f"No compose file found for instance {instance_num}") + finally: + # Clean up the temporary compose file if it exists + if temp_compose.exists(): + temp_compose.unlink() + print(f"Removed temporary compose file for instance {instance_num}") + + +def run_x_instances(num_instances: int) -> tuple[uuid.UUID, list[DeploymentConfig]]: + """Start x instances of the application and return their configurations.""" + run_id = uuid.uuid4() + instance_ids = list(range(1, num_instances + 1)) + _pids: list[int] = [] + + def register_process(process) -> None: + _pids.append(process.pid) + + def cleanup_all_instances() -> None: + """Cleanup all instances.""" + print("Cleaning up all instances...") + + # Stop the database services + subprocess.run( + [ + "docker", + "compose", + "-p", + get_shared_services_stack_name(run_id), + "-f", + "/tmp/docker-compose.minimal.yml", + "down", + ], + check=True, + ) + + # Stop all Redis instances + for compose_file in Path("/tmp").glob("docker-compose.redis.*.yml"): + instance_id = compose_file.stem.split(".")[ + -1 + ] # Extract instance number from filename + try: + subprocess.run( + [ + "docker", + "compose", + "-f", + str(compose_file), + "-p", + f"redis-danswer-{instance_id}", + "down", + ], + check=True, + ) + compose_file.unlink() # Remove the temporary compose file + except subprocess.CalledProcessError: + print(f"Error cleaning up Redis instance {instance_id}") + + for pid in _pids: + cleanup_pid(pid) + + # Register cleanup handler + atexit.register(cleanup_all_instances) + + # Start database services first + postgres_port, vespa_port, vespa_tenant_port = start_shared_services(run_id) + prepare_vespa(instance_ids) + + # Use ThreadPool to launch instances in parallel and collect results + with ThreadPool(processes=num_instances) as pool: + # Create list of arguments for each instance + launch_args = [ + (i, postgres_port, vespa_port, vespa_tenant_port, register_process) + for i in instance_ids + ] + + # Launch instances and get results + port_configs = pool.starmap(launch_instance, launch_args) + + # Wait for all instances to be healthy + for config in port_configs: + wait_for_instance(config) + + print("All instances launched!") + print("Database Services:") + print(f"Postgres port: {postgres_port}") + print(f"Vespa main port: {vespa_port}") + print(f"Vespa tenant port: {vespa_tenant_port}") + print("\nApplication Instances:") + for ports in port_configs: + print( + f"Instance {ports.instance_num}: " + f"API={ports.api_port}, Web={ports.web_port}, Nginx={ports.nginx_port}" + ) + + return run_id, port_configs + + +def main() -> None: + run_id, port_configs = run_x_instances(1) + + # Run pytest with the API server port set + api_port = port_configs[0].api_port # Use first instance's API port + try: + subprocess.run( + ["pytest", "tests/integration/openai_assistants_api"], + env={**os.environ, "API_SERVER_PORT": str(api_port)}, + cwd=str(BACKEND_DIR_PATH), + check=True, + ) + except subprocess.CalledProcessError as e: + print(f"Tests failed with exit code {e.returncode}") + sys.exit(e.returncode) + + time.sleep(5) + + +if __name__ == "__main__": + main() diff --git a/backend/tests/integration/run.py b/backend/tests/integration/run.py new file mode 100644 index 00000000000..9b1a66c2485 --- /dev/null +++ b/backend/tests/integration/run.py @@ -0,0 +1,126 @@ +import multiprocessing +import os +import subprocess +import sys +from dataclasses import dataclass +from pathlib import Path + +from tests.integration.introspection import list_all_tests +from tests.integration.kickoff import BACKEND_DIR_PATH +from tests.integration.kickoff import DeploymentConfig +from tests.integration.kickoff import run_x_instances + + +@dataclass +class TestResult: + test_name: str + success: bool + output: str + error: str | None = None + + +def run_single_test( + test_name: str, api_port: int, queue: multiprocessing.Queue +) -> None: + """Run a single test with the given API port.""" + try: + result = subprocess.run( + ["pytest", test_name, "-v"], + env={**os.environ, "API_SERVER_PORT": str(api_port)}, + cwd=str(BACKEND_DIR_PATH) / "tests" / "integration", + capture_output=True, + text=True, + ) + queue.put( + TestResult( + test_name=test_name, + success=result.returncode == 0, + output=result.stdout, + error=result.stderr if result.returncode != 0 else None, + ) + ) + except Exception as e: + queue.put( + TestResult( + test_name=test_name, + success=False, + output="", + error=str(e), + ) + ) + + +def run_deployment_and_test( + test_name: str, + ports: DeploymentConfig, + result_queue: multiprocessing.Queue, +) -> None: + """Run a test against an existing deployment.""" + try: + # Run the test + run_single_test(test_name, ports.api_port, result_queue) + except Exception as e: + result_queue.put( + TestResult( + test_name=test_name, + success=False, + output="", + error=str(e), + ) + ) + + +def main() -> None: + # Get all tests + tests = list_all_tests(Path(__file__).parent) + print(f"Found {len(tests)} tests to run") + + # Run only 2 tests for now + tests = tests[:2] + print(f"Running {len(tests)} tests") + + # Start all instances at once + run_id, port_configs = run_x_instances(len(tests)) + + # Create a queue for test results + result_queue: multiprocessing.Queue = multiprocessing.Queue() + + # Start all tests in parallel + processes = [] + for test, ports in zip(tests, port_configs): + p = multiprocessing.Process( + target=run_deployment_and_test, + args=(test, ports, result_queue), + ) + p.start() + processes.append(p) + + # Collect results + results: list[TestResult] = [] + for _ in range(len(tests)): + results.append(result_queue.get()) + + # Wait for all processes to finish + for p in processes: + p.join() + + # Print results + print("\nTest Results:") + failed = False + for result in results: + status = "✅ PASSED" if result.success else "❌ FAILED" + print(f"{status} - {result.test_name}") + if not result.success: + failed = True + print("Error output:") + print(result.error) + print("Test output:") + print(result.output) + print("-" * 80) + + if failed: + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/kickoff.py b/kickoff.py index a98e0b01690..d830e6822e1 100644 --- a/kickoff.py +++ b/kickoff.py @@ -5,6 +5,7 @@ import signal import socket import subprocess +import sys import time import uuid from collections.abc import Callable @@ -43,6 +44,10 @@ def cleanup_pid(pid: int) -> None: print(f"Process {pid} not found") +def get_shared_services_stack_name(run_id: uuid.UUID) -> str: + return f"base-danswer-{run_id}" + + def get_db_name(instance_num: int) -> str: """Get the database name for a given instance number.""" return f"danswer_{instance_num}" @@ -268,7 +273,7 @@ def start_background( register_process(process) -def start_shared_services() -> tuple[int, int, int]: +def start_shared_services(run_id: uuid.UUID) -> tuple[int, int, int]: """Start Postgres and Vespa using docker-compose. Returns (postgres_port, vespa_port, vespa_tenant_port) """ @@ -312,7 +317,7 @@ def start_shared_services() -> tuple[int, int, int]: "-f", str(temp_compose), "-p", - f"base-danswer-{uuid.uuid4()}", + get_shared_services_stack_name(run_id), "up", "-d", ], @@ -473,6 +478,8 @@ def cleanup_instance(instance_num: int) -> None: def main() -> None: + run_id = uuid.uuid4() + _PIDS: list[int] = [] def register_process(process) -> None: @@ -484,7 +491,15 @@ def cleanup_all_instances() -> None: # Stop the database services subprocess.run( - ["docker", "compose", "-f", "/tmp/docker-compose.minimal.yml", "down"], + [ + "docker", + "compose", + "-p", + get_shared_services_stack_name(run_id), + "-f", + "/tmp/docker-compose.minimal.yml", + "down", + ], check=True, ) @@ -516,8 +531,10 @@ def cleanup_all_instances() -> None: # Register cleanup handler atexit.register(cleanup_all_instances) - # Start database services first (now without Redis) - postgres_port, vespa_port, vespa_tenant_port = start_shared_services() + # Start database services first (without Redis, since it's + # (1) very lightweight and (2) a hassle to manage since + # it's used as a Celery broker) + postgres_port, vespa_port, vespa_tenant_port = start_shared_services(run_id) # Launch instances (Redis will be started per instance) port_configs: list[DeploymentConfig] = [] @@ -544,7 +561,20 @@ def cleanup_all_instances() -> None: f"API={ports.api_port}, Web={ports.web_port}, Nginx={ports.nginx_port}" ) - time.sleep(100) + # Run pytest with the API server port set + api_port = ports.api_port # Use first instance's API port + try: + subprocess.run( + ["pytest", "tests/integration/openai_assistants_api"], + env={**os.environ, "API_SERVER_PORT": str(api_port)}, + cwd=str(Path(__file__).parent / "backend"), + check=True, + ) + except subprocess.CalledProcessError as e: + print(f"Tests failed with exit code {e.returncode}") + sys.exit(e.returncode) + + time.sleep(5) if __name__ == "__main__":