From a7b13c8bf922516844baf908c9ab0956c4e65e62 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Wed, 20 May 2026 08:48:19 +0200 Subject: [PATCH 1/2] Remove DEFER_LAST as a priority --- haystack/core/pipeline/async_pipeline.py | 6 ++-- haystack/core/pipeline/base.py | 22 ++++--------- haystack/core/pipeline/component_checks.py | 25 --------------- haystack/core/pipeline/pipeline.py | 2 +- ...-defer-last-priority-94bd32cf1ee83bf5.yaml | 8 +++++ test/components/agents/test_agent.py | 16 +++++----- test/core/pipeline/test_component_checks.py | 32 ------------------- test/core/pipeline/test_pipeline_base.py | 6 ++-- 8 files changed, 30 insertions(+), 87 deletions(-) create mode 100644 releasenotes/notes/remove-defer-last-priority-94bd32cf1ee83bf5.yaml diff --git a/haystack/core/pipeline/async_pipeline.py b/haystack/core/pipeline/async_pipeline.py index 4d8b93526a..f814070f10 100644 --- a/haystack/core/pipeline/async_pipeline.py +++ b/haystack/core/pipeline/async_pipeline.py @@ -435,12 +435,12 @@ async def _wait_for_all_tasks_to_complete() -> AsyncIterator[dict[str, Any]]: # keep adding while concurrency is not locked continue - # The next is DEFER/DEFER_LAST => we only schedule it if it "becomes READY" + # The next is DEFER => we only schedule it if it "becomes READY" # We'll handle it in the next iteration or with incremental waiting break - # We only schedule components with priority DEFER or DEFER_LAST when no other tasks are running - elif priority in (ComponentPriority.DEFER, ComponentPriority.DEFER_LAST) and not running_tasks: + # We only schedule components with priority DEFER when no other tasks are running + elif priority == ComponentPriority.DEFER and not running_tasks: if len(priority_queue) > 0: comp_name, topological_sort = self._tiebreak_waiting_components( component_name=comp_name, diff --git a/haystack/core/pipeline/base.py b/haystack/core/pipeline/base.py index 80e601ae4e..dccb4da406 100644 --- a/haystack/core/pipeline/base.py +++ b/haystack/core/pipeline/base.py @@ -29,7 +29,6 @@ from haystack.core.pipeline.component_checks import ( _NO_OUTPUT_PRODUCED, all_predecessors_executed, - are_all_lazy_variadic_sockets_resolved, are_all_sockets_ready, can_component_run, is_any_greedy_socket_ready, @@ -76,8 +75,7 @@ class ComponentPriority(IntEnum): HIGHEST = 1 READY = 2 DEFER = 3 - DEFER_LAST = 4 - BLOCKED = 5 + BLOCKED = 4 class PipelineBase: # noqa: PLW1641 @@ -1200,9 +1198,9 @@ def _calculate_priority(comp: dict, comp_inputs: dict[str, list[dict[str, Any]]] if all_predecessors_executed(comp, comp_inputs): # This priority is explicitly used in AsyncPipeline + in _is_queue_stale return ComponentPriority.READY - if are_all_lazy_variadic_sockets_resolved(comp, comp_inputs): - return ComponentPriority.DEFER - return ComponentPriority.DEFER_LAST + # If we make it here it means the component can run but is waiting for more inputs, so we give it the lowest + # priority. This way, components that are ready to run will be prioritized over ones assigned with this prio. + return ComponentPriority.DEFER def _get_component_with_graph_metadata_and_visits(self, component_name: str, visits: int) -> dict[str, Any]: """ @@ -1306,8 +1304,8 @@ def _tiebreak_waiting_components( """ Decides which component to run when multiple components are waiting for inputs with the same priority. - NOTE: This was designed to only tie-break for priorities DEFER and DEFER_LAST. Since this function also removes - these components from the priority queue we rely on _is_queue_stale to then refill the priority queue. + NOTE: This was designed to only tie-break for components with the priority DEFER. Since this function also + removes these components from the priority queue we rely on _is_queue_stale to then refill the priority queue. And _is_queue_stale only triggers when all remaining components have BLOCKED priority. :param component_name: The name of the component. @@ -1320,16 +1318,10 @@ def _tiebreak_waiting_components( """ # Create a list of all components that have the same priority as the current component, including the # current component itself and remove them from the priority queue. - has_deferred_priority = priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST] components_with_same_priority = [component_name] while len(priority_queue) > 0: next_priority, next_component_name = priority_queue.peek() - # For tiebreaking purposes we treat DEFER and DEFER_LAST as the same priority. - if ( - has_deferred_priority - and next_priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST] - or next_priority == priority - ): + if priority == ComponentPriority.DEFER and next_priority == ComponentPriority.DEFER: priority_queue.pop() # actually remove the component components_with_same_priority.append(next_component_name) else: diff --git a/haystack/core/pipeline/component_checks.py b/haystack/core/pipeline/component_checks.py index 54a59dc28f..d05d373338 100644 --- a/haystack/core/pipeline/component_checks.py +++ b/haystack/core/pipeline/component_checks.py @@ -201,31 +201,6 @@ def all_predecessors_executed(component: dict, inputs: dict) -> bool: ) -def are_all_lazy_variadic_sockets_resolved(component: dict, inputs: dict) -> bool: - """ - Checks if the final state for all lazy variadic sockets of a component is resolved. - - :param component: Component metadata and the component instance. - :param inputs: Inputs for the component. - """ - for socket_name, socket in component["input_sockets"].items(): - if socket.is_lazy_variadic: - socket_inputs = inputs.get(socket_name, []) - - # Checks if a lazy variadic socket is ready to run. - # A socket is ready if either: - # - it has received all expected inputs, or - # - all its predecessors have executed - # If none of the conditions are met, the socket is not ready to run and we defer the component. - if not ( - has_lazy_variadic_socket_received_all_inputs(socket, socket_inputs) - or all_socket_predecessors_executed(socket, socket_inputs) - ): - return False - - return True - - def is_any_greedy_socket_ready(component: dict, inputs: dict) -> bool: """ Checks if the component has any greedy socket that is ready to run. diff --git a/haystack/core/pipeline/pipeline.py b/haystack/core/pipeline/pipeline.py index add3977e40..e957c94fce 100644 --- a/haystack/core/pipeline/pipeline.py +++ b/haystack/core/pipeline/pipeline.py @@ -324,7 +324,7 @@ def run( # noqa: PLR0915, PLR0912, C901 # We always exit the loop since we cannot run the next component. break - if len(priority_queue) > 0 and priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST]: + if len(priority_queue) > 0 and priority == ComponentPriority.DEFER: component_name, topological_sort = self._tiebreak_waiting_components( component_name=component_name, priority=priority, diff --git a/releasenotes/notes/remove-defer-last-priority-94bd32cf1ee83bf5.yaml b/releasenotes/notes/remove-defer-last-priority-94bd32cf1ee83bf5.yaml new file mode 100644 index 0000000000..1cc637b9a5 --- /dev/null +++ b/releasenotes/notes/remove-defer-last-priority-94bd32cf1ee83bf5.yaml @@ -0,0 +1,8 @@ +--- +enhancements: + - | + Simplified the pipeline scheduler by removing the `DEFER_LAST` component priority. + Components that previously got `DEFER_LAST` (those with unresolved lazy-variadic sockets) + now share the `DEFER` priority and are tie-broken via topological order, matching the + behavior of all other deferred components. This should not materially change how + components are scheduled in a pipeline. diff --git a/test/components/agents/test_agent.py b/test/components/agents/test_agent.py index 3153a8b6ef..c155f3610d 100644 --- a/test/components/agents/test_agent.py +++ b/test/components/agents/test_agent.py @@ -1722,13 +1722,13 @@ class TestAgentWaitsForBlockedPredecessor: 1. history_parser runs → sends messages to messages_joiner. 2. files_processor runs with files=[] → returns {} (no output). 3. attachments_builder is BLOCKED — its mandatory processed_files input never arrives. - 4. messages_joiner gets DEFER_LAST (priority=4): it has a lazy-variadic socket and attachments_builder hasn't - executed yet, so the joiner doesn't know if more data might still come. It keeps waiting. - 5. agent gets DEFER (priority=3): retrieval_filters arrives with sender=None (static pipeline input), which + 4. messages_joiner gets DEFER: it has a lazy-variadic socket and attachments_builder hasn't executed yet, + so the joiner doesn't know if more data might still come. It keeps waiting. + 5. agent also gets DEFER: retrieval_filters arrives with sender=None (static pipeline input), which satisfies has_any_trigger() on the first visit. The Agent has no mandatory sockets, so can_component_run() - returns True. It also has no unresolved lazy-variadic sockets, so it gets DEFER rather than DEFER_LAST. - 6. Since DEFER (3) < DEFER_LAST (4), the scheduler picks the Agent before the joiner runs. - The Agent executes without messages and raises: + returns True. + 6. The scheduler tie-breaks DEFER components by topological order, so the joiner should run before the Agent. + Before the fix the Agent was picked first and executed without messages, raising: ValueError("No messages provided to the Agent and neither user_prompt nor system_prompt is set.") """ @@ -1780,8 +1780,8 @@ def run(self, processed_files: list[str]) -> dict: pipeline.connect("messages_joiner.values", "agent.messages") # files=[] → files_processor produces no output → attachments_builder BLOCKED - # → messages_joiner stays DEFER_LAST - # → agent (DEFER) runs first without messages → ValueError + # → messages_joiner stays DEFER waiting for the blocked branch + # → agent (DEFER) must wait for the joiner via topological tie-break result = pipeline.run( data={ "history_parser": {"query": "What case law applies?"}, diff --git a/test/core/pipeline/test_component_checks.py b/test/core/pipeline/test_component_checks.py index 9b423a4188..e60e7c2bd1 100644 --- a/test/core/pipeline/test_component_checks.py +++ b/test/core/pipeline/test_component_checks.py @@ -15,7 +15,6 @@ any_predecessors_provided_input, any_socket_input_received, any_socket_value_from_predecessor_received, - are_all_lazy_variadic_sockets_resolved, are_all_sockets_ready, can_component_run, can_not_receive_inputs_from_pipeline, @@ -618,37 +617,6 @@ def test_all_predecessors_executed_with_user_input(self, complex_component): assert all_predecessors_executed(complex_component, inputs) is False -class TestLazyVariadicResolution: - def test_lazy_variadic_sockets_all_resolved(self, complex_component): - """Checks that lazy variadic sockets are resolved when all inputs have arrived.""" - inputs = {"lazy_var": [{"sender": "component1", "value": 42}, {"sender": "component2", "value": 43}]} - assert are_all_lazy_variadic_sockets_resolved(complex_component, inputs) is True - - def test_lazy_variadic_sockets_partially_resolved(self, complex_component): - """Missing some sender outputs means lazy variadic sockets are not resolved.""" - inputs = { - "lazy_var": [{"sender": "component1", "value": 42}] # Missing component2 - } - assert are_all_lazy_variadic_sockets_resolved(complex_component, inputs) is False - - def test_lazy_variadic_sockets_with_no_inputs(self, complex_component): - """No inputs: lazy variadic socket is not resolved.""" - assert are_all_lazy_variadic_sockets_resolved(complex_component, {}) is False - - def test_lazy_variadic_sockets_with_predecessors_executed(self, complex_component): - """ - Ensures that if all predecessors have executed (but produced no output), - the lazy variadic socket is still considered resolved. - """ - inputs = { - "lazy_var": [ - {"sender": "component1", "value": _NO_OUTPUT_PRODUCED}, - {"sender": "component2", "value": _NO_OUTPUT_PRODUCED}, - ] - } - assert are_all_lazy_variadic_sockets_resolved(complex_component, inputs) is True - - class TestGreedySocketReadiness: def test_greedy_socket_ready(self, complex_component): """A single valid input is enough for a greedy variadic socket to be considered ready.""" diff --git a/test/core/pipeline/test_pipeline_base.py b/test/core/pipeline/test_pipeline_base.py index fc03738508..47c241929a 100644 --- a/test/core/pipeline/test_pipeline_base.py +++ b/test/core/pipeline/test_pipeline_base.py @@ -1105,7 +1105,7 @@ def test__find_receivers_from(self): ComponentPriority.DEFER, "Component should DEFER when all lazy variadic sockets are resolved", ), - # Test case 6: DEFER_LAST - Incomplete variadic inputs + # Test case 6: DEFER - Incomplete variadic inputs ( { "instance": "mock_instance", @@ -1121,8 +1121,8 @@ def test__find_receivers_from(self): "variadic_input": [{"sender": "component1", "value": 42}], # Missing component2 "normal_input": [{"sender": "component3", "value": "test"}], }, - ComponentPriority.DEFER_LAST, - "Component should be DEFER_LAST when not all variadic senders have produced output", + ComponentPriority.DEFER, + "Component should be DEFER when not all variadic senders have produced output", ), # Test case 7: READY - No input sockets, first visit ( From 7fa208b3e3e769e8f4111303fc42291bce295aaa Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Wed, 20 May 2026 08:53:31 +0200 Subject: [PATCH 2/2] fix reno --- .../notes/remove-defer-last-priority-94bd32cf1ee83bf5.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/releasenotes/notes/remove-defer-last-priority-94bd32cf1ee83bf5.yaml b/releasenotes/notes/remove-defer-last-priority-94bd32cf1ee83bf5.yaml index 1cc637b9a5..116451ba78 100644 --- a/releasenotes/notes/remove-defer-last-priority-94bd32cf1ee83bf5.yaml +++ b/releasenotes/notes/remove-defer-last-priority-94bd32cf1ee83bf5.yaml @@ -1,8 +1,8 @@ --- enhancements: - | - Simplified the pipeline scheduler by removing the `DEFER_LAST` component priority. - Components that previously got `DEFER_LAST` (those with unresolved lazy-variadic sockets) - now share the `DEFER` priority and are tie-broken via topological order, matching the + Simplified the pipeline scheduler by removing the ``DEFER_LAST`` component priority. + Components that previously got ``DEFER_LAST`` (those with unresolved lazy-variadic sockets) + now share the ``DEFER`` priority and are tie-broken via topological order, matching the behavior of all other deferred components. This should not materially change how components are scheduled in a pipeline.