Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 56 additions & 23 deletions haystack/core/pipeline/breakpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,29 +375,12 @@ def _create_agent_snapshot(
:param agent_breakpoint: AgentBreakpoint object containing breakpoints
:return: An AgentSnapshot containing the agent's state and component visits.
"""
try:
serialized_chat_generator = _serialize_value_with_schema(
_deepcopy_with_exceptions(component_inputs["chat_generator"])
)
except Exception as error:
logger.warning(
"Failed to serialize the agent's chat_generator inputs. "
"The inputs in the snapshot will be replaced with an empty dictionary. Error: {e}",
e=error,
)
serialized_chat_generator = {}

try:
serialized_tool_invoker = _serialize_value_with_schema(
_deepcopy_with_exceptions(component_inputs["tool_invoker"])
)
except Exception as error:
logger.warning(
"Failed to serialize the agent's tool_invoker inputs. "
"The inputs in the snapshot will be replaced with an empty dictionary. Error: {e}",
e=error,
)
serialized_tool_invoker = {}
serialized_chat_generator = _serialize_agent_component_inputs(
component_name="chat_generator", component_inputs=component_inputs["chat_generator"]
)
serialized_tool_invoker = _serialize_agent_component_inputs(
component_name="tool_invoker", component_inputs=component_inputs["tool_invoker"]
)

return AgentSnapshot(
component_inputs={"chat_generator": serialized_chat_generator, "tool_invoker": serialized_tool_invoker},
Expand All @@ -407,6 +390,56 @@ def _create_agent_snapshot(
)


def _serialize_agent_component_inputs(component_name: str, component_inputs: dict[str, Any]) -> dict[str, Any]:
"""
Serialize agent component inputs while preserving resumable fields whenever possible.

If serializing the whole input mapping fails (for example due to a non-serializable callback),
we retry field-by-field and omit only the failing fields. This keeps snapshots resumable when
required fields like ``messages`` or ``state`` are still serializable.

:param component_name: Name of the agent sub-component (e.g. ``chat_generator`` or ``tool_invoker``).
:param component_inputs: Runtime inputs for that sub-component.
:returns: A serialized payload, or ``{}`` if no fields can be serialized at all.
"""
try:
return _serialize_value_with_schema(_deepcopy_with_exceptions(component_inputs))
except Exception as error:
logger.warning(
"Failed to serialize the agent's {component_name} inputs. "
"Haystack will omit only the non-serializable fields when possible. Error: {e}",
component_name=component_name,
e=error,
)

serialized_properties: dict[str, Any] = {}
serialized_data: dict[str, Any] = {}

for field_name, value in component_inputs.items():
try:
serialized_value = _serialize_value_with_schema(_deepcopy_with_exceptions(value))
except Exception as field_error:
logger.warning(
"Failed to serialize the agent's {component_name}.{field_name} input. "
"The field will be omitted from the snapshot. Error: {e}",
component_name=component_name,
field_name=field_name,
e=field_error,
)
continue

serialized_properties[field_name] = serialized_value["serialization_schema"]
serialized_data[field_name] = serialized_value["serialized_data"]

if not serialized_properties:
return {}

return {
"serialization_schema": {"type": "object", "properties": serialized_properties},
"serialized_data": serialized_data,
}
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_serialize_agent_component_inputs can still return a bare {} when no fields serialize, but _deserialize_value_with_schema treats {} as an invalid payload and will raise DeserializationError. Consider always returning a structurally valid payload (e.g., empty object schema + empty serialized_data) so snapshots don’t become invalid-by-format even when all fields are omitted; this can matter if the corresponding component inputs are optional for resume (e.g., resuming from a ToolBreakpoint where chat_generator inputs may not be needed).

Copilot uses AI. Check for mistakes.


def _validate_tool_breakpoint_is_valid(agent_breakpoint: AgentBreakpoint, tools: "ToolsType") -> None:
"""
Validates the AgentBreakpoint passed to the agent.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
fixes:
- |
Preserve resumable agent snapshots when some ``chat_generator`` or ``tool_invoker`` inputs are
non-serializable. Haystack now omits only the failing runtime-only fields (for example
non-serializable callbacks) instead of replacing the whole payload with an empty dictionary.
34 changes: 34 additions & 0 deletions test/components/agents/test_agent_breakpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,40 @@ def test_resume_from_tool_invoker(self, agent, tmp_path, monkeypatch):
assert "last_message" in result
assert len(result["messages"]) > 0

def test_resume_from_tool_invoker_omits_non_serializable_runtime_callback(self, agent, tmp_path, monkeypatch):
monkeypatch.setenv(HAYSTACK_PIPELINE_SNAPSHOT_SAVE_ENABLED, "true")
debug_path = str(tmp_path / "debug_snapshots")
tool_bp = ToolBreakpoint(component_name="tool_invoker", tool_name="weather_tool", snapshot_file_path=debug_path)
agent_breakpoint = AgentBreakpoint(break_point=tool_bp, agent_name="test_agent")

try:
agent.run(
messages=[ChatMessage.from_user("What's the weather in Berlin?")],
break_point=agent_breakpoint,
streaming_callback=lambda chunk: None,
)
except BreakpointException:
pass

snapshot_files = list(Path(debug_path).glob("test_agent_tool_invoker_*.json"))
assert len(snapshot_files) > 0
latest_snapshot_file = str(max(snapshot_files, key=os.path.getctime))
agent_snapshot = load_pipeline_snapshot(latest_snapshot_file).agent_snapshot

assert agent_snapshot is not None
assert "streaming_callback" not in agent_snapshot.component_inputs["chat_generator"]["serialized_data"]
assert "streaming_callback" not in agent_snapshot.component_inputs["tool_invoker"]["serialized_data"]
assert "state" in agent_snapshot.component_inputs["tool_invoker"]["serialized_data"]

result = agent.run(
messages=[ChatMessage.from_user("This is actually ignored when resuming from snapshot.")],
snapshot=agent_snapshot,
)

assert "messages" in result
assert "last_message" in result
assert len(result["messages"]) == 4

def test_resume_from_tool_invoker_and_new_breakpoint(self, weather_tool, tmp_path, monkeypatch):
monkeypatch.setenv(HAYSTACK_PIPELINE_SNAPSHOT_SAVE_ENABLED, "true")
agent = Agent(
Expand Down
Loading