Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions haystack/core/pipeline/breakpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,13 +375,32 @@ def _create_agent_snapshot(
:param agent_breakpoint: AgentBreakpoint object containing breakpoints
:return: An AgentSnapshot containing the agent's state and component visits.
"""
try:
serialized_chat_generator = _serialize_value_with_schema(
_deepcopy_with_exceptions(component_inputs["chat_generator"])
)
except Exception as error:
logger.warning(
"Failed to serialize the agent's chat_generator inputs. "
"The inputs in the snapshot will be replaced with an empty dictionary. Error: {e}",
e=error,
)
serialized_chat_generator = {}

try:
serialized_tool_invoker = _serialize_value_with_schema(
_deepcopy_with_exceptions(component_inputs["tool_invoker"])
)
except Exception as error:
logger.warning(
"Failed to serialize the agent's tool_invoker inputs. "
"The inputs in the snapshot will be replaced with an empty dictionary. Error: {e}",
e=error,
)
serialized_tool_invoker = {}

return AgentSnapshot(
component_inputs={
"chat_generator": _serialize_value_with_schema(
_deepcopy_with_exceptions(component_inputs["chat_generator"])
),
"tool_invoker": _serialize_value_with_schema(_deepcopy_with_exceptions(component_inputs["tool_invoker"])),
},
component_inputs={"chat_generator": serialized_chat_generator, "tool_invoker": serialized_tool_invoker},
component_visits=component_visits,
break_point=agent_breakpoint,
timestamp=datetime.now(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
enhancements:
- |
Made ``_create_agent_snapshot`` robust towards serialization errors. If serializing
agent component inputs fails, a warning is logged and an empty dictionary is used
as a fallback, preventing the serialization error from masking the real pipeline
runtime error.
71 changes: 70 additions & 1 deletion test/core/pipeline/test_breakpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
from haystack.core.pipeline import Pipeline
from haystack.core.pipeline.breakpoint import (
HAYSTACK_PIPELINE_SNAPSHOT_SAVE_ENABLED,
_create_agent_snapshot,
_create_pipeline_snapshot,
_is_snapshot_save_enabled,
_save_pipeline_snapshot,
_transform_json_structure,
load_pipeline_snapshot,
)
from haystack.dataclasses import ChatMessage
from haystack.dataclasses.breakpoints import Breakpoint, PipelineSnapshot, PipelineState
from haystack.dataclasses.breakpoints import AgentBreakpoint, Breakpoint, PipelineSnapshot, PipelineState


def test_transform_json_structure_unwraps_sender_value():
Expand Down Expand Up @@ -239,6 +240,74 @@ def to_dict(self):
assert any("Failed to serialize original input data for `pipeline.run`." in msg for msg in caplog.messages)


class TestCreateAgentSnapshot:
def test_create_agent_snapshot_non_serializable_chat_generator(self, caplog):
class NonSerializable:
def to_dict(self):
raise TypeError("Cannot serialize")

agent_breakpoint = AgentBreakpoint(
agent_name="agent", break_point=Breakpoint(component_name="chat_generator", visit_count=1)
)

with caplog.at_level(logging.WARNING):
snapshot = _create_agent_snapshot(
component_visits={"chat_generator": 1, "tool_invoker": 0},
agent_breakpoint=agent_breakpoint,
component_inputs={"chat_generator": {"messages": NonSerializable()}, "tool_invoker": {"messages": []}},
)

assert snapshot.component_inputs["chat_generator"] == {}
assert snapshot.component_inputs["tool_invoker"] != {}
assert "Failed to serialize the agent's chat_generator inputs" in caplog.text

def test_create_agent_snapshot_non_serializable_tool_invoker(self, caplog):
class NonSerializable:
def to_dict(self):
raise TypeError("Cannot serialize")

agent_breakpoint = AgentBreakpoint(
agent_name="agent", break_point=Breakpoint(component_name="chat_generator", visit_count=1)
)

with caplog.at_level(logging.WARNING):
snapshot = _create_agent_snapshot(
component_visits={"chat_generator": 1, "tool_invoker": 0},
agent_breakpoint=agent_breakpoint,
component_inputs={"chat_generator": {"messages": []}, "tool_invoker": {"messages": NonSerializable()}},
)

assert snapshot.component_inputs["tool_invoker"] == {}
assert snapshot.component_inputs["chat_generator"] != {}
assert "Failed to serialize the agent's tool_invoker inputs" in caplog.text

def test_create_agent_snapshot_both_non_serializable(self, caplog):
class NonSerializable:
def to_dict(self):
raise TypeError("Cannot serialize")

agent_breakpoint = AgentBreakpoint(
agent_name="agent", break_point=Breakpoint(component_name="chat_generator", visit_count=1)
)

with caplog.at_level(logging.WARNING):
snapshot = _create_agent_snapshot(
component_visits={"chat_generator": 1, "tool_invoker": 0},
agent_breakpoint=agent_breakpoint,
component_inputs={
"chat_generator": {"messages": NonSerializable()},
"tool_invoker": {"messages": NonSerializable()},
},
)

assert snapshot.component_inputs["chat_generator"] == {}
assert snapshot.component_inputs["tool_invoker"] == {}
assert "Failed to serialize the agent's chat_generator inputs" in caplog.text
assert "Failed to serialize the agent's tool_invoker inputs" in caplog.text
assert snapshot.component_visits == {"chat_generator": 1, "tool_invoker": 0}
assert snapshot.break_point == agent_breakpoint


def test_save_pipeline_snapshot_raises_on_failure(tmp_path, caplog, monkeypatch):
monkeypatch.setenv(HAYSTACK_PIPELINE_SNAPSHOT_SAVE_ENABLED, "true")

Expand Down
Loading