diff --git a/conda/environments/all_cuda-125_arch-x86_64.yaml b/conda/environments/all_cuda-125_arch-x86_64.yaml index 47110e7e1..0af01ebc8 100644 --- a/conda/environments/all_cuda-125_arch-x86_64.yaml +++ b/conda/environments/all_cuda-125_arch-x86_64.yaml @@ -142,6 +142,7 @@ dependencies: - milvus==2.3.5 - nemollm==0.3.5 - pymilvus==2.3.6 + - gpudb>=7.2.2.3 - pytest-kafka==0.6.0 - python-logging-loki - sentence-transformers==2.7 diff --git a/conda/environments/dev_cuda-125_arch-x86_64.yaml b/conda/environments/dev_cuda-125_arch-x86_64.yaml index 135105efb..a97404276 100644 --- a/conda/environments/dev_cuda-125_arch-x86_64.yaml +++ b/conda/environments/dev_cuda-125_arch-x86_64.yaml @@ -112,6 +112,7 @@ dependencies: - databricks-connect - milvus==2.3.5 - pymilvus==2.3.6 + - gpudb>=7.2.2.3 - pytest-kafka==0.6.0 - torch==2.4.0+cu124 name: dev_cuda-125_arch-x86_64 diff --git a/conda/environments/examples_cuda-125_arch-x86_64.yaml b/conda/environments/examples_cuda-125_arch-x86_64.yaml index 8bed5bbd8..781149340 100644 --- a/conda/environments/examples_cuda-125_arch-x86_64.yaml +++ b/conda/environments/examples_cuda-125_arch-x86_64.yaml @@ -79,6 +79,7 @@ dependencies: - milvus==2.3.5 - nemollm==0.3.5 - pymilvus==2.3.6 + - gpudb>=7.2.2.3 - python-logging-loki - sentence-transformers==2.7 - torch==2.4.0+cu124 diff --git a/conda/environments/runtime_cuda-125_arch-x86_64.yaml b/conda/environments/runtime_cuda-125_arch-x86_64.yaml index d1086eb84..416fe9f4f 100644 --- a/conda/environments/runtime_cuda-125_arch-x86_64.yaml +++ b/conda/environments/runtime_cuda-125_arch-x86_64.yaml @@ -52,5 +52,6 @@ dependencies: - databricks-connect - milvus==2.3.5 - pymilvus==2.3.6 + - gpudb>=7.2.2.3 - torch==2.4.0+cu124 name: runtime_cuda-125_arch-x86_64 diff --git a/examples/llm/completion/pipeline.py b/examples/llm/completion/pipeline.py index f951a6ce2..a140c6d88 100644 --- a/examples/llm/completion/pipeline.py +++ b/examples/llm/completion/pipeline.py @@ -37,6 +37,8 @@ from morpheus_llm.llm.task_handlers.simple_task_handler import SimpleTaskHandler from morpheus_llm.stages.llm.llm_engine_stage import LLMEngineStage +from morpheus_llm.stages.output.write_to_vector_db_stage import WriteToVectorDBStage + logger = logging.getLogger(__name__) @@ -127,6 +129,8 @@ def pipeline(use_cpu_only: bool, pipe.add_stage(MonitorStage(config, description="Inference rate", unit="req", delayed_start=True)) + pipe.add_stage(WriteToVectorDBStage()) + sink = pipe.add_stage(InMemorySinkStage(config)) start_time = time.time() diff --git a/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py b/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py new file mode 100644 index 000000000..30498d4dc --- /dev/null +++ b/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py @@ -0,0 +1,94 @@ +import csv +import logging +import random +import cudf +import os + +from morpheus.pipeline.linear_pipeline import LinearPipeline +from morpheus.config import Config +from morpheus.modules import to_control_message # noqa: F401 # pylint: disable=unused-import +from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE +from morpheus.utils.module_ids import TO_CONTROL_MESSAGE + +from morpheus_llm.stages.output.write_to_vector_db_stage import WriteToVectorDBStage +from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage +from morpheus.stages.general.linear_modules_stage import LinearModulesStage + +from morpheus.messages import ControlMessage + +# from morpheus.utils.logging import configure_logging +# from morpheus.utils.type_support import numpy_to_cudf + +# Import Milvus services from Morpheus +from morpheus_llm.service.vdb.kinetica_vector_db_service import KineticaVectorDBService + +import json + +from morpheus.utils.logger import configure_logging + +from morpheus.stages.preprocess.deserialize_stage import DeserializeStage + +from morpheus.config import ExecutionMode + +logger = logging.getLogger(__name__) + +def get_test_df(num_input_rows): + df = cudf.DataFrame({ + "id": list(range(num_input_rows)), + "embeddings": [[random.random() for _ in range(3)] for _ in range(num_input_rows)], + "metadata": [json.dumps({"metadata": f"Sample metadata for row {i}"}) for i in range(num_input_rows)], + }) + + return df + + +def main(): + host = os.getenv("kinetica_host", "http://localhost:9191") + username = os.getenv("username", "") + password = os.getenv("password", "") + schema = os.getenv("schema", "") + + config = Config() + config.execution_mode = ExecutionMode.GPU + + kinetica_db_service = KineticaVectorDBService(host, user=username, password=password, kinetica_schema=schema) + collection_name = "test_collection" + collection_name = f"{schema}.{collection_name}" if schema is not None and len( + schema) > 0 else f"ki_home.{collection_name}" + + vector_dim = 3 # Example: 3-dimensional vector embeddings + + columns = [ + ["id", "long", "primary_key"], + ["embeddings", "bytes", "vector(3)"], + ["metadata", "string", "json"], + ] + kinetica_db_service.create(collection_name, type=columns) + + df = get_test_df(10) + to_cm_module_config = { + "module_id": TO_CONTROL_MESSAGE, "module_name": "to_control_message", "namespace": MORPHEUS_MODULE_NAMESPACE + } + + # Step 1: Create a pipeline + pipeline = LinearPipeline(config) + pipeline.set_source(InMemorySourceStage(config, [df])) + pipeline.add_stage( + LinearModulesStage(config, + to_cm_module_config, + input_port_name="input", + output_port_name="output", + output_type=ControlMessage)) + + pipeline.add_stage( + WriteToVectorDBStage( + config, + kinetica_db_service, + "test_collection" + ) + ) + + pipeline.run() + +if __name__ == "__main__": + main() diff --git a/examples/sample_milvus_pipeline/pipeline.py b/examples/sample_milvus_pipeline/pipeline.py new file mode 100644 index 000000000..367bbca53 --- /dev/null +++ b/examples/sample_milvus_pipeline/pipeline.py @@ -0,0 +1,159 @@ +import csv +import logging +import random + +from morpheus.pipeline.linear_pipeline import LinearPipeline +from morpheus.config import Config +from morpheus.stages.postprocess.serialize_stage import SerializeStage +from morpheus.io.deserializers import read_file_to_df +from morpheus.utils.type_utils import exec_mode_to_df_type_str + +from morpheus.stages.input.in_memory_data_generation_stage import InMemoryDataGenStage + +from morpheus.stages.preprocess.drop_null_stage import DropNullStage +from morpheus_llm.stages.output.write_to_vector_db_stage import WriteToVectorDBStage +from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage + +# from morpheus.utils.logging import configure_logging +# from morpheus.utils.type_support import numpy_to_cudf + +# Import Milvus services from Morpheus +from morpheus_llm.service.vdb.milvus_vector_db_service import MilvusVectorDBResourceService, MilvusVectorDBService + +import numpy as np +import json + +from morpheus.utils.logger import configure_logging + +from morpheus.stages.input.file_source_stage import FileSourceStage + +from morpheus.stages.output.write_to_file_stage import WriteToFileStage + +from morpheus.stages.preprocess.deserialize_stage import DeserializeStage + +from morpheus.config import ExecutionMode + +logger = logging.getLogger(__name__) + +def generate_random_vector(dim=3): + """Generate a random vector of specified dimensions.""" + return [random.uniform(-1.0, 1.0) for _ in range(dim)] + + +def save_to_json_file(data, file_name="data.json"): + """Save data to a JSON file.""" + with open(file_name, "w") as json_file: + json.dump(data, json_file, indent=4) + print(f"JSON data saved to {file_name}") + + +def generate_json_records(collection_name, output_file, num_records=100): + """Generate a list of records to be saved in a JSON file.""" + data = [] + for pk in range(1, num_records + 1): + record = { + "id": pk, + "vector": generate_random_vector(), + "metadata": json.dumps({"description": f"Record {pk}", "category": random.choice(["A", "B", "C"])}) + } + data.append(record) + + json_data = { + "collectionName": collection_name, + "data": data + } + + return json_data + + +def generate_csv(file_name, num_rows): + """ + Generates a CSV file with fields: + - PK: Primary Key (integer) + - metadata: Sample text metadata (string) + - vector: A random vector of 3 dimensions (array of floats) + + :param file_name: Name of the output CSV file + :param num_rows: Number of rows to generate + """ + with open(file_name, mode='w', newline='', encoding='utf-8') as csvfile: + fieldnames = ['id', 'vector', 'metadata'] +# fieldnames = ['vector', 'metadata'] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + + # Write the header +# writer.writeheader() + + for pk in range(1, num_rows + 1): + # Generate random metadata + metadata = f"Sample metadata for row {pk}" + + # Generate a random vector of 3 dimensions + vector = [round(random.uniform(-1.0, 1.0), 4) for _ in range(3)] + + # Write the row + writer.writerow({ + 'id': pk, + 'vector': vector, + 'metadata': metadata + }) + + +def main(input_file_name: str): + # Step 1: Configure logging + + # Step 2: Initialize Morpheus Config + config = Config() + config.execution_mode = ExecutionMode.GPU + + # Step 3: Setup Milvus services + milvus_db_service = MilvusVectorDBService("https://in03-c87c25d216da0ac.serverless.gcp-us-west1.cloud.zilliz.com", user="db_c87c25d216da0ac", password="Cv3;^~HaY.>~>!)H", token="1c80242758bbfc207773c9a731421d9d96e269ac3ef41d87b40725f53795e1305489827dd310f0e55fb886ba0ea15898244de182") + milvus_resource_service = milvus_db_service.load_resource("test_collection") + collection_name = "test_collection" + vector_dim = 3 # Example: 3-dimensional vector embeddings + + source_df = read_file_to_df(input_file_name, df_type=exec_mode_to_df_type_str(config.execution_mode)) + print(source_df.shape[0]) +# Step 5: Create a pipeline + pipeline = LinearPipeline(config) + + # Step 6: Define source stage + def data_generator(): + for i in range(5): + embedding = np.random.random(vector_dim).tolist() + metadata = {"id": i, "label": f"example_{i}"} + yield {"embedding": embedding, "metadata": metadata} + +# pipeline.set_source(FileSourceStage(config, filename=input_file_name)) + pipeline.set_source(InMemorySourceStage(config, dataframes=[source_df])) + +# pipeline.add_stage( +# DropNullStage(config, "vector") +# ) + + # pipeline.add_stage(WriteToFileStage(config, filename="output_file.csv", overwrite=True)) +# pipeline.add_stage(SerializeStage(config)) + pipeline.add_stage(DeserializeStage(config)) + + # Step 9: Add WriteToVectorDBStage for Milvus + pipeline.add_stage( + WriteToVectorDBStage( + config, + milvus_db_service, + "test_collection" + ) + ) + + pipeline.build() + + # Step 10: Execute the pipeline + pipeline.run() + +if __name__ == "__main__": +# file_name = "test.json" + file_name = "test.csv" + collection_name = "test_collection" + generate_csv(file_name, 1000) +# data = generate_json_records(collection_name, file_name) +# save_to_json_file(data, file_name) + main(file_name) diff --git a/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py b/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py index efde3afa1..547e445cb 100644 --- a/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py +++ b/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py @@ -147,7 +147,10 @@ def on_completed(): if accum_stats.data: df_pkg = get_df_pkg_from_obj(accum_stats.data[0]) merged_df = df_pkg.concat(accum_stats.data) + print("Before Inerting data ...") + merged_df.info() service.insert_dataframe(name=key, df=merged_df) + print("After Inserting data") final_df_references.append(accum_stats.data) except Exception as e: logger.error("Unable to upload dataframe entries to vector database: %s", e) @@ -163,6 +166,8 @@ def extract_df(msg: ControlMessage): df = msg.payload().df if (msg.has_metadata("vdb_resource")): resource_name = msg.get_metadata("vdb_resource") + df.info() + print("Resource name = ", resource_name) else: resource_name = None else: diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py new file mode 100644 index 000000000..78514343f --- /dev/null +++ b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py @@ -0,0 +1,1027 @@ +# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import enum +import json +import logging +import threading +import time +import typing +from collections import OrderedDict +from functools import wraps + +from gpudb import GPUdb, GPUdbTable, GPUdbRecordColumn, GPUdbRecordType, GPUdbException, GPUdbSqlIterator +import sqlparse +from sqlparse.sql import IdentifierList, Identifier, Token +from sqlparse.tokens import Keyword + +from morpheus.io.utils import cudf_string_cols_exceed_max_bytes +from morpheus.io.utils import truncate_string_cols_by_bytes +from morpheus.utils.type_aliases import DataFrameType +from morpheus.utils.type_utils import is_cudf_type +from morpheus_llm.error import IMPORT_ERROR_MESSAGE +from morpheus_llm.service.vdb.vector_db_service import VectorDBResourceService +from morpheus_llm.service.vdb.vector_db_service import VectorDBService + +logger = logging.getLogger(__name__) + +IMPORT_EXCEPTION = None + +try: + import gpudb + +except ImportError as import_exc: + IMPORT_EXCEPTION = import_exc + + +class DistanceStrategy(str, enum.Enum): + """Enumerator of the Distance strategies.""" + + EUCLIDEAN = "l2" + COSINE = "cosine" + MAX_INNER_PRODUCT = "inner" + +class Dimension(int, enum.Enum): + """Some default dimensions for known embeddings.""" + + OPENAI = 1536 + + +DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.EUCLIDEAN + +class KineticaVectorDBResourceService(VectorDBResourceService): + """ + Represents a service for managing resources in a Kinetica Vector Database. + + Parameters + ---------- + name : str + Name of the Kinetica table. Must be an existing table + schema : str + Name of the Kinetica schema. Must be an existing schema + client : GPUdb instance + An instance of the GPUdb class for interaction with the Kinetica Vector Database. + """ + + def __init__(self, name: str, client: "GPUdb") -> None: + if IMPORT_EXCEPTION is not None: + raise ImportError(IMPORT_ERROR_MESSAGE.format(package='gpudb')) from IMPORT_EXCEPTION + + super().__init__() + + self._name = name + self._client = client + + self._collection = GPUdbTable(name=self._name, db=client) + self._record_type = self._collection.get_table_type() + self._fields: list[GPUdbRecordColumn] = self._record_type.columns + self._description = self.describe() + + self._vector_field = None + self._fillna_fields_dict = {} + + # Mapping of field name to max length for string fields + self._fields_max_length: dict[str, int] = {} + + for field in self._fields: + if field.is_vector(): + self._vector_field = field.name + else: + if not field.column_properties[1] == "primary_key": + self._fillna_fields_dict[field.name] = field.column_type + + + def insert(self, data: list[list] | list[dict], **kwargs: dict[str, typing.Any]) -> dict: + """ + Insert data into the vector database. + + Parameters + ---------- + data : list[list] | list[dict] + Data to be inserted into the Kinetica table. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict + Returns response content as a dictionary. + """ + options = kwargs.get( "options", None ) + if options is not None: # if given, remove from kwargs + kwargs.pop( "options" ) + else: # no option given; use an empty dict + options = {} + + result = self._collection.insert_records(data, options=options) + + return self._insert_result_to_dict(result=result.count) + + def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) -> dict: + """ + Insert a dataframe entires into the vector database. + + Parameters + ---------- + df : DataFrameType + Dataframe to be inserted into the Kinetica table. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict + Returns response content as a dictionary. + """ + # From the schema, this is the list of columns we need + column_names = [field.name for field in self._fields] + + collection_df = df[column_names] + if is_cudf_type(collection_df): + collection_df = collection_df.to_pandas() + + # Note: dataframe columns has to be in the order of Kinetica table schema fields.s + result = self._collection.insert_df(collection_df) + + return self._insert_result_to_dict(result=result) + + def describe(self, **kwargs: dict[str, typing.Any]) -> dict: + """ + Provides a description of the Kinetica table. + + Parameters + ---------- + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict + Returns response content as a dictionary. + """ + table1 = GPUdbTable(db=self._client, name=self._name) + record_type: GPUdbRecordType = table1.get_table_type() + + col: GPUdbRecordColumn = None + description: dict[str, object] = {} + + for col in record_type.columns: + description[col.name] = ( + col.column_type, + col.is_nullable, + col.is_vector(), + col.column_properties, + "primary_key" if "primary_key" in col.column_properties else "" + ) + + return description + + def _get_pk_field_name(self): + for field_name, properties in self._description.items(): + if properties[4] == "primary_key": + return field_name + + return "" + + def query(self, query: str, **kwargs: dict[str, typing.Any]) -> typing.Any: + """ + Query data in a table in the Kinetica database. + + This method performs a search operation in the specified table in the Kinetica database. + + Parameters + ---------- + query : str, optional + The search query, which is an SQL query. + **kwargs : dict + Additional keyword arguments for the search operation. + + Returns + ------- + typing.Any + The search result (a GPUdbSqlIterator object) + + Raises + ------ + GPUdbException + If an error occurs during the search operation. + """ + + if query is None: + raise GPUdbException("'query' - a valid SQL query statement must be given ...") + + logger.debug("Searching in Kinetica table: %s, query=%s, kwargs=%s", self._name, query, kwargs) + batch_size = kwargs.get("batch_size", 5000) + sql_params = kwargs.get("sql_params", []) + sql_opts = kwargs.get("sql_opts", {}) + return self._client.query(query, batch_size, sql_params, sql_opts) + + def _extract_projection_fields(self, parsed_tokens): + """ + Recursively extracts projection fields from the SQL tokens. + + Parameters: + parsed_tokens (list): List of tokens from a parsed SQL query. + + Returns: + list: A list of field names in the projection list. + """ + fields = [] + for token in parsed_tokens: + if isinstance(token, IdentifierList): + # Multiple fields in the projection list + for identifier in token.get_identifiers(): + fields.append(identifier.get_real_name()) + elif isinstance(token, Identifier): + # Single field + fields.append(token.get_real_name()) + elif token.ttype is Keyword and token.value.upper() in ["SELECT"]: + # Skip the SELECT keyword + continue + elif token.is_group: + # Handle nested subqueries + fields.extend(self._extract_projection_fields(token.tokens)) + return fields + + def _is_field_in_projection(self, sql_statement, field_name): + """ + Check whether a given field name is present in the projection list of an SQL statement. + + Parameters: + sql_statement (str): The SQL query to parse. + field_name (str): The field name to search for. + + Returns: + bool: True if the field name is present in the projection list, False otherwise. + """ + parsed = sqlparse.parse(sql_statement) + for stmt in parsed: + if stmt.get_type() == "SELECT": + # Extract projection fields + projection_fields = self._extract_projection_fields(stmt.tokens) + # Check if the field is in the projection list + return field_name in projection_fields + return False + + def __query_collection( + self, + embedding: list[float], + output_fields: list[str], + k: int = 4, + filter: dict[str, str] = None, + ) -> dict: + """Query the Kinetica table.""" + # if filter is not None: + # filter_clauses = [] + # for key, value in filter.items(): + # IN = "in" + # if isinstance(value, dict) and IN in map(str.lower, value): + # value_case_insensitive = { + # k.lower(): v for k, v in value.items() + # } + # filter_by_metadata = self.EmbeddingStore.cmetadata[ + # key + # ].astext.in_(value_case_insensitive[IN]) + # filter_clauses.append(filter_by_metadata) + # else: + # filter_by_metadata = self.EmbeddingStore.cmetadata[ + # key + # ].astext == str(value) + # filter_clauses.append(filter_by_metadata) + + json_filter = json.dumps(filter) if filter is not None else None + where_clause = ( + f" where '{json_filter}' = JSON(metadata) " + if json_filter is not None + else "" + ) + + embedding_str = "[" + ",".join([str(x) for x in embedding]) + "]" + + dist_strategy = DEFAULT_DISTANCE_STRATEGY + + query_string = f""" + SELECT {', '.join(output_fields)}, {dist_strategy}(embedding, '{embedding_str}') + as distance, {self._vector_field} + FROM {self._collection_name} + {where_clause} + ORDER BY distance asc NULLS LAST + LIMIT {k} + """ + + self.logger.debug(query_string) + resp = self._client.execute_sql_and_decode(query_string) + self.logger.debug(resp) + return resp + + def similarity_search_by_vector( + self, + embedding: list[float], + output_fields: list[str], + k: int = 4, + filter: dict = None, + **kwargs: typing.Any, + ) -> list[dict]: + """Return docs most similar to embedding vector. + + Args: + embedding: Embedding to look up documents similar to. + k: Number of Documents to return. Defaults to 4. + filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. + + Returns: + List of records most similar to the query vector. + """ + docs = self.similarity_search_with_score_by_vector( + embedding=embedding, output_fields=output_fields, k=k, filter=filter + ) + return docs + + def similarity_search_with_score_by_vector( + self, + embedding: list[float], + output_fields: list[str], + k: int = 4, + filter: dict = None, + ) -> list[dict]: + + resp: dict = self.__query_collection(embedding, k, filter) + if resp and resp["status_info"]["status"] == "OK" and "records" in resp: + records: OrderedDict = resp["records"] + return [records] + + self.logger.error(resp["status_info"]["message"]) + return [] + + + async def similarity_search(self, + embeddings: list[list[float]], + k: int = 4, + **kwargs: dict[str, typing.Any]) -> list[list[dict]]: + """ + Perform a similarity search within the Kinetica table. + + Parameters + ---------- + embeddings : list[list[float]] + Embeddings for which to perform the similarity search. + k : int, optional + The number of nearest neighbors to return, by default 4. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + list[list[dict]] + Returns a list of 'list of dictionaries' representing the results of the similarity search. + """ + + assert self._vector_field is not None, "Cannot perform similarity search on a table without a vector field" + + # Determine result metadata fields. + output_fields = [x.name for x in self._fields if x.name != self._vector_field] + search_filter = kwargs.get("filter", "") + + results: list[list[dict]] = [self.similarity_search_by_vector( + embedding=embedding, + output_fields=output_fields, + k=k, + filter=search_filter, + ) for embedding in embeddings] + + return results + + def update(self, data: list[typing.Any], **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Update data in the Kinetica table. + + Parameters + ---------- + data : list[typing.Any] + Data to be updated in the Kinetica table. This is npt used by Kinetica. The required parameters + are all passed in a keyword arguments. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to Kinetica. The full explanation of each of these + keyword arguments is available in the documentation of the API `/update/records`. + + Allowed keyword arguments: + options: dict[str, str] - options + + expressions: [list] - expression used to filter records for update + + new_value_maps: [list[dict]] | [dict] - + + records_to_insert: list[] - + + records_to_insert_str: list[] - + + + + Returns + ------- + dict[str, typing.Any] + Returns result of the updated operation stats. + """ + + options = kwargs.get( "options", None ) + if options is not None: # if given, remove from kwargs + kwargs.pop( "options" ) + else: # no option given; use an empty dict + options = {} + + expressions = kwargs.get( "expressions", [] ) + if expressions is not None: # if given, remove from kwargs + if not isinstance(expressions, list): + raise GPUdbException("'expressions' must be of type 'list' ...") + kwargs.pop( "expressions" ) + else: # no option given; use an empty dict + raise GPUdbException("Update 'expressions' must be given ...") + + new_values_maps = kwargs.get( "new_values_maps", None ) + if new_values_maps is not None: # if given, remove from kwargs + if not isinstance(new_values_maps, (list, dict)): + raise GPUdbException("'new_value_maps' should either be a 'list of dicts' or a dict ...") + kwargs.pop( "new_values_maps" ) + else: # no option given; use an empty dict + raise GPUdbException("'new_values_maps' must be given ...") + + if len(expressions) != len(new_values_maps): + raise GPUdbException("'expression' and 'new_value_maps' must have the same number of elements") + + records_to_insert = kwargs.get("records_to_insert", []) + records_to_insert_str = kwargs.get("records_to_insert_str", []) + + result = self._collection.update_records(expressions, new_values_maps, records_to_insert, records_to_insert_str, options=options) + + return self._update_delete_result_to_dict(result=result) + + def delete(self, expr: str, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Delete vectors from the Kinetica table using expressions. + + Parameters + ---------- + expr : str + Delete expression. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict[str, typing.Any] + Returns result of the given keys that are deleted from the Kinetica table. + """ + + result = self._collection.delete_records(expressions=[expr], **kwargs) + + return self._update_delete_result_to_dict(result=result) + + def delete_by_keys(self, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> typing.Any: + """ + Delete vectors by keys from the resource. + + Parameters + ---------- + keys : int | str | list + Primary keys to delete vectors. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + typing.Any + Returns vectors of the given keys that are delete from the resource. + """ + pass + + def retrieve_by_keys(self, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> list[typing.Any]: + """ + Retrieve the inserted vectors using their primary keys. + + Parameters + ---------- + keys : int | str | list + Primary keys to get vectors for. Depending on pk_field type it can be int or str + or a list of either. + **kwargs : dict[str, typing.Any] + Additional keyword arguments for the retrieval operation. + + Only valid keyword arguments are: + + expression: [str] - The Kinetica expression to pass on to `get/records/by/key` + + options: dict - The `options` dict accepted by `get/records/by/key` + + Returns + ------- + list[typing.Any] + Returns result rows of the given keys from the Kinetica table. + """ + + result = None + expression = kwargs.get("expression", "") + options = kwargs.get("options", {}) + if keys is None or keys <= 0 or (isinstance(keys, (str, list)) and len(keys) <= 0): + raise GPUdbException("'keys' must be specified as either an 'int' or 'str' or 'list' ...") + + if expression == "": + # expression not specified; keys must be values of PK in the Kinetica table. + pk_field_name = self._get_pk_field_name() + if pk_field_name == "": + raise GPUdbException("No 'expression' given and no 'PK field' found cannot retrieve records ...") + + if isinstance(keys, str): + expression = [f"{pk_field_name} = '{keys}'"] + elif isinstance(keys, int): + expression = [f"{pk_field_name} = {keys}"] + elif isinstance(keys, list): + # keys is a list + expression = [f"{pk_field_name} in ({','.join(keys)})"] + else: + raise GPUdbException("'keys' must be of type (int or str or list) ...") + try: + result = self._collection.get_records_by_key(keys, expression, options) + except GPUdbException as exec_info: + raise RuntimeError(f"Unable to perform search: {exec_info}") from exec_info + + return result["records"] if result is not None and 'records' in result else [] + + def count(self, **kwargs: dict[str, typing.Any]) -> int: + """ + Returns number of rows/entities. + + Parameters + ---------- + **kwargs : dict[str, typing.Any] + Not used. + + Returns + ------- + int + Returns number of records in the Kinetica table. + """ + return self._collection.count + + def drop(self, **kwargs: dict[str, typing.Any]) -> None: + """ + Drop a Kinetica table. + + This function allows you to delete/drop a Kinetica table. + + Parameters + ---------- + **kwargs : dict + Not used. + """ + + self._client.clear_table(self._collection_name) + + def _insert_result_to_dict(self, result: int) -> dict[str, typing.Any]: + result_dict = { + "count": result, + } + return result_dict + + def _update_delete_result_to_dict(self, result) -> dict[str, typing.Any]: + result_dict = { + "count_deleted": result["count_deleted"], + "counts_updated": result["counts_deleted"], + "info": result["info"], + } + return result_dict + + +class KineticaVectorDBService(VectorDBService): + """ + Service class for Kinetica Database implementation. This class provides functions for interacting + with a Kinetica database. + + Parameters + ---------- + host : str + The hostname or IP address of the Kinetica server. + port : str + The port number for connecting to the Kinetica server. + alias : str, optional + Alias for the Kinetica connection, by default "default". + """ + + def __init__(self, + uri: str, + user: str = "", + password: str = "", + kinetica_schema = "", + ): + options = GPUdb.Options() + options.skip_ssl_cert_verification = True + options.username = user + options.password = password + + self._collection_name = None + self.schema = kinetica_schema if kinetica_schema is not None and len(kinetica_schema) > 0 else None + self._client = GPUdb(host=uri, options=options) + + def load_resource(self, name: str, **kwargs: dict[str, typing.Any]) -> KineticaVectorDBResourceService: + """ + + @param name: + @param kwargs: + @return: + """ + self._collection_name = f"{self.schema}.{name}" if self.schema is not None and len(self.schema) > 0 else f"ki_home.{name}" + return KineticaVectorDBResourceService(name=self._collection_name, + client=self._client) + + @property + def collection_name(self): + return self._collection_name if self._collection_name is not None else None + + def has_store_object(self, name: str) -> bool: + """ + Check if a table exists in the Kinetica database. + + Parameters + ---------- + name : str + Name of the table to check. + + Returns + ------- + bool + True if the table exists, False otherwise. + """ + return self._client.has_table(name)["table_exists"] + + def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing.Any]): + """ + Create a table in the Kinetica database with the specified name and configuration. + If the table already exists, it can be overwritten if the `overwrite` parameter is set to True. + + Parameters + ---------- + name : str + Name of the table to be created. The name must be in the form 'schema_name.table_name'. + overwrite : bool, optional + If True, the Kinetica table will be overwritten if it already exists, by default False. + **kwargs : dict + Additional keyword arguments containing Kinetica `/create/table` options. + + Raises + ------ + GPUdbException + If the provided type schema configuration is empty. + """ + logger.debug("Creating Kinetica table: %s, overwrite=%s, kwargs=%s", name, overwrite, kwargs) + + table_type: list[list[str]] = kwargs.get("type", []) + if not self.has_store_object(name) and (table_type is None or len(table_type) == 0): + raise GPUdbException("Table must either be existing or a type must be given to create the table ...") + + options = kwargs.get("options", {}) + if len(options) == 0: + options['no_error_if_exists'] = 'true' + + if not self.has_store_object(name) or overwrite: + if overwrite and self.has_store_object(name): + self.drop(name) + + GPUdbTable(table_type, name, options=options, db=self._client) + + + def create_from_dataframe(self, + name: str, + df: DataFrameType, + overwrite: bool = False, + **kwargs: dict[str, typing.Any]) -> None: + """ + Create collections in the vector database. + + Parameters + ---------- + name : str + Name of the Kinetica table. + df : DataFrameType + The dataframe to create the Kinetica table from. + overwrite : bool, optional + Whether to overwrite the Kinetica table if it already exists. Default is False. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + """ + + GPUdbTable.from_df(df, self._client, name, clear_table=overwrite) + + def insert(self, name: str, data: list[list] | list[dict], **kwargs: dict[str, + typing.Any]) -> dict[str, typing.Any]: + """ + Insert a collection specific data in the Kinetica vector database. + + Parameters + ---------- + name : str + Name of the Kinetica table to be inserted. + data : list[list] | list[dict] + Data to be inserted in the Kinetica table. + **kwargs : dict[str, typing.Any] + Additional keyword arguments containing Kinetica table configuration. + + Returns + ------- + dict + Returns response content as a dictionary. + + Raises + ------ + RuntimeError + If the table not exists. + """ + + resource = self.load_resource(name) + return resource.insert(data, **kwargs) + + def insert_dataframe(self, name: str, df: DataFrameType, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Converts dataframe to rows and insert to a Kinetica table in the Kinetica vector database. + + Parameters + ---------- + name : str + Name of the Kinetica table to be inserted. + df : DataFrameType + Dataframe to be inserted in the Kinetica table. + **kwargs : dict[str, typing.Any] + Additional keyword arguments containing Kinetica table configuration. + + Returns + ------- + dict + Returns response content as a dictionary. + + Raises + ------ + RuntimeError + If the Kinetica table not exists. + """ + resource = self.load_resource(name) + + return resource.insert_dataframe(df=df, **kwargs) + + def query(self, name: str, query: str = None, **kwargs: dict[str, typing.Any]) -> typing.Any: + """ + Query data in a Kinetica table in the Kinetica vector database. + + This method performs a search operation in the specified Kinetica table/partition in the Kinetica vector database. + + Parameters + ---------- + name : str + Name of the Kinetica table to search within. + query : str + The search query, which is an SQL query. + **kwargs : dict + Additional keyword arguments for the search operation. + + Returns + ------- + typing.Any + The search result, which can vary depending on the query and options. + """ + + resource = self.load_resource(name) + + return resource.query(query, **kwargs) + + async def similarity_search(self, name: str, **kwargs: dict[str, typing.Any]) -> list[list[dict]]: + """ + Perform a similarity search within the Kinetica table. + + Parameters + ---------- + name : str + Name of the Kinetica table. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + list[dict] + Returns a list of dictionaries representing the results of the similarity search. + """ + + resource = self.load_resource(name) + + return resource.similarity_search(**kwargs) + + def update(self, name: str, data: list[typing.Any], **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Update data in the vector database. + + Parameters + ---------- + name : str + Name of the Kinetica table. + data : list[typing.Any] + Data to be updated in the Kinetica table. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to upsert operation. + + Returns + ------- + dict[str, typing.Any] + Returns result of the updated operation stats. + """ + + if not isinstance(data, list): + raise RuntimeError("Data is not of type list.") + + resource = self.load_resource(name) + + return resource.update(data=data, **kwargs) + + def delete(self, name: str, expr: str, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Delete vectors from the Kinetica table using expressions. + + Parameters + ---------- + name : str + Name of the Kinetica table. + expr : str + Delete expression. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict[str, typing.Any] + Returns result of the given keys that are delete from the Kinetica table. + """ + + resource = self.load_resource(name) + result = resource.delete(expr=expr, **kwargs) + + return result + + def retrieve_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> list[typing.Any]: + """ + Retrieve the inserted vectors using their primary keys from the Kinetica table. + + Parameters + ---------- + name : str + Name of the Kinetica table. + keys : int | str | list + Primary keys to get vectors for. Depending on pk_field type it can be int or str + or a list of either. + **kwargs : dict[str, typing.Any] + Additional keyword arguments for the retrieval operation. + + Returns + ------- + list[typing.Any] + Returns result rows of the given keys from the Kinetica table. + """ + + resource = self.load_resource(name) + + result = resource.retrieve_by_keys(keys=keys, **kwargs) + + return result + + def count(self, name: str, **kwargs: dict[str, typing.Any]) -> int: + """ + Returns number of rows/entities in the given Kinetica table. + + Parameters + ---------- + name : str + Name of the Kinetica table. + **kwargs : dict[str, typing.Any] + Additional keyword arguments for the count operation. + + Returns + ------- + int + Returns number of entities in the Kinetica table. + """ + resource = self.load_resource(name) + + return resource.count(**kwargs) + + def drop(self, name: str, **kwargs: dict[str, typing.Any]) -> None: + """ + Drop a table in the Kinetica database. + + This method allows you to drop a table in the Kinetica database. + + Parameters + ---------- + name : str + Name of the table + **kwargs : dict + Additional keyword arguments for specifying the type and partition name (if applicable). + + Notes on Expected Keyword Arguments: + ------------------------------------ + - 'schema' (str, optional): + Specifies the schema of the table to drop. Default 'ki_home' + + Raises + ------ + ValueError + If mandatory arguments are missing or if the provided 'Kinetica table' value is invalid. + """ + + logger.debug("Dropping Kinetica table: %s, kwargs=%s", name, kwargs) + + if self.has_store_object(name): + # schema = kwargs.get("schema", "ki_home") + try: + self._client.clear_table(name) + except GPUdbException as e: + raise ValueError(e.message) + + def describe(self, name: str, **kwargs: dict[str, typing.Any]) -> dict: + """ + Describe the Kinetica table in the vector database. + + Parameters + ---------- + name : str + Name of the Kinetica table. + **kwargs : dict[str, typing.Any] + Additional keyword arguments specific to the Kinetica vector database. + + Returns + ------- + dict + Returns Kinetica table information. + """ + + resource = self.load_resource(name) + + return resource.describe(**kwargs) + + def release_resource(self, name: str) -> None: + """ + Release a loaded resource from the memory. + + Parameters + ---------- + name : str + Name of the resource to release. + """ + pass + + def close(self) -> None: + """ + Close connection to the vector database. + """ + + pass + + def list_store_objects(self, **kwargs: dict[str, typing.Any]) -> list[str]: + """ + List existing resources in the vector database. + + Parameters + ---------- + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + list[str] + Returns available resouce names in the vector database. + """ + + pass + + def delete_by_keys(self, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> typing.Any: + """ + Delete vectors by keys from the resource. + + Parameters + ---------- + keys : int | str | list + Primary keys to delete vectors. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + typing.Any + Returns vectors of the given keys that are delete from the resource. + """ + pass diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py index b9478eca0..289128ffc 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py @@ -265,6 +265,7 @@ def __init__(self, name: str, client: "MilvusClient", truncate_long_strings: boo self._truncate_long_strings = truncate_long_strings self._collection.load() + print("Resource created ...") def _set_up_collection(self): """ @@ -310,21 +311,30 @@ def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) - Returns response content as a dictionary. """ # Ensure that there are no None values in the DataFrame entries. + print("##### 1") + print(self._fillna_fields_dict.items()) for field_name, dtype in self._fillna_fields_dict.items(): if dtype in (pymilvus.DataType.VARCHAR, pymilvus.DataType.STRING): + print("##### 1 1") df[field_name] = df[field_name].fillna("") elif dtype in (pymilvus.DataType.INT8, pymilvus.DataType.INT16, pymilvus.DataType.INT32, pymilvus.DataType.INT64): + print("##### 1 2") df[field_name] = df[field_name].fillna(0) elif dtype in (pymilvus.DataType.FLOAT, pymilvus.DataType.DOUBLE): + print("##### 1 3") df[field_name] = df[field_name].fillna(0.0) elif dtype == pymilvus.DataType.BOOL: + print("##### 1 4") df[field_name] = df[field_name].fillna(False) else: + print("##### 1 5") logger.info("Skipped checking 'None' in the field: %s, with datatype: %s", field_name, dtype) + print("##### 2") + needs_truncate = self._truncate_long_strings if needs_truncate and is_cudf_type(df): # Cudf specific optimization, we can avoid a costly call to truncate_string_cols_by_bytes if all of the @@ -333,6 +343,7 @@ def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) - # From the schema, this is the list of columns we need, excluding any auto_id columns column_names = [field.name for field in self._fields if not field.auto_id] + print(column_names) collection_df = df[column_names] if is_cudf_type(collection_df): @@ -342,6 +353,7 @@ def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) - truncate_string_cols_by_bytes(collection_df, self._fields_max_length, warn_on_truncate=True) # Note: dataframe columns has to be in the order of collection schema fields.s + print(collection_df.to_string(max_rows=3)) result = self._collection.insert(data=collection_df, **kwargs) self._collection.flush() @@ -854,6 +866,8 @@ def insert_dataframe(self, name: str, df: DataFrameType, **kwargs: dict[str, typ If the collection not exists exists. """ resource = self.load_resource(name) + print(name) + print(resource) return resource.insert_dataframe(df=df, **kwargs)