diff --git a/haystack/core/pipeline/breakpoint.py b/haystack/core/pipeline/breakpoint.py index 8d91e8ae90..c1208a83f0 100644 --- a/haystack/core/pipeline/breakpoint.py +++ b/haystack/core/pipeline/breakpoint.py @@ -375,13 +375,32 @@ def _create_agent_snapshot( :param agent_breakpoint: AgentBreakpoint object containing breakpoints :return: An AgentSnapshot containing the agent's state and component visits. """ + try: + serialized_chat_generator = _serialize_value_with_schema( + _deepcopy_with_exceptions(component_inputs["chat_generator"]) + ) + except Exception as error: + logger.warning( + "Failed to serialize the agent's chat_generator inputs. " + "The inputs in the snapshot will be replaced with an empty dictionary. Error: {e}", + e=error, + ) + serialized_chat_generator = {} + + try: + serialized_tool_invoker = _serialize_value_with_schema( + _deepcopy_with_exceptions(component_inputs["tool_invoker"]) + ) + except Exception as error: + logger.warning( + "Failed to serialize the agent's tool_invoker inputs. " + "The inputs in the snapshot will be replaced with an empty dictionary. Error: {e}", + e=error, + ) + serialized_tool_invoker = {} + return AgentSnapshot( - component_inputs={ - "chat_generator": _serialize_value_with_schema( - _deepcopy_with_exceptions(component_inputs["chat_generator"]) - ), - "tool_invoker": _serialize_value_with_schema(_deepcopy_with_exceptions(component_inputs["tool_invoker"])), - }, + component_inputs={"chat_generator": serialized_chat_generator, "tool_invoker": serialized_tool_invoker}, component_visits=component_visits, break_point=agent_breakpoint, timestamp=datetime.now(), diff --git a/releasenotes/notes/error-handling-agent-snapshot-bd5adfd458e9981a.yaml b/releasenotes/notes/error-handling-agent-snapshot-bd5adfd458e9981a.yaml new file mode 100644 index 0000000000..209c8b5013 --- /dev/null +++ b/releasenotes/notes/error-handling-agent-snapshot-bd5adfd458e9981a.yaml @@ -0,0 +1,7 @@ +--- +enhancements: + - | + Made ``_create_agent_snapshot`` robust towards serialization errors. If serializing + agent component inputs fails, a warning is logged and an empty dictionary is used + as a fallback, preventing the serialization error from masking the real pipeline + runtime error. diff --git a/test/core/pipeline/test_breakpoint.py b/test/core/pipeline/test_breakpoint.py index 9b77e8ba0d..953fd26667 100644 --- a/test/core/pipeline/test_breakpoint.py +++ b/test/core/pipeline/test_breakpoint.py @@ -12,6 +12,7 @@ from haystack.core.pipeline import Pipeline from haystack.core.pipeline.breakpoint import ( HAYSTACK_PIPELINE_SNAPSHOT_SAVE_ENABLED, + _create_agent_snapshot, _create_pipeline_snapshot, _is_snapshot_save_enabled, _save_pipeline_snapshot, @@ -19,7 +20,7 @@ load_pipeline_snapshot, ) from haystack.dataclasses import ChatMessage -from haystack.dataclasses.breakpoints import Breakpoint, PipelineSnapshot, PipelineState +from haystack.dataclasses.breakpoints import AgentBreakpoint, Breakpoint, PipelineSnapshot, PipelineState def test_transform_json_structure_unwraps_sender_value(): @@ -239,6 +240,74 @@ def to_dict(self): assert any("Failed to serialize original input data for `pipeline.run`." in msg for msg in caplog.messages) +class TestCreateAgentSnapshot: + def test_create_agent_snapshot_non_serializable_chat_generator(self, caplog): + class NonSerializable: + def to_dict(self): + raise TypeError("Cannot serialize") + + agent_breakpoint = AgentBreakpoint( + agent_name="agent", break_point=Breakpoint(component_name="chat_generator", visit_count=1) + ) + + with caplog.at_level(logging.WARNING): + snapshot = _create_agent_snapshot( + component_visits={"chat_generator": 1, "tool_invoker": 0}, + agent_breakpoint=agent_breakpoint, + component_inputs={"chat_generator": {"messages": NonSerializable()}, "tool_invoker": {"messages": []}}, + ) + + assert snapshot.component_inputs["chat_generator"] == {} + assert snapshot.component_inputs["tool_invoker"] != {} + assert "Failed to serialize the agent's chat_generator inputs" in caplog.text + + def test_create_agent_snapshot_non_serializable_tool_invoker(self, caplog): + class NonSerializable: + def to_dict(self): + raise TypeError("Cannot serialize") + + agent_breakpoint = AgentBreakpoint( + agent_name="agent", break_point=Breakpoint(component_name="chat_generator", visit_count=1) + ) + + with caplog.at_level(logging.WARNING): + snapshot = _create_agent_snapshot( + component_visits={"chat_generator": 1, "tool_invoker": 0}, + agent_breakpoint=agent_breakpoint, + component_inputs={"chat_generator": {"messages": []}, "tool_invoker": {"messages": NonSerializable()}}, + ) + + assert snapshot.component_inputs["tool_invoker"] == {} + assert snapshot.component_inputs["chat_generator"] != {} + assert "Failed to serialize the agent's tool_invoker inputs" in caplog.text + + def test_create_agent_snapshot_both_non_serializable(self, caplog): + class NonSerializable: + def to_dict(self): + raise TypeError("Cannot serialize") + + agent_breakpoint = AgentBreakpoint( + agent_name="agent", break_point=Breakpoint(component_name="chat_generator", visit_count=1) + ) + + with caplog.at_level(logging.WARNING): + snapshot = _create_agent_snapshot( + component_visits={"chat_generator": 1, "tool_invoker": 0}, + agent_breakpoint=agent_breakpoint, + component_inputs={ + "chat_generator": {"messages": NonSerializable()}, + "tool_invoker": {"messages": NonSerializable()}, + }, + ) + + assert snapshot.component_inputs["chat_generator"] == {} + assert snapshot.component_inputs["tool_invoker"] == {} + assert "Failed to serialize the agent's chat_generator inputs" in caplog.text + assert "Failed to serialize the agent's tool_invoker inputs" in caplog.text + assert snapshot.component_visits == {"chat_generator": 1, "tool_invoker": 0} + assert snapshot.break_point == agent_breakpoint + + def test_save_pipeline_snapshot_raises_on_failure(tmp_path, caplog, monkeypatch): monkeypatch.setenv(HAYSTACK_PIPELINE_SNAPSHOT_SAVE_ENABLED, "true")