Skip to content

Commit 0034789

Browse files
marc-mrtbogdankostic
authored andcommitted
fix: require messages input parameter in Agent component (#10734)
* fix: require messages in Agent component * fix: default to empty messages list in LLM component * docs: add release notes * test: add case to prove agentic pipeline execution failure * docs: update agent docstring to accurately represent requirement of messages input * tests: fix agent messages param * docs: add explicit bug reference in new test case
1 parent 9be588e commit 0034789

4 files changed

Lines changed: 166 additions & 25 deletions

File tree

haystack/components/agents/agent.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ def translate(
212212
213213
# The template variables 'language' and 'document' become inputs to the run method
214214
result = agent.run(
215+
messages=[],
215216
language="French",
216217
document="The weather is lovely today and the sun is shining.",
217218
)
@@ -467,7 +468,7 @@ def _create_agent_span(self) -> Any:
467468

468469
def _initialize_fresh_execution(
469470
self,
470-
messages: list[ChatMessage] | None,
471+
messages: list[ChatMessage],
471472
streaming_callback: StreamingCallbackT | None,
472473
requires_async: bool,
473474
*,
@@ -497,12 +498,6 @@ def _initialize_fresh_execution(
497498
"""
498499
user_prompt = user_prompt or self.user_prompt
499500
system_prompt = system_prompt or self.system_prompt
500-
if messages is None and user_prompt is None and system_prompt is None:
501-
raise ValueError(
502-
"No messages provided to the Agent and neither user_prompt nor system_prompt is set. "
503-
"Please provide at least one of these inputs."
504-
)
505-
506501
messages = messages or []
507502

508503
if user_prompt is not None:
@@ -675,7 +670,7 @@ def _runtime_checks(self, break_point: AgentBreakpoint | None) -> None:
675670

676671
def run( # noqa: PLR0915
677672
self,
678-
messages: list[ChatMessage] | None = None,
673+
messages: list[ChatMessage],
679674
streaming_callback: StreamingCallbackT | None = None,
680675
*,
681676
generation_kwargs: dict[str, Any] | None = None,
@@ -905,7 +900,7 @@ def run( # noqa: PLR0915
905900

906901
async def run_async( # noqa: PLR0915
907902
self,
908-
messages: list[ChatMessage] | None = None,
903+
messages: list[ChatMessage],
909904
streaming_callback: StreamingCallbackT | None = None,
910905
*,
911906
generation_kwargs: dict[str, Any] | None = None,

haystack/components/generators/chat/llm.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ def run(
133133
- "last_message": The last message exchanged during the LLM's run.
134134
"""
135135
return super(LLM, self).run( # noqa: UP008
136-
messages=messages,
136+
messages=messages or [],
137137
streaming_callback=streaming_callback,
138138
generation_kwargs=generation_kwargs,
139139
system_prompt=system_prompt,
@@ -170,7 +170,7 @@ async def run_async(
170170
- "last_message": The last message exchanged during the LLM's run.
171171
"""
172172
return await super(LLM, self).run_async( # noqa: UP008
173-
messages=messages,
173+
messages=messages or [],
174174
streaming_callback=streaming_callback,
175175
generation_kwargs=generation_kwargs,
176176
system_prompt=system_prompt,
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
fixes:
2+
- |
3+
Reverts the change that made Agent messages optional as it caused issues with pipeline execution.
4+
As a consequence, the LLM component now defaults to an empty messages list unless provided at runtime.

test/components/agents/test_agent.py

Lines changed: 156 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,13 +1380,6 @@ def _make_agent_with_user_prompt(
13801380

13811381

13821382
class TestUserPromptInitialization:
1383-
def test_user_prompt_raises_when_no_messages_and_no_prompt(self, weather_tool):
1384-
agent = Agent(chat_generator=MockChatGenerator(), tools=[weather_tool])
1385-
with pytest.raises(
1386-
ValueError, match="No messages provided to the Agent and neither user_prompt nor system_prompt is set"
1387-
):
1388-
agent.run()
1389-
13901383
def test_user_prompt_conflict_with_state_schema_raises(self, weather_tool):
13911384
with pytest.raises(ValueError, match="already defined in the state schema"):
13921385
_make_agent_with_user_prompt(
@@ -1400,14 +1393,14 @@ def test_user_prompt_conflict_with_run_param_raises(self, weather_tool):
14001393
def test_user_prompt_only_variables_forwarded_to_builder(self, weather_tool):
14011394
agent = _make_agent_with_user_prompt(_user_msg("Question: {{question}}"), tools=[weather_tool])
14021395
# 'irrelevant_kwarg' is not a template variable — must not raise
1403-
result = agent.run(question="Will it snow?", irrelevant_kwarg="unused")
1396+
result = agent.run(messages=[], question="Will it snow?", irrelevant_kwarg="unused")
14041397
assert "messages" in result
14051398

14061399

14071400
class TestUserPromptOnly:
14081401
def test_simple_literal_user_prompt(self, weather_tool):
14091402
agent = _make_agent_with_user_prompt(_user_msg("Tell me the weather."), tools=[weather_tool])
1410-
result = agent.run()
1403+
result = agent.run(messages=[])
14111404
messages = result["messages"]
14121405
# The rendered user_prompt should be the first (and only) non-system message
14131406
user_messages = [m for m in messages if m.is_from(ChatRole.USER)]
@@ -1423,7 +1416,7 @@ def test_user_prompt_with_template_variables(self, weather_tool):
14231416
),
14241417
tools=[weather_tool],
14251418
)
1426-
result = agent.run(name="Alice", cities=["Berlin", "Paris", "Rome"], date="2024-01-15")
1419+
result = agent.run(messages=[], name="Alice", cities=["Berlin", "Paris", "Rome"], date="2024-01-15")
14271420
user_messages = [m for m in result["messages"] if m.is_from(ChatRole.USER)]
14281421
assert user_messages[0].text == "Hello ALICE, check weather for: Berlin, Paris, Rome on 2024-01-15?"
14291422

@@ -1438,7 +1431,7 @@ def test_user_prompt_with_system_prompt(self, weather_tool):
14381431
tools=[weather_tool],
14391432
system_prompt="You are a helpful weather assistant.",
14401433
)
1441-
result = agent.run(city="Berlin")
1434+
result = agent.run(messages=[], city="Berlin")
14421435
messages = result["messages"]
14431436
assert messages[0].is_from(ChatRole.SYSTEM)
14441437
assert messages[0].text == "You are a helpful weather assistant."
@@ -1455,15 +1448,15 @@ def test_user_prompt_with_documents_variable(self, weather_tool):
14551448
tools=[weather_tool],
14561449
)
14571450
docs = [Document(content="Doc A"), Document(content="Doc B")]
1458-
result = agent.run(documents=docs, question="What is in the docs?")
1451+
result = agent.run(messages=[], documents=docs, question="What is in the docs?")
14591452
user_messages = [m for m in result["messages"] if m.is_from(ChatRole.USER)]
14601453
assert "Doc A" in user_messages[0].text
14611454
assert "Doc B" in user_messages[0].text
14621455
assert "What is in the docs?" in user_messages[0].text
14631456

14641457
def test_runtime_user_prompt_overrides_init_prompt(self, weather_tool):
14651458
agent = _make_agent_with_user_prompt(_user_msg("Default prompt for {{city}}."), tools=[weather_tool])
1466-
result = agent.run(user_prompt=_user_msg("Runtime prompt for {{city}}."), city="Berlin")
1459+
result = agent.run(messages=[], user_prompt=_user_msg("Runtime prompt for {{city}}."), city="Berlin")
14671460
user_messages = [m for m in result["messages"] if m.is_from(ChatRole.USER)]
14681461
assert user_messages[0].text == "Runtime prompt for Berlin."
14691462

@@ -1558,7 +1551,7 @@ def document_store_with_docs(self):
15581551
def test_rag_pipeline_user_prompt_init_only(self, document_store_with_docs, weather_tool):
15591552
pipeline = _make_rag_pipeline(document_store_with_docs, weather_tool)
15601553
query = "Where is the Colosseum?"
1561-
result = pipeline.run(data={"retriever": {"query": query}, "agent": {"query": query}})
1554+
result = pipeline.run(data={"retriever": {"query": query}, "agent": {"messages": [], "query": query}})
15621555
assert "agent" in result
15631556
agent_output = result["agent"]
15641557
assert "messages" in agent_output
@@ -1585,6 +1578,7 @@ def test_rag_pipeline_user_prompt_runtime_override(self, document_store_with_doc
15851578
data={
15861579
"retriever": {"query": query},
15871580
"agent": {
1581+
"messages": [],
15881582
"user_prompt": _user_msg(
15891583
"OVERRIDE: Using docs:\n"
15901584
"{% for doc in documents %}{{doc.content}}\n{% endfor %}"
@@ -1633,3 +1627,151 @@ def test_rag_pipeline_messages_plus_user_prompt(self, document_store_with_docs,
16331627
assert "History:" in user_messages[0].text
16341628
rendered = user_messages[1].text
16351629
assert "Relevant docs:" in rendered
1630+
1631+
1632+
class TestAgentPipelineStaticToolInput:
1633+
"""
1634+
Regression test for the scheduling bug introduced by making the 'messages'
1635+
run parameter non-required in https://github.com/deepset-ai/haystack/pull/10638.
1636+
1637+
pipeline inputs:
1638+
query → history_parser # feeds the messages chain
1639+
filters → agent.retrieval_filters # static, sender=None ← the trigger
1640+
(files is optional / absent)
1641+
1642+
pipeline connections:
1643+
history_parser.messages → messages_joiner.values
1644+
files_processor.prompt → messages_joiner.values # needs 'files' (mandatory)
1645+
messages_joiner.values → system_concat.messages
1646+
system_concat.output → agent.messages
1647+
1648+
agent.tools = [ComponentTool(inputs_from_state={"documents": "docs"})]
1649+
1650+
The bug
1651+
-------
1652+
When the optional 'files' pipeline input is NOT provided:
1653+
1. files_processor is BLOCKED (its mandatory 'files' input is absent).
1654+
2. messages_joiner stays DEFER_LAST.
1655+
3. system_concat is BLOCKED – cannot receive 'messages'.
1656+
4. agent.messages is therefore never delivered.
1657+
1658+
Meanwhile, 'filters' → agent.retrieval_filters (sender=None) fires the pipeline's
1659+
"user trigger" gate on the Agent's first visit. Because none of the Agent's
1660+
sockets are mandatory, can_component_run() returns True and the Agent gets
1661+
DEFER priority instead of BLOCKED.
1662+
1663+
The scheduler eventually pops the Agent (DEFER) from the queue — the only
1664+
non-BLOCKED component left — and runs it. _add_missing_input_defaults fills
1665+
messages=None, and Agent._initialize_fresh_execution raises:
1666+
1667+
ValueError("No messages provided to the Agent and neither
1668+
user_prompt nor system_prompt is set.")
1669+
"""
1670+
1671+
@pytest.fixture()
1672+
def search_tool(self):
1673+
return ComponentTool(
1674+
name="search",
1675+
description="Searches documents.",
1676+
component=PromptBuilder(template="{% for d in docs %}{{ d.content }}{% endfor %}"),
1677+
inputs_from_state={"documents": "docs"},
1678+
)
1679+
1680+
def _make_agent(self, search_tool):
1681+
chat_generator = MockChatGenerator()
1682+
agent = Agent(
1683+
chat_generator=chat_generator,
1684+
tools=[search_tool],
1685+
state_schema={"retrieval_filters": {"type": dict[str, Any]}, "documents": {"type": list[Document]}},
1686+
)
1687+
# Mock after __init__ so Agent sees the real 'tools' param in the signature.
1688+
chat_generator.run = MagicMock(return_value={"replies": [ChatMessage.from_assistant("done")]})
1689+
return agent
1690+
1691+
def test_agent_runs_prematurely_when_messages_predecessor_is_blocked(self, search_tool):
1692+
"""
1693+
Demonstrates the bug: the Agent executes without 'messages' when its
1694+
messages-providing predecessor chain is permanently BLOCKED.
1695+
1696+
Pipeline shape:
1697+
query → history_parser → messages_joiner.values
1698+
files=[]→ files_processor → attachments_builder → messages_joiner.values
1699+
messages_joiner → system_concat → agent.messages
1700+
filters → agent.retrieval_filters (static, triggers the user gate)
1701+
1702+
Scheduling sequence that exposes the bug:
1703+
1. history_parser runs (query provided) → sends to messages_joiner.
1704+
2. files_processor runs with files=[] → returns {} (_NO_OUTPUT_PRODUCED).
1705+
3. attachments_builder receives _NO_OUTPUT_PRODUCED → BLOCKED (mandatory
1706+
processed_files socket never filled).
1707+
4. messages_joiner is DEFER_LAST (lazy-variadic; attachments_builder
1708+
has not executed yet → are_all_lazy_variadic_sockets_resolved=False).
1709+
5. system_concat is BLOCKED (mandatory messages from messages_joiner
1710+
never received).
1711+
6. agent is DEFER (static retrieval_filters triggered the user gate;
1712+
no mandatory sockets → can_component_run=True).
1713+
1714+
DEFER (priority=3) < DEFER_LAST (priority=4) → the scheduler picks the
1715+
Agent before messages_joiner gets a chance to run. _add_missing_input_defaults
1716+
fills messages=None, and Agent._initialize_fresh_execution raises:
1717+
ValueError("No messages provided …")
1718+
"""
1719+
1720+
@component
1721+
class HistoryParser:
1722+
@component.output_types(messages=list[ChatMessage])
1723+
def run(self, query: str) -> dict:
1724+
return {"messages": [ChatMessage.from_user(query)]}
1725+
1726+
@component
1727+
class FilesProcessor:
1728+
"""Produces no output when given an empty file list."""
1729+
1730+
@component.output_types(processed_files=list[str])
1731+
def run(self, files: list[str]) -> dict:
1732+
if not files:
1733+
return {} # _NO_OUTPUT_PRODUCED → blocks AttachmentsBuilder
1734+
return {"processed_files": files}
1735+
1736+
@component
1737+
class AttachmentsBuilder:
1738+
"""Builds attachment messages; mandatory processed_files from FilesProcessor."""
1739+
1740+
@component.output_types(prompt=list[ChatMessage])
1741+
def run(self, processed_files: list[str]) -> dict:
1742+
return {"prompt": [ChatMessage.from_user(f"Files: {processed_files}")]}
1743+
1744+
@component
1745+
class SystemConcat:
1746+
@component.output_types(output=list[ChatMessage])
1747+
def run(self, messages: list[ChatMessage]) -> dict:
1748+
return {"output": messages}
1749+
1750+
from haystack.components.joiners.list_joiner import ListJoiner
1751+
1752+
agent = self._make_agent(search_tool)
1753+
1754+
pipeline = Pipeline()
1755+
pipeline.add_component("history_parser", HistoryParser())
1756+
pipeline.add_component("files_processor", FilesProcessor())
1757+
pipeline.add_component("attachments_builder", AttachmentsBuilder())
1758+
pipeline.add_component("messages_joiner", ListJoiner(list[ChatMessage]))
1759+
pipeline.add_component("system_concat", SystemConcat())
1760+
pipeline.add_component("agent", agent)
1761+
1762+
pipeline.connect("history_parser.messages", "messages_joiner.values")
1763+
pipeline.connect("files_processor.processed_files", "attachments_builder.processed_files")
1764+
pipeline.connect("attachments_builder.prompt", "messages_joiner.values")
1765+
pipeline.connect("messages_joiner.values", "system_concat.messages")
1766+
pipeline.connect("system_concat.output", "agent.messages")
1767+
1768+
# files=[] → files_processor produces no output → attachments_builder BLOCKED
1769+
# → messages_joiner stays DEFER_LAST → system_concat BLOCKED
1770+
# → agent (DEFER) runs first without messages → ValueError
1771+
pipeline.run(
1772+
data={
1773+
"history_parser": {"query": "What case law applies?"},
1774+
"files_processor": {"files": []}, # empty → no output
1775+
"agent": {"retrieval_filters": {"field": "date", "value": "2024-01-01"}},
1776+
}
1777+
)

0 commit comments

Comments
 (0)