Skip to content

Spurious component execution via pipeline_run_args injection #11109

@deep-rloebbert

Description

@deep-rloebbert

Describe the bug

Any value injected into pipeline.run(data={"ComponentB": {"some_input": value}}) for a component that has no mandatory inputs and at least one fully optional connected socket causes that component to run spuriously — even when its upstream pipeline path was never triggered.

The concrete case: Hayhooks injects streaming_callback into pipeline_run_args for all components in streaming_components before calling pipeline.run(). In a two-agent pipeline where a ConditionalRouter routes to either a planning path or an execution path, the downstream Execution_Agent runs in the planning path with messages=None even though its upstream BranchJoiner never fired.

This is related to but distinct from the bug addressed in PR #11057 (PIP-219). That fix adds is_any_connected_socket_blocked_by_routing() which requires all senders to have executed before concluding a path is cut off. In our case the intermediate BranchJoiner (greedy variadic) never runs at all (no real value arrives at its input), so all_socket_predecessors_executed() returns False, the new block check never fires, and the spurious run is not prevented.

Error message

No exception is raised. The component silently executes with messages=None, making an unnecessary LLM call with only a system prompt. Depending on the LLM provider this manifests as:

  • AmazonBedrockChatGenerator: "A conversation must start with a user message."
  • Other providers: empty or malformed response

Expected behavior

Execution_Agent should not run when its only data path (BranchJoiner → Execution_Agent.messages) was never triggered. A sender=None entry on an unconnected auxiliary socket (streaming_callback) should not be sufficient to schedule a component that has connected upstream pipeline sockets.

Additional context

The mechanism is two interacting facts:

  1. _prepare_component_input_data_convert_to_internal_format in base.py (lines 999 and 1081) — called in sequence inside pipeline.run() — stores every key uniformly as {"sender": None, "value": ...}. No distinction between user data and infrastructure configuration like streaming_callback.

  2. has_user_input() in component_checks.py returns True if any socket entry has sender=None. Since Agent.messages has default=None in both run() and run_async() (agent.py lines 747, 978), are_all_sockets_ready(component, inputs, only_check_mandatory=True) passes trivially. The combination means can_component_run() returns True even though no upstream component produced output.

The trigger_from_user semantic is correct and should be preserved. The root issue is that there is currently no way to inject infrastructure/configuration inputs (like streaming_callback) into pipeline_run_args without those inputs being treated as user-data triggers.

Note: this is the same root vulnerability as PIP-219 but exposed via a different trigger mechanism (injection rather than router path). The fix in PR #11057 does not cover this case.

Workaround

No workaround available.

To Reproduce

README.md
requirements.txt

test_h1_user_trigger.py
test_pr11057_does_not_fix_our_bug.py
test_hayhooks_injection.py
test_h4_pipeline_routing.py
test_h3_injection_side_effect.py
test_h2_agent_sockets.py

import os
os.environ.setdefault("OPENAI_API_KEY", "dummy")

from haystack import Pipeline, component
from haystack.components.joiners.branch import BranchJoiner
from haystack.components.routers.conditional_router import ConditionalRouter
from haystack.core.pipeline.base import PipelineBase
from haystack.core.pipeline.component_checks import has_user_input


@component
class PlanningAgentMock:
    @component.output_types(messages=list, last_role=str)
    def run(self, query: str, streaming_callback: object = None):
        return {
            "messages": [{"role": "assistant", "text": "clarifying question"}],
            "last_role": "assistant",
        }


@component
class ExecutionAgentMock:
    def __init__(self):
        self.call_count = 0

    @component.output_types(result=str)
    def run(self, messages: list | None = None, streaming_callback: object = None):
        self.call_count += 1
        return {"result": "done"}


exec_agent = ExecutionAgentMock()
bj = BranchJoiner(type_=list)
router = ConditionalRouter(
    routes=[
        {
            "condition": "{{ last_role == 'tool' }}",
            "output": "{{ messages }}",
            "output_name": "processing",
            "output_type": list,
        },
        {
            "condition": "{{ True }}",
            "output": "{{ messages }}",
            "output_name": "planning",
            "output_type": list,
        },
    ],
    unsafe=True,
)

p = Pipeline()
p.add_component("planning_agent", PlanningAgentMock())
p.add_component("router", router)
p.add_component("bj", bj)
p.add_component("exec_agent", exec_agent)
p.connect("planning_agent.messages", "router.messages")
p.connect("planning_agent.last_role", "router.last_role")
p.connect("router.processing", "bj.value")
p.connect("bj.value", "exec_agent.messages")

# Simulate what streaming infrastructure does before pipeline.run()
pipeline_run_args = {
    "planning_agent": {"query": "some question"},
    "exec_agent": {"streaming_callback": lambda chunk: None},  # <-- injected by Hayhooks
}
p.run(data=pipeline_run_args)

assert exec_agent.call_count == 0, f"BUG: exec_agent ran {exec_agent.call_count} time(s) in the planning path"

The assert fails — exec_agent.call_count == 1.

Static proof that both injection paths are equivalent:

# _convert_to_internal_format gives every key sender=None — no exceptions
dummy_cb = lambda chunk: None
result = PipelineBase._convert_to_internal_format({"exec_agent": {"streaming_callback": dummy_cb}})  # @staticmethod — no instance needed
assert result["exec_agent"]["streaming_callback"][0] == {"sender": None, "value": dummy_cb}

# has_user_input fires on that result
assert has_user_input(result["exec_agent"]) is True

Why PR #11057 does not fix this

PR #11057 adds is_any_connected_socket_blocked_by_routing() as a third gate in can_component_run. That gate iterates connected non-variadic sockets and, once all their senders have executed, checks whether any produced a real value. If not, the component is marked BLOCKED.

In our pipeline the path is ConditionalRouter → BranchJoiner → Execution_Agent.messages. In the planning path, ConditionalRouter routes away from BranchJoiner's input. BranchJoiner (greedy variadic) therefore never executes at all — no real value ever arrives at its input socket. Because BranchJoiner never ran:

all_socket_predecessors_executed(messages_socket, socket_inputs=[])
# expected_senders={"bj"}, executed_senders={} → False

The gate skips the messages socket and returns Falsenot_blocked_by_routing = True. The spurious trigger from streaming_callback injection is unaffected.

This was verified by implementing is_any_connected_socket_blocked_by_routing verbatim from the PR diff, patching it into can_component_run via unittest.mock.patch, and running the pipeline end-to-end. exec_agent.call_count remains 1 with the patch applied.

Unit-level summary:

Scenario all_socket_predecessors_executed is_any_connected_socket_blocked_by_routing outcome
Our bug — bj never ran False (bj not in executed senders) False (gate skipped) spurious run persists
PIP-219 scenario — bj ran, _NO_OUTPUT_PRODUCED True True (path cut off) correctly blocked

Suggested fix directions (both zero breaking changes)

Option A — new pipeline.run(component_config=...) kwarg:
Add an optional component_config: dict | None = None to pipeline.run(). Entries there are merged into component inputs in _consume_component_inputs just before a component executes, but are never passed through _convert_to_internal_format, so they produce no sender=None trigger. Streaming infrastructure switches from injecting via data to injecting via component_config. No existing call sites change.

Option B — PassiveInput annotation on InputSocket (consistent with existing Greedy/Variadic pattern):
Add a PassiveInput annotation that a component can use to mark a socket as non-triggering: streaming_callback: Annotated[cb_type | None, PassiveInput] = None. has_user_input() (or the trigger check inside has_any_trigger) skips sockets marked this way. Agent opts in once; all other sockets behave exactly as today.

FAQ Check

System:

  • OS: macOS / Linux (Docker)
  • GPU/CPU: CPU
  • Haystack version: 2.27.0
  • DocumentStore: n/a
  • Reader: n/a
  • Retriever: n/a

Metadata

Metadata

Assignees

No one assigned

    Labels

    P1High priority, add to the next sprint

    Type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions