Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Milvus Delayed Flushing #1694

Draft
wants to merge 7 commits into
base: branch-24.06
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .devcontainer/bin/dev-milvus-start
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -e

COMPOSE_FILE="${MORPHEUS_ROOT}/.devcontainer/docker-compose.yml"

docker compose -f $COMPOSE_FILE up -d milvus-standalone

export MILVUS_HOST=$(docker compose -f $COMPOSE_FILE exec milvus-standalone hostname -i)

echo $MILVUS_HOST
41 changes: 41 additions & 0 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,47 @@ services:
depends_on:
- zookeeper

milvus-etcd:
container_name: morpheus-milvus-etcd
image: quay.io/coreos/etcd:v3.5.0
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd

milvus-minio:
container_name: morpheus-milvus-minio
image: minio/minio:RELEASE.2020-12-03T00-03-10Z
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
command: minio server /minio_data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3

milvus-standalone:
container_name: morpheus-milvus-standalone
image: milvusdb/milvus:2.3-latest
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: milvus-etcd:2379
MINIO_ADDRESS: milvus-minio:9000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
ports:
- "19530:19530"
depends_on:
- milvus-etcd
- milvus-minio

networks:
default:
external:
Expand Down
11 changes: 4 additions & 7 deletions examples/llm/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
from morpheus.llm.services.nemo_llm_service import NeMoLLMService
from morpheus.llm.services.openai_chat_service import OpenAIChatService
from morpheus.service.vdb.milvus_client import DATA_TYPE_MAP
from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService
from morpheus.service.vdb.utils import VectorDBServiceFactory
from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBServiceProvider

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -109,9 +108,7 @@ def build_default_milvus_config(embedding_size: int):
return milvus_resource_kwargs


def build_milvus_service(embedding_size: int, uri: str = "http://localhost:19530"):
def build_milvus_service(embedding_size: int, uri: str = "http://172.18.0.4:19530"):
default_service = build_default_milvus_config(embedding_size)

vdb_service: MilvusVectorDBService = VectorDBServiceFactory.create_instance("milvus", uri=uri, **default_service)

return vdb_service
milvus_provider = MilvusVectorDBServiceProvider(uri=uri, **default_service)
return milvus_provider.create()
2 changes: 2 additions & 0 deletions examples/llm/rag/persistant_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,6 @@ def pipeline(num_threads, pipeline_batch_size, model_max_batch_size, embedding_s
# Run the pipeline
pipe.run()

vdb_service.close()

return start_time
13 changes: 9 additions & 4 deletions examples/llm/rag/standalone_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.utils.concat_df import concat_dataframes
from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService

from ..common.utils import build_huggingface_embeddings
from ..common.utils import build_llm_service
Expand All @@ -40,7 +41,7 @@
logger = logging.getLogger(__name__)


def _build_engine(model_name: str, vdb_resource_name: str, llm_service: str, embedding_size: int):
def _build_engine(milvus_service: MilvusVectorDBService, model_name: str, vdb_resource_name: str, llm_service: str, embedding_size: int):

engine = LLMEngine()

Expand All @@ -55,7 +56,6 @@ def _build_engine(model_name: str, vdb_resource_name: str, llm_service: str, emb

Please answer the following question: \n{{ query }}"""

vector_service = build_milvus_service(embedding_size)
embeddings = build_huggingface_embeddings("sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={'device': 'cuda'},
encode_kwargs={'batch_size': 100})
Expand All @@ -69,7 +69,7 @@ async def calc_embeddings(texts: list[str]) -> list[list[float]]:
engine.add_node("rag",
inputs=["/extracter"],
node=RAGNode(prompt=prompt,
vdb_service=vector_service.load_resource(vdb_resource_name),
vdb_service=milvus_service.load_resource(vdb_resource_name),
embedding=calc_embeddings,
llm_client=llm_service))

Expand Down Expand Up @@ -110,9 +110,12 @@ def standalone(num_threads,

pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions'))

milvus_service = build_milvus_service(embedding_size)

pipe.add_stage(
LLMEngineStage(config,
engine=_build_engine(model_name=model_name,
engine=_build_engine(milvus_service=milvus_service,
model_name=model_name,
vdb_resource_name=vdb_resource_name,
llm_service=llm_service,
embedding_size=embedding_size)))
Expand All @@ -125,6 +128,8 @@ def standalone(num_threads,

pipe.run()

milvus_service.close()

messages = sink.get_messages()
responses = concat_dataframes(messages)
logger.info("Pipeline complete. Received %s responses", len(responses))
Expand Down
24 changes: 10 additions & 14 deletions morpheus/modules/output/write_to_vector_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
from morpheus.messages import MultiResponseMessage
from morpheus.modules.schemas.write_to_vector_db_schema import WriteToVDBSchema
from morpheus.service.vdb.milvus_client import DATA_TYPE_MAP
from morpheus.service.vdb.utils import VectorDBServiceFactory
from morpheus.service.vdb.vector_db_service import VectorDBService
from morpheus.service.vdb.vector_db_service import VectorDbServiceProvider
from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE
from morpheus.utils.module_ids import WRITE_TO_VECTOR_DB
from morpheus.utils.module_utils import ModuleLoaderFactory
Expand Down Expand Up @@ -91,12 +90,10 @@ def _write_to_vector_db(builder: mrc.Builder):
The `module_config` should contain:
- 'embedding_column_name': str, the name of the column containing embeddings (default is "embedding").
- 'recreate': bool, whether to recreate the resource if it already exists (default is False).
- 'service': str, the name of the service or a serialized instance of VectorDBService.
- 'is_service_serialized': bool, whether the provided service is serialized (default is False).
- 'service': instance of VectorDbServiceProvider.
- 'default_resource_name': str, the name of the collection resource (must not be None or empty).
- 'resource_kwargs': dict, additional keyword arguments for resource creation.
- 'resource_schemas': dict, additional keyword arguments for resource creation.
- 'service_kwargs': dict, additional keyword arguments for VectorDBService creation.
- 'batch_size': int, accumulates messages until reaching the specified batch size for writing to VDB.
- 'write_time_interval': float, specifies the time interval (in seconds) for writing messages, or writing messages
when the accumulated batch size is reached.
Expand All @@ -122,19 +119,16 @@ def _write_to_vector_db(builder: mrc.Builder):

embedding_column_name = write_to_vdb_config.embedding_column_name
recreate = write_to_vdb_config.recreate
service = write_to_vdb_config.service
is_service_serialized = write_to_vdb_config.is_service_serialized
service_provider = write_to_vdb_config.service_provider
default_resource_name = write_to_vdb_config.default_resource_name
resource_kwargs = write_to_vdb_config.resource_kwargs
resource_schemas = write_to_vdb_config.resource_schemas
service_kwargs = write_to_vdb_config.service_kwargs
batch_size = write_to_vdb_config.batch_size
write_time_interval = write_to_vdb_config.write_time_interval

# Check if service is serialized and convert if needed
# pylint: disable=not-a-mapping
service: VectorDBService = (pickle.loads(bytes(service, "latin1")) if is_service_serialized else
VectorDBServiceFactory.create_instance(service_name=service, **service_kwargs))
service_provider: VectorDbServiceProvider = pickle.loads(bytes(service_provider, "latin1"))

service = service_provider.create()

preprocess_vdb_resources(service, recreate, resource_schemas)

Expand All @@ -148,7 +142,8 @@ def on_completed():
try:
if accum_stats.data:
merged_df = cudf.concat(accum_stats.data)
service.insert_dataframe(name=key, df=merged_df)
collection = service.load_resource(name=key)
collection.insert_dataframe(df=merged_df)
final_df_references.append(accum_stats.data)
except Exception as e:
logger.error("Unable to upload dataframe entries to vector database: %s", e)
Expand Down Expand Up @@ -213,7 +208,8 @@ def on_data(msg: typing.Union[ControlMessage, MultiResponseMessage, MultiMessage
merged_df = cudf.concat(accum_stats.data)

# pylint: disable=not-a-mapping
service.insert_dataframe(name=key, df=merged_df, **resource_kwargs)
collection = service.load_resource(name=key)
collection.insert_dataframe(df=merged_df, **resource_kwargs)
# Reset accumulator stats
accum_stats.data.clear()
accum_stats.last_insert_time = current_time
Expand Down
8 changes: 3 additions & 5 deletions morpheus/modules/schemas/write_to_vector_db_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,17 @@
class WriteToVDBSchema(BaseModel):
embedding_column_name: str = "embedding"
recreate: bool = False
service: str = Field(default_factory=None)
is_service_serialized: bool = False
service_provider: str = Field(default_factory=None)
default_resource_name: str = Field(default_factory=None)
resource_schemas: dict = Field(default_factory=dict)
resource_kwargs: dict = Field(default_factory=dict)
service_kwargs: dict = Field(default_factory=dict)
batch_size: int = 1024
write_time_interval: float = 1.0

@validator('service', pre=True)
@validator('service_provider', pre=True)
def validate_service(cls, to_validate): # pylint: disable=no-self-argument
if not to_validate:
raise ValueError("Service must be a service name or a serialized instance of VectorDBService")
raise ValueError("Service must be an instance of VectorDbServiceProvider")
return to_validate

@validator('default_resource_name', pre=True)
Expand Down
Loading
Loading