Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions haystack/core/pipeline/component_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,65 @@
_NO_OUTPUT_PRODUCED = _empty


def is_any_connected_socket_blocked_by_routing(component: dict, inputs: dict) -> bool:
"""
Returns True if any non-variadic connected socket's path has been cut off by routing.

A socket is cut off when all its senders have executed but none produced an actual value
(i.e. all sent ``_NO_OUTPUT_PRODUCED``).

This happens when a ConditionalRouter (or any component that returns ``{}``
for a particular output) permanently cuts off the data path into an input
socket. Even when that socket is optional (has a default value), allowing
the component to run would be wrong: it would use the default instead of
the intended value, leading to errors such as an Agent being called with an
empty messages list when the router took a different path.
Comment on lines +21 to +24
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the part I'm not sure about. I'm not sure the current behavior is "wrong". It would defeat the purpose of indicating an input is optional and would be a big shift in how we treat connections in Pipelines.

Basically we would be saying that if a connection is made to the input that input becomes mandatory no matter if its optional or not.

I'm not necessarily against this change, we just need more time than a patch fix to think about this and whether this would cause other types of pipelines to break.


Variadic sockets are excluded because they can legitimately receive
``_NO_OUTPUT_PRODUCED`` from some senders while still getting real values
from others.

:param component: Component metadata including input sockets.
:param inputs: Current inputs state for the component.
:returns: True if at least one socket's path was cut off by routing.
"""
for socket_name, socket in component["input_sockets"].items():
if socket.is_variadic:
continue
if not socket.senders:
continue

socket_inputs = inputs.get(socket_name, [])

# Wait until all senders have executed before concluding the path is cut off.
if not all_socket_predecessors_executed(socket, socket_inputs):
continue

# All senders ran — check whether any produced a real value.
# any_socket_input_received also counts user-provided values (sender=None),
# so an explicit pipeline.run(messages=[...]) is never treated as blocked.
if not any_socket_input_received(socket_inputs):
return True

return False


def can_component_run(component: dict, inputs: dict) -> bool:
"""
Checks if the component can run, given the current state of its inputs.

A component needs to pass two gates so that it is ready to run:
A component needs to pass three gates so that it is ready to run:
1. It has received all mandatory inputs.
2. It has received a trigger.
3. No connected non-variadic socket has been permanently cut off by routing.
:param component: Component metadata and the component instance.
:param inputs: Inputs for the component.
"""
received_all_mandatory_inputs = are_all_sockets_ready(component, inputs, only_check_mandatory=True)
received_trigger = has_any_trigger(component, inputs)
not_blocked_by_routing = not is_any_connected_socket_blocked_by_routing(component, inputs)

return received_all_mandatory_inputs and received_trigger
return received_all_mandatory_inputs and received_trigger and not_blocked_by_routing


def has_any_trigger(component: dict, inputs: dict) -> bool:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
fixes:
- |
Fixed a regression (introduced in 2.26.0 by PR #10793) where a component
downstream of a ConditionalRouter could run even when the router had taken
a different path and produced no output for that component's input socket.

The root cause was that the pipeline scheduler only blocked a component when
a *mandatory* (no-default) input socket was unfilled. Making
``Agent.messages`` optional (``list[ChatMessage] | None = None``) therefore
allowed the Agent to be scheduled even when the message path had been cut
off by routing, leading to calls like ``AmazonBedrockChatGenerator`` failing
with "A conversation must start with a user message."

The fix adds ``is_any_connected_socket_blocked_by_routing()`` to the
scheduler (``component_checks.py``). After all senders of a non-variadic
connected socket have executed, if none of them produced an actual value,
the downstream component is marked BLOCKED regardless of whether the socket
has a default value. User-provided values (``pipeline.run(messages=[…])``)
are still respected and never trigger the block.
100 changes: 100 additions & 0 deletions test/core/pipeline/test_component_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
has_lazy_variadic_socket_received_all_inputs,
has_socket_received_all_inputs,
has_user_input,
is_any_connected_socket_blocked_by_routing,
is_any_greedy_socket_ready,
)

Expand Down Expand Up @@ -673,3 +674,102 @@ def test_greedy_socket_with_user_input(self, complex_component):
"""User input can also trigger readiness for a greedy variadic socket."""
inputs = {"greedy_var": [{"sender": None, "value": 42}]}
assert is_any_greedy_socket_ready(complex_component, inputs) is True


@pytest.fixture
def optional_socket_component():
"""
Component with one optional (non-mandatory) connected socket and one socket
that receives a trigger. Mirrors the Agent in PIP-219: system_prompt
arrives unconditionally, while messages is optional but connected from a
predecessor that may produce _NO_OUTPUT_PRODUCED.
"""
return {
"instance": "mock_instance",
"visits": 0,
"input_sockets": {
"trigger_input": InputSocket("trigger_input", str, senders=["always_runs"]),
"optional_connected": InputSocket(
"optional_connected", list, default_value=None, senders=["conditional_sender"]
),
},
"output_sockets": {},
}


class TestIsAnyConnectedSocketBlockedByRouting:
def test_not_blocked_when_sender_has_not_run(self, optional_socket_component):
"""If the sender hasn't executed yet we cannot say routing cut it off."""
inputs = {
"trigger_input": [{"sender": "always_runs", "value": "go"}]
# conditional_sender has not sent anything yet
}
assert is_any_connected_socket_blocked_by_routing(optional_socket_component, inputs) is False

def test_blocked_when_all_senders_produced_no_output(self, optional_socket_component):
"""All senders ran and all produced _NO_OUTPUT_PRODUCED → path is cut off."""
inputs = {
"trigger_input": [{"sender": "always_runs", "value": "go"}],
"optional_connected": [{"sender": "conditional_sender", "value": _NO_OUTPUT_PRODUCED}],
}
assert is_any_connected_socket_blocked_by_routing(optional_socket_component, inputs) is True

def test_not_blocked_when_sender_produced_real_value(self, optional_socket_component):
"""Sender ran and produced a real value → not cut off."""
inputs = {
"trigger_input": [{"sender": "always_runs", "value": "go"}],
"optional_connected": [{"sender": "conditional_sender", "value": ["msg1"]}],
}
assert is_any_connected_socket_blocked_by_routing(optional_socket_component, inputs) is False

def test_not_blocked_with_user_provided_value(self, optional_socket_component):
"""
User-provided values (sender=None) count as real values, so an explicit
pipeline.run(optional_connected=[...]) should never trigger the block.
"""
inputs = {
"trigger_input": [{"sender": "always_runs", "value": "go"}],
"optional_connected": [
{"sender": "conditional_sender", "value": _NO_OUTPUT_PRODUCED},
{"sender": None, "value": ["user_msg"]},
],
}
assert is_any_connected_socket_blocked_by_routing(optional_socket_component, inputs) is False

def test_not_blocked_for_variadic_socket(self, variadic_component):
"""Variadic sockets are excluded — they can legitimately have mixed outputs."""
inputs = {
"variadic_input": [{"sender": "previous_component", "value": _NO_OUTPUT_PRODUCED}],
"normal_input": [{"sender": "another_component", "value": "hello"}],
}
assert is_any_connected_socket_blocked_by_routing(variadic_component, inputs) is False

def test_not_blocked_for_socket_with_no_senders(self):
"""Sockets not connected to any predecessor are never considered blocked."""
component = {
"instance": "mock",
"visits": 0,
"input_sockets": {"standalone": InputSocket("standalone", int, default_value=0)},
}
assert is_any_connected_socket_blocked_by_routing(component, {}) is False

def test_can_component_run_blocked_by_routing(self, optional_socket_component):
"""
can_component_run returns False when routing has cut off a connected socket,
even though the socket is optional and a trigger was received.

This is the core regression guard for PIP-219 / PR #10793.
"""
inputs = {
"trigger_input": [{"sender": "always_runs", "value": "go"}],
"optional_connected": [{"sender": "conditional_sender", "value": _NO_OUTPUT_PRODUCED}],
}
assert can_component_run(optional_socket_component, inputs) is False

def test_can_component_run_not_blocked_when_sender_provides_value(self, optional_socket_component):
"""Happy-path: sender provides a real value, component is allowed to run."""
inputs = {
"trigger_input": [{"sender": "always_runs", "value": "go"}],
"optional_connected": [{"sender": "conditional_sender", "value": ["msg"]}],
}
assert can_component_run(optional_socket_component, inputs) is True
110 changes: 110 additions & 0 deletions test/core/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,3 +498,113 @@ def test_run_auto_variadic_chat_message_and_list_chat_message_to_list_chat_messa
p.connect("list_producer.messages", "receiver.messages")
result = p.run({})
assert [m.text for m in result["receiver"]["result"]] == ["hello", "world", "!"]


class TestRoutingBlocksDownstream:
"""
Regression tests for PIP-219 / PR #10793.

When a ConditionalRouter (or any component) produces _NO_OUTPUT_PRODUCED for
a particular output, downstream components whose *only* source for that input
socket was that component should be BLOCKED — even if the socket is optional.

The root cause: making Agent.messages optional removed the scheduler's ability
to detect that the message path had been cut off by routing. The fix lives in
is_any_connected_socket_blocked_by_routing() inside component_checks.py.
"""

def test_optional_connected_socket_blocks_component_when_sender_produces_no_output(self):
"""
A component with an optional but connected input socket must be BLOCKED
when its sender produces _NO_OUTPUT_PRODUCED, even though another socket
(trigger_input) did receive a value.

Topology
--------
always_fires ──► downstream.trigger_input (always provides a value)
conditional ──► downstream.optional_input (returns {} → _NO_OUTPUT_PRODUCED)
"""

@component
class AlwaysFires:
@component.output_types(value=str)
def run(self) -> dict:
return {"value": "triggered"}

@component
class ConditionalSender:
"""Simulates a component that is routed away from and produces no output."""

@component.output_types(data=list[ChatMessage])
def run(self, text: str | None = None) -> dict:
if text is None:
return {}
return {"data": [ChatMessage.from_user(text)]}

@component
class Downstream:
called: bool = False

@component.output_types(result=str)
def run(self, trigger_input: str, optional_input: list[ChatMessage] | None = None) -> dict:
Downstream.called = True
return {"result": "should not reach here"}

Downstream.called = False

p = Pipeline()
p.add_component("always_fires", AlwaysFires())
p.add_component("conditional", ConditionalSender())
p.add_component("downstream", Downstream())
p.connect("always_fires.value", "downstream.trigger_input")
p.connect("conditional.data", "downstream.optional_input")

# conditional receives no input → returns {} → downstream.optional_input is cut off.
p.run({})

assert not Downstream.called, "Downstream should have been BLOCKED by routing, not executed"

def test_agent_not_called_when_message_path_is_cut_off(self):
"""
Minimal reproducer for PIP-219: an Agent with optional messages connected
to a component that produces no output must NOT be executed.

agent_prompt ──► agent.system_prompt (always fires)
msg_producer ──► agent.messages (returns {} → no messages)

In broken versions (2.26.0–2.27.0) the scheduler treated messages as
non-mandatory and ran the Agent anyway, causing the LLM to receive only
a system message and raise "A conversation must start with a user message."
"""

@component
class FailingLLM:
"""Raises if called — the Agent must never reach it."""

@component.output_types(replies=list[ChatMessage])
def run(self, messages: list[ChatMessage]) -> dict:
raise AssertionError(
f"LLM must not be called; got {len(messages)} message(s): {[m.role.value for m in messages]}"
)

@component
class MessageProducer:
@component.output_types(messages=list[ChatMessage])
def run(self, text: str | None = None) -> dict:
if text is None:
return {}
return {"messages": [ChatMessage.from_user(text)]}

from haystack.components.builders import PromptBuilder

p = Pipeline()
p.add_component("agent_prompt", PromptBuilder(template="You are a helpful assistant."))
p.add_component("msg_producer", MessageProducer())
p.add_component("agent", Agent(chat_generator=FailingLLM()))

p.connect("agent_prompt.prompt", "agent.system_prompt")
p.connect("msg_producer.messages", "agent.messages")

# msg_producer receives no input → returns {} → agent.messages is cut off.
# The Agent must be BLOCKED (not executed), so FailingLLM is never called.
p.run({})
Loading