diff --git a/.devcontainer/bin/dev-milvus-start b/.devcontainer/bin/dev-milvus-start new file mode 100755 index 0000000000..be1d86ae42 --- /dev/null +++ b/.devcontainer/bin/dev-milvus-start @@ -0,0 +1,25 @@ +#!/bin/bash +# SPDX-FileCopyrightText: Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +COMPOSE_FILE="${MORPHEUS_ROOT}/.devcontainer/docker-compose.yml" + +docker compose -f $COMPOSE_FILE up -d milvus-standalone + +export MILVUS_HOST=$(docker compose -f $COMPOSE_FILE exec milvus-standalone hostname -i) + +echo $MILVUS_HOST diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index f9fdf0d858..463e6c175e 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -52,6 +52,47 @@ services: depends_on: - zookeeper + milvus-etcd: + container_name: morpheus-milvus-etcd + image: quay.io/coreos/etcd:v3.5.0 + environment: + - ETCD_AUTO_COMPACTION_MODE=revision + - ETCD_AUTO_COMPACTION_RETENTION=1000 + - ETCD_QUOTA_BACKEND_BYTES=4294967296 + volumes: + - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd + command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd + + milvus-minio: + container_name: morpheus-milvus-minio + image: minio/minio:RELEASE.2020-12-03T00-03-10Z + environment: + MINIO_ACCESS_KEY: minioadmin + MINIO_SECRET_KEY: minioadmin + volumes: + - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data + command: minio server /minio_data + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 30s + timeout: 20s + retries: 3 + + milvus-standalone: + container_name: morpheus-milvus-standalone + image: milvusdb/milvus:2.3-latest + command: ["milvus", "run", "standalone"] + environment: + ETCD_ENDPOINTS: milvus-etcd:2379 + MINIO_ADDRESS: milvus-minio:9000 + volumes: + - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus + ports: + - "19530:19530" + depends_on: + - milvus-etcd + - milvus-minio + networks: default: external: diff --git a/examples/llm/common/utils.py b/examples/llm/common/utils.py index 1779bb6c88..c7d5d654c9 100644 --- a/examples/llm/common/utils.py +++ b/examples/llm/common/utils.py @@ -21,8 +21,7 @@ from morpheus.llm.services.nemo_llm_service import NeMoLLMService from morpheus.llm.services.openai_chat_service import OpenAIChatService from morpheus.service.vdb.milvus_client import DATA_TYPE_MAP -from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService -from morpheus.service.vdb.utils import VectorDBServiceFactory +from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBServiceProvider logger = logging.getLogger(__name__) @@ -109,9 +108,7 @@ def build_default_milvus_config(embedding_size: int): return milvus_resource_kwargs -def build_milvus_service(embedding_size: int, uri: str = "http://localhost:19530"): +def build_milvus_service(embedding_size: int, uri: str = "http://172.18.0.4:19530"): default_service = build_default_milvus_config(embedding_size) - - vdb_service: MilvusVectorDBService = VectorDBServiceFactory.create_instance("milvus", uri=uri, **default_service) - - return vdb_service + milvus_provider = MilvusVectorDBServiceProvider(uri=uri, **default_service) + return milvus_provider.create() diff --git a/examples/llm/rag/persistant_pipeline.py b/examples/llm/rag/persistant_pipeline.py index 7f5cc4f756..dda4656b47 100644 --- a/examples/llm/rag/persistant_pipeline.py +++ b/examples/llm/rag/persistant_pipeline.py @@ -194,4 +194,6 @@ def pipeline(num_threads, pipeline_batch_size, model_max_batch_size, embedding_s # Run the pipeline pipe.run() + vdb_service.close() + return start_time diff --git a/examples/llm/rag/standalone_pipeline.py b/examples/llm/rag/standalone_pipeline.py index 5d3cd01c58..c9a5173500 100644 --- a/examples/llm/rag/standalone_pipeline.py +++ b/examples/llm/rag/standalone_pipeline.py @@ -32,6 +32,7 @@ from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage from morpheus.utils.concat_df import concat_dataframes +from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService from ..common.utils import build_huggingface_embeddings from ..common.utils import build_llm_service @@ -40,7 +41,7 @@ logger = logging.getLogger(__name__) -def _build_engine(model_name: str, vdb_resource_name: str, llm_service: str, embedding_size: int): +def _build_engine(milvus_service: MilvusVectorDBService, model_name: str, vdb_resource_name: str, llm_service: str, embedding_size: int): engine = LLMEngine() @@ -55,7 +56,6 @@ def _build_engine(model_name: str, vdb_resource_name: str, llm_service: str, emb Please answer the following question: \n{{ query }}""" - vector_service = build_milvus_service(embedding_size) embeddings = build_huggingface_embeddings("sentence-transformers/all-MiniLM-L6-v2", model_kwargs={'device': 'cuda'}, encode_kwargs={'batch_size': 100}) @@ -69,7 +69,7 @@ async def calc_embeddings(texts: list[str]) -> list[list[float]]: engine.add_node("rag", inputs=["/extracter"], node=RAGNode(prompt=prompt, - vdb_service=vector_service.load_resource(vdb_resource_name), + vdb_service=milvus_service.load_resource(vdb_resource_name), embedding=calc_embeddings, llm_client=llm_service)) @@ -110,9 +110,12 @@ def standalone(num_threads, pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions')) + milvus_service = build_milvus_service(embedding_size) + pipe.add_stage( LLMEngineStage(config, - engine=_build_engine(model_name=model_name, + engine=_build_engine(milvus_service=milvus_service, + model_name=model_name, vdb_resource_name=vdb_resource_name, llm_service=llm_service, embedding_size=embedding_size))) @@ -125,6 +128,8 @@ def standalone(num_threads, pipe.run() + milvus_service.close() + messages = sink.get_messages() responses = concat_dataframes(messages) logger.info("Pipeline complete. Received %s responses", len(responses)) diff --git a/morpheus/modules/output/write_to_vector_db.py b/morpheus/modules/output/write_to_vector_db.py index c141aef7c6..7cc06abb68 100644 --- a/morpheus/modules/output/write_to_vector_db.py +++ b/morpheus/modules/output/write_to_vector_db.py @@ -29,8 +29,7 @@ from morpheus.messages import MultiResponseMessage from morpheus.modules.schemas.write_to_vector_db_schema import WriteToVDBSchema from morpheus.service.vdb.milvus_client import DATA_TYPE_MAP -from morpheus.service.vdb.utils import VectorDBServiceFactory -from morpheus.service.vdb.vector_db_service import VectorDBService +from morpheus.service.vdb.vector_db_service import VectorDbServiceProvider from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE from morpheus.utils.module_ids import WRITE_TO_VECTOR_DB from morpheus.utils.module_utils import ModuleLoaderFactory @@ -91,12 +90,10 @@ def _write_to_vector_db(builder: mrc.Builder): The `module_config` should contain: - 'embedding_column_name': str, the name of the column containing embeddings (default is "embedding"). - 'recreate': bool, whether to recreate the resource if it already exists (default is False). - - 'service': str, the name of the service or a serialized instance of VectorDBService. - - 'is_service_serialized': bool, whether the provided service is serialized (default is False). + - 'service': instance of VectorDbServiceProvider. - 'default_resource_name': str, the name of the collection resource (must not be None or empty). - 'resource_kwargs': dict, additional keyword arguments for resource creation. - 'resource_schemas': dict, additional keyword arguments for resource creation. - - 'service_kwargs': dict, additional keyword arguments for VectorDBService creation. - 'batch_size': int, accumulates messages until reaching the specified batch size for writing to VDB. - 'write_time_interval': float, specifies the time interval (in seconds) for writing messages, or writing messages when the accumulated batch size is reached. @@ -122,19 +119,16 @@ def _write_to_vector_db(builder: mrc.Builder): embedding_column_name = write_to_vdb_config.embedding_column_name recreate = write_to_vdb_config.recreate - service = write_to_vdb_config.service - is_service_serialized = write_to_vdb_config.is_service_serialized + service_provider = write_to_vdb_config.service_provider default_resource_name = write_to_vdb_config.default_resource_name resource_kwargs = write_to_vdb_config.resource_kwargs resource_schemas = write_to_vdb_config.resource_schemas - service_kwargs = write_to_vdb_config.service_kwargs batch_size = write_to_vdb_config.batch_size write_time_interval = write_to_vdb_config.write_time_interval - # Check if service is serialized and convert if needed - # pylint: disable=not-a-mapping - service: VectorDBService = (pickle.loads(bytes(service, "latin1")) if is_service_serialized else - VectorDBServiceFactory.create_instance(service_name=service, **service_kwargs)) + service_provider: VectorDbServiceProvider = pickle.loads(bytes(service_provider, "latin1")) + + service = service_provider.create() preprocess_vdb_resources(service, recreate, resource_schemas) @@ -148,7 +142,8 @@ def on_completed(): try: if accum_stats.data: merged_df = cudf.concat(accum_stats.data) - service.insert_dataframe(name=key, df=merged_df) + collection = service.load_resource(name=key) + collection.insert_dataframe(df=merged_df) final_df_references.append(accum_stats.data) except Exception as e: logger.error("Unable to upload dataframe entries to vector database: %s", e) @@ -213,7 +208,8 @@ def on_data(msg: typing.Union[ControlMessage, MultiResponseMessage, MultiMessage merged_df = cudf.concat(accum_stats.data) # pylint: disable=not-a-mapping - service.insert_dataframe(name=key, df=merged_df, **resource_kwargs) + collection = service.load_resource(name=key) + collection.insert_dataframe(df=merged_df, **resource_kwargs) # Reset accumulator stats accum_stats.data.clear() accum_stats.last_insert_time = current_time diff --git a/morpheus/modules/schemas/write_to_vector_db_schema.py b/morpheus/modules/schemas/write_to_vector_db_schema.py index 8000dabfbc..ee6b183408 100644 --- a/morpheus/modules/schemas/write_to_vector_db_schema.py +++ b/morpheus/modules/schemas/write_to_vector_db_schema.py @@ -30,19 +30,17 @@ class WriteToVDBSchema(BaseModel): embedding_column_name: str = "embedding" recreate: bool = False - service: str = Field(default_factory=None) - is_service_serialized: bool = False + service_provider: str = Field(default_factory=None) default_resource_name: str = Field(default_factory=None) resource_schemas: dict = Field(default_factory=dict) resource_kwargs: dict = Field(default_factory=dict) - service_kwargs: dict = Field(default_factory=dict) batch_size: int = 1024 write_time_interval: float = 1.0 - @validator('service', pre=True) + @validator('service_provider', pre=True) def validate_service(cls, to_validate): # pylint: disable=no-self-argument if not to_validate: - raise ValueError("Service must be a service name or a serialized instance of VectorDBService") + raise ValueError("Service must be an instance of VectorDbServiceProvider") return to_validate @validator('default_resource_name', pre=True) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index 09c68f15cd..ce88325566 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -15,10 +15,9 @@ import copy import json import logging -import threading -import time import typing from functools import wraps +from typing import Callable import cudf @@ -26,7 +25,9 @@ from morpheus.io.utils import truncate_string_cols_by_bytes from morpheus.service.vdb.vector_db_service import VectorDBResourceService from morpheus.service.vdb.vector_db_service import VectorDBService +from morpheus.service.vdb.vector_db_service import VectorDbServiceProvider from morpheus.utils.type_aliases import DataFrameType +from morpheus.utils.debounce import DebounceQueue, DebounceRunner logger = logging.getLogger(__name__) @@ -191,33 +192,6 @@ def from_dict(field: dict) -> "pymilvus.FieldSchema": return pymilvus.FieldSchema.construct_from_dict(field) -def with_collection_lock(func: typing.Callable) -> typing.Callable: - """ - A decorator to synchronize access to a collection with a lock. This decorator ensures that operations on a - specific collection within the Milvus Vector Database are synchronized by acquiring and - releasing a collection-specific lock. - - Parameters - ---------- - func : Callable - The function to be wrapped with the lock. - - Returns - ------- - Callable - The wrapped function with the lock acquisition logic. - """ - - @wraps(func) - def wrapper(self, name, *args, **kwargs): - collection_lock = MilvusVectorDBService.get_collection_lock(name) - with collection_lock: - result = func(self, name, *args, **kwargs) - return result - - return wrapper - - class MilvusVectorDBResourceService(VectorDBResourceService): """ Represents a service for managing resources in a Milvus Vector Database. @@ -232,14 +206,12 @@ class MilvusVectorDBResourceService(VectorDBResourceService): When true, truncate strings values that are longer than the max length of the field """ - def __init__(self, name: str, client: "MilvusClient", truncate_long_strings: bool = False) -> None: - if IMPORT_EXCEPTION is not None: - raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION - + def __init__(self, name: str, client: "MilvusClient", client_flush: Callable[[str], None], truncate_long_strings: bool = False) -> None: super().__init__() self._name = name self._client = client + self._client_flush = client_flush self._collection = self._client.get_collection(collection_name=self._name) self._fields: list[pymilvus.FieldSchema] = self._collection.schema.fields @@ -290,7 +262,7 @@ def insert(self, data: list[list] | list[dict], **kwargs: dict[str, typing.Any]) Returns response content as a dictionary. """ result = self._collection.insert(data, **kwargs) - self._collection.flush() + self._client_flush(self._name) return self._insert_result_to_dict(result=result) @@ -344,7 +316,7 @@ def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) - # Note: dataframe columns has to be in the order of collection schema fields.s result = self._collection.insert(data=collection_df, **kwargs) - self._collection.flush() + self._client_flush(self._collection.name) return self._insert_result_to_dict(result=result) @@ -461,7 +433,7 @@ def update(self, data: list[typing.Any], **kwargs: dict[str, typing.Any]) -> dic result = self._client.upsert(collection_name=self._name, entities=data, **kwargs) - self._collection.flush() + self._client_flush(self._collection.name) return self._update_delete_result_to_dict(result=result) @@ -597,37 +569,23 @@ class MilvusVectorDBService(VectorDBService): Parameters ---------- - host : str - The hostname or IP address of the Milvus server. - port : str - The port number for connecting to the Milvus server. - alias : str, optional - Alias for the Milvus connection, by default "default". + client : MilvusClient + The underlying milvus client for this service instance truncate_long_strings : bool, optional When true, truncate strings values that are longer than the max length of the field - **kwargs : dict - Additional keyword arguments specific to the Milvus connection configuration. """ - _collection_locks = {} - _cleanup_interval = 600 # 10mins - _last_cleanup_time = time.time() - - def __init__(self, - uri: str, - user: str = "", - password: str = "", - db_name: str = "", - token: str = "", - truncate_long_strings: bool = False, - **kwargs: dict[str, typing.Any]): - + def __init__(self, client: MilvusClient, truncate_long_strings: bool): self._truncate_long_strings = truncate_long_strings - self._client = MilvusClient(uri=uri, user=user, password=password, db_name=db_name, token=token, **kwargs) + self._client = client + self._flush_queue = DebounceQueue(self._flush) + self._flush_runner = DebounceRunner(self._flush_queue) + self._flush_runner.start() def load_resource(self, name: str, **kwargs: dict[str, typing.Any]) -> MilvusVectorDBResourceService: return MilvusVectorDBResourceService(name=name, client=self._client, + client_flush=self._flush_queue.queue, truncate_long_strings=self._truncate_long_strings, **kwargs) @@ -664,7 +622,7 @@ def _create_schema_field(self, field_conf: dict) -> "pymilvus.FieldSchema": return field_schema - @with_collection_lock + def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing.Any]): """ Create a collection in the Milvus vector database with the specified name and configuration. This method @@ -801,232 +759,6 @@ def create_from_dataframe(self, self.create(name=name, overwrite=overwrite, **create_kwargs) - @with_collection_lock - def insert(self, name: str, data: list[list] | list[dict], **kwargs: dict[str, - typing.Any]) -> dict[str, typing.Any]: - """ - Insert a collection specific data in the Milvus vector database. - - Parameters - ---------- - name : str - Name of the collection to be inserted. - data : list[list] | list[dict] - Data to be inserted in the collection. - **kwargs : dict[str, typing.Any] - Additional keyword arguments containing collection configuration. - - Returns - ------- - dict - Returns response content as a dictionary. - - Raises - ------ - RuntimeError - If the collection not exists exists. - """ - - resource = self.load_resource(name) - return resource.insert(data, **kwargs) - - @with_collection_lock - def insert_dataframe(self, name: str, df: DataFrameType, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: - """ - Converts dataframe to rows and insert to a collection in the Milvus vector database. - - Parameters - ---------- - name : str - Name of the collection to be inserted. - df : DataFrameType - Dataframe to be inserted in the collection. - **kwargs : dict[str, typing.Any] - Additional keyword arguments containing collection configuration. - - Returns - ------- - dict - Returns response content as a dictionary. - - Raises - ------ - RuntimeError - If the collection not exists exists. - """ - resource = self.load_resource(name) - - return resource.insert_dataframe(df=df, **kwargs) - - @with_collection_lock - def query(self, name: str, query: str = None, **kwargs: dict[str, typing.Any]) -> typing.Any: - """ - Query data in a collection in the Milvus vector database. - - This method performs a search operation in the specified collection/partition in the Milvus vector database. - - Parameters - ---------- - name : str - Name of the collection to search within. - query : str - The search query, which can be a filter expression. - **kwargs : dict - Additional keyword arguments for the search operation. - - Returns - ------- - typing.Any - The search result, which can vary depending on the query and options. - """ - - resource = self.load_resource(name) - - return resource.query(query, **kwargs) - - async def similarity_search(self, name: str, **kwargs: dict[str, typing.Any]) -> list[dict]: - """ - Perform a similarity search within the collection. - - Parameters - ---------- - name : str - Name of the collection. - **kwargs : dict[str, typing.Any] - Extra keyword arguments specific to the vector database implementation. - - Returns - ------- - list[dict] - Returns a list of dictionaries representing the results of the similarity search. - """ - - resource = self.load_resource(name) - - return resource.similarity_search(**kwargs) - - @with_collection_lock - def update(self, name: str, data: list[typing.Any], **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: - """ - Update data in the vector database. - - Parameters - ---------- - name : str - Name of the collection. - data : list[typing.Any] - Data to be updated in the collection. - **kwargs : dict[str, typing.Any] - Extra keyword arguments specific to upsert operation. - - Returns - ------- - dict[str, typing.Any] - Returns result of the updated operation stats. - """ - - if not isinstance(data, list): - raise RuntimeError("Data is not of type list.") - - resource = self.load_resource(name) - - return resource.update(data=data, **kwargs) - - @with_collection_lock - def delete_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> typing.Any: - """ - Delete vectors by keys from the collection. - - Parameters - ---------- - name : str - Name of the collection. - keys : int | str | list - Primary keys to delete vectors. - **kwargs : dict[str, typing.Any] - Extra keyword arguments specific to the vector database implementation. - - Returns - ------- - typing.Any - Returns result of the given keys that are delete from the collection. - """ - - resource = self.load_resource(name) - - return resource.delete_by_keys(keys=keys, **kwargs) - - @with_collection_lock - def delete(self, name: str, expr: str, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: - """ - Delete vectors from the collection using expressions. - - Parameters - ---------- - name : str - Name of the collection. - expr : str - Delete expression. - **kwargs : dict[str, typing.Any] - Extra keyword arguments specific to the vector database implementation. - - Returns - ------- - dict[str, typing.Any] - Returns result of the given keys that are delete from the collection. - """ - - resource = self.load_resource(name) - result = resource.delete(expr=expr, **kwargs) - - return result - - @with_collection_lock - def retrieve_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> list[typing.Any]: - """ - Retrieve the inserted vectors using their primary keys from the Collection. - - Parameters - ---------- - name : str - Name of the collection. - keys : int | str | list - Primary keys to get vectors for. Depending on pk_field type it can be int or str - or a list of either. - **kwargs : dict[str, typing.Any] - Additional keyword arguments for the retrieval operation. - - Returns - ------- - list[typing.Any] - Returns result rows of the given keys from the collection. - """ - - resource = self.load_resource(name) - - result = resource.retrieve_by_keys(keys=keys, **kwargs) - - return result - - def count(self, name: str, **kwargs: dict[str, typing.Any]) -> int: - """ - Returns number of rows/entities in the given collection. - - Parameters - ---------- - name : str - Name of the collection. - **kwargs : dict[str, typing.Any] - Additional keyword arguments for the count operation. - - Returns - ------- - int - Returns number of entities in the collection. - """ - resource = self.load_resource(name) - - return resource.count(**kwargs) def drop(self, name: str, **kwargs: dict[str, typing.Any]) -> None: """ @@ -1085,26 +817,6 @@ def drop(self, name: str, **kwargs: dict[str, typing.Any]) -> None: raise ValueError( "Mandatory arguments 'field_name' and 'index_name' are required when resource='index'") - def describe(self, name: str, **kwargs: dict[str, typing.Any]) -> dict: - """ - Describe the collection in the vector database. - - Parameters - ---------- - name : str - Name of the collection. - **kwargs : dict[str, typing.Any] - Additional keyword arguments specific to the Milvus vector database. - - Returns - ------- - dict - Returns collection information. - """ - - resource = self.load_resource(name) - - return resource.describe(**kwargs) def release_resource(self, name: str) -> None: """ @@ -1118,6 +830,17 @@ def release_resource(self, name: str) -> None: self._client.release_collection(collection_name=name) + def flush(self): + self._flush_queue.flush() + + def _flush(self, collection_names: typing.List[str]): + for collection_name in collection_names: + try: + self._client.flush(collection_name, timeout=1) + except: + # no way to handle exceptions for records already added. + pass + def close(self) -> None: """ Close the connection to the Milvus vector database. @@ -1125,37 +848,55 @@ def close(self) -> None: This method disconnects from the Milvus vector database by removing the connection. """ + self._flush_runner.stop() self._client.close() - @classmethod - def get_collection_lock(cls, name: str) -> threading.Lock: - """ - Get a lock for a given collection name. - - Parameters - ---------- - name : str - Name of the collection for which to acquire the lock. +class MilvusVectorDBServiceProvider(VectorDbServiceProvider): - Returns - ------- - threading.Lock - A thread lock specific to the given collection name. - """ - - current_time = time.time() - - if name not in cls._collection_locks: - cls._collection_locks[name] = {"lock": threading.Lock(), "last_used": current_time} - else: - cls._collection_locks[name]["last_used"] = current_time + """ + Service class for Milvus Vector Database implementation. This class provides functions for interacting + with a Milvus vector database. - if (current_time - cls._last_cleanup_time) >= cls._cleanup_interval: - for lock_name, lock_info in cls._collection_locks.copy().items(): - last_used = lock_info["last_used"] - if current_time - last_used >= cls._cleanup_interval: - logger.debug("Cleaning up lock for collection: %s", lock_name) - del cls._collection_locks[lock_name] - cls._last_cleanup_time = current_time + Parameters + ---------- + host : str + The hostname or IP address of the Milvus server. + port : str + The port number for connecting to the Milvus server. + alias : str, optional + Alias for the Milvus connection, by default "default". + truncate_long_strings : bool, optional + When true, truncate strings values that are longer than the max length of the field + **kwargs : dict + Additional keyword arguments specific to the Milvus connection configuration. + """ + def __init__(self, + uri: str, + user: str = "", + password: str = "", + db_name: str = "", + token: str = "", + truncate_long_strings: bool = False, + **kwargs: dict[str, typing.Any]): + + if IMPORT_EXCEPTION is not None: + raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION - return cls._collection_locks[name]["lock"] + self._uri = uri + self._user = user + self._password = password + self._db_name = db_name + self._token = token + self._truncate_long_strings = truncate_long_strings + self._kwargs = kwargs + + def create(self) -> MilvusVectorDBService: + client = self._client = MilvusClient( + uri=self._uri, + user=self._user, + password=self._password, + db_name=self._db_name, + token=self._token, + **self._kwargs) + + return MilvusVectorDBService(client, self._truncate_long_strings) diff --git a/morpheus/service/vdb/utils.py b/morpheus/service/vdb/utils.py index 44d8fae889..9792154797 100644 --- a/morpheus/service/vdb/utils.py +++ b/morpheus/service/vdb/utils.py @@ -74,13 +74,13 @@ def validate_service(service_name: str): return service_name -class VectorDBServiceFactory: +class VectorDBServiceProviderFactory: @typing.overload @classmethod def create_instance( cls, service_name: typing.Literal["milvus"], *args: typing.Any, - **kwargs: dict[str, typing.Any]) -> "morpheus.service.vdb.milvus_vector_db_service.MilvusVectorDBService": + **kwargs: dict[str, typing.Any]) -> "morpheus.service.vdb.milvus_vector_db_service.MilvusVectorDBServiceProvider": pass @classmethod @@ -111,7 +111,7 @@ def create_instance(cls, service_name: str, *args: typing.Any, **kwargs: dict[st """ module_name = f"morpheus.service.vdb.{service_name}_vector_db_service" module = importlib.import_module(module_name) - class_name = f"{service_name.capitalize()}VectorDBService" + class_name = f"{service_name.capitalize()}VectorDBServiceProvider" class_ = getattr(module, class_name) instance = class_(*args, **kwargs) return instance diff --git a/morpheus/service/vdb/vector_db_service.py b/morpheus/service/vdb/vector_db_service.py index 8f2d346f55..26bb6d64f5 100644 --- a/morpheus/service/vdb/vector_db_service.py +++ b/morpheus/service/vdb/vector_db_service.py @@ -218,100 +218,6 @@ class VectorDBService(ABC): def load_resource(self, name: str, **kwargs: dict[str, typing.Any]) -> VectorDBResourceService: pass - @abstractmethod - def insert(self, name: str, data: list[list] | list[dict], **kwargs: dict[str, typing.Any]) -> dict: - """ - Insert data into the vector database. - - Parameters - ---------- - name : str - Name of the resource. - data : list[list] | list[dict] - Data to be inserted into the resource. - **kwargs : dict[str, typing.Any] - Extra keyword arguments specific to the vector database implementation. - - Returns - ------- - dict - Returns response content as a dictionary. - """ - - pass - - @abstractmethod - def insert_dataframe(self, - name: str, - df: typing.Union[cudf.DataFrame, pd.DataFrame], - **kwargs: dict[str, typing.Any]) -> dict: - """ - Converts dataframe to rows and insert into the vector database resource. - - Parameters - ---------- - name : str - Name of the resource to be inserted. - df : typing.Union[cudf.DataFrame, pd.DataFrame] - Dataframe to be inserted. - **kwargs : dict[str, typing.Any] - Additional keyword arguments containing collection configuration. - - Returns - ------- - dict - Returns response content as a dictionary. - - Raises - ------ - RuntimeError - If the resource not exists exists. - """ - pass - - @abstractmethod - def query(self, name: str, query: str, **kwargs: dict[str, typing.Any]) -> typing.Any: - """ - Query a resource in the vector database. - - Parameters - ---------- - name : str - Name of the resource. - query : str - Query to execute on the given resource. - **kwargs : dict[str, typing.Any] - Extra keyword arguments specific to the vector database implementation. - - Returns - ------- - typing.Any - Returns search results. - """ - - pass - - @abstractmethod - async def similarity_search(self, name: str, **kwargs: dict[str, typing.Any]) -> list[list[dict]]: - """ - Perform a similarity search within the vector database. - - Parameters - ---------- - name : str - Name of the resource. - **kwargs : dict[str, typing.Any] - Extra keyword arguments specific to the vector database implementation. - - Returns - ------- - list[list[dict]] - Returns a list of lists, where each inner list contains dictionaries representing the results of the - similarity search. - """ - - pass - @abstractmethod def drop(self, name: str, **kwargs: dict[str, typing.Any]) -> None: """ @@ -327,50 +233,6 @@ def drop(self, name: str, **kwargs: dict[str, typing.Any]) -> None: pass - @abstractmethod - def update(self, name: str, data: list[typing.Any], **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: - """ - Update data in the vector database. - - Parameters - ---------- - name : str - Name of the resource. - data : list[typing.Any] - Data to be updated in the resource. - **kwargs : dict[str, typing.Any] - Extra keyword arguments specific to the vector database implementation. - - Returns - ------- - dict[str, typing.Any] - Returns result of the updated operation stats. - """ - - pass - - @abstractmethod - def delete(self, name: str, expr: str, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: - """ - Delete data in the vector database. - - Parameters - ---------- - name : str - Name of the resource. - expr : typing.Any - Delete expression. - **kwargs : dict[str, typing.Any] - Extra keyword arguments specific to the vector database implementation. - - Returns - ------- - dict[str, typing.Any] - Returns result of the delete operation stats. - """ - - pass - @abstractmethod def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing.Any]) -> None: """ @@ -410,26 +272,6 @@ def create_from_dataframe(self, """ pass - @abstractmethod - def describe(self, name: str, **kwargs: dict[str, typing.Any]) -> dict: - """ - Describe resource in the vector database. - - Parameters - ---------- - name : str - Name of the resource. - **kwargs : dict[str, typing.Any] - Extra keyword arguments specific to the vector database implementation. - - Returns - ------- - dict - Returns resource information. - """ - - pass - @abstractmethod def release_resource(self, name: str) -> None: """ @@ -486,82 +328,8 @@ def list_store_objects(self, **kwargs: dict[str, typing.Any]) -> list[str]: pass - # pylint: disable=unused-argument - def transform(self, data: typing.Any, **kwargs: dict[str, typing.Any]) -> typing.Any: - """ - Transform data according to the specific vector database implementation. - - Parameters - ---------- - data : typing.Any - Data to be updated in the resource. - **kwargs : dict[str, typing.Any] - Extra keyword arguments specific to the vector database implementation. - - Returns - ------- - typing.Any - Returns transformed data as per the implementation. - """ - return data +class VectorDbServiceProvider(ABC): @abstractmethod - def retrieve_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> list[typing.Any]: - """ - Retrieve the inserted vectors using keys from the resource. - - Parameters - ---------- - name : str - Name of the resource. - keys : typing.Any - Primary keys to get vectors. - **kwargs : dict[str, typing.Any] - Extra keyword arguments specific to the vector database implementation. - - Returns - ------- - list[typing.Any] - Returns rows of the given keys that exists in the resource. - """ - pass - - @abstractmethod - def delete_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> typing.Any: - """ - Delete vectors by keys from the resource. - - Parameters - ---------- - name : str - Name of the resource. - keys : int | str | list - Primary keys to delete vectors. - **kwargs : dict[str, typing.Any] - Extra keyword arguments specific to the vector database implementation. - - Returns - ------- - typing.Any - Returns vectors of the given keys that are delete from the resource. - """ - pass - - @abstractmethod - def count(self, name: str, **kwargs: dict[str, typing.Any]) -> int: - """ - Returns number of rows/entities in the given resource. - - Parameters - ---------- - name : str - Name of the resource. - **kwargs : dict[str, typing.Any] - Extra keyword arguments specific to the vector database implementation. - - Returns - ------- - int - Returns number of rows/entities in the given resource. - """ - pass + def create(self) -> VectorDBService: + pass \ No newline at end of file diff --git a/morpheus/stages/output/write_to_vector_db_stage.py b/morpheus/stages/output/write_to_vector_db_stage.py index 2b2fddc394..651999a5f0 100644 --- a/morpheus/stages/output/write_to_vector_db_stage.py +++ b/morpheus/stages/output/write_to_vector_db_stage.py @@ -25,7 +25,8 @@ from morpheus.modules.output.write_to_vector_db import WriteToVectorDBLoaderFactory from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage -from morpheus.service.vdb.vector_db_service import VectorDBService +from morpheus.service.vdb.vector_db_service import VectorDbServiceProvider +from morpheus.service.vdb.utils import VectorDBServiceProviderFactory from morpheus.utils.module_utils import ModuleLoader logger = logging.getLogger(__name__) @@ -55,18 +56,16 @@ class WriteToVectorDBStage(PassThruTypeMixin, SinglePortStage): write_time_interval : float Specifies the time interval (in seconds) for writing messages, or writing messages when the accumulated batch size is reached. - **service_kwargs : dict - Additional keyword arguments to pass when creating a VectorDBService instance. Raises ------ ValueError - If `service` is not a valid string (service name) or an instance of VectorDBService. + If `service` is not a valid string (service name) or an instance of VectorDbServiceProvider. """ def __init__(self, config: Config, - service: typing.Union[str, VectorDBService], + service: typing.Union[str, VectorDbServiceProvider], resource_name: str, embedding_column_name: str = "embedding", recreate: bool = False, @@ -80,21 +79,18 @@ def __init__(self, resource_kwargs = resource_kwargs if resource_kwargs is not None else {} resource_schemas = resource_schemas if resource_schemas is not None else {} - is_service_serialized = False - if isinstance(service, VectorDBService): - service = str(pickle.dumps(service), encoding="latin1") - is_service_serialized = True + + if isinstance(service, str): + service = VectorDBServiceProviderFactory.create_instance(service, **service_kwargs) module_config = { "batch_size": batch_size, "default_resource_name": resource_name, "embedding_column_name": embedding_column_name, - "is_service_serialized": is_service_serialized, "recreate": recreate, "resource_kwargs": resource_kwargs, "resource_schemas": resource_schemas, - "service_kwargs": service_kwargs, - "service": service, + "service_provider": str(pickle.dumps(service), encoding="latin1"), "write_time_interval": write_time_interval } diff --git a/morpheus/utils/debounce.py b/morpheus/utils/debounce.py new file mode 100644 index 0000000000..bb821b491a --- /dev/null +++ b/morpheus/utils/debounce.py @@ -0,0 +1,109 @@ +from datetime import datetime, timedelta +from threading import Thread, Lock +from typing import Callable, TypeVar, Generic + +T = TypeVar('T') + +class DebounceQueue(Generic[T]): + + _lock: Lock + _items: list[T] + _oldest_time: datetime + _latest_time: datetime + _callback: Callable[[list[T]], None] + _now: Callable[[],datetime] + _min_delay: timedelta + _max_delay: timedelta + + + def __init__(self, + callback: Callable[[list[T]], None], + now: Callable[[],datetime]=datetime.now, + min_delay: timedelta=timedelta(seconds=1), + max_delay: timedelta=timedelta(seconds=5) + ): + self._lock = Lock() + self._items = [] + self._oldest_time = None + self._latest_time = None + self._callback = callback + self._now = now + self._min_delay = min_delay + self._max_delay = max_delay + + + def queue(self, item: T): + with self._lock: + now = self._now() + if self._oldest_time is None: + self._oldest_time = now + self._latest_time = now + self._items.append(item) + + + def step(self): + with self._lock: + if self._oldest_time is None: + return # nothing has been queued. + + now = self._now() + + if (now - self._latest_time) > self._min_delay: + self._flush() + return + + if (now - self._oldest_time) > self._max_delay: + self._flush() + return + + + def flush(self): + with self._lock: + self._flush() + + + def _flush(self): + if self._oldest_time is None: + return # nothing has been queued. + + self._oldest_time = None + self._latest_time = None + self._callback(self._items) + self._items = [] + + +class DebounceRunner(): + + _cancelled: bool + _thread: Thread + _queue: DebounceQueue + _step_delay: float + + + def __init__(self, queue: DebounceQueue, step_delay: float = 0.1): + self._thread = Thread(target=self._run) + self._queue = queue + self._step_delay = step_delay + self._cancelled = False + + + def start(self): + self._thread.start() + + + def stop(self): + if self._thread.is_alive: + self._cancelled = True + self._thread.join() + self._queue.flush() + + + def __del__(self): + self.stop() + + + def _run(self): + import time + while not self._cancelled: + self._queue.step() + time.sleep(self._step_delay) diff --git a/tests/_utils/milvus.py b/tests/_utils/milvus.py index ac5e08b7e6..86536b2bb5 100644 --- a/tests/_utils/milvus.py +++ b/tests/_utils/milvus.py @@ -16,7 +16,7 @@ import cudf -from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService +from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBServiceProvider def populate_milvus(milvus_server_uri: str, @@ -24,7 +24,9 @@ def populate_milvus(milvus_server_uri: str, resource_kwargs: dict, df: cudf.DataFrame, overwrite: bool = False): - milvus_service = MilvusVectorDBService(uri=milvus_server_uri) + milvus_provider = MilvusVectorDBServiceProvider(uri=milvus_server_uri) + milvus_service = milvus_provider.create() milvus_service.create(collection_name, overwrite=overwrite, **resource_kwargs) resource_service = milvus_service.load_resource(name=collection_name) resource_service.insert_dataframe(name=collection_name, df=df, **resource_kwargs) + milvus_service.close() diff --git a/tests/benchmarks/test_bench_rag_standalone_pipeline.py b/tests/benchmarks/test_bench_rag_standalone_pipeline.py index b09d4b624d..0b361eedc9 100644 --- a/tests/benchmarks/test_bench_rag_standalone_pipeline.py +++ b/tests/benchmarks/test_bench_rag_standalone_pipeline.py @@ -38,6 +38,7 @@ from morpheus.stages.llm.llm_engine_stage import LLMEngineStage from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage +from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService EMBEDDING_SIZE = 384 QUESTION = "What are some new attacks discovered in the cyber security industry?" @@ -52,15 +53,14 @@ EXPECTED_RESPONSE = "Ransomware, Phishing, Malware, Denial of Service, SQL injection, and Password Attacks" -def _build_engine(llm_service_name: str, +def _build_engine(milvus_service: MilvusVectorDBService, + llm_service_name: str, model_name: str, - milvus_server_uri: str, collection_name: str, utils_mod: types.ModuleType): engine = LLMEngine() engine.add_node("extracter", node=ExtracterNode()) - vector_service = utils_mod.build_milvus_service(embedding_size=EMBEDDING_SIZE, uri=milvus_server_uri) embeddings = utils_mod.build_huggingface_embeddings("sentence-transformers/all-MiniLM-L6-v2", model_kwargs={'device': 'cuda'}, encode_kwargs={'batch_size': 100}) @@ -77,7 +77,7 @@ async def calc_embeddings(texts: list[str]) -> list[list[float]]: engine.add_node("rag", inputs=["/extracter"], node=RAGNode(prompt=PROMPT, - vdb_service=vector_service.load_resource(collection_name), + vdb_provider=milvus_service.load_resource(collection_name), embedding=calc_embeddings, llm_client=llm_service)) @@ -109,18 +109,23 @@ def _run_pipeline(config: Config, pipe.add_stage( DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=completion_task)) + + milvus_service = utils_mod.build_milvus_service(embedding_size=EMBEDDING_SIZE, uri=milvus_server_uri) pipe.add_stage( LLMEngineStage(config, - engine=_build_engine(llm_service_name=llm_service_name, + engine=_build_engine(milvus_service=milvus_service, + llm_service_name=llm_service_name, model_name=model_name, - milvus_server_uri=milvus_server_uri, collection_name=collection_name, utils_mod=utils_mod))) + pipe.add_stage(InMemorySinkStage(config)) pipe.run() + milvus_service.close() + @pytest.mark.milvus @pytest.mark.use_python diff --git a/tests/benchmarks/test_bench_vdb_upload_pipeline.py b/tests/benchmarks/test_bench_vdb_upload_pipeline.py index d54c536780..a95ca9283a 100644 --- a/tests/benchmarks/test_bench_vdb_upload_pipeline.py +++ b/tests/benchmarks/test_bench_vdb_upload_pipeline.py @@ -35,6 +35,7 @@ from morpheus.stages.output.write_to_vector_db_stage import WriteToVectorDBStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage +from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBServiceProvider EMBEDDING_SIZE = 384 MODEL_MAX_BATCH_SIZE = 64 @@ -81,8 +82,7 @@ def _run_pipeline(config: Config, resource_name=collection_name, resource_kwargs=utils_mod.build_milvus_config(embedding_size=EMBEDDING_SIZE), recreate=True, - service="milvus", - uri=milvus_server_uri)) + service_provider=MilvusVectorDBServiceProvider(uri=milvus_server_uri))) pipe.run() diff --git a/tests/llm/nodes/test_llm_retriever_node_pipe.py b/tests/llm/nodes/test_llm_retriever_node_pipe.py index 2dd6dba8df..5f09fb43f3 100644 --- a/tests/llm/nodes/test_llm_retriever_node_pipe.py +++ b/tests/llm/nodes/test_llm_retriever_node_pipe.py @@ -27,17 +27,16 @@ from morpheus.messages import ControlMessage from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBResourceService -from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService +from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBServiceProvider from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.llm.llm_engine_stage import LLMEngineStage from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage -@pytest.fixture(scope="module", name="milvus_service") +@pytest.fixture(scope="module", name="milvus_provider") def milvus_service_fixture(milvus_server_uri: str): - service = MilvusVectorDBService(uri=milvus_server_uri) - yield service + yield MilvusVectorDBServiceProvider(uri=milvus_server_uri) def _build_engine(vdb_service, **similarity_search_kwargs) -> LLMEngine: @@ -84,12 +83,13 @@ def test_pipeline(config: Config): @pytest.mark.milvus def test_pipeline_with_milvus(config: Config, - milvus_service: MilvusVectorDBService, + milvus_provider: MilvusVectorDBServiceProvider, idx_part_collection_config: dict, milvus_data: list[dict]): collection_name = "test_retriever_node_collection" # Make sure to drop any existing collection from previous runs. + milvus_service = milvus_provider.create() milvus_service.drop(collection_name) # Create a collection. milvus_service.create(collection_name, **idx_part_collection_config) @@ -116,6 +116,8 @@ def test_pipeline_with_milvus(config: Config, pipe.run() + milvus_service.close() + message = sink.get_messages()[0] assert isinstance(message, ControlMessage) actual_df = message.payload().df diff --git a/tests/llm/test_rag_standalone_pipe.py b/tests/llm/test_rag_standalone_pipe.py index e394420845..f4bc296387 100644 --- a/tests/llm/test_rag_standalone_pipe.py +++ b/tests/llm/test_rag_standalone_pipe.py @@ -39,6 +39,7 @@ from morpheus.stages.llm.llm_engine_stage import LLMEngineStage from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage +from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService EMBEDDING_SIZE = 384 QUESTION = "What are some new attacks discovered in the cyber security industry?" @@ -53,15 +54,14 @@ EXPECTED_RESPONSE = "Ransomware, Phishing, Malware, Denial of Service, SQL injection, and Password Attacks" -def _build_engine(llm_service_name: str, +def _build_engine(milvus_service: MilvusVectorDBService, + llm_service_name: str, model_name: str, - milvus_server_uri: str, collection_name: str, utils_mod: types.ModuleType): engine = LLMEngine() engine.add_node("extracter", node=ExtracterNode()) - vector_service = utils_mod.build_milvus_service(embedding_size=EMBEDDING_SIZE, uri=milvus_server_uri) embeddings = utils_mod.build_huggingface_embeddings("sentence-transformers/all-MiniLM-L6-v2", model_kwargs={'device': 'cuda'}, encode_kwargs={'batch_size': 100}) @@ -78,7 +78,7 @@ async def calc_embeddings(texts: list[str]) -> list[list[float]]: engine.add_node("rag", inputs=["/extracter"], node=RAGNode(prompt=PROMPT, - vdb_service=vector_service.load_resource(collection_name), + vdb_service=milvus_service.load_resource(collection_name), embedding=calc_embeddings, llm_client=llm_service)) @@ -110,18 +110,22 @@ def _run_pipeline(config: Config, pipe.add_stage( DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=completion_task)) + + milvus_service = utils_mod.build_milvus_service(embedding_size=EMBEDDING_SIZE, uri=milvus_server_uri) pipe.add_stage( LLMEngineStage(config, - engine=_build_engine(llm_service_name=llm_service_name, + engine=_build_engine(milvus_service=milvus_service, + llm_service_name=llm_service_name, model_name=model_name, - milvus_server_uri=milvus_server_uri, collection_name=collection_name, utils_mod=utils_mod))) sink = pipe.add_stage(CompareDataFrameStage(config, compare_df=expected_df)) pipe.run() + vector_service.close() + return sink.get_results() diff --git a/tests/llm/test_vdb_upload_pipe.py b/tests/llm/test_vdb_upload_pipe.py index f6051ebb55..4a05328e93 100755 --- a/tests/llm/test_vdb_upload_pipe.py +++ b/tests/llm/test_vdb_upload_pipe.py @@ -27,7 +27,7 @@ from _utils import mk_async_infer from _utils.dataset_manager import DatasetManager from morpheus.config import Config -from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService +from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBServiceProvider @pytest.mark.milvus @@ -133,7 +133,8 @@ def mock_get_fn(url: str): vdb_upload_pipeline_mod.pipeline(**vdb_pipeline_config) - milvus_service = MilvusVectorDBService(uri=milvus_server_uri) + milvus_provider = MilvusVectorDBServiceProvider(uri=milvus_server_uri) + milvus_service = milvus_provider.create() resource_service = milvus_service.load_resource(name=collection_name) assert resource_service.count() == len(expected_values_df) @@ -150,3 +151,5 @@ def mock_get_fn(url: str): db_emb_row = pd.DataFrame(db_emb[i], dtype=np.float32) expected_emb_row = pd.DataFrame(expected_emb[i], dtype=np.float32) dataset.assert_compare_df(db_emb_row, expected_emb_row) + + milvus_service.close() diff --git a/tests/test_debounce.py b/tests/test_debounce.py new file mode 100644 index 0000000000..65ab1455d5 --- /dev/null +++ b/tests/test_debounce.py @@ -0,0 +1,166 @@ +from datetime import datetime, timedelta +import pytest + +from morpheus.utils.debounce import DebounceQueue, DebounceRunner + + +def test_slow(): + + import time + + flushed = [] + + def flush(keys): + for key in keys: + flushed.append(key) + + queue = DebounceQueue(flush) + runner = DebounceRunner(queue) + + runner.start() + + queue.queue("a") + queue.queue("b") + + time.sleep(2) + + assert len(flushed) == 2 + + queue.queue("c") + queue.queue("d") + + runner.stop() + + assert len(flushed) == 4 + + with pytest.raises(RuntimeError): + runner.start() + + +def test_flush(): + time = datetime(2024, 1, 1) + + def get_time(): + return time + + flushed = {} + + def flush(keys): + for item in keys: + flushed[item] = time + + batcher = DebounceQueue(flush, now=get_time) + + batcher.queue("a") + batcher.queue("b") + + assert len(flushed) == 0 + + batcher.flush() + + assert len(flushed) == 2 + assert flushed["a"] == time + assert flushed["b"] == time + +def test_step_no_primer(): + time = datetime(2024, 1, 1) + + def get_time(): + return time + + flushed = {} + + def flush(keys): + for item in keys: + flushed[item] = time + + batcher = DebounceQueue(flush, now=get_time) + + time += timedelta(days=1) + + batcher.queue("a") + batcher.step() + + assert len(flushed) == 0 + + time += timedelta(days=1) + + batcher.step() + + assert len(flushed) == 1 + + +def test_step_min_delay(): + time = datetime(2024, 1, 1) + + def get_time(): + return time + + flushed = {} + + def flush(keys): + for item in keys: + flushed[item] = time + + batcher = DebounceQueue(flush, now=get_time) + + batcher.queue("a") + time += timedelta(seconds=1) + + batcher.step() + + assert len(flushed) == 0 + + batcher.queue("b") + time += timedelta(seconds=1) + + batcher.step() + + assert len(flushed) == 0 + + time += timedelta(seconds=1) + + batcher.step() + + assert len(flushed) == 2 + + +def test_step_max_delay(): + + time = datetime(2024, 1, 1) + time_start = time + + def get_time(): + return time + + flushed = {} + + def flush(keys): + for item in keys: + flushed[item] = time + + batcher = DebounceQueue(flush, now=get_time) + + for i in range(10): + batcher.queue(i) + time += timedelta(seconds=1) + batcher.step() + + assert len(flushed) == 6 + assert flushed[0] == time_start + timedelta(seconds=6) + assert flushed[1] == time_start + timedelta(seconds=6) + assert flushed[2] == time_start + timedelta(seconds=6) + assert flushed[3] == time_start + timedelta(seconds=6) + assert flushed[4] == time_start + timedelta(seconds=6) + assert flushed[5] == time_start + timedelta(seconds=6) + + time += timedelta(seconds=10) + + batcher.step() + + assert len(flushed) == 10 + + assert flushed[6] == time_start + timedelta(seconds=20) + assert flushed[7] == time_start + timedelta(seconds=20) + assert flushed[8] == time_start + timedelta(seconds=20) + assert flushed[9] == time_start + timedelta(seconds=20) diff --git a/tests/test_milvus_vector_db_service.py b/tests/test_milvus_vector_db_service.py index 3d0548176d..8338bc770b 100644 --- a/tests/test_milvus_vector_db_service.py +++ b/tests/test_milvus_vector_db_service.py @@ -30,6 +30,7 @@ from morpheus.service.vdb.milvus_vector_db_service import MAX_STRING_LENGTH_BYTES from morpheus.service.vdb.milvus_vector_db_service import FieldSchemaEncoder from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService +from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBServiceProvider # Milvus data type mapping dictionary MILVUS_DATA_TYPE_MAP = { @@ -48,11 +49,12 @@ } -@pytest.fixture(scope="module", name="milvus_service") +@pytest.fixture(scope="function", name="milvus_service") def milvus_service_fixture(milvus_server_uri: str): # This fixture is scoped to the function level since the WriteToVectorDBStage will close the connection on' # pipeline completion - service = MilvusVectorDBService(uri=milvus_server_uri) + service_provider = MilvusVectorDBServiceProvider(uri=milvus_server_uri) + service = service_provider.create() yield service @@ -62,6 +64,8 @@ def test_list_store_objects(milvus_service: MilvusVectorDBService): collections = milvus_service.list_store_objects() assert isinstance(collections, list) + milvus_service.close() + @pytest.mark.milvus def test_has_store_object(milvus_service: MilvusVectorDBService): @@ -69,6 +73,8 @@ def test_has_store_object(milvus_service: MilvusVectorDBService): collection_name = "non_existing_collection" assert not milvus_service.has_store_object(collection_name) + milvus_service.close() + @pytest.fixture(scope="module", name="sample_field") def sample_field_fixture(): @@ -127,8 +133,11 @@ def test_create_and_drop_collection(idx_part_collection_config: dict, milvus_ser # Drop the collection and check if it no longer exists. milvus_service.drop(collection_name) + assert not milvus_service.has_store_object(collection_name) + milvus_service.close() + @pytest.mark.milvus def test_insert_and_retrieve_by_keys(milvus_service: MilvusVectorDBService, @@ -141,18 +150,20 @@ def test_insert_and_retrieve_by_keys(milvus_service: MilvusVectorDBService, # Create a collection. milvus_service.create(collection_name, **idx_part_collection_config) + milvus_collection = milvus_service.load_resource(collection_name) # Insert data into the collection. - response = milvus_service.insert(collection_name, milvus_data) + response = milvus_collection.insert(milvus_data) assert response["insert_count"] == len(milvus_data) # Retrieve inserted data by primary keys. keys_to_retrieve = [2, 4, 6] - retrieved_data = milvus_service.retrieve_by_keys(collection_name, keys_to_retrieve) + retrieved_data = milvus_collection.retrieve_by_keys(keys_to_retrieve) assert len(retrieved_data) == len(keys_to_retrieve) # Clean up the collection. milvus_service.drop(collection_name) + milvus_service.close() @pytest.mark.milvus @@ -164,21 +175,23 @@ def test_query(milvus_service: MilvusVectorDBService, idx_part_collection_config # Create a collection. milvus_service.create(collection_name, **idx_part_collection_config) + milvus_collection = milvus_service.load_resource(collection_name) # Insert data into the collection. - milvus_service.insert(collection_name, milvus_data) + milvus_collection.insert(milvus_data) # Define a search query. query = "age==26 or age==27" # Perform a search in the collection. - search_result = milvus_service.query(collection_name, query) + search_result = milvus_collection.query(query) assert len(search_result) == 2 assert search_result[0]["age"] in [26, 27] assert search_result[1]["age"] in [26, 27] # Clean up the collection. milvus_service.drop(collection_name) + milvus_service.close() @pytest.mark.milvus @@ -193,9 +206,10 @@ async def test_similarity_search_with_data(milvus_service: MilvusVectorDBService # Create a collection. milvus_service.create(collection_name, **idx_part_collection_config) + milvus_collection = milvus_service.load_resource(collection_name) # Insert data to the collection. - milvus_service.insert(collection_name, milvus_data) + milvus_collection.insert(milvus_data) rng = np.random.default_rng(seed=100) search_vec = rng.random((1, 3)) @@ -204,10 +218,7 @@ async def test_similarity_search_with_data(milvus_service: MilvusVectorDBService expr = "age==26 or age==27" # Perform a search in the collection. - similarity_search_coroutine = await milvus_service.similarity_search(collection_name, - embeddings=search_vec, - expr=expr) - search_result = await similarity_search_coroutine + search_result = await milvus_collection.similarity_search(embeddings=search_vec, expr=expr) assert len(search_result[0]) == 2 assert search_result[0][0]["age"] in [26, 27] @@ -217,6 +228,7 @@ async def test_similarity_search_with_data(milvus_service: MilvusVectorDBService # Clean up the collection. milvus_service.drop(collection_name) + milvus_service.close() @pytest.mark.milvus @@ -228,16 +240,19 @@ def test_count(milvus_service: MilvusVectorDBService, idx_part_collection_config # Create a collection. milvus_service.create(collection_name, **idx_part_collection_config) + milvus_collection = milvus_service.load_resource(collection_name) # Insert data into the collection. - milvus_service.insert(collection_name, milvus_data) + milvus_collection.insert(milvus_data) + milvus_service.flush() # Get the count of entities in the collection. - count = milvus_service.count(collection_name) + count = milvus_collection.count() assert count == len(milvus_data) # Clean up the collection. milvus_service.drop(collection_name) + milvus_service.close() @pytest.mark.milvus @@ -252,27 +267,30 @@ def test_overwrite_collection_on_create(milvus_service: MilvusVectorDBService, # Create a collection. milvus_service.create(collection_name, **idx_part_collection_config) + milvus_collection = milvus_service.load_resource(collection_name) # Insert data to the collection. - response1 = milvus_service.insert(collection_name, milvus_data) + response1 = milvus_collection.insert(milvus_data) assert response1["insert_count"] == len(milvus_data) # Create the same collection again with overwrite=True. milvus_service.create(collection_name, overwrite=True, **idx_part_collection_config) + milvus_collection = milvus_service.load_resource(collection_name) # Insert different data into the collection. data2 = [{"id": i, "embedding": [i / 10] * 3, "age": 26 + i} for i in range(10)] - response2 = milvus_service.insert(collection_name, data2) + response2 = milvus_collection.insert(data2) assert response2["insert_count"] == len(data2) # Retrieve the data from the collection and check if it matches the second set of data. - retrieved_data = milvus_service.retrieve_by_keys(collection_name, list(range(10))) + retrieved_data = milvus_collection.retrieve_by_keys(list(range(10))) for i in range(10): assert retrieved_data[i]["age"] == data2[i]["age"] # Clean up the collection. milvus_service.drop(collection_name) + milvus_service.close() @pytest.mark.milvus @@ -288,21 +306,20 @@ def test_insert_into_partition(milvus_service: MilvusVectorDBService, # Create a collection with a partition. milvus_service.create(collection_name, **idx_part_collection_config) + milvus_collection = milvus_service.load_resource(collection_name) # Insert data into the specified partition. - response = milvus_service.insert(collection_name, milvus_data, partition_name=partition_name) + response = milvus_collection.insert(milvus_data, partition_name=partition_name) assert response["insert_count"] == len(milvus_data) # Retrieve inserted data by primary keys. keys_to_retrieve = [2, 4, 6] - retrieved_data = milvus_service.retrieve_by_keys(collection_name, - keys_to_retrieve, - partition_names=[partition_name]) + retrieved_data = milvus_collection.retrieve_by_keys(keys_to_retrieve, + partition_names=[partition_name]) assert len(retrieved_data) == len(keys_to_retrieve) - retrieved_data_default_part = milvus_service.retrieve_by_keys(collection_name, - keys_to_retrieve, - partition_names=["_default"]) + retrieved_data_default_part = milvus_collection.retrieve_by_keys(keys_to_retrieve, + partition_names=["_default"]) assert len(retrieved_data_default_part) == 0 assert len(retrieved_data_default_part) != len(keys_to_retrieve) @@ -320,11 +337,14 @@ def test_insert_into_partition(milvus_service: MilvusVectorDBService, milvus_service.drop(name=collection_name, resource="index", field_name="embedding", index_name="_default_idx_") - retrieved_data_after_part_drop = milvus_service.retrieve_by_keys(collection_name, keys_to_retrieve) + milvus_collection = milvus_service.load_resource(collection_name) + + retrieved_data_after_part_drop = milvus_collection.retrieve_by_keys(keys_to_retrieve) assert len(retrieved_data_after_part_drop) == 0 # Clean up the collection. milvus_service.drop(collection_name) + milvus_service.close() @pytest.mark.milvus @@ -336,9 +356,10 @@ def test_update(milvus_service: MilvusVectorDBService, simple_collection_config: # Create a collection with the specified schema configuration. milvus_service.create(collection_name, **simple_collection_config) + milvus_collection = milvus_service.load_resource(collection_name) # Insert data to the collection. - milvus_service.insert(collection_name, milvus_data) + milvus_collection.insert(milvus_data) # Use updated data to test the update/upsert functionality. updated_data = [{ @@ -353,7 +374,7 @@ def test_update(milvus_service: MilvusVectorDBService, simple_collection_config: }] # Apply update/upsert on updated_data. - result_dict = milvus_service.update(collection_name, updated_data) + result_dict = milvus_collection.update(updated_data) assert result_dict["upsert_count"] == 7 assert result_dict["insert_count"] == 7 @@ -361,6 +382,7 @@ def test_update(milvus_service: MilvusVectorDBService, simple_collection_config: # Clean up the collection. milvus_service.drop(collection_name) + milvus_service.close() @pytest.mark.milvus @@ -374,15 +396,16 @@ def test_delete_by_keys(milvus_service: MilvusVectorDBService, # Create a collection. milvus_service.create(collection_name, **idx_part_collection_config) + milvus_collection = milvus_service.load_resource(collection_name) # Insert data into the collection. - milvus_service.insert(collection_name, milvus_data) + milvus_collection.insert(milvus_data) # Delete data by keys from the collection. keys_to_delete = [5, 6] - milvus_service.delete_by_keys(collection_name, keys_to_delete) + milvus_collection.delete_by_keys(keys_to_delete) - response = milvus_service.query(collection_name, query="id >= 0") + response = milvus_collection.query(query="id >= 0") assert len(response) == len(milvus_data) - 2 for item in response: @@ -390,6 +413,7 @@ def test_delete_by_keys(milvus_service: MilvusVectorDBService, # Clean up the collection. milvus_service.drop(collection_name) + milvus_service.close() @pytest.mark.milvus @@ -402,18 +426,19 @@ def test_delete(milvus_service: MilvusVectorDBService, idx_part_collection_confi # Create a collection. milvus_service.create(collection_name, **idx_part_collection_config) + milvus_collection = milvus_service.load_resource(collection_name) # Insert data into the collection. - milvus_service.insert(collection_name, milvus_data) + milvus_collection.insert(milvus_data) # Delete expression. delete_expr = "id in [0,1]" # Delete data from the collection using the expression. - delete_response = milvus_service.delete(collection_name, delete_expr) + delete_response = milvus_collection.delete(delete_expr) assert delete_response["delete_count"] == 2 - response = milvus_service.query(collection_name, query="id > 0") + response = milvus_collection.query(query="id > 0") assert len(response) == len(milvus_data) - 2 for item in response: @@ -421,6 +446,7 @@ def test_delete(milvus_service: MilvusVectorDBService, idx_part_collection_confi # Clean up the collection. milvus_service.drop(collection_name) + milvus_service.close() @pytest.mark.milvus @@ -434,22 +460,14 @@ def test_release_collection(milvus_service: MilvusVectorDBService, # Create a collection. milvus_service.create(collection_name, **idx_part_collection_config) + milvus_collection = milvus_service.load_resource(collection_name) # Insert data into the collection. - milvus_service.insert(collection_name, milvus_data) + milvus_collection.insert(milvus_data) # Release resource from the memory. milvus_service.release_resource(name=collection_name) - - -def test_get_collection_lock(): - """ - This test doesn't require milvus server to be running. - """ - collection_name = "test_collection_lock" - lock = MilvusVectorDBService.get_collection_lock(collection_name) - assert "lock" == type(lock).__name__ - assert collection_name in MilvusVectorDBService._collection_locks + milvus_service.close() @pytest.mark.milvus @@ -473,6 +491,7 @@ def test_create_from_dataframe(milvus_service: MilvusVectorDBService): # Clean up the collection. milvus_service.drop(collection_name) + milvus_service.close() def test_fse_default(): @@ -528,13 +547,15 @@ def test_insert_dataframe(milvus_server_uri: str, num_rows = 10 collection_name = "test_insert_dataframe" - milvus_service = MilvusVectorDBService(uri=milvus_server_uri, truncate_long_strings=truncate_long_strings) + milvus_provider = MilvusVectorDBServiceProvider(uri=milvus_server_uri, truncate_long_strings=truncate_long_strings) + milvus_service = milvus_provider.create() # Make sure to drop any existing collection from previous runs. milvus_service.drop(collection_name) # Create a collection. milvus_service.create(collection_name, **string_collection_config) + milvus_collection = milvus_service.load_resource(collection_name) short_str_col_len = -1 long_str_col_len = -1 @@ -589,19 +610,20 @@ def test_insert_dataframe(milvus_server_uri: str, if (exceed_max_str_len and (not truncate_long_strings)): with pytest.raises(MilvusException, match="string exceeds max length"): - milvus_service.insert_dataframe(collection_name, df) + milvus_collection.insert_dataframe(df) + else: - return # Skip the rest of the test if the string column exceeds the maximum length. + milvus_collection.insert_dataframe(df) - milvus_service.insert_dataframe(collection_name, df) + # Retrieve inserted data by primary keys. + retrieved_data = milvus_collection.retrieve_by_keys(ids) + assert len(retrieved_data) == num_rows - # Retrieve inserted data by primary keys. - retrieved_data = milvus_service.retrieve_by_keys(collection_name, ids) - assert len(retrieved_data) == num_rows + # Clean up the collection. + milvus_service.drop(collection_name) - # Clean up the collection. - milvus_service.drop(collection_name) + result_df = dataset.df_class(retrieved_data) - result_df = dataset.df_class(retrieved_data) + dataset.compare_df(result_df, expected_df) - dataset.compare_df(result_df, expected_df) + milvus_service.close() diff --git a/tests/test_milvus_write_to_vector_db_stage_pipe.py b/tests/test_milvus_write_to_vector_db_stage_pipe.py index d5e8efee99..9e9cdc70cf 100755 --- a/tests/test_milvus_write_to_vector_db_stage_pipe.py +++ b/tests/test_milvus_write_to_vector_db_stage_pipe.py @@ -27,7 +27,7 @@ from morpheus.messages.multi_response_message import MultiResponseMessage from morpheus.modules import to_control_message # noqa: F401 # pylint: disable=unused-import from morpheus.pipeline import LinearPipeline -from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService +from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBServiceProvider from morpheus.stages.general.linear_modules_stage import LinearModulesStage from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage @@ -65,12 +65,14 @@ def test_write_to_vector_db_stage_from_cm_pipe(milvus_server_uri: str, df = get_test_df(num_input_rows) - milvus_service = MilvusVectorDBService(uri=milvus_server_uri) + milvus_provider = MilvusVectorDBServiceProvider(uri=milvus_server_uri) + milvus_service = milvus_provider.create() # Make sure to drop any existing collection from previous runs. milvus_service.drop(collection_name) # Create milvus collection using config file. milvus_service.create(name=collection_name, overwrite=True, **idx_part_collection_config) + milvus_service.close() if recreate: # Update resource kwargs with collection configuration if recreate is True @@ -88,14 +90,14 @@ def test_write_to_vector_db_stage_from_cm_pipe(milvus_server_uri: str, input_port_name="input", output_port_name="output", output_type=ControlMessage)) - + # Provide partition name in the resource_kwargs to insert data into the partition # otherwise goes to '_default' partition. if use_instance: # Instantiate stage with service instance and insert options. write_to_vdb_stage = WriteToVectorDBStage(config, resource_name=collection_name, - service=milvus_service, + service=milvus_provider, recreate=recreate, resource_kwargs=resource_kwargs) else: @@ -144,10 +146,12 @@ def test_write_to_vector_db_stage_from_mm_pipe(milvus_server_uri: str, df = get_test_df(num_input_rows=10) - milvus_service = MilvusVectorDBService(uri=milvus_server_uri) + milvus_provider = MilvusVectorDBServiceProvider(uri=milvus_server_uri) + milvus_service = milvus_provider.create() # Make sure to drop any existing collection from previous runs. milvus_service.drop(collection_name) + milvus_service.close() resource_kwargs = {"partition_name": "age_partition"} @@ -163,7 +167,7 @@ def test_write_to_vector_db_stage_from_mm_pipe(milvus_server_uri: str, pipe.add_stage( WriteToVectorDBStage(config, resource_name=collection_name, - service=milvus_service, + service=milvus_provider, recreate=True, resource_kwargs=resource_kwargs))