Skip to content

Commit

Permalink
clean up and rasise error in react agent for debug
Browse files Browse the repository at this point in the history
  • Loading branch information
liyin2015 committed Dec 27, 2024
1 parent 6f0bb4c commit 32dda35
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 173 deletions.
253 changes: 126 additions & 127 deletions adalflow/adalflow/components/agent/react.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,78 +123,6 @@ def call(
return step_history


# class ExecuteAction(GradComponent):
# def __init__(self):
# super().__init__()
# self.name = "ExecuteAction"
# self._component_desc = "Execute the action and output the new step_output."

# def call(
# self,
# response: GeneratorOutput,
# step_output: StepOutput,
# execute_action: Callable,
# id: Optional[str] = None,
# ) -> StepOutput:
# """Parse the action string to a function call and execute it. Update the action_step with the result."""
# step = step_output.step
# output = execute_action_fn(response, step_output, step, execute_action, id)
# if isinstance(output, Parameter):
# output = output.full_response
# return output


class FunctionOutputToStepOutput(GradComponent):
def __init__(self):
super().__init__()
self.name = "FunctionOutputToStepOutput"
self._component_desc = "Convert the FunctionOutput to StepOutput."

def call(self, output: FunctionOutput, step_output: StepOutput) -> StepOutput:
"""Convert the FunctionOutput to StepOutput."""

temp_result = output.output
if isinstance(temp_result, Parameter):
step_output.observation = temp_result.data
else:
step_output.observation = temp_result
return step_output
# step_output = StepOutput(step=step)
# step_output.observation = output.output
# return step_output


# TODO: make execute_action_fn to a GradComponent to enable the training of the tools too.
def execute_action_fn(
x: GeneratorOutput, step_output: StepOutput, step: int, execute_action: Any, id=None
) -> StepOutput:
"""Execute the action and update the step_output."""
if x.error:
error_msg = f"Error planning step {step}: {x.error}"
step_output.observation = error_msg
log.error(error_msg)
else:
try:
fun_expr: FunctionExpression = x.data
step_output.action = fun_expr
log.debug(f"Step {step}: {fun_expr}")

if step_output and step_output.action:
step_output = execute_action(step_output, id)
printc(f"Step {step}: \n{step_output}\n_______\n", color="blue")
return step_output
else:
printc(f"Failed to parse response for step {step}", color="red")
log.error(f"Failed to parse response for step {step}")
return step_output
except Exception as e:
error_msg = f"Error parsing response for step {step}: {e}"
step_output.observation = error_msg
log.error(error_msg)
printc(error_msg, color="red")
return step_output


@dataclass
class ReActOutput(DataClass):
r"""Similar to GeneratorOutput, but with additional step history and final answer."""
Expand Down Expand Up @@ -323,8 +251,6 @@ def __init__(

# added this component to the computation graph
self.append_step_history = AppendStepHistory()
# self.execute_action = ExecuteAction()
self.function_output_to_step_output = FunctionOutputToStepOutput()

def _init_tools(
self,
Expand Down Expand Up @@ -398,6 +324,10 @@ def call(
) -> StepOutput:
"""Convert the action string to StepOutput."""
step_output = StepOutput(step=step)
if not isinstance(action_str, FunctionExpression):
raise ValueError(
f"Expected FunctionExpression, but got {type(action_str)}"
)
step_output.action = action_str
step_output.function = func
# printc(f"result: {result}", color="blue")
Expand All @@ -411,53 +341,104 @@ def call(

return step_output

tmp_action_str_to_step_output = ActionStrToStepOutput()
try:

# printc(f"response: {response}", color="yellow")
# TO FunctionExpression
response.add_successor_map_fn(
successor=self.tool_manager, map_fn=lambda x: x.full_response
)
tmp_action_str_to_step_output = ActionStrToStepOutput()

func: Union[Function, Parameter] = self.tool_manager(
expr_or_fun=response, step="parse"
)
printc(f"tool_manager: {self.tool_manager.training}", color="red")
if not isinstance(func, Parameter):
raise ValueError(f"Expected Parameter, but got {type(func)}: {func}")
# printc(f"func: {func}", color="yellow")
# replace the id
if isinstance(func, Parameter):
func.data.kwargs["id"] = id

func.add_successor_map_fn(self.tool_manager, lambda x: x.data)

result: Parameter = self.tool_manager(expr_or_fun=func, step="execute")
# printc(f"result: {result}", color="red")
result.add_successor_map_fn(
successor=tmp_action_str_to_step_output, map_fn=lambda x: x.data
)
action_step = tmp_action_str_to_step_output.forward(
action_str=response.data,
# printc(f"response: {response}", color="yellow")
# TO FunctionExpression
response.add_successor_map_fn(
successor=self.tool_manager, map_fn=lambda x: x.full_response
)

func: Union[Function, Parameter] = self.tool_manager(
expr_or_fun=response, step="parse"
)
printc(f"tool_manager: {self.tool_manager.training}", color="red")
if not isinstance(func, Parameter):
raise ValueError(
f"Expected Parameter, but got {type(func)}: {func}"
)
# printc(f"func: {func}", color="yellow")
# replace the id
if isinstance(func, Parameter):
func.data.kwargs["id"] = id

func.add_successor_map_fn(self.tool_manager, lambda x: x.data)

result: Parameter = self.tool_manager(expr_or_fun=func, step="execute")
# printc(f"result: {result}", color="red")
result.add_successor_map_fn(
successor=tmp_action_str_to_step_output, map_fn=lambda x: x.data
)
action_step = tmp_action_str_to_step_output.forward(
action_str=response.data,
step=action_step.step,
result=result,
func=func,
)

return action_step

except Exception as e:
log.error(f"Error executing {response}: {e}")
# pass the error as observation so that the agent can continue and correct the error in the next step
action_step.observation = f"Error executing {response}: {e}"
return action_step
else:

return self._execute_action_eval_mode(
x=response,
step_output=action_step,
step=action_step.step,
result=result,
func=func,
id=id,
)

return action_step
# except Exception as e:
# log.error(f"Error executing {response}: {e}")
# # pass the error as observation so that the agent can continue and correct the error in the next step
# # action_step.observation = f"Error executing {response}: {e}"
# # return action_step
# raise e
def _execute_action_eval_mode(
self,
x: GeneratorOutput,
step_output: StepOutput,
step: int,
id=None,
) -> StepOutput:
"""Execute the action and update the step_output."""
if x.error:
error_msg = f"Error planning step {step}: {x.error}"
step_output.observation = error_msg
step_output.action = None
log.error(error_msg)
else:
# normal pass
fun: Function = self.tool_manager(expr_or_fun=response.data, step="parse")
action_step.function = fun
result: FunctionOutput = self.tool_manager(expr_or_fun=fun, step="execute")
action_step.observation = result.output
return action_step
try:
fun_expr: FunctionExpression = x.data
step_output.action = fun_expr
log.debug(f"Step {step}: {fun_expr}")

if step_output and step_output.action:

fun: Function = self.tool_manager(
expr_or_fun=fun_expr, step="parse"
)

step_output.function = fun

result: FunctionOutput = self.tool_manager(
expr_or_fun=fun, step="execute"
)
step_output.observation = result.output

# step_output = execute_action(step_output, id)
printc(f"Step {step}: \n{step_output}\n_______\n", color="blue")
return step_output
else:
printc(f"Failed to parse response for step {step}", color="red")
log.error(f"Failed to parse response for step {step}")
return step_output
except Exception as e:
error_msg = f"Error parsing response for step {step}: {e}"
step_output.observation = error_msg
log.error(error_msg)
printc(error_msg, color="red")
return step_output

def _run_one_step(
self,
Expand All @@ -470,16 +451,28 @@ def _run_one_step(
"""Run one step of the agent. Plan and execute the action for the step.
Need to deal with both train and eval mode on the self.planner.
"""
printc("start running one step", color="yellow")
printc(f"step: {step}", color="yellow")

prompt_kwargs["step_history"] = step_history
step_history_value = (
step_history.data if isinstance(step_history, Parameter) else step_history
)
for step in step_history_value:
if not step:
raise ValueError(
f"Expected StepOutput, but got {type(step)}, all steps: {step_history_value}"
)
if not isinstance(step, StepOutput):
raise ValueError(
f"Expected StepOutput, but got {type(step)}, all steps: {step_history_value}"
)
# printc(
# f"prompt_kwargs 1: {prompt_kwargs}, training: {self.planner.training}",
# color="yellow",
# )

prompt_str = self.planner.get_prompt(**prompt_kwargs)
printc(f"prompt_str: {prompt_str}", color="red")
# prompt_str = self.planner.get_prompt(**prompt_kwargs)
# printc(f"prompt_str: {prompt_str}", color="red")
# return [StepOutput(step=step, action=None, observation="test")]

log.debug(
Expand All @@ -491,14 +484,16 @@ def _run_one_step(
prompt_kwargs=prompt_kwargs, model_kwargs=model_kwargs, id=id
)
except Exception as e:
log.error(f"Error planning step {step}: {e}")
return None
error_msg = f"Error happened in planner response: {e}. Training mode: {self.planner.training}"
raise ValueError(
error_msg
) # raise the error for debugging as this should not happen in normal cases.

# create a new step output
step_output: StepOutput = StepOutput(step=step)

# connecting two generators in the computation graph, it will set up self.step_history
if isinstance(response, Parameter):
if self.training and isinstance(response, Parameter):
# printc(f"response: {response}", color="yellow")

step_output: Parameter = self._execute_action(step_output, response, id)
Expand All @@ -522,7 +517,7 @@ def _run_one_step(

else:

step_output = self._execute_action(
step_output: StepOutput = self._execute_action(
action_step=step_output, response=response, id=id
)
printc(f"step_output: {step_output}", color="red")
Expand Down Expand Up @@ -620,11 +615,15 @@ def bicall(
printc(f"input_query: {input}", color="red")
for i in range(self.max_steps):
step = i + 1
step_history = self._run_one_step(
step, prompt_kwargs, model_kwargs, id, step_history
)
if self._check_last_step(step_history):
break
try:
step_history = self._run_one_step(
step, prompt_kwargs, model_kwargs, id, step_history
)
if self._check_last_step(step_history):
break
except Exception as e:
log.error(f"Error running step {step}: {e}")
raise e # the only place to raise the error for debugging. In normal cases, the agent should not raise an error.

answer = self._get_answer(step_history)
printc(f"answer: {answer}", color="yellow")
Expand Down
6 changes: 5 additions & 1 deletion adalflow/adalflow/components/agent/react_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from adalflow.core.base_data_class import DataClass
from copy import deepcopy
import logging
import warnings


from adalflow.core.generator import Generator
Expand Down Expand Up @@ -384,8 +385,11 @@ def _execute_action(
action_step: StepOutput = self.function_output_to_step_output(
output=result, step_output=action_step
)
else:
elif isinstance(result, FunctionOutput):
action_step.observation = result.output
else:
warnings.warn(f"Fails to parse the result: {result}")
action_step.observation = result

return action_step
except Exception as e:
Expand Down
20 changes: 15 additions & 5 deletions adalflow/tests/test_react_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from unittest.mock import Mock, patch
from adalflow.core.func_tool import FunctionTool
from adalflow.core.types import FunctionExpression, GeneratorOutput
from adalflow.components.agent.react import ReActAgent
from adalflow.components.agent.react import ReActAgent, StepOutput
from adalflow.components.model_client.openai_client import OpenAIClient


Expand Down Expand Up @@ -84,6 +84,16 @@ def test_complex_query_execution(self, mock_planner):
),
]

# mock the agent to run the first step
step_output = self.react_agent._run_one_step(
step=1, step_history=[], prompt_kwargs={}, model_kwargs={}
)
print(f"step_output 1: {step_output}")
self.assertEqual(len(step_output), 1)
self.assertTrue(isinstance(step_output[0], StepOutput))
self.assertTrue(step_output[0].action)
self.assertTrue(isinstance(step_output[0].action, FunctionExpression))

result = self.react_agent.call("Add 2 and 3, then multiply by 4.")
print(f"result: {result}")
self.assertEqual(result.answer, 12)
Expand All @@ -100,11 +110,11 @@ def test_error_handling(self, mock_planner):
# no action

# check error raised
with self.assertRaises(ValueError):
# with self.assertRaises(ValueError):

result = self.react_agent.call("Simulate an error.")
print(f"result 2: {result}")
self.assertIn("Error occurred", result.answer)
result = self.react_agent.call("Simulate an error.")
print(f"result 2: {result}")
self.assertIn("Error occurred", result.answer)


from adalflow.optim.grad_component import GradComponent
Expand Down
Loading

0 comments on commit 32dda35

Please sign in to comment.