Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions haystack/components/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ async def run_async(
component={"instance": self.chat_generator},
component_inputs={"messages": messages, **generator_inputs},
component_visits=component_visits,
max_runs_per_component=self.max_agent_steps,
parent_span=span,
)
llm_messages = result["replies"]
Comment thread
sjrl marked this conversation as resolved.
Expand All @@ -396,7 +395,6 @@ async def run_async(

# 3. Call the ToolInvoker
# We only send the messages from the LLM to the tool invoker
# Check if the ToolInvoker supports async execution. Currently, it doesn't.
tool_invoker_result = await AsyncPipeline._run_component_async(
component_name="tool_invoker",
component={"instance": self._tool_invoker},
Expand All @@ -406,7 +404,6 @@ async def run_async(
"streaming_callback": streaming_callback,
},
component_visits=component_visits,
max_runs_per_component=self.max_agent_steps,
parent_span=span,
)
tool_messages = tool_invoker_result["tool_messages"]
Expand Down
47 changes: 32 additions & 15 deletions haystack/core/pipeline/async_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

import asyncio
import contextvars
from typing import Any, AsyncIterator, Dict, List, Optional, Set
from typing import Any, AsyncIterator, Dict, List, Mapping, Optional, Set

from haystack import logging, tracing
from haystack.core.component import Component
from haystack.core.errors import PipelineMaxComponentRuns, PipelineRuntimeError
from haystack.core.errors import PipelineRuntimeError
from haystack.core.pipeline.base import (
_COMPONENT_INPUT,
_COMPONENT_OUTPUT,
Expand All @@ -31,14 +31,13 @@ class AsyncPipeline(PipelineBase):
"""

@staticmethod
async def _run_component_async( # pylint: disable=too-many-positional-arguments
async def _run_component_async(
component_name: str,
component: Dict[str, Any],
component_inputs: Dict[str, Any],
component_visits: Dict[str, int],
max_runs_per_component: int = 100,
parent_span: Optional[tracing.Span] = None,
) -> Dict[str, Any]:
) -> Mapping[str, Any]:
Comment thread
sjrl marked this conversation as resolved.
"""
Executes a single component asynchronously.

Expand All @@ -52,13 +51,13 @@ async def _run_component_async( # pylint: disable=too-many-positional-arguments
:param component_inputs: Inputs for the component.
:returns: Outputs from the component that can be yielded from run_async_generator.
"""
if component_visits[component_name] > max_runs_per_component:
raise PipelineMaxComponentRuns(f"Max runs for '{component_name}' reached.")

Comment thread
Amnah199 marked this conversation as resolved.
instance: Component = component["instance"]

with PipelineBase._create_component_span(
component_name=component_name, instance=instance, inputs=component_inputs, parent_span=parent_span
) as span:
# We deepcopy the inputs otherwise we might lose that information
# when we delete them in case they're sent to other Components
span.set_content_tag(_COMPONENT_INPUT, _deepcopy_with_exceptions(component_inputs))
logger.info("Running component {component_name}", component_name=component_name)

Expand All @@ -76,15 +75,15 @@ async def _run_component_async( # pylint: disable=too-many-positional-arguments

component_visits[component_name] += 1

if not isinstance(outputs, dict):
if not isinstance(outputs, Mapping):
raise PipelineRuntimeError.from_invalid_output(component_name, instance.__class__, outputs)

span.set_tag(_COMPONENT_VISITS, component_visits[component_name])
span.set_content_tag(_COMPONENT_OUTPUT, _deepcopy_with_exceptions(outputs))

return outputs

async def run_async_generator( # noqa: PLR0915,C901
async def run_async_generator( # noqa: PLR0915,C901 # pylint: disable=too-many-statements
self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None, concurrency_limit: int = 4
) -> AsyncIterator[Dict[str, Any]]:
"""
Expand Down Expand Up @@ -229,7 +228,8 @@ async def _run_highest_in_isolation(component_name: str) -> AsyncIterator[Dict[s
Runs a component with HIGHEST priority in isolation.

We need to run components with HIGHEST priority (i.e. components with GreedyVariadic input socket)
because otherwise, downstream components could produce additional inputs for the GreedyVariadic socket.
by themselves, without any other components running concurrently. Otherwise, downstream components
could produce additional inputs for the GreedyVariadic socket.

:param component_name: The name of the component.
:return: An async iterator of partial outputs.
Expand Down Expand Up @@ -261,7 +261,6 @@ async def _run_highest_in_isolation(component_name: str) -> AsyncIterator[Dict[s
component=comp_dict,
component_inputs=component_inputs,
component_visits=component_visits,
max_runs_per_component=self._max_runs_per_component,
parent_span=parent_span,
)

Expand Down Expand Up @@ -307,7 +306,6 @@ async def _runner():
component=comp_dict,
component_inputs=component_inputs,
component_visits=component_visits,
max_runs_per_component=self._max_runs_per_component,
parent_span=parent_span,
)

Expand Down Expand Up @@ -363,7 +361,8 @@ async def _wait_for_all_tasks_to_complete() -> AsyncIterator[Dict[str, Any]]:
# 2) Build the priority queue of candidates
priority_queue = self._fill_queue(ordered_names, inputs_state, component_visits)
candidate = self._get_next_runnable_component(priority_queue, component_visits)
if candidate is None and running_tasks:

if (candidate is None or candidate[0] == ComponentPriority.BLOCKED) and running_tasks:
# We need to wait for one task to finish to make progress and potentially unblock the priority_queue
async for partial_res in _wait_for_one_task_to_complete():
yield partial_res
Expand All @@ -373,7 +372,25 @@ async def _wait_for_all_tasks_to_complete() -> AsyncIterator[Dict[str, Any]]:
# done
break

priority, comp_name, _ = candidate # type: ignore
priority, comp_name, comp = candidate # type: ignore

# If the next component is blocked, we do a check to see if the pipeline is possibly blocked and raise
# a warning if it is.
if priority == ComponentPriority.BLOCKED and not running_tasks:
if self._is_pipeline_possibly_blocked(current_pipeline_outputs=pipeline_outputs):
# Pipeline is most likely blocked (most likely a configuration issue) so we raise a warning.
logger.warning(
"Cannot run pipeline - the next component that is meant to run is blocked.\n"
"Component name: '{component_name}'\n"
"Component type: '{component_type}'\n"
"This typically happens when the component is unable to receive all of its required "
"inputs.\nCheck the connections to this component and ensure all required inputs are "
"provided.",
component_name=comp_name,
component_type=comp["instance"].__class__.__name__,
)
# We always exit the loop since we cannot run the next component.
break

if comp_name in scheduled_components:
# We need to wait for one task to finish to make progress
Expand Down
61 changes: 47 additions & 14 deletions haystack/core/pipeline/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# pylint: disable=too-many-lines
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0
Expand All @@ -7,7 +8,21 @@
from datetime import datetime
from enum import IntEnum
from pathlib import Path
from typing import Any, ContextManager, Dict, Iterator, List, Optional, Set, TextIO, Tuple, Type, TypeVar, Union
from typing import (
Any,
ContextManager,
Dict,
Iterator,
List,
Mapping,
Optional,
Set,
TextIO,
Tuple,
Type,
TypeVar,
Union,
)

import networkx # type:ignore

Expand Down Expand Up @@ -1177,18 +1192,15 @@ def _get_next_runnable_component(
None if (item := priority_queue.get()) is None else (ComponentPriority(item[0]), str(item[1]))
)

if priority_and_component_name is not None and priority_and_component_name[0] != ComponentPriority.BLOCKED:
priority, component_name = priority_and_component_name
component = self._get_component_with_graph_metadata_and_visits(
component_name, component_visits[component_name]
)
if component["visits"] > self._max_runs_per_component:
msg = f"Maximum run count {self._max_runs_per_component} reached for component '{component_name}'"
raise PipelineMaxComponentRuns(msg)
if priority_and_component_name is None:
return None

return priority, component_name, component

return None
priority, component_name = priority_and_component_name
comp = self._get_component_with_graph_metadata_and_visits(component_name, component_visits[component_name])
if comp["visits"] > self._max_runs_per_component:
msg = f"Maximum run count {self._max_runs_per_component} reached for component '{component_name}'"
raise PipelineMaxComponentRuns(msg)
return priority, component_name, comp

@staticmethod
def _add_missing_input_defaults(
Expand Down Expand Up @@ -1258,11 +1270,11 @@ def _tiebreak_waiting_components(
@staticmethod
def _write_component_outputs(
component_name: str,
component_outputs: Dict[str, Any],
component_outputs: Mapping[str, Any],
inputs: Dict[str, Any],
receivers: List[Tuple],
include_outputs_from: Set[str],
) -> Dict[str, Any]:
) -> Mapping[str, Any]:
"""
Distributes the outputs of a component to the input sockets that it is connected to.

Expand Down Expand Up @@ -1432,6 +1444,27 @@ def _merge_super_component_pipelines(self) -> Tuple["networkx.MultiDiGraph", Dic

return merged_graph, super_component_mapping

def _is_pipeline_possibly_blocked(self, current_pipeline_outputs: Dict[str, Any]) -> bool:
"""
Heuristically determines whether the pipeline is possibly blocked based on its current outputs.

This method checks if the pipeline has produced any of the expected outputs.
- If no outputs are expected (i.e., `self.outputs()` returns an empty list), the method assumes the pipeline
is not blocked.
- If at least one expected output is present in `current_pipeline_outputs`, the pipeline is also assumed to not
be blocked.
- If none of the expected outputs are present, the pipeline is considered to be possibly blocked.

Note: This check is not definitive—it is intended as a best-effort guess to detect a stalled or misconfigured
pipeline when there are no more runnable components.

:param current_pipeline_outputs: A dictionary of outputs currently produced by the pipeline.
:returns:
bool: True if the pipeline is possibly blocked (i.e., expected outputs are missing), False otherwise.
"""
expected_outputs = self.outputs()
return bool(expected_outputs) and not any(k in current_pipeline_outputs for k in expected_outputs)


def _connections_status(
sender_node: str, receiver_node: str, sender_sockets: List[OutputSocket], receiver_sockets: List[InputSocket]
Expand Down
29 changes: 26 additions & 3 deletions haystack/core/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: Apache-2.0

from typing import Any, Dict, Mapping, Optional, Set, cast
from typing import Any, Dict, Mapping, Optional, Set

from haystack import logging, tracing
from haystack.core.component import Component
Expand Down Expand Up @@ -34,7 +34,7 @@ def _run_component(
inputs: Dict[str, Any],
component_visits: Dict[str, int],
parent_span: Optional[tracing.Span] = None,
) -> Dict[str, Any]:
) -> Mapping[str, Any]:
"""
Runs a Component with the given inputs.

Expand All @@ -56,10 +56,12 @@ def _run_component(
# when we delete them in case they're sent to other Components
span.set_content_tag(_COMPONENT_INPUT, _deepcopy_with_exceptions(inputs))
logger.info("Running component {component_name}", component_name=component_name)

try:
component_output = instance.run(**inputs)
except Exception as error:
raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error

component_visits[component_name] += 1

if not isinstance(component_output, Mapping):
Expand All @@ -68,7 +70,7 @@ def _run_component(
span.set_tag(_COMPONENT_VISITS, component_visits[component_name])
span.set_content_tag(_COMPONENT_OUTPUT, component_output)

return cast(Dict[Any, Any], component_output)
return component_output

def run( # noqa: PLR0915, PLR0912
self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None
Expand Down Expand Up @@ -208,10 +210,31 @@ def run( # noqa: PLR0915, PLR0912

while True:
candidate = self._get_next_runnable_component(priority_queue, component_visits)

# If there are no runnable components left, we can exit the loop
if candidate is None:
break

priority, component_name, component = candidate

# If the next component is blocked, we do a check to see if the pipeline is possibly blocked and raise
# a warning if it is.
if priority == ComponentPriority.BLOCKED:
if self._is_pipeline_possibly_blocked(current_pipeline_outputs=pipeline_outputs):
# Pipeline is most likely blocked (most likely a configuration issue) so we raise a warning.
logger.warning(
"Cannot run pipeline - the next component that is meant to run is blocked.\n"
"Component name: '{component_name}'\n"
"Component type: '{component_type}'\n"
"This typically happens when the component is unable to receive all of its required "
"inputs.\nCheck the connections to this component and ensure all required inputs are "
"provided.",
component_name=component_name,
component_type=component["instance"].__class__.__name__,
)
# We always exit the loop since we cannot run the next component.
break

if len(priority_queue) > 0 and priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST]:
component_name, topological_sort = self._tiebreak_waiting_components(
component_name=component_name,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
features:
- |
Raise a warning when a pipeline can no longer proceed because **all** remaining components are blocked from running **and** no expected pipeline outputs have been produced.
This scenario can occur legitimately. For example, in pipelines with mutually exclusive branches where some components are intentionally blocked. To help avoid false positives, the check ensures that none of the expected outputs (as defined by `Pipeline().outputs()`) have been generated during the current run.
1 change: 1 addition & 0 deletions test/core/pipeline/features/pipeline_run.feature
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Feature: Pipeline running
| that has components returning dataframes |
| where a single component connects multiple sockets to the same receiver socket |
| where a component in a cycle provides inputs for a component outside the cycle in one iteration and no input in another iteration |
| that is blocked because not enough component inputs |

Scenario Outline: Running a bad Pipeline
Given a pipeline <kind>
Expand Down
Loading
Loading