Skip to content

Commit

Permalink
WIP - untested
Browse files Browse the repository at this point in the history
  • Loading branch information
dagardner-nv committed Sep 23, 2024
1 parent f8d2f81 commit 203c56d
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 12 deletions.
8 changes: 6 additions & 2 deletions python/morpheus_llm/morpheus_llm/llm/nodes/extracter_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import numpy as np

from morpheus.messages import MessageMeta
from morpheus_llm.llm import LLMContext
from morpheus_llm.llm import LLMNodeBase

Expand Down Expand Up @@ -59,7 +60,9 @@ async def execute(self, context: LLMContext) -> LLMContext: # pylint: disable=i
# Get the keys from the task
input_keys: list[str] = typing.cast(list[str], context.task()["input_keys"])

with context.message().payload().mutable_dataframe() as df:
meta: MessageMeta = context.message().get_metadata("llm_message_meta")

with meta.mutable_dataframe() as df:
input_dict: list[dict] = df[input_keys].to_dict(orient="list")

input_dict = _array_to_list(input_dict)
Expand Down Expand Up @@ -95,7 +98,8 @@ def get_input_names(self) -> list[str]:
async def execute(self, context: LLMContext) -> LLMContext: # pylint: disable=invalid-overridden-method

# Get the data from the DataFrame
with context.message().payload().mutable_dataframe() as df:
meta: MessageMeta = context.message().get_metadata("llm_message_meta")
with meta.mutable_dataframe() as df:
input_dict: list[dict] = df[self._input_names].to_dict(orient="list")

input_dict = _array_to_list(input_dict)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging

from morpheus.messages import ControlMessage
from morpheus.messages import MessageMeta
from morpheus_llm.llm import LLMContext
from morpheus_llm.llm import LLMTaskHandler

Expand Down Expand Up @@ -48,7 +49,8 @@ async def try_handle(self, context: LLMContext) -> list[ControlMessage]:

input_dict = context.get_inputs()

with context.message().payload().mutable_dataframe() as df:
meta: MessageMeta = context.message().get_metadata("llm_message_meta")
with meta.mutable_dataframe() as df:
# Write the values to the dataframe
for key, value in input_dict.items():
df[key] = value
Expand Down
56 changes: 47 additions & 9 deletions python/morpheus_llm/morpheus_llm/stages/llm/llm_engine_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,28 +68,66 @@ def supports_cpp_node(self) -> bool:
"""Indicates whether this stage supports a C++ node."""
return True

def _cast_control_message(self, message: ControlMessage, *, cpp_messages_lib: types.ModuleType) -> ControlMessage:
def _store_payload(self, message: ControlMessage) -> ControlMessage:
"""
Store the MessageMeta in the ControlMessage's metadata.
In CPU-only allows the ControlMessage to hold an instance of a Python MessageMeta containing a pandas DataFrame.
"""
message.set_metadata("llm_message_meta", message.payload())
return message

def _cast_to_cpp_control_message(self, message: ControlMessage, *,
cpp_messages_lib: types.ModuleType) -> ControlMessage:
"""
LLMEngineStage does not contain a Python implementation, however it is capable of running in cpu-only mode.
This method is needed to cast the Python ControlMessage to a C++ ControlMessage.
This method is needed to create an instance of a C++ ControlMessage.
This is different than casting from the Python bindings for the C++ ControlMessage to a C++ ControlMessage.
"""
return cpp_messages_lib.ControlMessage(message, no_cast=True)
cm = cpp_messages_lib.ControlMessage()
metadata = message.get_metadata()
for (key, value) in metadata.items():
cm.set_metadata(key, value)

return cm

def _restore_payload(self, message: ControlMessage) -> ControlMessage:
"""
Pop llm_message_meta from the metadata and set it as the payload.
In CPU-only mode this has the effect of converting the C++ ControlMessage back to a Python ControlMessage.
"""
metadata = message.get_metadata()
message_meta = metadata.pop("llm_message_meta")

out_message = ControlMessage()
out_message.payload(message_meta)
for (key, value) in metadata.items():
out_message.set_metadata(key, value)

return out_message

def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
import morpheus_llm._lib.llm as _llm

store_payload_node = builder.make_node(f"{self.unique_name}-store-payload", ops.map(self._store_payload))
builder.make_edge(input_node, store_payload_node)

node = _llm.LLMEngineStage(builder, self.unique_name, self._engine)
node.launch_options.pe_count = 1

if self._config.execution_mode == ExecutionMode.CPU:
import morpheus._lib.messages as _messages
cast_fn = functools.partial(self._cast_control_message, cpp_messages_lib=_messages)
pre_node = builder.make_node(f"{self.unique_name}-pre-cast", ops.map(cast_fn))
builder.make_edge(input_node, pre_node)
cast_to_cpp_fn = functools.partial(self._cast_to_cpp_control_message, cpp_messages_lib=_messages)
cast_to_cpp_node = builder.make_node(f"{self.unique_name}-pre-msg-cast", ops.map(cast_to_cpp_fn))
builder.make_edge(store_payload_node, cast_to_cpp_node)
builder.make_edge(cast_to_cpp_node, node)

input_node = pre_node
else:
builder.make_edge(store_payload_node, node)

builder.make_edge(input_node, node)
restore_payload_node = builder.make_node(f"{self.unique_name}-restore-payload", ops.map(self._restore_payload))
builder.make_edge(node, restore_payload_node)

return node
return restore_payload_node

0 comments on commit 203c56d

Please sign in to comment.