Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions haystack/core/pipeline/async_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,12 +435,12 @@ async def _wait_for_all_tasks_to_complete() -> AsyncIterator[dict[str, Any]]:
# keep adding while concurrency is not locked
continue

# The next is DEFER/DEFER_LAST => we only schedule it if it "becomes READY"
# The next is DEFER => we only schedule it if it "becomes READY"
# We'll handle it in the next iteration or with incremental waiting
break

# We only schedule components with priority DEFER or DEFER_LAST when no other tasks are running
elif priority in (ComponentPriority.DEFER, ComponentPriority.DEFER_LAST) and not running_tasks:
# We only schedule components with priority DEFER when no other tasks are running
elif priority == ComponentPriority.DEFER and not running_tasks:
if len(priority_queue) > 0:
comp_name, topological_sort = self._tiebreak_waiting_components(
component_name=comp_name,
Expand Down
22 changes: 7 additions & 15 deletions haystack/core/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from haystack.core.pipeline.component_checks import (
_NO_OUTPUT_PRODUCED,
all_predecessors_executed,
are_all_lazy_variadic_sockets_resolved,
are_all_sockets_ready,
can_component_run,
is_any_greedy_socket_ready,
Expand Down Expand Up @@ -76,8 +75,7 @@ class ComponentPriority(IntEnum):
HIGHEST = 1
READY = 2
DEFER = 3
DEFER_LAST = 4
BLOCKED = 5
BLOCKED = 4


class PipelineBase: # noqa: PLW1641
Expand Down Expand Up @@ -1200,9 +1198,9 @@ def _calculate_priority(comp: dict, comp_inputs: dict[str, list[dict[str, Any]]]
if all_predecessors_executed(comp, comp_inputs):
# This priority is explicitly used in AsyncPipeline + in _is_queue_stale
return ComponentPriority.READY
if are_all_lazy_variadic_sockets_resolved(comp, comp_inputs):
return ComponentPriority.DEFER
return ComponentPriority.DEFER_LAST
# If we make it here it means the component can run but is waiting for more inputs, so we give it the lowest
# priority. This way, components that are ready to run will be prioritized over ones assigned with this prio.
return ComponentPriority.DEFER

def _get_component_with_graph_metadata_and_visits(self, component_name: str, visits: int) -> dict[str, Any]:
"""
Expand Down Expand Up @@ -1306,8 +1304,8 @@ def _tiebreak_waiting_components(
"""
Decides which component to run when multiple components are waiting for inputs with the same priority.

NOTE: This was designed to only tie-break for priorities DEFER and DEFER_LAST. Since this function also removes
these components from the priority queue we rely on _is_queue_stale to then refill the priority queue.
NOTE: This was designed to only tie-break for components with the priority DEFER. Since this function also
removes these components from the priority queue we rely on _is_queue_stale to then refill the priority queue.
And _is_queue_stale only triggers when all remaining components have BLOCKED priority.

:param component_name: The name of the component.
Expand All @@ -1320,16 +1318,10 @@ def _tiebreak_waiting_components(
"""
# Create a list of all components that have the same priority as the current component, including the
# current component itself and remove them from the priority queue.
has_deferred_priority = priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST]
components_with_same_priority = [component_name]
while len(priority_queue) > 0:
next_priority, next_component_name = priority_queue.peek()
# For tiebreaking purposes we treat DEFER and DEFER_LAST as the same priority.
if (
has_deferred_priority
and next_priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST]
or next_priority == priority
):
if priority == ComponentPriority.DEFER and next_priority == ComponentPriority.DEFER:
priority_queue.pop() # actually remove the component
components_with_same_priority.append(next_component_name)
else:
Expand Down
25 changes: 0 additions & 25 deletions haystack/core/pipeline/component_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,31 +201,6 @@ def all_predecessors_executed(component: dict, inputs: dict) -> bool:
)


def are_all_lazy_variadic_sockets_resolved(component: dict, inputs: dict) -> bool:
"""
Checks if the final state for all lazy variadic sockets of a component is resolved.

:param component: Component metadata and the component instance.
:param inputs: Inputs for the component.
"""
for socket_name, socket in component["input_sockets"].items():
if socket.is_lazy_variadic:
socket_inputs = inputs.get(socket_name, [])

# Checks if a lazy variadic socket is ready to run.
# A socket is ready if either:
# - it has received all expected inputs, or
# - all its predecessors have executed
# If none of the conditions are met, the socket is not ready to run and we defer the component.
if not (
has_lazy_variadic_socket_received_all_inputs(socket, socket_inputs)
or all_socket_predecessors_executed(socket, socket_inputs)
):
return False

return True


def is_any_greedy_socket_ready(component: dict, inputs: dict) -> bool:
"""
Checks if the component has any greedy socket that is ready to run.
Expand Down
2 changes: 1 addition & 1 deletion haystack/core/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def run( # noqa: PLR0915, PLR0912, C901
# We always exit the loop since we cannot run the next component.
break

if len(priority_queue) > 0 and priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST]:
if len(priority_queue) > 0 and priority == ComponentPriority.DEFER:
component_name, topological_sort = self._tiebreak_waiting_components(
component_name=component_name,
priority=priority,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
enhancements:
- |
Simplified the pipeline scheduler by removing the ``DEFER_LAST`` component priority.
Components that previously got ``DEFER_LAST`` (those with unresolved lazy-variadic sockets)
now share the ``DEFER`` priority and are tie-broken via topological order, matching the
behavior of all other deferred components. This should not materially change how
components are scheduled in a pipeline.
16 changes: 8 additions & 8 deletions test/components/agents/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1722,13 +1722,13 @@ class TestAgentWaitsForBlockedPredecessor:
1. history_parser runs → sends messages to messages_joiner.
2. files_processor runs with files=[] → returns {} (no output).
3. attachments_builder is BLOCKED — its mandatory processed_files input never arrives.
4. messages_joiner gets DEFER_LAST (priority=4): it has a lazy-variadic socket and attachments_builder hasn't
executed yet, so the joiner doesn't know if more data might still come. It keeps waiting.
5. agent gets DEFER (priority=3): retrieval_filters arrives with sender=None (static pipeline input), which
4. messages_joiner gets DEFER: it has a lazy-variadic socket and attachments_builder hasn't executed yet,
so the joiner doesn't know if more data might still come. It keeps waiting.
5. agent also gets DEFER: retrieval_filters arrives with sender=None (static pipeline input), which
satisfies has_any_trigger() on the first visit. The Agent has no mandatory sockets, so can_component_run()
returns True. It also has no unresolved lazy-variadic sockets, so it gets DEFER rather than DEFER_LAST.
6. Since DEFER (3) < DEFER_LAST (4), the scheduler picks the Agent before the joiner runs.
The Agent executes without messages and raises:
returns True.
6. The scheduler tie-breaks DEFER components by topological order, so the joiner should run before the Agent.
Before the fix the Agent was picked first and executed without messages, raising:

ValueError("No messages provided to the Agent and neither user_prompt nor system_prompt is set.")
"""
Expand Down Expand Up @@ -1780,8 +1780,8 @@ def run(self, processed_files: list[str]) -> dict:
pipeline.connect("messages_joiner.values", "agent.messages")

# files=[] → files_processor produces no output → attachments_builder BLOCKED
# → messages_joiner stays DEFER_LAST
# → agent (DEFER) runs first without messages → ValueError
# → messages_joiner stays DEFER waiting for the blocked branch
# → agent (DEFER) must wait for the joiner via topological tie-break
result = pipeline.run(
data={
"history_parser": {"query": "What case law applies?"},
Expand Down
32 changes: 0 additions & 32 deletions test/core/pipeline/test_component_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
any_predecessors_provided_input,
any_socket_input_received,
any_socket_value_from_predecessor_received,
are_all_lazy_variadic_sockets_resolved,
are_all_sockets_ready,
can_component_run,
can_not_receive_inputs_from_pipeline,
Expand Down Expand Up @@ -618,37 +617,6 @@ def test_all_predecessors_executed_with_user_input(self, complex_component):
assert all_predecessors_executed(complex_component, inputs) is False


class TestLazyVariadicResolution:
def test_lazy_variadic_sockets_all_resolved(self, complex_component):
"""Checks that lazy variadic sockets are resolved when all inputs have arrived."""
inputs = {"lazy_var": [{"sender": "component1", "value": 42}, {"sender": "component2", "value": 43}]}
assert are_all_lazy_variadic_sockets_resolved(complex_component, inputs) is True

def test_lazy_variadic_sockets_partially_resolved(self, complex_component):
"""Missing some sender outputs means lazy variadic sockets are not resolved."""
inputs = {
"lazy_var": [{"sender": "component1", "value": 42}] # Missing component2
}
assert are_all_lazy_variadic_sockets_resolved(complex_component, inputs) is False

def test_lazy_variadic_sockets_with_no_inputs(self, complex_component):
"""No inputs: lazy variadic socket is not resolved."""
assert are_all_lazy_variadic_sockets_resolved(complex_component, {}) is False

def test_lazy_variadic_sockets_with_predecessors_executed(self, complex_component):
"""
Ensures that if all predecessors have executed (but produced no output),
the lazy variadic socket is still considered resolved.
"""
inputs = {
"lazy_var": [
{"sender": "component1", "value": _NO_OUTPUT_PRODUCED},
{"sender": "component2", "value": _NO_OUTPUT_PRODUCED},
]
}
assert are_all_lazy_variadic_sockets_resolved(complex_component, inputs) is True


class TestGreedySocketReadiness:
def test_greedy_socket_ready(self, complex_component):
"""A single valid input is enough for a greedy variadic socket to be considered ready."""
Expand Down
6 changes: 3 additions & 3 deletions test/core/pipeline/test_pipeline_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,7 @@ def test__find_receivers_from(self):
ComponentPriority.DEFER,
"Component should DEFER when all lazy variadic sockets are resolved",
),
# Test case 6: DEFER_LAST - Incomplete variadic inputs
# Test case 6: DEFER - Incomplete variadic inputs
(
{
"instance": "mock_instance",
Expand All @@ -1121,8 +1121,8 @@ def test__find_receivers_from(self):
"variadic_input": [{"sender": "component1", "value": 42}], # Missing component2
"normal_input": [{"sender": "component3", "value": "test"}],
},
ComponentPriority.DEFER_LAST,
"Component should be DEFER_LAST when not all variadic senders have produced output",
ComponentPriority.DEFER,
"Component should be DEFER when not all variadic senders have produced output",
),
# Test case 7: READY - No input sockets, first visit
(
Expand Down
Loading