Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinetica Vector DB Service #2098

Open
wants to merge 32 commits into
base: branch-25.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
91d09da
WIP - test sample Milvus pipeline locally
am-kinetica Nov 21, 2024
3ef2e07
WIP - test sample Milvus pipeline locally
am-kinetica Nov 25, 2024
20fd509
WIP - Debug statements
am-kinetica Nov 29, 2024
a293fbd
WIP - Debug statements
am-kinetica Nov 29, 2024
43d98e5
WIP - Debug statements
am-kinetica Nov 29, 2024
291f4bc
WIP - Debug statements
am-kinetica Nov 30, 2024
812e230
WIP - Debug statements
am-kinetica Nov 30, 2024
edd3641
WIP - Debug statements
am-kinetica Nov 30, 2024
bb1074d
WIP - Debug statements
am-kinetica Dec 1, 2024
d7f66a7
WIP - Implementation of Kinetica vector DB service.
am-kinetica Dec 5, 2024
fdd2347
WIP - Implementation of Kinetica vector DB service.
am-kinetica Dec 6, 2024
c9ede22
WIP - Implementation of Kinetica vector DB service.
am-kinetica Dec 6, 2024
2b3d8c0
WIP - Implementation of Kinetica vector DB service.
am-kinetica Dec 6, 2024
09f5d85
WIP - Implementation of Kinetica vector DB service. Added sample usage.
am-kinetica Dec 16, 2024
920bb67
WIP - Implementation of Kinetica vector DB service. Added sample usage.
am-kinetica Dec 16, 2024
096c7bd
WIP - Implementation of Kinetica vector DB service. Added sample usage.
am-kinetica Dec 16, 2024
ddbc867
WIP - Implementation of Kinetica vector DB service. Added sample usage.
am-kinetica Dec 19, 2024
48b3709
WIP - Implementation of Kinetica vector DB service. Added sample usage.
am-kinetica Dec 25, 2024
70b5f27
WIP - Implementation of Kinetica vector DB service. Added sample usage.
am-kinetica Dec 25, 2024
1e07661
WIP - Implementation of Kinetica vector DB service. Added sample usage.
am-kinetica Dec 25, 2024
d8c0750
WIP - Implementation of Kinetica vector DB service. Added sample usage.
am-kinetica Dec 25, 2024
d9ef7ea
WIP - Implementation of Kinetica vector DB service. Added sample usage.
am-kinetica Dec 25, 2024
1c61f40
WIP - Implementation of Kinetica vector DB service. Added sample usage.
am-kinetica Dec 25, 2024
bd5d592
WIP - Implementation of Kinetica vector DB service. Added sample usage.
am-kinetica Dec 25, 2024
f322956
WIP - Example works correctly. Tests related kinetca_vector_db_servic…
am-kinetica Dec 25, 2024
5f972f3
WIP - Example works correctly. Tests related kinetca_vector_db_servic…
am-kinetica Jan 6, 2025
0ac5d9a
WIP - Example works correctly. Tests related kinetca_vector_db_servic…
am-kinetica Jan 10, 2025
78110a7
WIP - Example works correctly. Tests related kinetca_vector_db_servic…
am-kinetica Jan 10, 2025
25ae05e
Resolved conflicts; removed `gpudb` API dependency.
am-kinetica Jan 20, 2025
fbce9cd
Merge remote-tracking branch 'origin/branch-25.02' into branch-25.02
am-kinetica Jan 22, 2025
6f43bc1
Merge branch 'branch-25.02' into feature/kinetica_vector_db_service_8…
am-kinetica Jan 22, 2025
188e8c5
Merge branch 'branch-25.02' into feature/kinetica_vector_db_service_8…
am-kinetica Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conda/environments/all_cuda-125_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ dependencies:
- milvus==2.3.5
- nemollm==0.3.5
- pymilvus==2.3.6
- gpudb>=7.2.2.3
- pytest-kafka==0.6.0
- python-logging-loki
- sentence-transformers==2.7
Expand Down
1 change: 1 addition & 0 deletions conda/environments/dev_cuda-125_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ dependencies:
- databricks-connect
- milvus==2.3.5
- pymilvus==2.3.6
- gpudb>=7.2.2.3
- pytest-kafka==0.6.0
- torch==2.4.0+cu124
name: dev_cuda-125_arch-x86_64
1 change: 1 addition & 0 deletions conda/environments/examples_cuda-125_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ dependencies:
- milvus==2.3.5
- nemollm==0.3.5
- pymilvus==2.3.6
- gpudb>=7.2.2.3
- python-logging-loki
- sentence-transformers==2.7
- torch==2.4.0+cu124
Expand Down
1 change: 1 addition & 0 deletions conda/environments/runtime_cuda-125_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,6 @@ dependencies:
- databricks-connect
- milvus==2.3.5
- pymilvus==2.3.6
- gpudb>=7.2.2.3
- torch==2.4.0+cu124
name: runtime_cuda-125_arch-x86_64
4 changes: 4 additions & 0 deletions examples/llm/completion/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
from morpheus_llm.llm.task_handlers.simple_task_handler import SimpleTaskHandler
from morpheus_llm.stages.llm.llm_engine_stage import LLMEngineStage

from morpheus_llm.stages.output.write_to_vector_db_stage import WriteToVectorDBStage

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -127,6 +129,8 @@ def pipeline(use_cpu_only: bool,

pipe.add_stage(MonitorStage(config, description="Inference rate", unit="req", delayed_start=True))

pipe.add_stage(WriteToVectorDBStage())

sink = pipe.add_stage(InMemorySinkStage(config))

start_time = time.time()
Expand Down
94 changes: 94 additions & 0 deletions examples/sample_kinetica_pipeline/kinetica_pipeline_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import csv
import logging
import random
import cudf
import os

from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.config import Config
from morpheus.modules import to_control_message # noqa: F401 # pylint: disable=unused-import
from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE
from morpheus.utils.module_ids import TO_CONTROL_MESSAGE

from morpheus_llm.stages.output.write_to_vector_db_stage import WriteToVectorDBStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.general.linear_modules_stage import LinearModulesStage

from morpheus.messages import ControlMessage

# from morpheus.utils.logging import configure_logging
# from morpheus.utils.type_support import numpy_to_cudf

# Import Milvus services from Morpheus
from morpheus_llm.service.vdb.kinetica_vector_db_service import KineticaVectorDBService

import json

from morpheus.utils.logger import configure_logging

from morpheus.stages.preprocess.deserialize_stage import DeserializeStage

from morpheus.config import ExecutionMode

logger = logging.getLogger(__name__)

def get_test_df(num_input_rows):
df = cudf.DataFrame({
"id": list(range(num_input_rows)),
"embeddings": [[random.random() for _ in range(3)] for _ in range(num_input_rows)],
"metadata": [json.dumps({"metadata": f"Sample metadata for row {i}"}) for i in range(num_input_rows)],
})

return df


def main():
host = os.getenv("kinetica_host", "http://localhost:9191")
username = os.getenv("username", "")
password = os.getenv("password", "")
schema = os.getenv("schema", "")

config = Config()
config.execution_mode = ExecutionMode.GPU

kinetica_db_service = KineticaVectorDBService(host, user=username, password=password, kinetica_schema=schema)
collection_name = "test_collection"
collection_name = f"{schema}.{collection_name}" if schema is not None and len(
schema) > 0 else f"ki_home.{collection_name}"

vector_dim = 3 # Example: 3-dimensional vector embeddings

columns = [
["id", "long", "primary_key"],
["embeddings", "bytes", "vector(3)"],
["metadata", "string", "json"],
]
kinetica_db_service.create(collection_name, type=columns)

df = get_test_df(10)
to_cm_module_config = {
"module_id": TO_CONTROL_MESSAGE, "module_name": "to_control_message", "namespace": MORPHEUS_MODULE_NAMESPACE
}

# Step 1: Create a pipeline
pipeline = LinearPipeline(config)
pipeline.set_source(InMemorySourceStage(config, [df]))
pipeline.add_stage(
LinearModulesStage(config,
to_cm_module_config,
input_port_name="input",
output_port_name="output",
output_type=ControlMessage))

pipeline.add_stage(
WriteToVectorDBStage(
config,
kinetica_db_service,
"test_collection"
)
)

pipeline.run()

if __name__ == "__main__":
main()
159 changes: 159 additions & 0 deletions examples/sample_milvus_pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import csv
import logging
import random

from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.config import Config
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.io.deserializers import read_file_to_df
from morpheus.utils.type_utils import exec_mode_to_df_type_str

from morpheus.stages.input.in_memory_data_generation_stage import InMemoryDataGenStage

from morpheus.stages.preprocess.drop_null_stage import DropNullStage
from morpheus_llm.stages.output.write_to_vector_db_stage import WriteToVectorDBStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage

# from morpheus.utils.logging import configure_logging
# from morpheus.utils.type_support import numpy_to_cudf

# Import Milvus services from Morpheus
from morpheus_llm.service.vdb.milvus_vector_db_service import MilvusVectorDBResourceService, MilvusVectorDBService

import numpy as np
import json

from morpheus.utils.logger import configure_logging

from morpheus.stages.input.file_source_stage import FileSourceStage

from morpheus.stages.output.write_to_file_stage import WriteToFileStage

from morpheus.stages.preprocess.deserialize_stage import DeserializeStage

from morpheus.config import ExecutionMode

logger = logging.getLogger(__name__)

def generate_random_vector(dim=3):
"""Generate a random vector of specified dimensions."""
return [random.uniform(-1.0, 1.0) for _ in range(dim)]


def save_to_json_file(data, file_name="data.json"):
"""Save data to a JSON file."""
with open(file_name, "w") as json_file:
json.dump(data, json_file, indent=4)
print(f"JSON data saved to {file_name}")


def generate_json_records(collection_name, output_file, num_records=100):
"""Generate a list of records to be saved in a JSON file."""
data = []
for pk in range(1, num_records + 1):
record = {
"id": pk,
"vector": generate_random_vector(),
"metadata": json.dumps({"description": f"Record {pk}", "category": random.choice(["A", "B", "C"])})
}
data.append(record)

json_data = {
"collectionName": collection_name,
"data": data
}

return json_data


def generate_csv(file_name, num_rows):
"""
Generates a CSV file with fields:
- PK: Primary Key (integer)
- metadata: Sample text metadata (string)
- vector: A random vector of 3 dimensions (array of floats)

:param file_name: Name of the output CSV file
:param num_rows: Number of rows to generate
"""
with open(file_name, mode='w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['id', 'vector', 'metadata']
# fieldnames = ['vector', 'metadata']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

# Write the header
# writer.writeheader()

for pk in range(1, num_rows + 1):
# Generate random metadata
metadata = f"Sample metadata for row {pk}"

# Generate a random vector of 3 dimensions
vector = [round(random.uniform(-1.0, 1.0), 4) for _ in range(3)]

# Write the row
writer.writerow({
'id': pk,
'vector': vector,
'metadata': metadata
})


def main(input_file_name: str):
# Step 1: Configure logging

# Step 2: Initialize Morpheus Config
config = Config()
config.execution_mode = ExecutionMode.GPU

# Step 3: Setup Milvus services
milvus_db_service = MilvusVectorDBService("https://in03-c87c25d216da0ac.serverless.gcp-us-west1.cloud.zilliz.com", user="db_c87c25d216da0ac", password="Cv3;^~HaY.>~>!)H", token="1c80242758bbfc207773c9a731421d9d96e269ac3ef41d87b40725f53795e1305489827dd310f0e55fb886ba0ea15898244de182")
milvus_resource_service = milvus_db_service.load_resource("test_collection")
collection_name = "test_collection"
vector_dim = 3 # Example: 3-dimensional vector embeddings

source_df = read_file_to_df(input_file_name, df_type=exec_mode_to_df_type_str(config.execution_mode))
print(source_df.shape[0])
# Step 5: Create a pipeline
pipeline = LinearPipeline(config)

# Step 6: Define source stage
def data_generator():
for i in range(5):
embedding = np.random.random(vector_dim).tolist()
metadata = {"id": i, "label": f"example_{i}"}
yield {"embedding": embedding, "metadata": metadata}

# pipeline.set_source(FileSourceStage(config, filename=input_file_name))
pipeline.set_source(InMemorySourceStage(config, dataframes=[source_df]))

# pipeline.add_stage(
# DropNullStage(config, "vector")
# )

# pipeline.add_stage(WriteToFileStage(config, filename="output_file.csv", overwrite=True))
# pipeline.add_stage(SerializeStage(config))
pipeline.add_stage(DeserializeStage(config))

# Step 9: Add WriteToVectorDBStage for Milvus
pipeline.add_stage(
WriteToVectorDBStage(
config,
milvus_db_service,
"test_collection"
)
)

pipeline.build()

# Step 10: Execute the pipeline
pipeline.run()

if __name__ == "__main__":
# file_name = "test.json"
file_name = "test.csv"
collection_name = "test_collection"
generate_csv(file_name, 1000)
# data = generate_json_records(collection_name, file_name)
# save_to_json_file(data, file_name)
main(file_name)
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ def on_completed():
if accum_stats.data:
df_pkg = get_df_pkg_from_obj(accum_stats.data[0])
merged_df = df_pkg.concat(accum_stats.data)
print("Before Inerting data ...")
merged_df.info()
service.insert_dataframe(name=key, df=merged_df)
print("After Inserting data")
final_df_references.append(accum_stats.data)
except Exception as e:
logger.error("Unable to upload dataframe entries to vector database: %s", e)
Expand All @@ -163,6 +166,8 @@ def extract_df(msg: ControlMessage):
df = msg.payload().df
if (msg.has_metadata("vdb_resource")):
resource_name = msg.get_metadata("vdb_resource")
df.info()
print("Resource name = ", resource_name)
else:
resource_name = None
else:
Expand Down
Loading