Skip to content

Commit 8527079

Browse files
authored
refactor: Remove DEFER_LAST as a priority (#11343)
1 parent f4b76a1 commit 8527079

8 files changed

Lines changed: 30 additions & 87 deletions

File tree

haystack/core/pipeline/async_pipeline.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -435,12 +435,12 @@ async def _wait_for_all_tasks_to_complete() -> AsyncIterator[dict[str, Any]]:
435435
# keep adding while concurrency is not locked
436436
continue
437437

438-
# The next is DEFER/DEFER_LAST => we only schedule it if it "becomes READY"
438+
# The next is DEFER => we only schedule it if it "becomes READY"
439439
# We'll handle it in the next iteration or with incremental waiting
440440
break
441441

442-
# We only schedule components with priority DEFER or DEFER_LAST when no other tasks are running
443-
elif priority in (ComponentPriority.DEFER, ComponentPriority.DEFER_LAST) and not running_tasks:
442+
# We only schedule components with priority DEFER when no other tasks are running
443+
elif priority == ComponentPriority.DEFER and not running_tasks:
444444
if len(priority_queue) > 0:
445445
comp_name, topological_sort = self._tiebreak_waiting_components(
446446
component_name=comp_name,

haystack/core/pipeline/base.py

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
from haystack.core.pipeline.component_checks import (
3030
_NO_OUTPUT_PRODUCED,
3131
all_predecessors_executed,
32-
are_all_lazy_variadic_sockets_resolved,
3332
are_all_sockets_ready,
3433
can_component_run,
3534
is_any_greedy_socket_ready,
@@ -76,8 +75,7 @@ class ComponentPriority(IntEnum):
7675
HIGHEST = 1
7776
READY = 2
7877
DEFER = 3
79-
DEFER_LAST = 4
80-
BLOCKED = 5
78+
BLOCKED = 4
8179

8280

8381
class PipelineBase: # noqa: PLW1641
@@ -1200,9 +1198,9 @@ def _calculate_priority(comp: dict, comp_inputs: dict[str, list[dict[str, Any]]]
12001198
if all_predecessors_executed(comp, comp_inputs):
12011199
# This priority is explicitly used in AsyncPipeline + in _is_queue_stale
12021200
return ComponentPriority.READY
1203-
if are_all_lazy_variadic_sockets_resolved(comp, comp_inputs):
1204-
return ComponentPriority.DEFER
1205-
return ComponentPriority.DEFER_LAST
1201+
# If we make it here it means the component can run but is waiting for more inputs, so we give it the lowest
1202+
# priority. This way, components that are ready to run will be prioritized over ones assigned with this prio.
1203+
return ComponentPriority.DEFER
12061204

12071205
def _get_component_with_graph_metadata_and_visits(self, component_name: str, visits: int) -> dict[str, Any]:
12081206
"""
@@ -1306,8 +1304,8 @@ def _tiebreak_waiting_components(
13061304
"""
13071305
Decides which component to run when multiple components are waiting for inputs with the same priority.
13081306
1309-
NOTE: This was designed to only tie-break for priorities DEFER and DEFER_LAST. Since this function also removes
1310-
these components from the priority queue we rely on _is_queue_stale to then refill the priority queue.
1307+
NOTE: This was designed to only tie-break for components with the priority DEFER. Since this function also
1308+
removes these components from the priority queue we rely on _is_queue_stale to then refill the priority queue.
13111309
And _is_queue_stale only triggers when all remaining components have BLOCKED priority.
13121310
13131311
:param component_name: The name of the component.
@@ -1320,16 +1318,10 @@ def _tiebreak_waiting_components(
13201318
"""
13211319
# Create a list of all components that have the same priority as the current component, including the
13221320
# current component itself and remove them from the priority queue.
1323-
has_deferred_priority = priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST]
13241321
components_with_same_priority = [component_name]
13251322
while len(priority_queue) > 0:
13261323
next_priority, next_component_name = priority_queue.peek()
1327-
# For tiebreaking purposes we treat DEFER and DEFER_LAST as the same priority.
1328-
if (
1329-
has_deferred_priority
1330-
and next_priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST]
1331-
or next_priority == priority
1332-
):
1324+
if priority == ComponentPriority.DEFER and next_priority == ComponentPriority.DEFER:
13331325
priority_queue.pop() # actually remove the component
13341326
components_with_same_priority.append(next_component_name)
13351327
else:

haystack/core/pipeline/component_checks.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -201,31 +201,6 @@ def all_predecessors_executed(component: dict, inputs: dict) -> bool:
201201
)
202202

203203

204-
def are_all_lazy_variadic_sockets_resolved(component: dict, inputs: dict) -> bool:
205-
"""
206-
Checks if the final state for all lazy variadic sockets of a component is resolved.
207-
208-
:param component: Component metadata and the component instance.
209-
:param inputs: Inputs for the component.
210-
"""
211-
for socket_name, socket in component["input_sockets"].items():
212-
if socket.is_lazy_variadic:
213-
socket_inputs = inputs.get(socket_name, [])
214-
215-
# Checks if a lazy variadic socket is ready to run.
216-
# A socket is ready if either:
217-
# - it has received all expected inputs, or
218-
# - all its predecessors have executed
219-
# If none of the conditions are met, the socket is not ready to run and we defer the component.
220-
if not (
221-
has_lazy_variadic_socket_received_all_inputs(socket, socket_inputs)
222-
or all_socket_predecessors_executed(socket, socket_inputs)
223-
):
224-
return False
225-
226-
return True
227-
228-
229204
def is_any_greedy_socket_ready(component: dict, inputs: dict) -> bool:
230205
"""
231206
Checks if the component has any greedy socket that is ready to run.

haystack/core/pipeline/pipeline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ def run( # noqa: PLR0915, PLR0912, C901
324324
# We always exit the loop since we cannot run the next component.
325325
break
326326

327-
if len(priority_queue) > 0 and priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST]:
327+
if len(priority_queue) > 0 and priority == ComponentPriority.DEFER:
328328
component_name, topological_sort = self._tiebreak_waiting_components(
329329
component_name=component_name,
330330
priority=priority,
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
enhancements:
3+
- |
4+
Simplified the pipeline scheduler by removing the ``DEFER_LAST`` component priority.
5+
Components that previously got ``DEFER_LAST`` (those with unresolved lazy-variadic sockets)
6+
now share the ``DEFER`` priority and are tie-broken via topological order, matching the
7+
behavior of all other deferred components. This should not materially change how
8+
components are scheduled in a pipeline.

test/components/agents/test_agent.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1722,13 +1722,13 @@ class TestAgentWaitsForBlockedPredecessor:
17221722
1. history_parser runs → sends messages to messages_joiner.
17231723
2. files_processor runs with files=[] → returns {} (no output).
17241724
3. attachments_builder is BLOCKED — its mandatory processed_files input never arrives.
1725-
4. messages_joiner gets DEFER_LAST (priority=4): it has a lazy-variadic socket and attachments_builder hasn't
1726-
executed yet, so the joiner doesn't know if more data might still come. It keeps waiting.
1727-
5. agent gets DEFER (priority=3): retrieval_filters arrives with sender=None (static pipeline input), which
1725+
4. messages_joiner gets DEFER: it has a lazy-variadic socket and attachments_builder hasn't executed yet,
1726+
so the joiner doesn't know if more data might still come. It keeps waiting.
1727+
5. agent also gets DEFER: retrieval_filters arrives with sender=None (static pipeline input), which
17281728
satisfies has_any_trigger() on the first visit. The Agent has no mandatory sockets, so can_component_run()
1729-
returns True. It also has no unresolved lazy-variadic sockets, so it gets DEFER rather than DEFER_LAST.
1730-
6. Since DEFER (3) < DEFER_LAST (4), the scheduler picks the Agent before the joiner runs.
1731-
The Agent executes without messages and raises:
1729+
returns True.
1730+
6. The scheduler tie-breaks DEFER components by topological order, so the joiner should run before the Agent.
1731+
Before the fix the Agent was picked first and executed without messages, raising:
17321732
17331733
ValueError("No messages provided to the Agent and neither user_prompt nor system_prompt is set.")
17341734
"""
@@ -1780,8 +1780,8 @@ def run(self, processed_files: list[str]) -> dict:
17801780
pipeline.connect("messages_joiner.values", "agent.messages")
17811781

17821782
# files=[] → files_processor produces no output → attachments_builder BLOCKED
1783-
# → messages_joiner stays DEFER_LAST
1784-
# → agent (DEFER) runs first without messages → ValueError
1783+
# → messages_joiner stays DEFER waiting for the blocked branch
1784+
# → agent (DEFER) must wait for the joiner via topological tie-break
17851785
result = pipeline.run(
17861786
data={
17871787
"history_parser": {"query": "What case law applies?"},

test/core/pipeline/test_component_checks.py

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
any_predecessors_provided_input,
1616
any_socket_input_received,
1717
any_socket_value_from_predecessor_received,
18-
are_all_lazy_variadic_sockets_resolved,
1918
are_all_sockets_ready,
2019
can_component_run,
2120
can_not_receive_inputs_from_pipeline,
@@ -618,37 +617,6 @@ def test_all_predecessors_executed_with_user_input(self, complex_component):
618617
assert all_predecessors_executed(complex_component, inputs) is False
619618

620619

621-
class TestLazyVariadicResolution:
622-
def test_lazy_variadic_sockets_all_resolved(self, complex_component):
623-
"""Checks that lazy variadic sockets are resolved when all inputs have arrived."""
624-
inputs = {"lazy_var": [{"sender": "component1", "value": 42}, {"sender": "component2", "value": 43}]}
625-
assert are_all_lazy_variadic_sockets_resolved(complex_component, inputs) is True
626-
627-
def test_lazy_variadic_sockets_partially_resolved(self, complex_component):
628-
"""Missing some sender outputs means lazy variadic sockets are not resolved."""
629-
inputs = {
630-
"lazy_var": [{"sender": "component1", "value": 42}] # Missing component2
631-
}
632-
assert are_all_lazy_variadic_sockets_resolved(complex_component, inputs) is False
633-
634-
def test_lazy_variadic_sockets_with_no_inputs(self, complex_component):
635-
"""No inputs: lazy variadic socket is not resolved."""
636-
assert are_all_lazy_variadic_sockets_resolved(complex_component, {}) is False
637-
638-
def test_lazy_variadic_sockets_with_predecessors_executed(self, complex_component):
639-
"""
640-
Ensures that if all predecessors have executed (but produced no output),
641-
the lazy variadic socket is still considered resolved.
642-
"""
643-
inputs = {
644-
"lazy_var": [
645-
{"sender": "component1", "value": _NO_OUTPUT_PRODUCED},
646-
{"sender": "component2", "value": _NO_OUTPUT_PRODUCED},
647-
]
648-
}
649-
assert are_all_lazy_variadic_sockets_resolved(complex_component, inputs) is True
650-
651-
652620
class TestGreedySocketReadiness:
653621
def test_greedy_socket_ready(self, complex_component):
654622
"""A single valid input is enough for a greedy variadic socket to be considered ready."""

test/core/pipeline/test_pipeline_base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,7 +1105,7 @@ def test__find_receivers_from(self):
11051105
ComponentPriority.DEFER,
11061106
"Component should DEFER when all lazy variadic sockets are resolved",
11071107
),
1108-
# Test case 6: DEFER_LAST - Incomplete variadic inputs
1108+
# Test case 6: DEFER - Incomplete variadic inputs
11091109
(
11101110
{
11111111
"instance": "mock_instance",
@@ -1121,8 +1121,8 @@ def test__find_receivers_from(self):
11211121
"variadic_input": [{"sender": "component1", "value": 42}], # Missing component2
11221122
"normal_input": [{"sender": "component3", "value": "test"}],
11231123
},
1124-
ComponentPriority.DEFER_LAST,
1125-
"Component should be DEFER_LAST when not all variadic senders have produced output",
1124+
ComponentPriority.DEFER,
1125+
"Component should be DEFER when not all variadic senders have produced output",
11261126
),
11271127
# Test case 7: READY - No input sockets, first visit
11281128
(

0 commit comments

Comments
 (0)