diff --git a/haystack/core/pipeline/__init__.py b/haystack/core/pipeline/__init__.py index c398e22223..c530491ed3 100644 --- a/haystack/core/pipeline/__init__.py +++ b/haystack/core/pipeline/__init__.py @@ -3,6 +3,14 @@ # SPDX-License-Identifier: Apache-2.0 from .async_pipeline import AsyncPipeline +from .benchmark import AsyncPipelineBenchmark, BenchmarkConfig, PipelineBenchmark, PipelineBenchmarkResult from .pipeline import Pipeline -__all__ = ["AsyncPipeline", "Pipeline"] +__all__ = [ + "AsyncPipeline", + "AsyncPipelineBenchmark", + "BenchmarkConfig", + "Pipeline", + "PipelineBenchmark", + "PipelineBenchmarkResult", +] diff --git a/haystack/core/pipeline/benchmark.py b/haystack/core/pipeline/benchmark.py new file mode 100644 index 0000000000..51b0e57580 --- /dev/null +++ b/haystack/core/pipeline/benchmark.py @@ -0,0 +1,326 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import dataclasses +import json +import statistics +import time +from collections.abc import Coroutine, Iterator +from contextlib import contextmanager +from typing import Any + +from haystack import logging +from haystack.core.pipeline.async_pipeline import AsyncPipeline +from haystack.core.pipeline.pipeline import Pipeline +from haystack.lazy_imports import LazyImport +from haystack.tracing import enable_tracing +from haystack.tracing.tracer import Span, Tracer +from haystack.tracing.tracer import tracer as global_tracer + +logger = logging.getLogger(__name__) + +with LazyImport("Run 'pip install tabulate'") as tabulate_import: + from tabulate import tabulate + + +@dataclasses.dataclass +class TimingSpan(Span): + operation_name: str + tags: dict = dataclasses.field(default_factory=dict) + duration_ms: float = 0.0 + + def set_tag(self, key: str, value: Any) -> None: + """Set a tag on the span.""" + self.tags[key] = value + + +class TimingTracer(Tracer): + def __init__(self) -> None: + self.spans: list[TimingSpan] = [] + + def reset(self) -> None: + """Reset collected spans.""" + self.spans = [] + + @contextmanager + def trace( + self, + operation_name: str, + tags: dict | None = None, + parent_span: Span | None = None, # noqa: ARG002 + ) -> Iterator[TimingSpan]: + """Trace execution of a code block.""" + + start = time.perf_counter() + span = TimingSpan(operation_name=operation_name, tags=dict(tags or {})) + try: + yield span + finally: + span.duration_ms = (time.perf_counter() - start) * 1000 + self.spans.append(span) + + def current_span(self) -> Span | None: + """Return the current active span.""" + return None + + def component_spans(self) -> list[TimingSpan]: + """Return spans for component executions.""" + return [s for s in self.spans if s.operation_name == "haystack.component.run"] + + def pipeline_span(self) -> TimingSpan | None: + """Return span for the pipeline execution.""" + return next( + (s for s in self.spans if s.operation_name in ("haystack.pipeline.run", "haystack.async_pipeline.run")), + None, + ) + + +@dataclasses.dataclass +class BenchmarkConfig: + """Configuration for pipeline benchmarking.""" + + runs: int + warmup_runs: int = 0 + + def __post_init__(self) -> None: + if self.runs <= 0: + raise ValueError("BenchmarkConfig.runs must be > 0") + if self.warmup_runs < 0: + raise ValueError("BenchmarkConfig.warmup_runs must be >= 0") + + +@dataclasses.dataclass +class PipelineBenchmarkMetrics: + """Performance metrics for a pipeline or component.""" + + p50: float + p90: float + p99: float + avg: float + total: float + + +@dataclasses.dataclass +class PipelineBenchmarkResult: + """Result of a pipeline benchmark.""" + + pipeline: PipelineBenchmarkMetrics + components: dict[str, PipelineBenchmarkMetrics] + slowest_component: str + fastest_run: float + slowest_run: float + num_runs: int + pipeline_name: str = "Pipeline" + + def _metrics_rows(self) -> list[dict[str, str]]: + """Convert pipeline and component metrics into tabulate-ready rows.""" + + def _fmt(m: PipelineBenchmarkMetrics, name: str) -> dict[str, str]: + return { + "name": name, + "p50": f"{m.p50:.3f} ms", + "p90": f"{m.p90:.3f} ms", + "p99": f"{m.p99:.3f} ms", + "avg": f"{m.avg:.3f} ms", + "total": f"{m.total:.3f} ms", + } + + return [_fmt(self.pipeline, "pipeline"), *(_fmt(m, n) for n, m in self.components.items())] + + def report(self) -> str: + """Generate a human-readable report of the benchmark results using tabulate.""" + tabulate_import.check() + + rows = self._metrics_rows() + pipeline_row, *component_rows = rows + + table = tabulate( + [pipeline_row, {}] + component_rows, headers={k: k for k in pipeline_row}, tablefmt="simple", missingval="" + ) + + summary = "\n".join( + [ + f" Runs : {self.num_runs}", + f" Fastest run : {self.fastest_run:.3f} ms", + f" Slowest run : {self.slowest_run:.3f} ms", + f" Slowest component : {self.slowest_component}", + ] + ) + + sep = "=" * len(table.splitlines()[0]) + return "\n".join([sep, f" {self.pipeline_name} Benchmark Results", sep, "", table, "", summary, sep]) + + def to_json(self) -> str: + """Serialize the benchmark result to JSON format.""" + return json.dumps(dataclasses.asdict(self), indent=2) + + +def _compute_metrics(durations: list[float]) -> PipelineBenchmarkMetrics: + if not durations: + return PipelineBenchmarkMetrics(p50=0.0, p90=0.0, p99=0.0, avg=0.0, total=0.0) + + if len(durations) >= 4: + q = statistics.quantiles(durations, n=100) + p50, p90, p99 = q[49], q[89], q[98] + else: + p50 = statistics.median(durations) + p90 = p99 = max(durations) + + return PipelineBenchmarkMetrics(p50=p50, p90=p90, p99=p99, avg=statistics.mean(durations), total=sum(durations)) + + +class Benchmark: + """ + Abstract base class for benchmarking Haystack pipelines. + + Subclasses implement `run()` for sync or async pipelines. + Shared logic for span collection, result building, and tracing lives here. + """ + + def __init__(self, pipeline: Pipeline | AsyncPipeline, input_data: dict[str, Any], config: BenchmarkConfig) -> None: + """ + Initialize the benchmark. + + :param pipeline: The pipeline to benchmark. + :param input_data: The data to use for the benchmark. + :param config: The benchmark configuration. + """ + self._pipeline = pipeline + self.input_data = input_data + self._config = config + self._tracer = TimingTracer() + + def run(self) -> PipelineBenchmarkResult | Coroutine[Any, Any, PipelineBenchmarkResult]: + """Run the benchmark. Subclasses return either a result or a coroutine.""" + raise NotImplementedError + + def _init_tracking(self) -> tuple[list[str], dict[str, list[float]], list[float]]: + component_names = list(self._pipeline.graph.nodes.keys()) + component_durations: dict[str, list[float]] = {n: [] for n in component_names} + pipeline_durations: list[float] = [] + + return component_names, component_durations, pipeline_durations + + @contextmanager + def _benchmark_tracing(self) -> Iterator[None]: + original_tracer = global_tracer.actual_tracer + enable_tracing(self._tracer) + try: + yield + finally: + global_tracer.actual_tracer = original_tracer + + def _collect_spans(self, component_durations: dict[str, list[float]], pipeline_durations: list[float]) -> None: + for span in self._tracer.component_spans(): + name = span.tags.get("haystack.component.name") + if name and name in component_durations: + component_durations[name].append(span.duration_ms) + + ps = self._tracer.pipeline_span() + if ps: + pipeline_durations.append(ps.duration_ms) + + def _build_result( + self, component_names: list[str], component_durations: dict[str, list[float]], pipeline_durations: list[float] + ) -> PipelineBenchmarkResult: + pipeline_metrics = _compute_metrics(pipeline_durations) + components_metrics: dict[str, PipelineBenchmarkMetrics] = {} + slowest_component = "" + max_avg = 0.0 + + for name in component_names: + metrics = _compute_metrics(component_durations[name]) + components_metrics[name] = metrics + if metrics.avg > max_avg: + max_avg = metrics.avg + slowest_component = name + + fastest_run = min(pipeline_durations) if pipeline_durations else 0.0 + slowest_run = max(pipeline_durations) if pipeline_durations else 0.0 + + pipeline_name = "AsyncPipeline" if isinstance(self._pipeline, AsyncPipeline) else "Pipeline" + + return PipelineBenchmarkResult( + pipeline=pipeline_metrics, + components=components_metrics, + slowest_component=slowest_component, + fastest_run=fastest_run, + slowest_run=slowest_run, + num_runs=self._config.runs, + pipeline_name=pipeline_name, + ) + + +class PipelineBenchmark(Benchmark): + """ + Benchmark a synchronous Haystack Pipeline. + + ```python + pipeline = Pipeline() + input_data = {"input": 1} + benchmark_config = BenchmarkConfig(runs=20, warmup_runs=2) + + benchmark = PipelineBenchmark(pipeline, input_data, benchmark_config) + result = benchmark.run() + + print(result.report()) + ``` + """ + + def __init__(self, pipeline: Pipeline, input_data: dict[str, Any], config: BenchmarkConfig) -> None: + super().__init__(pipeline, input_data, config) + self._pipeline: Pipeline + + def run(self) -> PipelineBenchmarkResult: + """Run the sync benchmark and return the results.""" + component_names, component_durations, pipeline_durations = self._init_tracking() + + self._pipeline.warm_up() + for _ in range(self._config.warmup_runs): + self._pipeline.run(self.input_data) + + with self._benchmark_tracing(): + for _ in range(self._config.runs): + self._tracer.reset() + self._pipeline.run(self.input_data) + self._collect_spans(component_durations, pipeline_durations) + + return self._build_result(component_names, component_durations, pipeline_durations) + + +class AsyncPipelineBenchmark(Benchmark): + """ + Benchmark an asynchronous Haystack AsyncPipeline. + + ```python + pipeline = AsyncPipeline() + input_data = {"input": 1} + benchmark_config = BenchmarkConfig(runs=20) + + benchmark = AsyncPipelineBenchmark(pipeline, input_data, benchmark_config) + result = await benchmark.run() + + print(result.report()) + ``` + """ + + def __init__(self, pipeline: AsyncPipeline, input_data: dict[str, Any], config: BenchmarkConfig) -> None: + super().__init__(pipeline, input_data, config) + self._pipeline: AsyncPipeline + + async def run(self) -> PipelineBenchmarkResult: + """Run the async benchmark and return the results.""" + component_names, component_durations, pipeline_durations = self._init_tracking() + + self._pipeline.warm_up() + for _ in range(self._config.warmup_runs): + await self._pipeline.run_async(self.input_data) + + with self._benchmark_tracing(): + for _ in range(self._config.runs): + self._tracer.reset() + await self._pipeline.run_async(self.input_data) + self._collect_spans(component_durations, pipeline_durations) + + return self._build_result(component_names, component_durations, pipeline_durations) diff --git a/releasenotes/notes/feat-benchmark-pipeline-c62cd3f6a99806f8.yaml b/releasenotes/notes/feat-benchmark-pipeline-c62cd3f6a99806f8.yaml new file mode 100644 index 0000000000..8e9f386f93 --- /dev/null +++ b/releasenotes/notes/feat-benchmark-pipeline-c62cd3f6a99806f8.yaml @@ -0,0 +1,20 @@ +--- +features: + - | + Add a new pipeline benchmarking utility for Haystack Pipelines. This supports both overall pipeline as well as individual component benchmarking. + The benchmarking configuration can reused across different pipelines. We display the benchmark results in both report format (viewer friendly) as well JSON format. + We calculate the p50, p90, p99, avg, and total runtime in ms both pipeline and per component level. Also, we display slowest component, faster/slowest pipeline execution time, number of runs, and other metadata. + + Sample usage: + .. code:: python + from haystack.core.pipeline.benchmark import BenchmarkConfig, PipelineBenchmark + from haystack.pipelines import Pipeline + + pipeline = Pipeline() + input_data = {"value": 1} + config = BenchmarkConfig(runs=3, warmup_runs=1) + + benchmark = PipelineBenchmark(pipeline, input_data, config) + benchmark_result = benchmark.run() + + print(benchmark_result.report()) diff --git a/test/core/pipeline/test_pipeline_benchmark.py b/test/core/pipeline/test_pipeline_benchmark.py new file mode 100644 index 0000000000..220dcf7e7a --- /dev/null +++ b/test/core/pipeline/test_pipeline_benchmark.py @@ -0,0 +1,183 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import dataclasses + +import pytest + +from haystack.core.pipeline import AsyncPipeline, AsyncPipelineBenchmark, BenchmarkConfig, Pipeline, PipelineBenchmark +from haystack.core.pipeline.benchmark import TimingTracer +from haystack.testing.sample_components import AddFixedValue, Double +from haystack.tracing import disable_tracing, enable_tracing, tracer + + +@pytest.fixture +def sample_pipeline() -> Pipeline: + pipeline = Pipeline() + pipeline.add_component("add_two", AddFixedValue(add=2)) + pipeline.add_component("add_default", AddFixedValue()) + pipeline.add_component("double", Double()) + + pipeline.connect("add_two", "double") + pipeline.connect("double", "add_default") + + return pipeline + + +@pytest.fixture +def sample_async_pipeline() -> AsyncPipeline: + pipeline = AsyncPipeline() + pipeline.add_component("add_two", AddFixedValue(add=2)) + pipeline.add_component("add_default", AddFixedValue()) + pipeline.add_component("double", Double()) + + pipeline.connect("add_two", "double") + pipeline.connect("double", "add_default") + + return pipeline + + +@pytest.fixture +def tracing_enabled(): + tracer = TimingTracer() + enable_tracing(tracer) + yield + disable_tracing() + + +class TestPipelineBenchmark: + """Test the PipelineBenchmark class.""" + + def test_init_pipeline_benchmark(self, sample_pipeline): + input_data = {"value": 1} + config = BenchmarkConfig(runs=2, warmup_runs=2) + + benchmark = PipelineBenchmark(sample_pipeline, input_data, config) + + assert isinstance(benchmark, PipelineBenchmark) + assert benchmark._pipeline == sample_pipeline + assert benchmark.input_data == input_data + assert benchmark._config == config + + def test_pipeline_benchmark_result(self, sample_pipeline): + input_data = {"value": 1} + config = BenchmarkConfig(runs=3, warmup_runs=1) + + result = PipelineBenchmark(sample_pipeline, input_data, config).run() + + assert result.num_runs == 3 + assert result.pipeline.total > 0 + assert result.fastest_run <= result.slowest_run + assert set(result.components.keys()) == set(sample_pipeline.graph.nodes.keys()) + assert result.slowest_component in result.components + + def test_component_metrics_structure(self, sample_pipeline): + input_data = {"value": 1} + config = BenchmarkConfig(runs=3) + + result = PipelineBenchmark(sample_pipeline, input_data, config).run() + + for metrics in result.components.values(): + assert metrics.avg >= 0 + assert metrics.total >= metrics.avg + assert metrics.p50 <= metrics.p99 + + def test_zero_benchmark_runs_config(self): + with pytest.raises(ValueError, match="BenchmarkConfig.runs must be > 0"): + BenchmarkConfig(runs=0) + + def test_zero_benchmark_warmup_runs_config(self): + with pytest.raises(ValueError, match="BenchmarkConfig.warmup_runs must be >= 0"): + BenchmarkConfig(runs=1, warmup_runs=-1) + + def test_warmup_runs_not_affect_count(self, sample_pipeline): + input_data = {"value": 1} + config = BenchmarkConfig(runs=2, warmup_runs=3) + + result = PipelineBenchmark(sample_pipeline, input_data, config).run() + + assert result.num_runs == 2 + + def test_pipeline_benchmark_report(self, sample_pipeline): + input_data = {"value": 1} + config = BenchmarkConfig(runs=2) + + result = PipelineBenchmark(sample_pipeline, input_data, config).run() + report = result.report() + + assert "Benchmark Results" in report + assert "pipeline" in report + assert "name" in report + + for name in sample_pipeline.graph.nodes.keys(): + assert name in report + + def test_pipeline_benchmark_to_json(self, sample_pipeline): + input_data = {"value": 1} + config = BenchmarkConfig(runs=2) + + result = PipelineBenchmark(sample_pipeline, input_data, config).run() + + data = dataclasses.asdict(result) + + assert "pipeline" in data + assert "components" in data + assert data["num_runs"] == 2 + + def test_benchmark_multiple_tracer(self, sample_pipeline): + user_tracer = TimingTracer() + enable_tracing(user_tracer) + + original_tracer = tracer.actual_tracer + assert tracer.actual_tracer is user_tracer + + benchmark = PipelineBenchmark(sample_pipeline, {"value": 1}, BenchmarkConfig(runs=1)) + benchmark.run() + + assert tracer.actual_tracer is original_tracer, ( + "Benchmark should restore the original tracer, but it overwrote it permanently." + ) + + +@pytest.mark.asyncio +class TestAsyncPipelineBenchmark: + """Test the AsyncPipelineBenchmark class.""" + + async def test_async_benchmark_result(self, sample_async_pipeline): + input_data = {"value": 1} + config = BenchmarkConfig(runs=3) + + result = await AsyncPipelineBenchmark(sample_async_pipeline, input_data, config).run() + + assert result.num_runs == 3 + assert result.pipeline.total > 0 + assert result.fastest_run <= result.slowest_run + + async def test_async_components(self, sample_async_pipeline): + input_data = {"value": 1} + config = BenchmarkConfig(runs=2) + + result = await AsyncPipelineBenchmark(sample_async_pipeline, input_data, config).run() + + assert set(result.components.keys()) == set(sample_async_pipeline.graph.nodes.keys()) + + async def test_async_report(self, sample_async_pipeline): + input_data = {"value": 1} + config = BenchmarkConfig(runs=2) + + result = await AsyncPipelineBenchmark(sample_async_pipeline, input_data, config).run() + + report = result.report() + + assert "Benchmark Results" in report + + async def test_async_to_json(self, sample_async_pipeline): + input_data = {"value": 1} + config = BenchmarkConfig(runs=2) + + result = await AsyncPipelineBenchmark(sample_async_pipeline, input_data, config).run() + + data = dataclasses.asdict(result) + + assert "pipeline" in data