Skip to content

Commit

Permalink
Removed stream parser, favouring return of raw stream from generator
Browse files Browse the repository at this point in the history
  • Loading branch information
Lloyd Hamilton committed Jan 12, 2025
1 parent 3329ccc commit 1abea9e
Showing 1 changed file with 7 additions and 13 deletions.
20 changes: 7 additions & 13 deletions adalflow/adalflow/components/model_client/bedrock_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from adalflow.core.model_client import ModelClient
from adalflow.core.types import ModelType, CompletionUsage, GeneratorOutput
from adalflow.utils import printc

from adalflow.utils.lazy_import import safe_import, OptionalPackages

Expand Down Expand Up @@ -165,27 +166,20 @@ def init_sync_client(self):
def init_async_client(self):
raise NotImplementedError("Async call not implemented yet.")

@staticmethod
def parse_stream_response(completion: dict) -> str:
if "contentBlockDelta" in completion:
if delta_chunk := completion["contentBlockDelta"]["delta"]:
return delta_chunk["text"]
return ''

def handle_stream_response(self, stream: dict) -> GeneratorType:
try:
for chunk in stream["stream"]:
stream: GeneratorType = stream["stream"]
for chunk in stream:
log.debug(f"Raw chunk: {chunk}")
parsed_content = self.parse_stream_response(chunk)
yield parsed_content
yield chunk
except Exception as e:
print(f"Error in handle_stream_response: {e}") # Debug print
raise

def parse_chat_completion(self, completion: dict) -> "GeneratorOutput":
"""Parse the completion, and put it into the raw_response."""
try:
data = self.handle_stream_response(completion)
data = self.chat_completion_parser(completion)
return GeneratorOutput(
data=None, error=None, raw_response=data
)
Expand Down Expand Up @@ -254,19 +248,19 @@ def call(
self,
api_kwargs: Dict = {},
model_type: ModelType = ModelType.UNDEFINED,
stream: bool = False
) -> dict:
"""
kwargs is the combined input and model_kwargs
"""
if model_type == ModelType.LLM:
if "stream" in api_kwargs and api_kwargs.get("stream", False):
log.debug("Streaming call")
printc("Streaming")
api_kwargs.pop("stream") # stream is not a valid parameter for bedrock
self.chat_completion_parser = self.handle_stream_response
return self.sync_client.converse_stream(**api_kwargs)
else:
api_kwargs.pop("stream")
api_kwargs.pop("stream", None)
return self.sync_client.converse(**api_kwargs)
else:
raise ValueError(f"model_type {model_type} is not supported")
Expand Down

0 comments on commit 1abea9e

Please sign in to comment.