Describe the bug
Any value injected into pipeline.run(data={"ComponentB": {"some_input": value}}) for a component that has no mandatory inputs and at least one fully optional connected socket causes that component to run spuriously — even when its upstream pipeline path was never triggered.
The concrete case: Hayhooks injects streaming_callback into pipeline_run_args for all components in streaming_components before calling pipeline.run(). In a two-agent pipeline where a ConditionalRouter routes to either a planning path or an execution path, the downstream Execution_Agent runs in the planning path with messages=None even though its upstream BranchJoiner never fired.
This is related to but distinct from the bug addressed in PR #11057 (PIP-219). That fix adds is_any_connected_socket_blocked_by_routing() which requires all senders to have executed before concluding a path is cut off. In our case the intermediate BranchJoiner (greedy variadic) never runs at all (no real value arrives at its input), so all_socket_predecessors_executed() returns False, the new block check never fires, and the spurious run is not prevented.
Error message
No exception is raised. The component silently executes with messages=None, making an unnecessary LLM call with only a system prompt. Depending on the LLM provider this manifests as:
AmazonBedrockChatGenerator: "A conversation must start with a user message."
- Other providers: empty or malformed response
Expected behavior
Execution_Agent should not run when its only data path (BranchJoiner → Execution_Agent.messages) was never triggered. A sender=None entry on an unconnected auxiliary socket (streaming_callback) should not be sufficient to schedule a component that has connected upstream pipeline sockets.
Additional context
The mechanism is two interacting facts:
-
_prepare_component_input_data → _convert_to_internal_format in base.py (lines 999 and 1081) — called in sequence inside pipeline.run() — stores every key uniformly as {"sender": None, "value": ...}. No distinction between user data and infrastructure configuration like streaming_callback.
-
has_user_input() in component_checks.py returns True if any socket entry has sender=None. Since Agent.messages has default=None in both run() and run_async() (agent.py lines 747, 978), are_all_sockets_ready(component, inputs, only_check_mandatory=True) passes trivially. The combination means can_component_run() returns True even though no upstream component produced output.
The trigger_from_user semantic is correct and should be preserved. The root issue is that there is currently no way to inject infrastructure/configuration inputs (like streaming_callback) into pipeline_run_args without those inputs being treated as user-data triggers.
Note: this is the same root vulnerability as PIP-219 but exposed via a different trigger mechanism (injection rather than router path). The fix in PR #11057 does not cover this case.
Workaround
No workaround available.
To Reproduce
README.md
requirements.txt
test_h1_user_trigger.py
test_pr11057_does_not_fix_our_bug.py
test_hayhooks_injection.py
test_h4_pipeline_routing.py
test_h3_injection_side_effect.py
test_h2_agent_sockets.py
import os
os.environ.setdefault("OPENAI_API_KEY", "dummy")
from haystack import Pipeline, component
from haystack.components.joiners.branch import BranchJoiner
from haystack.components.routers.conditional_router import ConditionalRouter
from haystack.core.pipeline.base import PipelineBase
from haystack.core.pipeline.component_checks import has_user_input
@component
class PlanningAgentMock:
@component.output_types(messages=list, last_role=str)
def run(self, query: str, streaming_callback: object = None):
return {
"messages": [{"role": "assistant", "text": "clarifying question"}],
"last_role": "assistant",
}
@component
class ExecutionAgentMock:
def __init__(self):
self.call_count = 0
@component.output_types(result=str)
def run(self, messages: list | None = None, streaming_callback: object = None):
self.call_count += 1
return {"result": "done"}
exec_agent = ExecutionAgentMock()
bj = BranchJoiner(type_=list)
router = ConditionalRouter(
routes=[
{
"condition": "{{ last_role == 'tool' }}",
"output": "{{ messages }}",
"output_name": "processing",
"output_type": list,
},
{
"condition": "{{ True }}",
"output": "{{ messages }}",
"output_name": "planning",
"output_type": list,
},
],
unsafe=True,
)
p = Pipeline()
p.add_component("planning_agent", PlanningAgentMock())
p.add_component("router", router)
p.add_component("bj", bj)
p.add_component("exec_agent", exec_agent)
p.connect("planning_agent.messages", "router.messages")
p.connect("planning_agent.last_role", "router.last_role")
p.connect("router.processing", "bj.value")
p.connect("bj.value", "exec_agent.messages")
# Simulate what streaming infrastructure does before pipeline.run()
pipeline_run_args = {
"planning_agent": {"query": "some question"},
"exec_agent": {"streaming_callback": lambda chunk: None}, # <-- injected by Hayhooks
}
p.run(data=pipeline_run_args)
assert exec_agent.call_count == 0, f"BUG: exec_agent ran {exec_agent.call_count} time(s) in the planning path"
The assert fails — exec_agent.call_count == 1.
Static proof that both injection paths are equivalent:
# _convert_to_internal_format gives every key sender=None — no exceptions
dummy_cb = lambda chunk: None
result = PipelineBase._convert_to_internal_format({"exec_agent": {"streaming_callback": dummy_cb}}) # @staticmethod — no instance needed
assert result["exec_agent"]["streaming_callback"][0] == {"sender": None, "value": dummy_cb}
# has_user_input fires on that result
assert has_user_input(result["exec_agent"]) is True
Why PR #11057 does not fix this
PR #11057 adds is_any_connected_socket_blocked_by_routing() as a third gate in can_component_run. That gate iterates connected non-variadic sockets and, once all their senders have executed, checks whether any produced a real value. If not, the component is marked BLOCKED.
In our pipeline the path is ConditionalRouter → BranchJoiner → Execution_Agent.messages. In the planning path, ConditionalRouter routes away from BranchJoiner's input. BranchJoiner (greedy variadic) therefore never executes at all — no real value ever arrives at its input socket. Because BranchJoiner never ran:
all_socket_predecessors_executed(messages_socket, socket_inputs=[])
# expected_senders={"bj"}, executed_senders={} → False
The gate skips the messages socket and returns False → not_blocked_by_routing = True. The spurious trigger from streaming_callback injection is unaffected.
This was verified by implementing is_any_connected_socket_blocked_by_routing verbatim from the PR diff, patching it into can_component_run via unittest.mock.patch, and running the pipeline end-to-end. exec_agent.call_count remains 1 with the patch applied.
Unit-level summary:
| Scenario |
all_socket_predecessors_executed |
is_any_connected_socket_blocked_by_routing |
outcome |
| Our bug — bj never ran |
False (bj not in executed senders) |
False (gate skipped) |
spurious run persists |
PIP-219 scenario — bj ran, _NO_OUTPUT_PRODUCED |
True |
True (path cut off) |
correctly blocked |
Suggested fix directions (both zero breaking changes)
Option A — new pipeline.run(component_config=...) kwarg:
Add an optional component_config: dict | None = None to pipeline.run(). Entries there are merged into component inputs in _consume_component_inputs just before a component executes, but are never passed through _convert_to_internal_format, so they produce no sender=None trigger. Streaming infrastructure switches from injecting via data to injecting via component_config. No existing call sites change.
Option B — PassiveInput annotation on InputSocket (consistent with existing Greedy/Variadic pattern):
Add a PassiveInput annotation that a component can use to mark a socket as non-triggering: streaming_callback: Annotated[cb_type | None, PassiveInput] = None. has_user_input() (or the trigger check inside has_any_trigger) skips sockets marked this way. Agent opts in once; all other sockets behave exactly as today.
FAQ Check
System:
- OS: macOS / Linux (Docker)
- GPU/CPU: CPU
- Haystack version: 2.27.0
- DocumentStore: n/a
- Reader: n/a
- Retriever: n/a
Describe the bug
Any value injected into
pipeline.run(data={"ComponentB": {"some_input": value}})for a component that has no mandatory inputs and at least one fully optional connected socket causes that component to run spuriously — even when its upstream pipeline path was never triggered.The concrete case: Hayhooks injects
streaming_callbackintopipeline_run_argsfor all components instreaming_componentsbefore callingpipeline.run(). In a two-agent pipeline where aConditionalRouterroutes to either a planning path or an execution path, the downstreamExecution_Agentruns in the planning path withmessages=Noneeven though its upstreamBranchJoinernever fired.This is related to but distinct from the bug addressed in PR #11057 (PIP-219). That fix adds
is_any_connected_socket_blocked_by_routing()which requires all senders to have executed before concluding a path is cut off. In our case the intermediateBranchJoiner(greedy variadic) never runs at all (no real value arrives at its input), soall_socket_predecessors_executed()returnsFalse, the new block check never fires, and the spurious run is not prevented.Error message
No exception is raised. The component silently executes with
messages=None, making an unnecessary LLM call with only a system prompt. Depending on the LLM provider this manifests as:AmazonBedrockChatGenerator: "A conversation must start with a user message."Expected behavior
Execution_Agentshould not run when its only data path (BranchJoiner → Execution_Agent.messages) was never triggered. Asender=Noneentry on an unconnected auxiliary socket (streaming_callback) should not be sufficient to schedule a component that has connected upstream pipeline sockets.Additional context
The mechanism is two interacting facts:
_prepare_component_input_data→_convert_to_internal_formatinbase.py(lines 999 and 1081) — called in sequence insidepipeline.run()— stores every key uniformly as{"sender": None, "value": ...}. No distinction between user data and infrastructure configuration likestreaming_callback.has_user_input()incomponent_checks.pyreturnsTrueif any socket entry hassender=None. SinceAgent.messageshasdefault=Nonein bothrun()andrun_async()(agent.py lines 747, 978),are_all_sockets_ready(component, inputs, only_check_mandatory=True)passes trivially. The combination meanscan_component_run()returnsTrueeven though no upstream component produced output.The
trigger_from_usersemantic is correct and should be preserved. The root issue is that there is currently no way to inject infrastructure/configuration inputs (likestreaming_callback) intopipeline_run_argswithout those inputs being treated as user-data triggers.Note: this is the same root vulnerability as PIP-219 but exposed via a different trigger mechanism (injection rather than router path). The fix in PR #11057 does not cover this case.
Workaround
No workaround available.
To Reproduce
README.md
requirements.txt
test_h1_user_trigger.py
test_pr11057_does_not_fix_our_bug.py
test_hayhooks_injection.py
test_h4_pipeline_routing.py
test_h3_injection_side_effect.py
test_h2_agent_sockets.py
The
assertfails —exec_agent.call_count == 1.Static proof that both injection paths are equivalent:
Why PR #11057 does not fix this
PR #11057 adds
is_any_connected_socket_blocked_by_routing()as a third gate incan_component_run. That gate iterates connected non-variadic sockets and, once all their senders have executed, checks whether any produced a real value. If not, the component is marked BLOCKED.In our pipeline the path is
ConditionalRouter → BranchJoiner → Execution_Agent.messages. In the planning path, ConditionalRouter routes away from BranchJoiner's input. BranchJoiner (greedy variadic) therefore never executes at all — no real value ever arrives at its input socket. Because BranchJoiner never ran:The gate skips the
messagessocket and returnsFalse→not_blocked_by_routing = True. The spurious trigger fromstreaming_callbackinjection is unaffected.This was verified by implementing
is_any_connected_socket_blocked_by_routingverbatim from the PR diff, patching it intocan_component_runviaunittest.mock.patch, and running the pipeline end-to-end.exec_agent.call_countremains1with the patch applied.Unit-level summary:
all_socket_predecessors_executedis_any_connected_socket_blocked_by_routingFalse(bj not in executed senders)False(gate skipped)_NO_OUTPUT_PRODUCEDTrueTrue(path cut off)Suggested fix directions (both zero breaking changes)
Option A — new
pipeline.run(component_config=...)kwarg:Add an optional
component_config: dict | None = Nonetopipeline.run(). Entries there are merged into component inputs in_consume_component_inputsjust before a component executes, but are never passed through_convert_to_internal_format, so they produce nosender=Nonetrigger. Streaming infrastructure switches from injecting viadatato injecting viacomponent_config. No existing call sites change.Option B —
PassiveInputannotation onInputSocket(consistent with existingGreedy/Variadicpattern):Add a
PassiveInputannotation that a component can use to mark a socket as non-triggering:streaming_callback: Annotated[cb_type | None, PassiveInput] = None.has_user_input()(or the trigger check insidehas_any_trigger) skips sockets marked this way.Agentopts in once; all other sockets behave exactly as today.FAQ Check
System: