From 91d09daf592e516cbf9ebf46cf49f067b48a79bb Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Thu, 21 Nov 2024 19:16:02 +0530 Subject: [PATCH 01/29] WIP - test sample Milvus pipeline locally --- examples/llm/completion/pipeline.py | 4 + examples/sample_milvus_pipeline/pipeline.py | 86 +++++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 examples/sample_milvus_pipeline/pipeline.py diff --git a/examples/llm/completion/pipeline.py b/examples/llm/completion/pipeline.py index 4087ce9ca6..c057dba63b 100644 --- a/examples/llm/completion/pipeline.py +++ b/examples/llm/completion/pipeline.py @@ -37,6 +37,8 @@ from morpheus_llm.llm.task_handlers.simple_task_handler import SimpleTaskHandler from morpheus_llm.stages.llm.llm_engine_stage import LLMEngineStage +from morpheus_llm.stages.output.write_to_vector_db_stage import WriteToVectorDBStage + logger = logging.getLogger(__name__) @@ -127,6 +129,8 @@ def pipeline(use_cpu_only: bool, pipe.add_stage(MonitorStage(config, description="Inference rate", unit="req", delayed_start=True)) + pipe.add_stage(WriteToVectorDBStage()) + sink = pipe.add_stage(InMemorySinkStage(config)) start_time = time.time() diff --git a/examples/sample_milvus_pipeline/pipeline.py b/examples/sample_milvus_pipeline/pipeline.py new file mode 100644 index 0000000000..ca57adeb73 --- /dev/null +++ b/examples/sample_milvus_pipeline/pipeline.py @@ -0,0 +1,86 @@ +from morpheus.pipeline.pipeline import LinearPipeline +from morpheus.config import Config +from morpheus.stages.general.source_stage import SourceStage +from morpheus.stages.general.filter_stage import FilterStage +from morpheus.stages.general.serialize_stage import SerializeStage +from python.morpheus_llm.morpheus_llm.stages.output.write_to_vector_db_stage import WriteToVectorDBStage +from morpheus.utils.logging import configure_logging +from morpheus.utils.producer import IterableProducer +from morpheus.utils.type_support import numpy_to_cudf + +# Import Milvus services from Morpheus +from morpheus_llm.service.vdb.milvus_vector_db_service import MilvusVectorDBResourceService, MilvusVectorDBService + +import numpy as np +import json + +def main(): + # Step 1: Configure logging + configure_logging(log_level="INFO") + + # Step 2: Initialize Morpheus Config + config = Config() + + # Step 3: Setup Milvus services + milvus_resource_service = MilvusVectorDBResourceService(host="127.0.0.1", port="19530") + collection_name = "morpheus_vectors" + vector_dim = 3 # Example: 3-dimensional vector embeddings + + # Step 4: Initialize the Milvus VectorDBService + milvus_service = MilvusVectorDBService( + collection_name=collection_name, + dim=vector_dim, + resource_service=milvus_resource_service, + ) + + # Step 5: Create a pipeline + pipeline = LinearPipeline(config) + + # Step 6: Define source stage + def data_generator(): + for i in range(5): + embedding = np.random.random(vector_dim).tolist() + metadata = {"id": i, "label": f"example_{i}"} + yield {"embedding": embedding, "metadata": metadata} + + pipeline.add_stage( + SourceStage( + config=config, + source=IterableProducer(data_generator()) + ) + ) + + # Step 7: Add filter stage + pipeline.add_stage( + FilterStage( + config=config, + filter_func=lambda msg: msg["embedding"] is not None # Only process messages with valid embeddings + ) + ) + + # Step 8: Serialize stage + pipeline.add_stage( + SerializeStage( + config=config, + to_dict_func=lambda msg: { + "embedding": msg["embedding"], + "metadata": msg["metadata"] + } + ) + ) + + # Step 9: Add WriteToVectorDBStage for Milvus + pipeline.add_stage( + WriteToVectorDBStage( + config=config, + vdb_service=milvus_service, + embedding_field="embedding", + metadata_field="metadata" + ) + ) + + # Step 10: Execute the pipeline + pipeline.run() + +if __name__ == "__main__": + main() From 3ef2e0723f9c7396464c26f62d74ddc05aee1ea0 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Mon, 25 Nov 2024 12:50:08 +0530 Subject: [PATCH 02/29] WIP - test sample Milvus pipeline locally --- examples/sample_milvus_pipeline/pipeline.py | 119 ++++++++++++-------- 1 file changed, 74 insertions(+), 45 deletions(-) diff --git a/examples/sample_milvus_pipeline/pipeline.py b/examples/sample_milvus_pipeline/pipeline.py index ca57adeb73..32d88b4443 100644 --- a/examples/sample_milvus_pipeline/pipeline.py +++ b/examples/sample_milvus_pipeline/pipeline.py @@ -1,12 +1,17 @@ -from morpheus.pipeline.pipeline import LinearPipeline +import csv +import logging +import random + +from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.config import Config -from morpheus.stages.general.source_stage import SourceStage -from morpheus.stages.general.filter_stage import FilterStage -from morpheus.stages.general.serialize_stage import SerializeStage -from python.morpheus_llm.morpheus_llm.stages.output.write_to_vector_db_stage import WriteToVectorDBStage -from morpheus.utils.logging import configure_logging -from morpheus.utils.producer import IterableProducer -from morpheus.utils.type_support import numpy_to_cudf +from morpheus.stages.postprocess.serialize_stage import SerializeStage + +from morpheus.stages.input.in_memory_data_generation_stage import InMemoryDataGenStage + +from morpheus.stages.preprocess.drop_null_stage import DropNullStage +from morpheus_llm.stages.output.write_to_vector_db_stage import WriteToVectorDBStage +# from morpheus.utils.logging import configure_logging +# from morpheus.utils.type_support import numpy_to_cudf # Import Milvus services from Morpheus from morpheus_llm.service.vdb.milvus_vector_db_service import MilvusVectorDBResourceService, MilvusVectorDBService @@ -14,25 +19,63 @@ import numpy as np import json -def main(): +from morpheus.utils.logger import configure_logging + +from morpheus.stages.input.file_source_stage import FileSourceStage + +from morpheus.stages.output.write_to_file_stage import WriteToFileStage + +from morpheus.stages.preprocess.deserialize_stage import DeserializeStage + +from morpheus.config import ExecutionMode + +logger = logging.getLogger(__name__) + +def generate_csv(file_name, num_rows): + """ + Generates a CSV file with fields: + - PK: Primary Key (integer) + - metadata: Sample text metadata (string) + - vector: A random vector of 3 dimensions (array of floats) + + :param file_name: Name of the output CSV file + :param num_rows: Number of rows to generate + """ + with open(file_name, mode='w', newline='', encoding='utf-8') as csvfile: + fieldnames = ['id', 'metadata', 'embedding'] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + + # Write the header + writer.writeheader() + + for pk in range(1, num_rows + 1): + # Generate random metadata + metadata = f"Sample metadata for row {pk}" + + # Generate a random vector of 3 dimensions + vector = [round(random.uniform(-1.0, 1.0), 4) for _ in range(3)] + + # Write the row + writer.writerow({ + 'id': pk, + 'metadata': metadata, + 'embedding': vector + }) + + +def main(input_file_name: str): # Step 1: Configure logging - configure_logging(log_level="INFO") # Step 2: Initialize Morpheus Config config = Config() + config.execution_mode = ExecutionMode.GPU # Step 3: Setup Milvus services - milvus_resource_service = MilvusVectorDBResourceService(host="127.0.0.1", port="19530") - collection_name = "morpheus_vectors" + milvus_db_service = MilvusVectorDBService("http://127.0.0.1:19530") + milvus_resource_service = milvus_db_service.load_resource("test_collection") + collection_name = "test_collection" vector_dim = 3 # Example: 3-dimensional vector embeddings - # Step 4: Initialize the Milvus VectorDBService - milvus_service = MilvusVectorDBService( - collection_name=collection_name, - dim=vector_dim, - resource_service=milvus_resource_service, - ) - # Step 5: Create a pipeline pipeline = LinearPipeline(config) @@ -43,44 +86,30 @@ def data_generator(): metadata = {"id": i, "label": f"example_{i}"} yield {"embedding": embedding, "metadata": metadata} - pipeline.add_stage( - SourceStage( - config=config, - source=IterableProducer(data_generator()) - ) - ) + pipeline.set_source(FileSourceStage(config, filename=input_file_name)) - # Step 7: Add filter stage pipeline.add_stage( - FilterStage( - config=config, - filter_func=lambda msg: msg["embedding"] is not None # Only process messages with valid embeddings - ) + DropNullStage(config, "embedding") ) - # Step 8: Serialize stage - pipeline.add_stage( - SerializeStage( - config=config, - to_dict_func=lambda msg: { - "embedding": msg["embedding"], - "metadata": msg["metadata"] - } - ) - ) + # pipeline.add_stage(WriteToFileStage(config, filename="output_file.csv", overwrite=True)) + pipeline.add_stage(DeserializeStage(config)) # Step 9: Add WriteToVectorDBStage for Milvus pipeline.add_stage( WriteToVectorDBStage( - config=config, - vdb_service=milvus_service, - embedding_field="embedding", - metadata_field="metadata" + config, + milvus_db_service, + "test_collection" ) ) + pipeline.build() + # Step 10: Execute the pipeline pipeline.run() if __name__ == "__main__": - main() + file_name = "test.csv" + generate_csv(file_name, 1000) + main(file_name) From 20fd509ce23f9e4603eb6ddb807391bfbcaa6ab2 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Fri, 29 Nov 2024 13:53:10 +0530 Subject: [PATCH 03/29] WIP - Debug statements --- examples/sample_milvus_pipeline/pipeline.py | 66 +++++++++++++++---- .../modules/output/write_to_vector_db.py | 3 + 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/examples/sample_milvus_pipeline/pipeline.py b/examples/sample_milvus_pipeline/pipeline.py index 32d88b4443..367bbca536 100644 --- a/examples/sample_milvus_pipeline/pipeline.py +++ b/examples/sample_milvus_pipeline/pipeline.py @@ -5,11 +5,15 @@ from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.config import Config from morpheus.stages.postprocess.serialize_stage import SerializeStage +from morpheus.io.deserializers import read_file_to_df +from morpheus.utils.type_utils import exec_mode_to_df_type_str from morpheus.stages.input.in_memory_data_generation_stage import InMemoryDataGenStage from morpheus.stages.preprocess.drop_null_stage import DropNullStage from morpheus_llm.stages.output.write_to_vector_db_stage import WriteToVectorDBStage +from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage + # from morpheus.utils.logging import configure_logging # from morpheus.utils.type_support import numpy_to_cudf @@ -31,6 +35,37 @@ logger = logging.getLogger(__name__) +def generate_random_vector(dim=3): + """Generate a random vector of specified dimensions.""" + return [random.uniform(-1.0, 1.0) for _ in range(dim)] + + +def save_to_json_file(data, file_name="data.json"): + """Save data to a JSON file.""" + with open(file_name, "w") as json_file: + json.dump(data, json_file, indent=4) + print(f"JSON data saved to {file_name}") + + +def generate_json_records(collection_name, output_file, num_records=100): + """Generate a list of records to be saved in a JSON file.""" + data = [] + for pk in range(1, num_records + 1): + record = { + "id": pk, + "vector": generate_random_vector(), + "metadata": json.dumps({"description": f"Record {pk}", "category": random.choice(["A", "B", "C"])}) + } + data.append(record) + + json_data = { + "collectionName": collection_name, + "data": data + } + + return json_data + + def generate_csv(file_name, num_rows): """ Generates a CSV file with fields: @@ -42,11 +77,12 @@ def generate_csv(file_name, num_rows): :param num_rows: Number of rows to generate """ with open(file_name, mode='w', newline='', encoding='utf-8') as csvfile: - fieldnames = ['id', 'metadata', 'embedding'] + fieldnames = ['id', 'vector', 'metadata'] +# fieldnames = ['vector', 'metadata'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) # Write the header - writer.writeheader() +# writer.writeheader() for pk in range(1, num_rows + 1): # Generate random metadata @@ -58,8 +94,8 @@ def generate_csv(file_name, num_rows): # Write the row writer.writerow({ 'id': pk, - 'metadata': metadata, - 'embedding': vector + 'vector': vector, + 'metadata': metadata }) @@ -71,12 +107,14 @@ def main(input_file_name: str): config.execution_mode = ExecutionMode.GPU # Step 3: Setup Milvus services - milvus_db_service = MilvusVectorDBService("http://127.0.0.1:19530") + milvus_db_service = MilvusVectorDBService("https://in03-c87c25d216da0ac.serverless.gcp-us-west1.cloud.zilliz.com", user="db_c87c25d216da0ac", password="Cv3;^~HaY.>~>!)H", token="1c80242758bbfc207773c9a731421d9d96e269ac3ef41d87b40725f53795e1305489827dd310f0e55fb886ba0ea15898244de182") milvus_resource_service = milvus_db_service.load_resource("test_collection") collection_name = "test_collection" vector_dim = 3 # Example: 3-dimensional vector embeddings - # Step 5: Create a pipeline + source_df = read_file_to_df(input_file_name, df_type=exec_mode_to_df_type_str(config.execution_mode)) + print(source_df.shape[0]) +# Step 5: Create a pipeline pipeline = LinearPipeline(config) # Step 6: Define source stage @@ -86,15 +124,17 @@ def data_generator(): metadata = {"id": i, "label": f"example_{i}"} yield {"embedding": embedding, "metadata": metadata} - pipeline.set_source(FileSourceStage(config, filename=input_file_name)) +# pipeline.set_source(FileSourceStage(config, filename=input_file_name)) + pipeline.set_source(InMemorySourceStage(config, dataframes=[source_df])) - pipeline.add_stage( - DropNullStage(config, "embedding") - ) +# pipeline.add_stage( +# DropNullStage(config, "vector") +# ) # pipeline.add_stage(WriteToFileStage(config, filename="output_file.csv", overwrite=True)) - +# pipeline.add_stage(SerializeStage(config)) pipeline.add_stage(DeserializeStage(config)) + # Step 9: Add WriteToVectorDBStage for Milvus pipeline.add_stage( WriteToVectorDBStage( @@ -110,6 +150,10 @@ def data_generator(): pipeline.run() if __name__ == "__main__": +# file_name = "test.json" file_name = "test.csv" + collection_name = "test_collection" generate_csv(file_name, 1000) +# data = generate_json_records(collection_name, file_name) +# save_to_json_file(data, file_name) main(file_name) diff --git a/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py b/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py index c9528f3c78..e01bde596c 100644 --- a/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py +++ b/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py @@ -146,7 +146,10 @@ def on_completed(): if accum_stats.data: df_pkg = get_df_pkg_from_obj(accum_stats.data[0]) merged_df = df_pkg.concat(accum_stats.data) + print("Before Inerting data ...") + merged_df.info() service.insert_dataframe(name=key, df=merged_df) + print("After Inserting data") final_df_references.append(accum_stats.data) except Exception as e: logger.error("Unable to upload dataframe entries to vector database: %s", e) From a293fbd9103ede98f1ae891785b209427bba1026 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Fri, 29 Nov 2024 14:06:19 +0530 Subject: [PATCH 04/29] WIP - Debug statements --- .../morpheus_llm/modules/output/write_to_vector_db.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py b/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py index e01bde596c..926516cdf7 100644 --- a/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py +++ b/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py @@ -165,6 +165,8 @@ def extract_df(msg: ControlMessage): df = msg.payload().df if (msg.has_metadata("vdb_resource")): resource_name = msg.get_metadata("vdb_resource") + df.info() + print("Resource name = ", resource_name) else: resource_name = None else: From 43d98e5983f5670898e127c9de5c60cc173a41d9 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Fri, 29 Nov 2024 15:06:34 +0530 Subject: [PATCH 05/29] WIP - Debug statements --- .../morpheus_llm/modules/output/write_to_vector_db.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py b/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py index 926516cdf7..1e6e715e9a 100644 --- a/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py +++ b/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py @@ -165,8 +165,8 @@ def extract_df(msg: ControlMessage): df = msg.payload().df if (msg.has_metadata("vdb_resource")): resource_name = msg.get_metadata("vdb_resource") - df.info() - print("Resource name = ", resource_name) + df.info() + print("Resource name = ", resource_name) else: resource_name = None else: From 291f4bcedfbc5450020b97d635fd8a44d05ad56d Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Sat, 30 Nov 2024 07:41:15 +0530 Subject: [PATCH 06/29] WIP - Debug statements --- .../morpheus_llm/service/vdb/milvus_vector_db_service.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py index 5c3f020aea..1270417ce9 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py @@ -333,6 +333,7 @@ def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) - # From the schema, this is the list of columns we need, excluding any auto_id columns column_names = [field.name for field in self._fields if not field.auto_id] + print(column_names) collection_df = df[column_names] if is_cudf_type(collection_df): @@ -342,6 +343,7 @@ def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) - truncate_string_cols_by_bytes(collection_df, self._fields_max_length, warn_on_truncate=True) # Note: dataframe columns has to be in the order of collection schema fields.s + print(collection_df.to_string(max_rows=3)) result = self._collection.insert(data=collection_df, **kwargs) self._collection.flush() From 812e230ab9f58f4e4c338f5c4ddaeb1523b9fe52 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Sat, 30 Nov 2024 13:46:20 +0530 Subject: [PATCH 07/29] WIP - Debug statements --- .../morpheus_llm/service/vdb/milvus_vector_db_service.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py index 1270417ce9..0744ced29a 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py @@ -310,6 +310,7 @@ def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) - Returns response content as a dictionary. """ # Ensure that there are no None values in the DataFrame entries. + print("##### 1") for field_name, dtype in self._fillna_fields_dict.items(): if dtype in (pymilvus.DataType.VARCHAR, pymilvus.DataType.STRING): df[field_name] = df[field_name].fillna("") @@ -325,6 +326,8 @@ def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) - else: logger.info("Skipped checking 'None' in the field: %s, with datatype: %s", field_name, dtype) + print("##### 2") + needs_truncate = self._truncate_long_strings if needs_truncate and is_cudf_type(df): # Cudf specific optimization, we can avoid a costly call to truncate_string_cols_by_bytes if all of the From edd3641c44f0f5e6a4f2ed69b9bc67a57899d933 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Sat, 30 Nov 2024 14:26:21 +0530 Subject: [PATCH 08/29] WIP - Debug statements --- .../morpheus_llm/service/vdb/milvus_vector_db_service.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py index 0744ced29a..0a5a7736a1 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py @@ -265,6 +265,7 @@ def __init__(self, name: str, client: "MilvusClient", truncate_long_strings: boo self._truncate_long_strings = truncate_long_strings self._collection.load() + print("Resource created ...") def _set_up_collection(self): """ @@ -859,6 +860,8 @@ def insert_dataframe(self, name: str, df: DataFrameType, **kwargs: dict[str, typ If the collection not exists exists. """ resource = self.load_resource(name) + print(name) + print(resource) return resource.insert_dataframe(df=df, **kwargs) From bb1074db3f63b74010fb891652ec592698785e0f Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Sun, 1 Dec 2024 11:01:04 +0530 Subject: [PATCH 09/29] WIP - Debug statements --- .../morpheus_llm/service/vdb/milvus_vector_db_service.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py index 0a5a7736a1..352656a1e8 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/milvus_vector_db_service.py @@ -312,19 +312,25 @@ def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) - """ # Ensure that there are no None values in the DataFrame entries. print("##### 1") + print(self._fillna_fields_dict.items()) for field_name, dtype in self._fillna_fields_dict.items(): if dtype in (pymilvus.DataType.VARCHAR, pymilvus.DataType.STRING): + print("##### 1 1") df[field_name] = df[field_name].fillna("") elif dtype in (pymilvus.DataType.INT8, pymilvus.DataType.INT16, pymilvus.DataType.INT32, pymilvus.DataType.INT64): + print("##### 1 2") df[field_name] = df[field_name].fillna(0) elif dtype in (pymilvus.DataType.FLOAT, pymilvus.DataType.DOUBLE): + print("##### 1 3") df[field_name] = df[field_name].fillna(0.0) elif dtype == pymilvus.DataType.BOOL: + print("##### 1 4") df[field_name] = df[field_name].fillna(False) else: + print("##### 1 5") logger.info("Skipped checking 'None' in the field: %s, with datatype: %s", field_name, dtype) print("##### 2") From d7f66a7baf6506ac2228de2767655aeb4a267a82 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Thu, 5 Dec 2024 10:58:04 +0530 Subject: [PATCH 10/29] WIP - Implementation of Kinetica vector DB service. --- external/morpheus-visualizations | 2 +- external/utilities | 2 +- .../service/vdb/kinetica_vector_db_service.py | 887 ++++++++++++++++++ 3 files changed, 889 insertions(+), 2 deletions(-) create mode 100644 python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py diff --git a/external/morpheus-visualizations b/external/morpheus-visualizations index f69a1fa8f5..c83e22fc0b 160000 --- a/external/morpheus-visualizations +++ b/external/morpheus-visualizations @@ -1 +1 @@ -Subproject commit f69a1fa8f5977b02a70436d92febfd4db1e0ad4d +Subproject commit c83e22fc0be11a522d51ee79eb64b2d94d55ae2c diff --git a/external/utilities b/external/utilities index 87b33dd0b7..d0bf0272d0 160000 --- a/external/utilities +++ b/external/utilities @@ -1 +1 @@ -Subproject commit 87b33dd0b7fd3d7460742bc5ad13d77e0d722c3c +Subproject commit d0bf0272d0ba8e1ebc182bba3cdbc1d6798db97d diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py new file mode 100644 index 0000000000..824c71060b --- /dev/null +++ b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py @@ -0,0 +1,887 @@ +# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import enum +import json +import logging +import threading +import time +import typing +from collections import OrderedDict +from functools import wraps + +from gpudb import GPUdb, GPUdbTable, GPUdbRecordColumn, GPUdbRecordType, GPUdbException +import sqlparse +from sqlparse.sql import IdentifierList, Identifier, Token +from sqlparse.tokens import Keyword + +from morpheus.io.utils import cudf_string_cols_exceed_max_bytes +from morpheus.io.utils import truncate_string_cols_by_bytes +from morpheus.utils.type_aliases import DataFrameType +from morpheus.utils.type_utils import is_cudf_type +from morpheus_llm.error import IMPORT_ERROR_MESSAGE +from morpheus_llm.service.vdb.vector_db_service import VectorDBResourceService +from morpheus_llm.service.vdb.vector_db_service import VectorDBService + +logger = logging.getLogger(__name__) + +IMPORT_EXCEPTION = None + +try: + import gpudb + +except ImportError as import_exc: + IMPORT_EXCEPTION = import_exc + + +class DistanceStrategy(str, enum.Enum): + """Enumerator of the Distance strategies.""" + + EUCLIDEAN = "l2" + COSINE = "cosine" + MAX_INNER_PRODUCT = "inner" + +class Dimension(int, enum.Enum): + """Some default dimensions for known embeddings.""" + + OPENAI = 1536 + + +DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.EUCLIDEAN + +class KineticaVectorDBResourceService(VectorDBResourceService): + """ + Represents a service for managing resources in a Kinetica Vector Database. + + Parameters + ---------- + name : str + Name of the Kinetica table. Must be an existing table + schema : str + Name of the Kinetica schema. Must be an existing schema + client : GPUdb instance + An instance of the GPUdb class for interaction with the Kinetica Vector Database. + """ + + def __init__(self, name: str, schema: str, client: "GPUdb") -> None: + if IMPORT_EXCEPTION is not None: + raise ImportError(IMPORT_ERROR_MESSAGE.format(package='pyKinetica')) from IMPORT_EXCEPTION + + super().__init__() + + self._name = name + self._schema = schema + self._client = client + + self._collection_name = f"{self._schema}.{self._name}" + self._collection = GPUdbTable(name=self._collection_name, db=client) + self._record_type = self._collection.get_table_type() + self._fields: list[GPUdbRecordColumn] = self._record_type.columns + + self._vector_field = None + self._fillna_fields_dict = {} + + # Mapping of field name to max length for string fields + self._fields_max_length: dict[str, int] = {} + + for field in self._fields: + if field.is_vector(): + self._vector_field = field.name + else: + if not field.column_properties[1] == "primary_key": + self._fillna_fields_dict[field.name] = field.column_type + + + def insert(self, data: list[list] | list[dict], **kwargs: dict[str, typing.Any]) -> dict: + """ + Insert data into the vector database. + + Parameters + ---------- + data : list[list] | list[dict] + Data to be inserted into the Kinetica table. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict + Returns response content as a dictionary. + """ + options = kwargs.get( "options", None ) + if options is not None: # if given, remove from kwargs + kwargs.pop( "options" ) + else: # no option given; use an empty dict + options = {} + + result = self._collection.insert_records(data, options=options) + + return self._insert_result_to_dict(result=result) + + def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) -> dict: + """ + Insert a dataframe entires into the vector database. + + Parameters + ---------- + df : DataFrameType + Dataframe to be inserted into the Kinetica table. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict + Returns response content as a dictionary. + """ + # From the schema, this is the list of columns we need + column_names = [field.name for field in self._fields] + + collection_df = df[column_names] + if is_cudf_type(collection_df): + collection_df = collection_df.to_pandas() + + # Note: dataframe columns has to be in the order of Kinetica table schema fields.s + result = self._collection.insert_df(collection_df) + + return self._insert_result_to_dict(result=result) + + def describe(self, **kwargs: dict[str, typing.Any]) -> dict: + """ + Provides a description of the Kinetica table. + + Parameters + ---------- + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict + Returns response content as a dictionary. + """ + table1 = GPUdbTable(db=self._client, name=self._name) + record_type: GPUdbRecordType = table1.get_table_type() + print(type(record_type)) + + col: GPUdbRecordColumn = None + description: dict[str, object] = {} + + for col in record_type.columns: + description[col.name] = (col.column_type, col.is_nullable, col.is_vector(), col.column_properties) + + return description + + def query(self, query: str, **kwargs: dict[str, typing.Any]) -> typing.Any: + """ + Query data in a table in the Kinetica database. + + This method performs a search operation in the specified table in the Kinetica database. + + Parameters + ---------- + query : str, optional + The search query, which is an SQL query. + **kwargs : dict + Additional keyword arguments for the search operation. + + Returns + ------- + typing.Any + The search result (a GPUdbSqlIterator object) + + Raises + ------ + GPUdbException + If an error occurs during the search operation. + """ + + if query is None: + raise GPUdbException("'query' - a valid SQL query statement must be given ...") + + logger.debug("Searching in Kinetica table: %s, query=%s, kwargs=%s", self._name, query, kwargs) + batch_size = kwargs.get("batch_size", 5000) + sql_params = kwargs.get("sql_params", []) + sql_opts = kwargs.get("sql_opts", {}) + return self._client.query(query, batch_size, sql_params, sql_opts) + + def _extract_projection_fields(self, parsed_tokens): + """ + Recursively extracts projection fields from the SQL tokens. + + Parameters: + parsed_tokens (list): List of tokens from a parsed SQL query. + + Returns: + list: A list of field names in the projection list. + """ + fields = [] + for token in parsed_tokens: + if isinstance(token, IdentifierList): + # Multiple fields in the projection list + for identifier in token.get_identifiers(): + fields.append(identifier.get_real_name()) + elif isinstance(token, Identifier): + # Single field + fields.append(token.get_real_name()) + elif token.ttype is Keyword and token.value.upper() in ["SELECT"]: + # Skip the SELECT keyword + continue + elif token.is_group: + # Handle nested subqueries + fields.extend(self._extract_projection_fields(token.tokens)) + return fields + + def _is_field_in_projection(self, sql_statement, field_name): + """ + Check whether a given field name is present in the projection list of an SQL statement. + + Parameters: + sql_statement (str): The SQL query to parse. + field_name (str): The field name to search for. + + Returns: + bool: True if the field name is present in the projection list, False otherwise. + """ + parsed = sqlparse.parse(sql_statement) + for stmt in parsed: + if stmt.get_type() == "SELECT": + # Extract projection fields + projection_fields = self._extract_projection_fields(stmt.tokens) + # Check if the field is in the projection list + return field_name in projection_fields + return False + + def __query_collection( + self, + embedding: list[float], + output_fields: list[str], + k: int = 4, + filter: dict[str, str] = None, + ) -> dict: + """Query the Kinetica table.""" + # if filter is not None: + # filter_clauses = [] + # for key, value in filter.items(): + # IN = "in" + # if isinstance(value, dict) and IN in map(str.lower, value): + # value_case_insensitive = { + # k.lower(): v for k, v in value.items() + # } + # filter_by_metadata = self.EmbeddingStore.cmetadata[ + # key + # ].astext.in_(value_case_insensitive[IN]) + # filter_clauses.append(filter_by_metadata) + # else: + # filter_by_metadata = self.EmbeddingStore.cmetadata[ + # key + # ].astext == str(value) + # filter_clauses.append(filter_by_metadata) + + json_filter = json.dumps(filter) if filter is not None else None + where_clause = ( + f" where '{json_filter}' = JSON(metadata) " + if json_filter is not None + else "" + ) + + embedding_str = "[" + ",".join([str(x) for x in embedding]) + "]" + + dist_strategy = DEFAULT_DISTANCE_STRATEGY + + query_string = f""" + SELECT {', '.join(output_fields)}, {dist_strategy}(embedding, '{embedding_str}') + as distance, {self._vector_field} + FROM {self._collection_name} + {where_clause} + ORDER BY distance asc NULLS LAST + LIMIT {k} + """ + + self.logger.debug(query_string) + resp = self._client.execute_sql_and_decode(query_string) + self.logger.debug(resp) + return resp + + def similarity_search_by_vector( + self, + embedding: list[float], + output_fields: list[str], + k: int = 4, + filter: dict = None, + **kwargs: typing.Any, + ) -> list[dict]: + """Return docs most similar to embedding vector. + + Args: + embedding: Embedding to look up documents similar to. + k: Number of Documents to return. Defaults to 4. + filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. + + Returns: + List of records most similar to the query vector. + """ + docs = self.similarity_search_with_score_by_vector( + embedding=embedding, output_fields=output_fields, k=k, filter=filter + ) + return docs + + def similarity_search_with_score_by_vector( + self, + embedding: list[float], + output_fields: list[str], + k: int = 4, + filter: dict = None, + ) -> list[dict]: + + resp: dict = self.__query_collection(embedding, k, filter) + if resp and resp["status_info"]["status"] == "OK" and "records" in resp: + records: OrderedDict = resp["records"] + return [records] + + self.logger.error(resp["status_info"]["message"]) + return [] + + + async def similarity_search(self, + embeddings: list[list[float]], + k: int = 4, + **kwargs: dict[str, typing.Any]) -> list[list[dict]]: + """ + Perform a similarity search within the Kinetica table. + + Parameters + ---------- + embeddings : list[list[float]] + Embeddings for which to perform the similarity search. + k : int, optional + The number of nearest neighbors to return, by default 4. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + list[list[dict]] + Returns a list of 'list of dictionaries' representing the results of the similarity search. + """ + + assert self._vector_field is not None, "Cannot perform similarity search on a table without a vector field" + + # Determine result metadata fields. + output_fields = [x.name for x in self._fields if x.name != self._vector_field] + search_filter = kwargs.get("filter", "") + + results: list[list[dict]] = [self.similarity_search_by_vector( + embedding=embedding, + output_fields=output_fields, + k=k, + filter=search_filter, + ) for embedding in embeddings] + + return results + + def update(self, data: list[typing.Any], **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Update data in the Kinetica table. + + Parameters + ---------- + data : list[typing.Any] + Data to be updated in the Kinetica table. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to upsert operation. + + Returns + ------- + dict[str, typing.Any] + Returns result of the updated operation stats. + """ + + if not isinstance(data, list): + raise RuntimeError("Data is not of type list.") + + options = kwargs.get( "options", None ) + if options is not None: # if given, remove from kwargs + kwargs.pop( "options" ) + else: # no option given; use an empty dict + options = {} + + expressions = kwargs.get( "expressions", [] ) + if expressions is not None: # if given, remove from kwargs + kwargs.pop( "expressions" ) + else: # no option given; use an empty dict + raise GPUdbException("Update expression must be given ...") + + new_values_maps = kwargs.get( "new_values_maps", None ) + if new_values_maps is not None: # if given, remove from kwargs + kwargs.pop( "new_values_maps" ) + else: # no option given; use an empty dict + raise GPUdbException("'new_values_maps' must be given ...") + + if len(expressions) != len(new_values_maps): + raise GPUdbException("'expression' and 'new_value_maps' must have the same number of elements") + + records_to_insert = kwargs.get("records_to_insert", []) + records_to_insert_str = kwargs.get("records_to_insert_str", []) + + result = self._collection.update_records(expressions, new_values_maps, records_to_insert, records_to_insert_str, options=options) + + return self._update_delete_result_to_dict(result=result) + + def delete(self, expr: str, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Delete vectors from the Kinetica table using expressions. + + Parameters + ---------- + expr : str + Delete expression. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict[str, typing.Any] + Returns result of the given keys that are deleted from the Kinetica table. + """ + + result = self._collection.delete_records(expressions=[expr], **kwargs) + + return self._update_delete_result_to_dict(result=result) + + def retrieve_by_keys(self, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> list[typing.Any]: + """ + Retrieve the inserted vectors using their primary keys. + + Parameters + ---------- + keys : int | str | list + Primary keys to get vectors for. Depending on pk_field type it can be int or str + or a list of either. + **kwargs : dict[str, typing.Any] + Additional keyword arguments for the retrieval operation. + + Returns + ------- + list[typing.Any] + Returns result rows of the given keys from the Kinetica table. + """ + + result = None + expression = kwargs.get("expression", "") + options = kwargs.get("options", {}) + try: + result = self._collection.get_records_by_key(keys, expression, options) + except GPUdbException as exec_info: + raise RuntimeError(f"Unable to perform search: {exec_info}") from exec_info + + return result["records"] + + def count(self, **kwargs: dict[str, typing.Any]) -> int: + """ + Returns number of rows/entities. + + Parameters + ---------- + **kwargs : dict[str, typing.Any] + Additional keyword arguments for the count operation. + + Returns + ------- + int + Returns number of entities in the Kinetica table. + """ + return self._collection.count + + def drop(self, **kwargs: dict[str, typing.Any]) -> None: + """ + Drop a Kinetica table, index, or partition in the Kinetica vector database. + + This function allows you to drop a Kinetica table. + + Parameters + ---------- + **kwargs : dict + Additional keyword arguments for specifying the type and partition name (if applicable). + """ + + self._client.clear_table(self._collection_name) + + def _insert_result_to_dict(self, result: "GPUdbTable") -> dict[str, typing.Any]: + result_dict = { + "count": result.count, + } + return result_dict + + def _update_delete_result_to_dict(self, result: "MutationResult") -> dict[str, typing.Any]: + result_dict = { + "count_deleted": result["count_deleted"], + "counts_updated": result["counts_deleted"], + "info": result["info"], + } + return result_dict + + +class KineticaVectorDBService(VectorDBService): + """ + Service class for Kinetica Database implementation. This class provides functions for interacting + with a Kinetica database. + + Parameters + ---------- + host : str + The hostname or IP address of the Kinetica server. + port : str + The port number for connecting to the Kinetica server. + alias : str, optional + Alias for the Kinetica connection, by default "default". + """ + + def __init__(self, + uri: str, + user: str = "", + password: str = "", + db_name: str = "", + token: str = ""): + + self._client = GPUdb(host=uri, username=user, password=password, db_name=db_name, token=token) + + def load_resource(self, name: str, **kwargs: dict[str, typing.Any]) -> KineticaVectorDBResourceService: + return KineticaVectorDBResourceService(name=name, + client=self._client) + + def has_store_object(self, name: str) -> bool: + """ + Check if a table exists in the Kinetica database. + + Parameters + ---------- + name : str + Name of the table to check. + + Returns + ------- + bool + True if the table exists, False otherwise. + """ + return self._client.has_table(name) + + def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing.Any]): + """ + Create a table in the Kinetica database with the specified name and configuration. + If the table already exists, it can be overwritten if the `overwrite` parameter is set to True. + + Parameters + ---------- + name : str + Name of the table to be created. The name must be in the form 'schema_name.table_name'. + overwrite : bool, optional + If True, the Kinetica table will be overwritten if it already exists, by default False. + **kwargs : dict + Additional keyword arguments containing Kinetica table configuration. + + Raises + ------ + ValueError + If the provided schema fields configuration is empty. + """ + logger.debug("Creating Kinetica table: %s, overwrite=%s, kwargs=%s", name, overwrite, kwargs) + + table_type: list[list[str]] = kwargs.get("type", []) + if not self.has_store_object(name) and (table_type is None or len(table_type) == 0): + raise GPUdbException("Table must either be existing or a type ust be given to create the table ...") + + options = kwargs.get("options", {}) + + if not self.has_store_object(name) or overwrite: + if overwrite and self.has_store_object(name): + self.drop(name) + + GPUdbTable(table_type, name, options=options) + + + def create_from_dataframe(self, + name: str, + df: DataFrameType, + overwrite: bool = False, + **kwargs: dict[str, typing.Any]) -> None: + """ + Create collections in the vector database. + + Parameters + ---------- + name : str + Name of the Kinetica table. + df : DataFrameType + The dataframe to create the Kinetica table from. + overwrite : bool, optional + Whether to overwrite the Kinetica table if it already exists. Default is False. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + """ + + GPUdbTable.from_df(df, self._client, name, clear_table=overwrite) + + def insert(self, name: str, data: list[list] | list[dict], **kwargs: dict[str, + typing.Any]) -> dict[str, typing.Any]: + """ + Insert a collection specific data in the Kinetica vector database. + + Parameters + ---------- + name : str + Name of the Kinetica table to be inserted. + data : list[list] | list[dict] + Data to be inserted in the Kinetica table. + **kwargs : dict[str, typing.Any] + Additional keyword arguments containing Kinetica table configuration. + + Returns + ------- + dict + Returns response content as a dictionary. + + Raises + ------ + RuntimeError + If the table not exists. + """ + + resource = self.load_resource(name) + return resource.insert(data, **kwargs) + + def insert_dataframe(self, name: str, df: DataFrameType, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Converts dataframe to rows and insert to a Kinetica table in the Kinetica vector database. + + Parameters + ---------- + name : str + Name of the Kinetica table to be inserted. + df : DataFrameType + Dataframe to be inserted in the Kinetica table. + **kwargs : dict[str, typing.Any] + Additional keyword arguments containing Kinetica table configuration. + + Returns + ------- + dict + Returns response content as a dictionary. + + Raises + ------ + RuntimeError + If the Kinetica table not exists. + """ + resource = self.load_resource(name) + + return resource.insert_dataframe(df=df, **kwargs) + + def query(self, name: str, query: str = None, **kwargs: dict[str, typing.Any]) -> typing.Any: + """ + Query data in a Kinetica table in the Kinetica vector database. + + This method performs a search operation in the specified Kinetica table/partition in the Kinetica vector database. + + Parameters + ---------- + name : str + Name of the Kinetica table to search within. + query : str + The search query, which can be a filter expression. + **kwargs : dict + Additional keyword arguments for the search operation. + + Returns + ------- + typing.Any + The search result, which can vary depending on the query and options. + """ + + resource = self.load_resource(name) + + return resource.query(query, **kwargs) + + async def similarity_search(self, name: str, **kwargs: dict[str, typing.Any]) -> list[list[dict]]: + """ + Perform a similarity search within the Kinetica table. + + Parameters + ---------- + name : str + Name of the Kinetica table. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + list[dict] + Returns a list of dictionaries representing the results of the similarity search. + """ + + resource = self.load_resource(name) + + return resource.similarity_search(**kwargs) + + def update(self, name: str, data: list[typing.Any], **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Update data in the vector database. + + Parameters + ---------- + name : str + Name of the Kinetica table. + data : list[typing.Any] + Data to be updated in the Kinetica table. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to upsert operation. + + Returns + ------- + dict[str, typing.Any] + Returns result of the updated operation stats. + """ + + if not isinstance(data, list): + raise RuntimeError("Data is not of type list.") + + resource = self.load_resource(name) + + return resource.update(data=data, **kwargs) + + def delete(self, name: str, expr: str, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Delete vectors from the Kinetica table using expressions. + + Parameters + ---------- + name : str + Name of the Kinetica table. + expr : str + Delete expression. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict[str, typing.Any] + Returns result of the given keys that are delete from the Kinetica table. + """ + + resource = self.load_resource(name) + result = resource.delete(expr=expr, **kwargs) + + return result + + def retrieve_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> list[typing.Any]: + """ + Retrieve the inserted vectors using their primary keys from the Kinetica table. + + Parameters + ---------- + name : str + Name of the Kinetica table. + keys : int | str | list + Primary keys to get vectors for. Depending on pk_field type it can be int or str + or a list of either. + **kwargs : dict[str, typing.Any] + Additional keyword arguments for the retrieval operation. + + Returns + ------- + list[typing.Any] + Returns result rows of the given keys from the Kinetica table. + """ + + resource = self.load_resource(name) + + result = resource.retrieve_by_keys(keys=keys, **kwargs) + + return result + + def count(self, name: str, **kwargs: dict[str, typing.Any]) -> int: + """ + Returns number of rows/entities in the given Kinetica table. + + Parameters + ---------- + name : str + Name of the Kinetica table. + **kwargs : dict[str, typing.Any] + Additional keyword arguments for the count operation. + + Returns + ------- + int + Returns number of entities in the Kinetica table. + """ + resource = self.load_resource(name) + + return resource.count(**kwargs) + + def drop(self, name: str, **kwargs: dict[str, typing.Any]) -> None: + """ + Drop a table in the Kinetica database. + + This method allows you to drop a table in the Kinetica database. + + Parameters + ---------- + name : str + Name of the table + **kwargs : dict + Additional keyword arguments for specifying the type and partition name (if applicable). + + Notes on Expected Keyword Arguments: + ------------------------------------ + - 'schema' (str, optional): + Specifies the schema of the table to drop. Default 'ki_home' + + Raises + ------ + ValueError + If mandatory arguments are missing or if the provided 'Kinetica table' value is invalid. + """ + + logger.debug("Dropping Kinetica table: %s, kwargs=%s", name, kwargs) + + if self.has_store_object(name): + schema = kwargs.get("schema", "ki_home") + try: + self._client.clear_table(name) + except GPUdbException as e: + raise ValueError(e.message) + + def describe(self, name: str, **kwargs: dict[str, typing.Any]) -> dict: + """ + Describe the Kinetica table in the vector database. + + Parameters + ---------- + name : str + Name of the Kinetica table. + **kwargs : dict[str, typing.Any] + Additional keyword arguments specific to the Kinetica vector database. + + Returns + ------- + dict + Returns Kinetica table information. + """ + + resource = self.load_resource(name) + + return resource.describe(**kwargs) + From fdd2347330bbe1b13199765c1caa56b086e5e8b0 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Fri, 6 Dec 2024 17:30:05 +0530 Subject: [PATCH 11/29] WIP - Implementation of Kinetica vector DB service. --- dependencies.yaml | 1 + .../morpheus_llm/service/vdb/kinetica_vector_db_service.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/dependencies.yaml b/dependencies.yaml index 43484fde9a..40b84a77a3 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -401,6 +401,7 @@ dependencies: - databricks-connect - milvus==2.3.5 # update to match pymilvus when available - pymilvus==2.3.6 + - gpudb>=7.2.2.3 - torch==2.4.0+cu124 test_python_morpheus: diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py index 824c71060b..04584e480e 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py @@ -77,7 +77,7 @@ class KineticaVectorDBResourceService(VectorDBResourceService): def __init__(self, name: str, schema: str, client: "GPUdb") -> None: if IMPORT_EXCEPTION is not None: - raise ImportError(IMPORT_ERROR_MESSAGE.format(package='pyKinetica')) from IMPORT_EXCEPTION + raise ImportError(IMPORT_ERROR_MESSAGE.format(package='gpudb')) from IMPORT_EXCEPTION super().__init__() @@ -525,7 +525,7 @@ def _insert_result_to_dict(self, result: "GPUdbTable") -> dict[str, typing.Any]: } return result_dict - def _update_delete_result_to_dict(self, result: "MutationResult") -> dict[str, typing.Any]: + def _update_delete_result_to_dict(self, result) -> dict[str, typing.Any]: result_dict = { "count_deleted": result["count_deleted"], "counts_updated": result["counts_deleted"], From c9ede22f7fb71d48601606f0ce957aa2a69ca30d Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Fri, 6 Dec 2024 18:38:32 +0530 Subject: [PATCH 12/29] WIP - Implementation of Kinetica vector DB service. --- python/morpheus_llm/morpheus_llm/requirements_morpheus_llm.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/python/morpheus_llm/morpheus_llm/requirements_morpheus_llm.txt b/python/morpheus_llm/morpheus_llm/requirements_morpheus_llm.txt index 8f9a9620b9..d2367685ba 100644 --- a/python/morpheus_llm/morpheus_llm/requirements_morpheus_llm.txt +++ b/python/morpheus_llm/morpheus_llm/requirements_morpheus_llm.txt @@ -8,4 +8,5 @@ langchain==0.1.16 milvus==2.3.5 nemollm==0.3.5 pymilvus==2.3.6 +gpudb>=7.2.2.3 torch==2.4.0+cu124 From 2b3d8c054afd2c3427b6dc13e83a5aaac0b48e0b Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Fri, 6 Dec 2024 19:40:52 +0530 Subject: [PATCH 13/29] WIP - Implementation of Kinetica vector DB service. --- conda/environments/all_cuda-125_arch-x86_64.yaml | 1 + conda/environments/dev_cuda-125_arch-x86_64.yaml | 1 + conda/environments/examples_cuda-125_arch-x86_64.yaml | 1 + conda/environments/runtime_cuda-125_arch-x86_64.yaml | 1 + 4 files changed, 4 insertions(+) diff --git a/conda/environments/all_cuda-125_arch-x86_64.yaml b/conda/environments/all_cuda-125_arch-x86_64.yaml index 6f4c5c12e9..f40be3368e 100644 --- a/conda/environments/all_cuda-125_arch-x86_64.yaml +++ b/conda/environments/all_cuda-125_arch-x86_64.yaml @@ -139,6 +139,7 @@ dependencies: - milvus==2.3.5 - nemollm==0.3.5 - pymilvus==2.3.6 + - gpudb>=7.2.2.3 - pytest-kafka==0.6.0 - python-logging-loki - sentence-transformers==2.7 diff --git a/conda/environments/dev_cuda-125_arch-x86_64.yaml b/conda/environments/dev_cuda-125_arch-x86_64.yaml index a4ff228a9c..5f5047ba12 100644 --- a/conda/environments/dev_cuda-125_arch-x86_64.yaml +++ b/conda/environments/dev_cuda-125_arch-x86_64.yaml @@ -111,6 +111,7 @@ dependencies: - databricks-connect - milvus==2.3.5 - pymilvus==2.3.6 + - gpudb>=7.2.2.3 - pytest-kafka==0.6.0 - torch==2.4.0+cu124 name: dev_cuda-125_arch-x86_64 diff --git a/conda/environments/examples_cuda-125_arch-x86_64.yaml b/conda/environments/examples_cuda-125_arch-x86_64.yaml index 0499d1690f..a3da1bd23f 100644 --- a/conda/environments/examples_cuda-125_arch-x86_64.yaml +++ b/conda/environments/examples_cuda-125_arch-x86_64.yaml @@ -77,6 +77,7 @@ dependencies: - milvus==2.3.5 - nemollm==0.3.5 - pymilvus==2.3.6 + - gpudb>=7.2.2.3 - python-logging-loki - sentence-transformers==2.7 - torch==2.4.0+cu124 diff --git a/conda/environments/runtime_cuda-125_arch-x86_64.yaml b/conda/environments/runtime_cuda-125_arch-x86_64.yaml index edfa5b103e..b908313ec6 100644 --- a/conda/environments/runtime_cuda-125_arch-x86_64.yaml +++ b/conda/environments/runtime_cuda-125_arch-x86_64.yaml @@ -52,5 +52,6 @@ dependencies: - databricks-connect - milvus==2.3.5 - pymilvus==2.3.6 + - gpudb>=7.2.2.3 - torch==2.4.0+cu124 name: runtime_cuda-125_arch-x86_64 From 09f5d8517f100057ddae006fe4c2183bd8855d82 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Mon, 16 Dec 2024 19:45:46 +0530 Subject: [PATCH 14/29] WIP - Implementation of Kinetica vector DB service. Added sample usage. --- .../kinetica_pipeline_example.py | 127 ++++++++++++++++++ .../service/vdb/kinetica_vector_db_service.py | 12 +- 2 files changed, 134 insertions(+), 5 deletions(-) create mode 100644 examples/sample_kinetica_pipeline/kinetica_pipeline_example.py diff --git a/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py b/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py new file mode 100644 index 0000000000..a2815ecddf --- /dev/null +++ b/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py @@ -0,0 +1,127 @@ +import csv +import logging +import random +import cudf + +from morpheus.pipeline.linear_pipeline import LinearPipeline +from morpheus.config import Config +from morpheus.stages.postprocess.serialize_stage import SerializeStage +from morpheus.io.deserializers import read_file_to_df +from morpheus.utils.type_utils import exec_mode_to_df_type_str + +from morpheus.stages.input.in_memory_data_generation_stage import InMemoryDataGenStage + +from morpheus.stages.preprocess.drop_null_stage import DropNullStage +from morpheus_llm.stages.output.write_to_vector_db_stage import WriteToVectorDBStage +from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage + +# from morpheus.utils.logging import configure_logging +# from morpheus.utils.type_support import numpy_to_cudf + +# Import Milvus services from Morpheus +from morpheus_llm.service.vdb.milvus_vector_db_service import MilvusVectorDBResourceService, MilvusVectorDBService +from morpheus_llm.service.vdb.kinetica_vector_db_service import KineticaVectorDBResourceService, KineticaVectorDBService + +import numpy as np +import json + +from morpheus.utils.logger import configure_logging + +from morpheus.stages.input.file_source_stage import FileSourceStage + +from morpheus.stages.output.write_to_file_stage import WriteToFileStage + +from morpheus.stages.preprocess.deserialize_stage import DeserializeStage + +from morpheus.config import ExecutionMode + +logger = logging.getLogger(__name__) + +def generate_random_vector(dim=3): + """Generate a random vector of specified dimensions.""" + return [random.uniform(-1.0, 1.0) for _ in range(dim)] + + +def generate_csv(file_path, num_records=10): + """ + Generate a CSV file with the specified format. + + Parameters: + file_path (str): Path to the output CSV file. + num_records (int): Number of records to generate. + """ + records = [] + + for i in range(1, num_records + 1): + vector = generate_random_vector() + metadata = json.dumps({"metadata": f"Sample metadata for row {i}"}) + records.append([i, str(vector), metadata]) + + # Write records to CSV file + with open(file_path, mode='w', newline='') as csv_file: + writer = csv.writer(csv_file) + # Write header + writer.writerow(["id", "embeddings", "metadata"]) + # Write data + writer.writerows(records) + + print(f"CSV file '{file_path}' with {num_records} records generated successfully.") + + +def get_test_df(num_input_rows): + df = cudf.DataFrame({ + "id": list(range(num_input_rows)), + "age": [random.randint(20, 40) for i in range(num_input_rows)], + "embedding": [[random.random() for _ in range(3)] for _ in range(num_input_rows)] + }) + + return df + + +def main(input_file_name: str): + # Step 1: Configure logging + + # Step 2: Initialize Morpheus Config + config = Config() + config.execution_mode = ExecutionMode.GPU + + # milvus_db_service = MilvusVectorDBService("https://in03-c87c25d216da0ac.serverless.gcp-us-west1.cloud.zilliz.com", user="db_c87c25d216da0ac", password="Cv3;^~HaY.>~>!)H", token="1c80242758bbfc207773c9a731421d9d96e269ac3ef41d87b40725f53795e1305489827dd310f0e55fb886ba0ea15898244de182") + kinetica_db_service = KineticaVectorDBService("http://demo72.kinetica.com/_gpudb", user="amukherjee", password="Kinetica1!") + # milvus_resource_service = milvus_db_service.load_resource("test_collection") + kinetica_resource_service = kinetica_db_service.load_resource("test_collection") + collection_name = "test_collection" + vector_dim = 3 # Example: 3-dimensional vector embeddings + + source_df = read_file_to_df(input_file_name, df_type=exec_mode_to_df_type_str(config.execution_mode)) + print(source_df.shape[0]) + + # Step 1: Create a pipeline + pipeline = LinearPipeline(config) + + # # Step 6: Define source stage + # def data_generator(): + # for i in range(5): + # embedding = np.random.random(vector_dim).tolist() + # metadata = {"id": i, "label": f"example_{i}"} + # yield {"embedding": embedding, "metadata": metadata} + + pipeline.set_source(InMemorySourceStage(config, dataframes=[source_df])) + + pipeline.add_stage(DeserializeStage(config)) + + pipeline.add_stage( + WriteToVectorDBStage( + config, + kinetica_db_service, + "test_collection" + ) + ) + + pipeline.build() + + pipeline.run() + +if __name__ == "__main__": + file_name = "test.csv" + generate_csv(file_name) + main(file_name) diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py index 04584e480e..1ebe25fdea 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py @@ -576,7 +576,7 @@ def has_store_object(self, name: str) -> bool: bool True if the table exists, False otherwise. """ - return self._client.has_table(name) + return self._client.has_table(name)["table_exists"] def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing.Any]): """ @@ -590,20 +590,22 @@ def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing. overwrite : bool, optional If True, the Kinetica table will be overwritten if it already exists, by default False. **kwargs : dict - Additional keyword arguments containing Kinetica table configuration. + Additional keyword arguments containing Kinetica `/create/table` options. Raises ------ - ValueError - If the provided schema fields configuration is empty. + GPUdbException + If the provided type schema configuration is empty. """ logger.debug("Creating Kinetica table: %s, overwrite=%s, kwargs=%s", name, overwrite, kwargs) table_type: list[list[str]] = kwargs.get("type", []) if not self.has_store_object(name) and (table_type is None or len(table_type) == 0): - raise GPUdbException("Table must either be existing or a type ust be given to create the table ...") + raise GPUdbException("Table must either be existing or a type must be given to create the table ...") options = kwargs.get("options", {}) + if len(options) == 0: + options['no_error_if_exists'] = 'true' if not self.has_store_object(name) or overwrite: if overwrite and self.has_store_object(name): From 920bb679358e8bf89f4d459c9b010fec1d7d0f72 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Mon, 16 Dec 2024 22:18:13 +0530 Subject: [PATCH 15/29] WIP - Implementation of Kinetica vector DB service. Added sample usage. --- .../service/vdb/kinetica_vector_db_service.py | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py index 1ebe25fdea..38a2f6e24c 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py @@ -887,3 +887,55 @@ def describe(self, name: str, **kwargs: dict[str, typing.Any]) -> dict: return resource.describe(**kwargs) + def release_resource(self, name: str) -> None: + """ + Release a loaded resource from the memory. + + Parameters + ---------- + name : str + Name of the resource to release. + """ + pass + + def close(self) -> None: + """ + Close connection to the vector database. + """ + + pass + + def list_store_objects(self, **kwargs: dict[str, typing.Any]) -> list[str]: + """ + List existing resources in the vector database. + + Parameters + ---------- + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + list[str] + Returns available resouce names in the vector database. + """ + + pass + + def delete_by_keys(self, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> typing.Any: + """ + Delete vectors by keys from the resource. + + Parameters + ---------- + keys : int | str | list + Primary keys to delete vectors. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + typing.Any + Returns vectors of the given keys that are delete from the resource. + """ + pass From 096c7bd1a0b2109999feb087c2e12764fdce879d Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Mon, 16 Dec 2024 23:41:21 +0530 Subject: [PATCH 16/29] WIP - Implementation of Kinetica vector DB service. Added sample usage. --- .../morpheus_llm/service/vdb/kinetica_vector_db_service.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py index 38a2f6e24c..923fd70801 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py @@ -553,10 +553,9 @@ def __init__(self, uri: str, user: str = "", password: str = "", - db_name: str = "", - token: str = ""): + ): - self._client = GPUdb(host=uri, username=user, password=password, db_name=db_name, token=token) + self._client = GPUdb(host=uri, username=user, password=password) def load_resource(self, name: str, **kwargs: dict[str, typing.Any]) -> KineticaVectorDBResourceService: return KineticaVectorDBResourceService(name=name, From ddbc86749eca240b04f84cc1dfe5d33f28372079 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Thu, 19 Dec 2024 19:31:29 +0530 Subject: [PATCH 17/29] WIP - Implementation of Kinetica vector DB service. Added sample usage. --- .../sample_kinetica_pipeline/kinetica_pipeline_example.py | 2 +- .../morpheus_llm/service/vdb/kinetica_vector_db_service.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py b/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py index a2815ecddf..3873a6ffe7 100644 --- a/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py +++ b/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py @@ -86,7 +86,7 @@ def main(input_file_name: str): config.execution_mode = ExecutionMode.GPU # milvus_db_service = MilvusVectorDBService("https://in03-c87c25d216da0ac.serverless.gcp-us-west1.cloud.zilliz.com", user="db_c87c25d216da0ac", password="Cv3;^~HaY.>~>!)H", token="1c80242758bbfc207773c9a731421d9d96e269ac3ef41d87b40725f53795e1305489827dd310f0e55fb886ba0ea15898244de182") - kinetica_db_service = KineticaVectorDBService("http://demo72.kinetica.com/_gpudb", user="amukherjee", password="Kinetica1!") + kinetica_db_service = KineticaVectorDBService("https://demo72.kinetica.com/_gpudb", user="amukherjee", password="Kinetica1!") # milvus_resource_service = milvus_db_service.load_resource("test_collection") kinetica_resource_service = kinetica_db_service.load_resource("test_collection") collection_name = "test_collection" diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py index 923fd70801..51dc635ae9 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py @@ -554,8 +554,12 @@ def __init__(self, user: str = "", password: str = "", ): + options = GPUdb.Options() + options.skip_ssl_cert_verification = True + options.username = "amukherjee" + options.password = "Kinetica1!" - self._client = GPUdb(host=uri, username=user, password=password) + self._client = GPUdb(host=uri, options=options) def load_resource(self, name: str, **kwargs: dict[str, typing.Any]) -> KineticaVectorDBResourceService: return KineticaVectorDBResourceService(name=name, From 48b370929325204328763d30fe371d0e95eff095 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Wed, 25 Dec 2024 10:42:24 +0530 Subject: [PATCH 18/29] WIP - Implementation of Kinetica vector DB service. Added sample usage. --- .../kinetica_pipeline_example.py | 32 ++++++++++--------- .../service/vdb/kinetica_vector_db_service.py | 26 +++++++++++---- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py b/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py index 3873a6ffe7..4cfee9e346 100644 --- a/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py +++ b/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py @@ -2,16 +2,13 @@ import logging import random import cudf +import os from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.config import Config -from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.io.deserializers import read_file_to_df from morpheus.utils.type_utils import exec_mode_to_df_type_str -from morpheus.stages.input.in_memory_data_generation_stage import InMemoryDataGenStage - -from morpheus.stages.preprocess.drop_null_stage import DropNullStage from morpheus_llm.stages.output.write_to_vector_db_stage import WriteToVectorDBStage from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage @@ -19,18 +16,12 @@ # from morpheus.utils.type_support import numpy_to_cudf # Import Milvus services from Morpheus -from morpheus_llm.service.vdb.milvus_vector_db_service import MilvusVectorDBResourceService, MilvusVectorDBService -from morpheus_llm.service.vdb.kinetica_vector_db_service import KineticaVectorDBResourceService, KineticaVectorDBService +from morpheus_llm.service.vdb.kinetica_vector_db_service import KineticaVectorDBService -import numpy as np import json from morpheus.utils.logger import configure_logging -from morpheus.stages.input.file_source_stage import FileSourceStage - -from morpheus.stages.output.write_to_file_stage import WriteToFileStage - from morpheus.stages.preprocess.deserialize_stage import DeserializeStage from morpheus.config import ExecutionMode @@ -79,19 +70,30 @@ def get_test_df(num_input_rows): def main(input_file_name: str): + host = os.getenv("kinetica_host", "http://localhost:9191") + username = os.getenv("username", "") + password = os.getenv("password", "") + schema = os.getenv("schema", "") # Step 1: Configure logging # Step 2: Initialize Morpheus Config config = Config() config.execution_mode = ExecutionMode.GPU - # milvus_db_service = MilvusVectorDBService("https://in03-c87c25d216da0ac.serverless.gcp-us-west1.cloud.zilliz.com", user="db_c87c25d216da0ac", password="Cv3;^~HaY.>~>!)H", token="1c80242758bbfc207773c9a731421d9d96e269ac3ef41d87b40725f53795e1305489827dd310f0e55fb886ba0ea15898244de182") - kinetica_db_service = KineticaVectorDBService("https://demo72.kinetica.com/_gpudb", user="amukherjee", password="Kinetica1!") - # milvus_resource_service = milvus_db_service.load_resource("test_collection") - kinetica_resource_service = kinetica_db_service.load_resource("test_collection") + kinetica_db_service = KineticaVectorDBService(host, user=username, password=password, kinetica_schema=schema) collection_name = "test_collection" + collection_name = f"{schema}.{collection_name}" if schema is not None and len( + schema) > 0 else f"ki_home.{collection_name}" + vector_dim = 3 # Example: 3-dimensional vector embeddings + columns = [ + ["id", "long", "primary_key"], + ["embeddings", "bytes", "vector(3)"], + ["metadata", "string", "json"], + ] + kinetica_db_service.create(collection_name, type=columns) + source_df = read_file_to_df(input_file_name, df_type=exec_mode_to_df_type_str(config.execution_mode)) print(source_df.shape[0]) diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py index 51dc635ae9..52bc6139eb 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py @@ -75,18 +75,16 @@ class KineticaVectorDBResourceService(VectorDBResourceService): An instance of the GPUdb class for interaction with the Kinetica Vector Database. """ - def __init__(self, name: str, schema: str, client: "GPUdb") -> None: + def __init__(self, name: str, client: "GPUdb") -> None: if IMPORT_EXCEPTION is not None: raise ImportError(IMPORT_ERROR_MESSAGE.format(package='gpudb')) from IMPORT_EXCEPTION super().__init__() self._name = name - self._schema = schema self._client = client - self._collection_name = f"{self._schema}.{self._name}" - self._collection = GPUdbTable(name=self._collection_name, db=client) + self._collection = GPUdbTable(name=self._name, db=client) self._record_type = self._collection.get_table_type() self._fields: list[GPUdbRecordColumn] = self._record_type.columns @@ -553,18 +551,32 @@ def __init__(self, uri: str, user: str = "", password: str = "", + kinetica_schema = "", ): options = GPUdb.Options() options.skip_ssl_cert_verification = True - options.username = "amukherjee" - options.password = "Kinetica1!" + options.username = user + options.password = password + self._collection_name = None + self.schema = kinetica_schema if kinetica_schema is not None and len(kinetica_schema) > 0 else None self._client = GPUdb(host=uri, options=options) def load_resource(self, name: str, **kwargs: dict[str, typing.Any]) -> KineticaVectorDBResourceService: - return KineticaVectorDBResourceService(name=name, + """ + + @param name: + @param kwargs: + @return: + """ + self._collection_name = f"{self.schema}.{name}" if self.schema is not None and len(self.schema) > 0 else f"ki_home.{name}" + return KineticaVectorDBResourceService(name=self._collection_name, client=self._client) + @property + def collection_name(self): + return self._collection_name if self._collection_name is not None else None + def has_store_object(self, name: str) -> bool: """ Check if a table exists in the Kinetica database. From 70b5f273a9b20cd8d0da5b8747b1fa2c51310a7f Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Wed, 25 Dec 2024 11:50:50 +0530 Subject: [PATCH 19/29] WIP - Implementation of Kinetica vector DB service. Added sample usage. --- .../service/vdb/kinetica_vector_db_service.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py index 52bc6139eb..cd6510602d 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py @@ -459,6 +459,24 @@ def delete(self, expr: str, **kwargs: dict[str, typing.Any]) -> dict[str, typing return self._update_delete_result_to_dict(result=result) + def delete_by_keys(self, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> typing.Any: + """ + Delete vectors by keys from the resource. + + Parameters + ---------- + keys : int | str | list + Primary keys to delete vectors. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + typing.Any + Returns vectors of the given keys that are delete from the resource. + """ + pass + def retrieve_by_keys(self, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> list[typing.Any]: """ Retrieve the inserted vectors using their primary keys. From 1e076619d21fcdf4733173124eaf830da48ed379 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Wed, 25 Dec 2024 12:44:27 +0530 Subject: [PATCH 20/29] WIP - Implementation of Kinetica vector DB service. Added sample usage. --- .../kinetica_pipeline_example.py | 36 ++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py b/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py index 4cfee9e346..5fc4c2d720 100644 --- a/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py +++ b/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py @@ -8,9 +8,15 @@ from morpheus.config import Config from morpheus.io.deserializers import read_file_to_df from morpheus.utils.type_utils import exec_mode_to_df_type_str +from morpheus.modules import to_control_message # noqa: F401 # pylint: disable=unused-import +from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE +from morpheus.utils.module_ids import TO_CONTROL_MESSAGE from morpheus_llm.stages.output.write_to_vector_db_stage import WriteToVectorDBStage from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage +from morpheus.stages.general.linear_modules_stage import LinearModulesStage + +from morpheus.messages import ControlMessage # from morpheus.utils.logging import configure_logging # from morpheus.utils.type_support import numpy_to_cudf @@ -62,8 +68,8 @@ def generate_csv(file_path, num_records=10): def get_test_df(num_input_rows): df = cudf.DataFrame({ "id": list(range(num_input_rows)), - "age": [random.randint(20, 40) for i in range(num_input_rows)], - "embedding": [[random.random() for _ in range(3)] for _ in range(num_input_rows)] + "embeddings": [[random.random() for _ in range(3)] for _ in range(num_input_rows)], + "metadata": [json.dumps({"metadata": f"Sample metadata for row {i}"}) for i in range(num_input_rows)], }) return df @@ -94,22 +100,20 @@ def main(input_file_name: str): ] kinetica_db_service.create(collection_name, type=columns) - source_df = read_file_to_df(input_file_name, df_type=exec_mode_to_df_type_str(config.execution_mode)) - print(source_df.shape[0]) + df = get_test_df(10) + to_cm_module_config = { + "module_id": TO_CONTROL_MESSAGE, "module_name": "to_control_message", "namespace": MORPHEUS_MODULE_NAMESPACE + } # Step 1: Create a pipeline pipeline = LinearPipeline(config) - - # # Step 6: Define source stage - # def data_generator(): - # for i in range(5): - # embedding = np.random.random(vector_dim).tolist() - # metadata = {"id": i, "label": f"example_{i}"} - # yield {"embedding": embedding, "metadata": metadata} - - pipeline.set_source(InMemorySourceStage(config, dataframes=[source_df])) - - pipeline.add_stage(DeserializeStage(config)) + pipeline.set_source(InMemorySourceStage(config, [df])) + pipeline.add_stage( + LinearModulesStage(config, + to_cm_module_config, + input_port_name="input", + output_port_name="output", + output_type=ControlMessage)) pipeline.add_stage( WriteToVectorDBStage( @@ -119,8 +123,6 @@ def main(input_file_name: str): ) ) - pipeline.build() - pipeline.run() if __name__ == "__main__": From d8c07508515907622c39eb96b9e0bc697dd6a328 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Wed, 25 Dec 2024 13:36:18 +0530 Subject: [PATCH 21/29] WIP - Implementation of Kinetica vector DB service. Added sample usage. --- .../morpheus_llm/service/vdb/kinetica_vector_db_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py index cd6510602d..e381890727 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py @@ -644,7 +644,7 @@ def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing. if overwrite and self.has_store_object(name): self.drop(name) - GPUdbTable(table_type, name, options=options) + GPUdbTable(table_type, name, options=options, db=self._client) def create_from_dataframe(self, From d9ef7eaf0d4b733f77743baa8e946fef45ec5be1 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Wed, 25 Dec 2024 14:23:20 +0530 Subject: [PATCH 22/29] WIP - Implementation of Kinetica vector DB service. Added sample usage. --- .../morpheus_llm/modules/output/write_to_vector_db.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py b/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py index c9528f3c78..016030c18c 100644 --- a/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py +++ b/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py @@ -145,9 +145,13 @@ def on_completed(): try: if accum_stats.data: df_pkg = get_df_pkg_from_obj(accum_stats.data[0]) + print("1 ......") merged_df = df_pkg.concat(accum_stats.data) + print("2 ......") service.insert_dataframe(name=key, df=merged_df) + print("3 ......") final_df_references.append(accum_stats.data) + print("4 ......") except Exception as e: logger.error("Unable to upload dataframe entries to vector database: %s", e) finally: From 1c61f408bf959152900d520d777cb1664bc9d2a4 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Wed, 25 Dec 2024 14:25:00 +0530 Subject: [PATCH 23/29] WIP - Implementation of Kinetica vector DB service. Added sample usage. --- .../morpheus_llm/modules/output/write_to_vector_db.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py b/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py index 016030c18c..5a130a74ac 100644 --- a/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py +++ b/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py @@ -15,6 +15,7 @@ import logging import pickle import time +import traceback from dataclasses import dataclass import mrc @@ -153,6 +154,7 @@ def on_completed(): final_df_references.append(accum_stats.data) print("4 ......") except Exception as e: + print(traceback.format_exc()) logger.error("Unable to upload dataframe entries to vector database: %s", e) finally: # Close vector database service connection From bd5d592252bedd92f9b5c16e491e9cf1e761f983 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Wed, 25 Dec 2024 17:36:09 +0530 Subject: [PATCH 24/29] WIP - Implementation of Kinetica vector DB service. Added sample usage. --- .../morpheus_llm/service/vdb/kinetica_vector_db_service.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py index e381890727..a5a337314f 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py @@ -126,7 +126,7 @@ def insert(self, data: list[list] | list[dict], **kwargs: dict[str, typing.Any]) result = self._collection.insert_records(data, options=options) - return self._insert_result_to_dict(result=result) + return self._insert_result_to_dict(result=result.count) def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) -> dict: """ @@ -535,9 +535,9 @@ def drop(self, **kwargs: dict[str, typing.Any]) -> None: self._client.clear_table(self._collection_name) - def _insert_result_to_dict(self, result: "GPUdbTable") -> dict[str, typing.Any]: + def _insert_result_to_dict(self, result: int) -> dict[str, typing.Any]: result_dict = { - "count": result.count, + "count": result, } return result_dict From f322956849ba61186e3f9ac9fcd03df13f91076b Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Wed, 25 Dec 2024 18:20:43 +0530 Subject: [PATCH 25/29] WIP - Example works correctly. Tests related kinetca_vector_db_service.py are in progress. --- .../kinetica_pipeline_example.py | 41 +------------------ .../modules/output/write_to_vector_db.py | 6 --- 2 files changed, 2 insertions(+), 45 deletions(-) diff --git a/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py b/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py index 5fc4c2d720..30498d4dc3 100644 --- a/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py +++ b/examples/sample_kinetica_pipeline/kinetica_pipeline_example.py @@ -6,8 +6,6 @@ from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.config import Config -from morpheus.io.deserializers import read_file_to_df -from morpheus.utils.type_utils import exec_mode_to_df_type_str from morpheus.modules import to_control_message # noqa: F401 # pylint: disable=unused-import from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE from morpheus.utils.module_ids import TO_CONTROL_MESSAGE @@ -34,37 +32,6 @@ logger = logging.getLogger(__name__) -def generate_random_vector(dim=3): - """Generate a random vector of specified dimensions.""" - return [random.uniform(-1.0, 1.0) for _ in range(dim)] - - -def generate_csv(file_path, num_records=10): - """ - Generate a CSV file with the specified format. - - Parameters: - file_path (str): Path to the output CSV file. - num_records (int): Number of records to generate. - """ - records = [] - - for i in range(1, num_records + 1): - vector = generate_random_vector() - metadata = json.dumps({"metadata": f"Sample metadata for row {i}"}) - records.append([i, str(vector), metadata]) - - # Write records to CSV file - with open(file_path, mode='w', newline='') as csv_file: - writer = csv.writer(csv_file) - # Write header - writer.writerow(["id", "embeddings", "metadata"]) - # Write data - writer.writerows(records) - - print(f"CSV file '{file_path}' with {num_records} records generated successfully.") - - def get_test_df(num_input_rows): df = cudf.DataFrame({ "id": list(range(num_input_rows)), @@ -75,14 +42,12 @@ def get_test_df(num_input_rows): return df -def main(input_file_name: str): +def main(): host = os.getenv("kinetica_host", "http://localhost:9191") username = os.getenv("username", "") password = os.getenv("password", "") schema = os.getenv("schema", "") - # Step 1: Configure logging - # Step 2: Initialize Morpheus Config config = Config() config.execution_mode = ExecutionMode.GPU @@ -126,6 +91,4 @@ def main(input_file_name: str): pipeline.run() if __name__ == "__main__": - file_name = "test.csv" - generate_csv(file_name) - main(file_name) + main() diff --git a/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py b/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py index 5a130a74ac..c9528f3c78 100644 --- a/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py +++ b/python/morpheus_llm/morpheus_llm/modules/output/write_to_vector_db.py @@ -15,7 +15,6 @@ import logging import pickle import time -import traceback from dataclasses import dataclass import mrc @@ -146,15 +145,10 @@ def on_completed(): try: if accum_stats.data: df_pkg = get_df_pkg_from_obj(accum_stats.data[0]) - print("1 ......") merged_df = df_pkg.concat(accum_stats.data) - print("2 ......") service.insert_dataframe(name=key, df=merged_df) - print("3 ......") final_df_references.append(accum_stats.data) - print("4 ......") except Exception as e: - print(traceback.format_exc()) logger.error("Unable to upload dataframe entries to vector database: %s", e) finally: # Close vector database service connection From 5f972f3cbf5597635e1377b15db40ff1a9c5ea05 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Mon, 6 Jan 2025 11:14:33 +0530 Subject: [PATCH 26/29] WIP - Example works correctly. Tests related kinetca_vector_db_service.py are in progress. --- .../service/vdb/kinetica_vector_db_service.py | 73 +++++++++++++++---- tests/conftest.py | 29 ++++++++ 2 files changed, 89 insertions(+), 13 deletions(-) diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py index a5a337314f..ca3166c1c9 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py @@ -87,6 +87,7 @@ def __init__(self, name: str, client: "GPUdb") -> None: self._collection = GPUdbTable(name=self._name, db=client) self._record_type = self._collection.get_table_type() self._fields: list[GPUdbRecordColumn] = self._record_type.columns + self._description = self.describe() self._vector_field = None self._fillna_fields_dict = {} @@ -172,16 +173,28 @@ def describe(self, **kwargs: dict[str, typing.Any]) -> dict: """ table1 = GPUdbTable(db=self._client, name=self._name) record_type: GPUdbRecordType = table1.get_table_type() - print(type(record_type)) col: GPUdbRecordColumn = None description: dict[str, object] = {} for col in record_type.columns: - description[col.name] = (col.column_type, col.is_nullable, col.is_vector(), col.column_properties) + description[col.name] = ( + col.column_type, + col.is_nullable, + col.is_vector(), + col.column_properties, + "primary_key" if "primary_key" in col.column_properties else "" + ) return description + def _get_pk_field_name(self): + for field_name, properties in self._description.items(): + if properties[4] == "primary_key": + return field_name + + return "" + def query(self, query: str, **kwargs: dict[str, typing.Any]) -> typing.Any: """ Query data in a table in the Kinetica database. @@ -397,9 +410,24 @@ def update(self, data: list[typing.Any], **kwargs: dict[str, typing.Any]) -> dic Parameters ---------- data : list[typing.Any] - Data to be updated in the Kinetica table. + Data to be updated in the Kinetica table. This is npt used by Kinetica. The required parameters + are all passed in a keyword arguments. **kwargs : dict[str, typing.Any] - Extra keyword arguments specific to upsert operation. + Extra keyword arguments specific to Kinetica. The full explanation of each of these + keyword arguments is available in the documentation of the API `/update/records`. + + Allowed keyword arguments: + options: dict[str, str] - options + + expressions: [list] - expression used to filter records for update + + new_value_maps: [list[dict]] | [dict] - + + records_to_insert: list[] - + + records_to_insert_str: list[] - + + Returns ------- @@ -407,9 +435,6 @@ def update(self, data: list[typing.Any], **kwargs: dict[str, typing.Any]) -> dic Returns result of the updated operation stats. """ - if not isinstance(data, list): - raise RuntimeError("Data is not of type list.") - options = kwargs.get( "options", None ) if options is not None: # if given, remove from kwargs kwargs.pop( "options" ) @@ -418,12 +443,16 @@ def update(self, data: list[typing.Any], **kwargs: dict[str, typing.Any]) -> dic expressions = kwargs.get( "expressions", [] ) if expressions is not None: # if given, remove from kwargs + if not isinstance(expressions, list): + raise GPUdbException("'expressions' must be of type 'list' ...") kwargs.pop( "expressions" ) else: # no option given; use an empty dict - raise GPUdbException("Update expression must be given ...") + raise GPUdbException("Update 'expressions' must be given ...") new_values_maps = kwargs.get( "new_values_maps", None ) if new_values_maps is not None: # if given, remove from kwargs + if not isinstance(new_values_maps, (list, dict)): + raise GPUdbException("'new_value_maps' should either be a 'list of dicts' or a dict ...") kwargs.pop( "new_values_maps" ) else: # no option given; use an empty dict raise GPUdbException("'new_values_maps' must be given ...") @@ -498,12 +527,30 @@ def retrieve_by_keys(self, keys: int | str | list, **kwargs: dict[str, typing.An result = None expression = kwargs.get("expression", "") options = kwargs.get("options", {}) + if keys is None or keys <= 0 or (isinstance(keys, (str, list)) and len(keys) <= 0): + raise GPUdbException("'keys' must be specified as either an 'int' or 'str' or 'list' ...") + + if expression == "": + # expression not specified; keys must be values of PK in the Kinetica table. + pk_field_name = self._get_pk_field_name() + if pk_field_name == "": + raise GPUdbException("No 'expression' given and no 'PK field' found cannot retrieve records ...") + + if isinstance(keys, str): + expression = [f"{pk_field_name} = '{keys}'"] + elif isinstance(keys, int): + expression = [f"{pk_field_name} = {keys}"] + elif isinstance(keys, list): + # keys is a list + expression = [f"{pk_field_name} in ({','.join(keys)})"] + else: + raise GPUdbException("'keys' must be of type (int or str or list) ...") try: result = self._collection.get_records_by_key(keys, expression, options) except GPUdbException as exec_info: raise RuntimeError(f"Unable to perform search: {exec_info}") from exec_info - return result["records"] + return result["records"] if result is not None and 'records' in result else [] def count(self, **kwargs: dict[str, typing.Any]) -> int: """ @@ -512,7 +559,7 @@ def count(self, **kwargs: dict[str, typing.Any]) -> int: Parameters ---------- **kwargs : dict[str, typing.Any] - Additional keyword arguments for the count operation. + Not used. Returns ------- @@ -523,14 +570,14 @@ def count(self, **kwargs: dict[str, typing.Any]) -> int: def drop(self, **kwargs: dict[str, typing.Any]) -> None: """ - Drop a Kinetica table, index, or partition in the Kinetica vector database. + Drop a Kinetica table. This function allows you to drop a Kinetica table. Parameters ---------- **kwargs : dict - Additional keyword arguments for specifying the type and partition name (if applicable). + Not used. """ self._client.clear_table(self._collection_name) @@ -893,7 +940,7 @@ def drop(self, name: str, **kwargs: dict[str, typing.Any]) -> None: logger.debug("Dropping Kinetica table: %s, kwargs=%s", name, kwargs) if self.has_store_object(name): - schema = kwargs.get("schema", "ki_home") + # schema = kwargs.get("schema", "ki_home") try: self._client.clear_table(name) except GPUdbException as e: diff --git a/tests/conftest.py b/tests/conftest.py index 093eada5c1..5aaf3736f9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -992,6 +992,35 @@ def milvus_server_uri(tmp_path_factory): yield uri +@pytest.fixture(scope="session", name="kinetica_data") +def kinetica_data_fixture(): + import random + import json + inital_data = [[ + i+1, + [random.random() for _ in range(3)], + json.dumps({"metadata": f"Sample metadata for row {i+1}"}), + ] for i in range(10)] + yield inital_data + + +@pytest.fixture(scope="session", name="kinetica_type") +def kinetica_type_fixture(): + columns = [ + ["id", "long", "primary_key"], + ["embeddings", "bytes", "vector(3)"], + ["metadata", "string", "json"], + ] + yield columns + + +@pytest.fixture(scope="session", name="kinetica_service") +def kinetica_service_fixture(kinetica_server_uri: str, username: str, password: str, schema: str = None): + from morpheus_llm.service.vdb.kinetica_vector_db_service import KineticaVectorDBService + service = KineticaVectorDBService(kinetica_server_uri, user=username, password=password, kinetica_schema=schema) + yield service + + @pytest.fixture(scope="session", name="milvus_data") def milvus_data_fixture(): inital_data = [{"id": i, "embedding": [i / 10.0] * 3, "age": 25 + i} for i in range(10)] From 0ac5d9a23d1173f04a993d85d54a7f71198a4635 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Fri, 10 Jan 2025 09:05:15 +0530 Subject: [PATCH 27/29] WIP - Example works correctly. Tests related kinetca_vector_db_service.py are in progress. --- .../service/vdb/kinetica_vector_db_service.py | 4 +-- tests/conftest.py | 29 ------------------- 2 files changed, 2 insertions(+), 31 deletions(-) diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py index ca3166c1c9..193cef1c3d 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py @@ -22,7 +22,7 @@ from collections import OrderedDict from functools import wraps -from gpudb import GPUdb, GPUdbTable, GPUdbRecordColumn, GPUdbRecordType, GPUdbException +from gpudb import GPUdb, GPUdbTable, GPUdbRecordColumn, GPUdbRecordType, GPUdbException, GPUdbSqlIterator import sqlparse from sqlparse.sql import IdentifierList, Identifier, Token from sqlparse.tokens import Keyword @@ -782,7 +782,7 @@ def query(self, name: str, query: str = None, **kwargs: dict[str, typing.Any]) - name : str Name of the Kinetica table to search within. query : str - The search query, which can be a filter expression. + The search query, which is an SQL query. **kwargs : dict Additional keyword arguments for the search operation. diff --git a/tests/conftest.py b/tests/conftest.py index 5aaf3736f9..093eada5c1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -992,35 +992,6 @@ def milvus_server_uri(tmp_path_factory): yield uri -@pytest.fixture(scope="session", name="kinetica_data") -def kinetica_data_fixture(): - import random - import json - inital_data = [[ - i+1, - [random.random() for _ in range(3)], - json.dumps({"metadata": f"Sample metadata for row {i+1}"}), - ] for i in range(10)] - yield inital_data - - -@pytest.fixture(scope="session", name="kinetica_type") -def kinetica_type_fixture(): - columns = [ - ["id", "long", "primary_key"], - ["embeddings", "bytes", "vector(3)"], - ["metadata", "string", "json"], - ] - yield columns - - -@pytest.fixture(scope="session", name="kinetica_service") -def kinetica_service_fixture(kinetica_server_uri: str, username: str, password: str, schema: str = None): - from morpheus_llm.service.vdb.kinetica_vector_db_service import KineticaVectorDBService - service = KineticaVectorDBService(kinetica_server_uri, user=username, password=password, kinetica_schema=schema) - yield service - - @pytest.fixture(scope="session", name="milvus_data") def milvus_data_fixture(): inital_data = [{"id": i, "embedding": [i / 10.0] * 3, "age": 25 + i} for i in range(10)] From 78110a7ca0726a65877329f87cd012e0ca0f5528 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Fri, 10 Jan 2025 09:44:33 +0530 Subject: [PATCH 28/29] WIP - Example works correctly. Tests related kinetca_vector_db_service.py are in progress. --- .../service/vdb/kinetica_vector_db_service.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py index 193cef1c3d..78514343f7 100644 --- a/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py +++ b/python/morpheus_llm/morpheus_llm/service/vdb/kinetica_vector_db_service.py @@ -518,6 +518,12 @@ def retrieve_by_keys(self, keys: int | str | list, **kwargs: dict[str, typing.An **kwargs : dict[str, typing.Any] Additional keyword arguments for the retrieval operation. + Only valid keyword arguments are: + + expression: [str] - The Kinetica expression to pass on to `get/records/by/key` + + options: dict - The `options` dict accepted by `get/records/by/key` + Returns ------- list[typing.Any] @@ -564,7 +570,7 @@ def count(self, **kwargs: dict[str, typing.Any]) -> int: Returns ------- int - Returns number of entities in the Kinetica table. + Returns number of records in the Kinetica table. """ return self._collection.count @@ -572,7 +578,7 @@ def drop(self, **kwargs: dict[str, typing.Any]) -> None: """ Drop a Kinetica table. - This function allows you to drop a Kinetica table. + This function allows you to delete/drop a Kinetica table. Parameters ---------- From 25ae05e8235f5d355df4db8ab9be1d344b461c83 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Mon, 20 Jan 2025 12:20:52 +0530 Subject: [PATCH 29/29] Resolved conflicts; removed `gpudb` API dependency. --- dependencies.yaml | 1 - python/morpheus_llm/morpheus_llm/requirements_morpheus_llm.txt | 1 - 2 files changed, 2 deletions(-) diff --git a/dependencies.yaml b/dependencies.yaml index 40b84a77a3..43484fde9a 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -401,7 +401,6 @@ dependencies: - databricks-connect - milvus==2.3.5 # update to match pymilvus when available - pymilvus==2.3.6 - - gpudb>=7.2.2.3 - torch==2.4.0+cu124 test_python_morpheus: diff --git a/python/morpheus_llm/morpheus_llm/requirements_morpheus_llm.txt b/python/morpheus_llm/morpheus_llm/requirements_morpheus_llm.txt index d2367685ba..8f9a9620b9 100644 --- a/python/morpheus_llm/morpheus_llm/requirements_morpheus_llm.txt +++ b/python/morpheus_llm/morpheus_llm/requirements_morpheus_llm.txt @@ -8,5 +8,4 @@ langchain==0.1.16 milvus==2.3.5 nemollm==0.3.5 pymilvus==2.3.6 -gpudb>=7.2.2.3 torch==2.4.0+cu124