Skip to content

Commit 98e8317

Browse files
committed
Keep agent snapshots resumable when runtime-only inputs cannot be serialized
The fallback added for agent snapshot serialization errors preserved the original runtime failure, but it could also replace the entire chat_generator or tool_invoker payload with an empty dict. That made the saved snapshot impossible to resume even when only a runtime-only field such as a streaming callback was non-serializable. This change narrows the fallback behavior: Haystack now retries those component inputs field-by-field and omits only the fields that cannot be serialized, preserving resumable fields like messages, state, and tools. A regression test covers resuming from a tool-invoker snapshot created with a non-serializable runtime callback. Constraint: Must preserve the original deepset-ai#11108 goal of not masking the real runtime error Rejected: Keep saving `{}` and document snapshots as non-resumable | breaks the existing resume contract more than necessary Confidence: high Scope-risk: narrow Reversibility: clean Directive: If agent snapshot fallback behavior changes again, verify both error preservation and snapshot resumability Tested: hatch -e test run pytest test/components/agents/test_agent_breakpoints.py -k 'resume_from_tool_invoker' -q Tested: hatch -e test run pytest test/core/pipeline/test_breakpoint.py -k 'create_agent_snapshot' -q Tested: hatch run fmt-check haystack/core/pipeline/breakpoint.py test/components/agents/test_agent_breakpoints.py Not-tested: Full unit suite and integration suite Related: deepset-ai#11126
1 parent 07e8c9b commit 98e8317

3 files changed

Lines changed: 96 additions & 23 deletions

File tree

haystack/core/pipeline/breakpoint.py

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -375,29 +375,12 @@ def _create_agent_snapshot(
375375
:param agent_breakpoint: AgentBreakpoint object containing breakpoints
376376
:return: An AgentSnapshot containing the agent's state and component visits.
377377
"""
378-
try:
379-
serialized_chat_generator = _serialize_value_with_schema(
380-
_deepcopy_with_exceptions(component_inputs["chat_generator"])
381-
)
382-
except Exception as error:
383-
logger.warning(
384-
"Failed to serialize the agent's chat_generator inputs. "
385-
"The inputs in the snapshot will be replaced with an empty dictionary. Error: {e}",
386-
e=error,
387-
)
388-
serialized_chat_generator = {}
389-
390-
try:
391-
serialized_tool_invoker = _serialize_value_with_schema(
392-
_deepcopy_with_exceptions(component_inputs["tool_invoker"])
393-
)
394-
except Exception as error:
395-
logger.warning(
396-
"Failed to serialize the agent's tool_invoker inputs. "
397-
"The inputs in the snapshot will be replaced with an empty dictionary. Error: {e}",
398-
e=error,
399-
)
400-
serialized_tool_invoker = {}
378+
serialized_chat_generator = _serialize_agent_component_inputs(
379+
component_name="chat_generator", component_inputs=component_inputs["chat_generator"]
380+
)
381+
serialized_tool_invoker = _serialize_agent_component_inputs(
382+
component_name="tool_invoker", component_inputs=component_inputs["tool_invoker"]
383+
)
401384

402385
return AgentSnapshot(
403386
component_inputs={"chat_generator": serialized_chat_generator, "tool_invoker": serialized_tool_invoker},
@@ -407,6 +390,56 @@ def _create_agent_snapshot(
407390
)
408391

409392

393+
def _serialize_agent_component_inputs(component_name: str, component_inputs: dict[str, Any]) -> dict[str, Any]:
394+
"""
395+
Serialize agent component inputs while preserving resumable fields whenever possible.
396+
397+
If serializing the whole input mapping fails (for example due to a non-serializable callback),
398+
we retry field-by-field and omit only the failing fields. This keeps snapshots resumable when
399+
required fields like ``messages`` or ``state`` are still serializable.
400+
401+
:param component_name: Name of the agent sub-component (e.g. ``chat_generator`` or ``tool_invoker``).
402+
:param component_inputs: Runtime inputs for that sub-component.
403+
:returns: A serialized payload, or ``{}`` if no fields can be serialized at all.
404+
"""
405+
try:
406+
return _serialize_value_with_schema(_deepcopy_with_exceptions(component_inputs))
407+
except Exception as error:
408+
logger.warning(
409+
"Failed to serialize the agent's {component_name} inputs. "
410+
"Haystack will omit only the non-serializable fields when possible. Error: {e}",
411+
component_name=component_name,
412+
e=error,
413+
)
414+
415+
serialized_properties: dict[str, Any] = {}
416+
serialized_data: dict[str, Any] = {}
417+
418+
for field_name, value in component_inputs.items():
419+
try:
420+
serialized_value = _serialize_value_with_schema(_deepcopy_with_exceptions(value))
421+
except Exception as field_error:
422+
logger.warning(
423+
"Failed to serialize the agent's {component_name}.{field_name} input. "
424+
"The field will be omitted from the snapshot. Error: {e}",
425+
component_name=component_name,
426+
field_name=field_name,
427+
e=field_error,
428+
)
429+
continue
430+
431+
serialized_properties[field_name] = serialized_value["serialization_schema"]
432+
serialized_data[field_name] = serialized_value["serialized_data"]
433+
434+
if not serialized_properties:
435+
return {}
436+
437+
return {
438+
"serialization_schema": {"type": "object", "properties": serialized_properties},
439+
"serialized_data": serialized_data,
440+
}
441+
442+
410443
def _validate_tool_breakpoint_is_valid(agent_breakpoint: AgentBreakpoint, tools: "ToolsType") -> None:
411444
"""
412445
Validates the AgentBreakpoint passed to the agent.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
fixes:
3+
- |
4+
Preserve resumable agent snapshots when some ``chat_generator`` or ``tool_invoker`` inputs are
5+
non-serializable. Haystack now omits only the failing runtime-only fields (for example
6+
non-serializable callbacks) instead of replacing the whole payload with an empty dictionary.

test/components/agents/test_agent_breakpoints.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,40 @@ def test_resume_from_tool_invoker(self, agent, tmp_path, monkeypatch):
472472
assert "last_message" in result
473473
assert len(result["messages"]) > 0
474474

475+
def test_resume_from_tool_invoker_omits_non_serializable_runtime_callback(self, agent, tmp_path, monkeypatch):
476+
monkeypatch.setenv(HAYSTACK_PIPELINE_SNAPSHOT_SAVE_ENABLED, "true")
477+
debug_path = str(tmp_path / "debug_snapshots")
478+
tool_bp = ToolBreakpoint(component_name="tool_invoker", tool_name="weather_tool", snapshot_file_path=debug_path)
479+
agent_breakpoint = AgentBreakpoint(break_point=tool_bp, agent_name="test_agent")
480+
481+
try:
482+
agent.run(
483+
messages=[ChatMessage.from_user("What's the weather in Berlin?")],
484+
break_point=agent_breakpoint,
485+
streaming_callback=lambda chunk: None,
486+
)
487+
except BreakpointException:
488+
pass
489+
490+
snapshot_files = list(Path(debug_path).glob("test_agent_tool_invoker_*.json"))
491+
assert len(snapshot_files) > 0
492+
latest_snapshot_file = str(max(snapshot_files, key=os.path.getctime))
493+
agent_snapshot = load_pipeline_snapshot(latest_snapshot_file).agent_snapshot
494+
495+
assert agent_snapshot is not None
496+
assert "streaming_callback" not in agent_snapshot.component_inputs["chat_generator"]["serialized_data"]
497+
assert "streaming_callback" not in agent_snapshot.component_inputs["tool_invoker"]["serialized_data"]
498+
assert "state" in agent_snapshot.component_inputs["tool_invoker"]["serialized_data"]
499+
500+
result = agent.run(
501+
messages=[ChatMessage.from_user("This is actually ignored when resuming from snapshot.")],
502+
snapshot=agent_snapshot,
503+
)
504+
505+
assert "messages" in result
506+
assert "last_message" in result
507+
assert len(result["messages"]) == 4
508+
475509
def test_resume_from_tool_invoker_and_new_breakpoint(self, weather_tool, tmp_path, monkeypatch):
476510
monkeypatch.setenv(HAYSTACK_PIPELINE_SNAPSHOT_SAVE_ENABLED, "true")
477511
agent = Agent(

0 commit comments

Comments
 (0)