Skip to content

Commit

Permalink
Full draft of first pass
Browse files Browse the repository at this point in the history
  • Loading branch information
Weves committed Dec 1, 2024
1 parent 1b9e3f6 commit 5ad4b12
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 78 deletions.
89 changes: 89 additions & 0 deletions .github/workflows/pr-integration-tests-parallel.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
name: Run Integration Tests v2
concurrency:
group: Run-Integration-Tests-${{ github.workflow }}-${{ github.head_ref || github.event.workflow_run.head_branch || github.run_id }}
cancel-in-progress: true

on:
merge_group:
pull_request:
branches:
- main
- 'release/**'

env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
CONFLUENCE_TEST_SPACE_URL: ${{ secrets.CONFLUENCE_TEST_SPACE_URL }}
CONFLUENCE_USER_NAME: ${{ secrets.CONFLUENCE_USER_NAME }}
CONFLUENCE_ACCESS_TOKEN: ${{ secrets.CONFLUENCE_ACCESS_TOKEN }}

jobs:
integration-tests:
# See https://runs-on.com/runners/linux/
runs-on: [runs-on,runner=8cpu-linux-x64,ram=16,"run-id=${{ github.run_id }}"]
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Login to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_TOKEN }}

- name: Build integration test Docker image
uses: ./.github/actions/custom-build-and-push
with:
context: ./backend
file: ./backend/tests/integration/Dockerfile
platforms: linux/amd64
tags: danswer/danswer-integration:test
push: false
load: true
cache-from: type=s3,prefix=cache/${{ github.repository }}/integration-tests/integration/,region=${{ env.RUNS_ON_AWS_REGION }},bucket=${{ env.RUNS_ON_S3_BUCKET_CACHE }}
cache-to: type=s3,prefix=cache/${{ github.repository }}/integration-tests/integration/,region=${{ env.RUNS_ON_AWS_REGION }},bucket=${{ env.RUNS_ON_S3_BUCKET_CACHE }},mode=max


- name: Run Standard Integration Tests
run: |
echo "Running integration tests..."
docker run danswer/danswer-integration:test PYTHONPATH=. python tests/integration/run.py
continue-on-error: true
id: run_tests

- name: Check test results
run: |
if [ ${{ steps.run_tests.outcome }} == 'failure' ]; then
echo "Integration tests failed. Exiting with error."
exit 1
else
echo "All integration tests passed successfully."
fi
# save before stopping the containers so the logs can be captured
# - name: Save Docker logs
# if: success() || failure()
# run: |
# cd deployment/docker_compose
# docker compose -f docker-compose.dev.yml -p danswer-stack logs > docker-compose.log
# mv docker-compose.log ${{ github.workspace }}/docker-compose.log

# - name: Stop Docker containers
# run: |
# cd deployment/docker_compose
# docker compose -f docker-compose.dev.yml -p danswer-stack down -v

# - name: Upload logs
# if: success() || failure()
# uses: actions/upload-artifact@v4
# with:
# name: docker-logs
# path: ${{ github.workspace }}/docker-compose.log

# - name: Stop Docker containers
# run: |
# cd deployment/docker_compose
# docker compose -f docker-compose.dev.yml -p danswer-stack down -v
51 changes: 20 additions & 31 deletions backend/danswer/document_index/vespa/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,30 +138,15 @@ def __init__(
self.http_client = get_vespa_http_client()

@classmethod
def _get_existing_schemas(cls) -> list[tuple[str, int, bool]]:
schema_url = f"{VESPA_APPLICATION_ENDPOINT}/application/v2/tenant/default/application/default/schema"
response = requests.get(schema_url)
response.raise_for_status()

indices: list[tuple[str, int, bool]] = []

# Add existing indices that we're not explicitly creating
for schema in response.json():
# Extract embedding dimension from schema content
for field in schema["fields"]:
if field["name"] == "embeddings":
dim = field["tensorType"]["dimensions"][0]["size"]
indices.append((schema["name"], dim, False))
logger.info(
f"Preserving existing index: {schema['name']} with dimension {dim}"
)
break

return indices

@classmethod
def create_indices(cls, indices: list[tuple[str, int, bool]]) -> None:
deploy_url = f"{VESPA_APPLICATION_ENDPOINT}/tenant/default/prepareandactivate"
def create_indices(
cls,
indices: list[tuple[str, int, bool]],
application_endpoint: str = VESPA_APPLICATION_ENDPOINT,
) -> None:
"""
Create indices in Vespa based on the passed in configuration(s).
"""
deploy_url = f"{application_endpoint}/tenant/default/prepareandactivate"
logger.notice(f"Deploying Vespa application package to {deploy_url}")

vespa_schema_path = os.path.join(
Expand Down Expand Up @@ -209,13 +194,18 @@ def create_indices(cls, indices: list[tuple[str, int, bool]]) -> None:

schema = add_ngrams_to_schema(schema) if needs_reindexing else schema
schema = schema.replace(TENANT_ID_PAT, "")
logger.info(
f"Creating index: {index_name} with embedding "
f"dimension: {index_embedding_dim}. Schema:\n\n {schema}"
)
zip_dict[f"schemas/{index_name}.sd"] = schema.encode("utf-8")

zip_file = in_memory_zip_from_file_bytes(zip_dict)

headers = {"Content-Type": "application/zip"}
response = requests.post(deploy_url, headers=headers, data=zip_file)
if response.status_code != 200:
logger.error(f"Failed to create Vespa indices: {response.text}")
raise RuntimeError(
f"Failed to prepare Vespa Danswer Index. Response: {response.text}"
)
Expand All @@ -231,6 +221,12 @@ def ensure_indices_exist(
)
return None

# Used in IT
# NOTE: this means that we can't switch embedding models
if self.preserve_existing_indices:
logger.info("Preserving existing indices")
return None

kv_store = get_kv_store()
primary_needs_reindexing = False
try:
Expand All @@ -246,13 +242,6 @@ def ensure_indices_exist(
(self.secondary_index_name, secondary_index_embedding_dim, False)
)

if self.preserve_existing_indices:
existing_indices = self._get_existing_schemas()
for index_info in existing_indices:
if index_info[0] in [self.index_name, self.secondary_index_name]:
continue
indices.append(index_info)

self.create_indices(indices)

@staticmethod
Expand Down
3 changes: 2 additions & 1 deletion backend/danswer/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ def setup_vespa(

logger.notice("Vespa setup complete.")
return True
except Exception:
except Exception as e:
logger.debug(f"Error creating Vespa indices: {e}")
logger.notice(
f"Vespa setup did not succeed. The Vespa service may not be ready yet. Retrying in {WAIT_SECONDS} seconds."
)
Expand Down
2 changes: 2 additions & 0 deletions backend/tests/integration/common_utils/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os

GUARANTEED_FRESH_SETUP = os.getenv("GUARANTEED_FRESH_SETUP") == "true"

API_SERVER_PROTOCOL = os.getenv("API_SERVER_PROTOCOL") or "http"
API_SERVER_HOST = os.getenv("API_SERVER_HOST") or "localhost"
API_SERVER_PORT = os.getenv("API_SERVER_PORT") or "8080"
Expand Down
6 changes: 6 additions & 0 deletions backend/tests/integration/common_utils/reset.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from danswer.setup import setup_postgres
from danswer.setup import setup_vespa
from danswer.utils.logger import setup_logger
from tests.integration.common_utils.constants import GUARANTEED_FRESH_SETUP

logger = setup_logger()

Expand Down Expand Up @@ -266,6 +267,11 @@ def reset_vespa_multitenant() -> None:


def reset_all() -> None:
# if we're guaranteed a fresh setup, we don't need to reset
# this happens when running the tests w/ the parallelized setup
if GUARANTEED_FRESH_SETUP:
return None

logger.info("Resetting Postgres...")
reset_postgres()
logger.info("Resetting Vespa...")
Expand Down
78 changes: 61 additions & 17 deletions backend/tests/integration/kickoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,23 @@
BACKEND_DIR_PATH = Path(__file__).parent.parent.parent
COMPOSE_DIR_PATH = BACKEND_DIR_PATH.parent / "deployment/docker_compose"

DEFAULT_EMBEDDING_DIMENSION = 768
DEFAULT_SCHEMA_NAME = "danswer_chunk_nomic_ai_nomic_embed_text_v1"


class DeploymentConfig(NamedTuple):
instance_num: int
api_port: int
web_port: int
nginx_port: int
redis_port: int


class SharedServicesConfig(NamedTuple):
run_id: uuid.UUID
postgres_port: int
vespa_port: int
vespa_tenant_port: int


def get_random_port() -> int:
Expand Down Expand Up @@ -157,6 +168,7 @@ def start_api_server(
"VESPA_TENANT_PORT": str(vespa_tenant_port),
"MODEL_SERVER_PORT": str(model_server_port),
"VECTOR_DB_INDEX_NAME_PREFIX__INTEGRATION_TEST_ONLY": vector_db_prefix,
"LOG_LEVEL": "debug",
}
)

Expand Down Expand Up @@ -205,6 +217,7 @@ def start_model_server(
"VESPA_HOST": "localhost",
"VESPA_PORT": str(vespa_port),
"VESPA_TENANT_PORT": str(vespa_tenant_port),
"LOG_LEVEL": "debug",
}
)

Expand Down Expand Up @@ -256,6 +269,7 @@ def start_background(
"VECTOR_DB_INDEX_NAME_PREFIX__INTEGRATION_TEST_ONLY": get_vector_db_prefix(
instance_num
),
"LOG_LEVEL": "debug",
}
)

Expand All @@ -274,7 +288,7 @@ def start_background(
register_process(process)


def start_shared_services(run_id: uuid.UUID) -> tuple[int, int, int]:
def start_shared_services(run_id: uuid.UUID) -> SharedServicesConfig:
"""Start Postgres and Vespa using docker-compose.
Returns (postgres_port, vespa_port, vespa_tenant_port)
"""
Expand Down Expand Up @@ -325,14 +339,30 @@ def start_shared_services(run_id: uuid.UUID) -> tuple[int, int, int]:
check=True,
)

return postgres_port, vespa_port, vespa_tenant_port
return SharedServicesConfig(run_id, postgres_port, vespa_port, vespa_tenant_port)


def prepare_vespa(instance_ids: list[int]) -> None:
def prepare_vespa(instance_ids: list[int], vespa_tenant_port: int) -> None:
schema_names = [
(get_vector_db_prefix(instance_id), 768, False) for instance_id in instance_ids
(
f"{get_vector_db_prefix(instance_id)}_{DEFAULT_SCHEMA_NAME}",
DEFAULT_EMBEDDING_DIMENSION,
False,
)
for instance_id in instance_ids
]
VespaIndex.create_indices(schema_names)
print(f"Creating indices: {schema_names}")
for _ in range(7):
try:
VespaIndex.create_indices(
schema_names, f"http://localhost:{vespa_tenant_port}/application/v2"
)
return
except Exception as e:
print(f"Error creating indices: {e}. Trying again in 5 seconds...")
time.sleep(5)

raise RuntimeError("Failed to create indices in Vespa")


def start_redis(
Expand Down Expand Up @@ -422,7 +452,7 @@ def launch_instance(
print(f"Failed to start API server for instance {instance_num}: {e}")
raise

return DeploymentConfig(instance_num, api_port, web_port, nginx_port)
return DeploymentConfig(instance_num, api_port, web_port, nginx_port, redis_port)


def wait_for_instance(
Expand Down Expand Up @@ -485,7 +515,9 @@ def cleanup_instance(instance_num: int) -> None:
print(f"Removed temporary compose file for instance {instance_num}")


def run_x_instances(num_instances: int) -> tuple[uuid.UUID, list[DeploymentConfig]]:
def run_x_instances(
num_instances: int,
) -> tuple[SharedServicesConfig, list[DeploymentConfig]]:
"""Start x instances of the application and return their configurations."""
run_id = uuid.uuid4()
instance_ids = list(range(1, num_instances + 1))
Expand Down Expand Up @@ -541,41 +573,53 @@ def cleanup_all_instances() -> None:
atexit.register(cleanup_all_instances)

# Start database services first
postgres_port, vespa_port, vespa_tenant_port = start_shared_services(run_id)
prepare_vespa(instance_ids)
print("Starting shared services...")
shared_services_config = start_shared_services(run_id)

# create documents
print("Creating indices in Vespa...")
prepare_vespa(instance_ids, shared_services_config.vespa_tenant_port)

# Use ThreadPool to launch instances in parallel and collect results
print("Launching instances...")
with ThreadPool(processes=num_instances) as pool:
# Create list of arguments for each instance
launch_args = [
(i, postgres_port, vespa_port, vespa_tenant_port, register_process)
(
i,
shared_services_config.postgres_port,
shared_services_config.vespa_port,
shared_services_config.vespa_tenant_port,
register_process,
)
for i in instance_ids
]

# Launch instances and get results
port_configs = pool.starmap(launch_instance, launch_args)

# Wait for all instances to be healthy
for config in port_configs:
wait_for_instance(config)
print("Waiting for instances to be healthy...")
with ThreadPool(processes=len(port_configs)) as pool:
pool.map(wait_for_instance, port_configs)

print("All instances launched!")
print("Database Services:")
print(f"Postgres port: {postgres_port}")
print(f"Vespa main port: {vespa_port}")
print(f"Vespa tenant port: {vespa_tenant_port}")
print(f"Postgres port: {shared_services_config.postgres_port}")
print(f"Vespa main port: {shared_services_config.vespa_port}")
print(f"Vespa tenant port: {shared_services_config.vespa_tenant_port}")
print("\nApplication Instances:")
for ports in port_configs:
print(
f"Instance {ports.instance_num}: "
f"API={ports.api_port}, Web={ports.web_port}, Nginx={ports.nginx_port}"
)

return run_id, port_configs
return shared_services_config, port_configs


def main() -> None:
run_id, port_configs = run_x_instances(1)
shared_services_config, port_configs = run_x_instances(1)

# Run pytest with the API server port set
api_port = port_configs[0].api_port # Use first instance's API port
Expand Down
Loading

0 comments on commit 5ad4b12

Please sign in to comment.