diff --git a/README.md b/README.md index d659e722..2a5d2b2c 100644 --- a/README.md +++ b/README.md @@ -253,6 +253,7 @@ The `storage` list in `system.yaml` selects where results go. You can use **seve |--------|---------| | **File backend** | CSV, JSON, and TXT under `output_dir`; tune formats with `enabled_outputs` and columns with `csv_columns`. | | **Database (optional)** | SQLite, PostgreSQL, or MySQL for querying and analysis; writes are **incremental** per conversation; storage errors are logged as warnings and **do not** abort the evaluation run. | +| **Langfuse (optional)** | Export evaluation scores to [Langfuse](https://langfuse.com) for observability and analytics. Creates one trace per run with per-metric numeric scores. Requires `pip install 'lightspeed-evaluation[langfuse]'`. | For field tables, full YAML examples (file-only, file + SQLite, file + Postgres), CSV column reference, and notes on API token columns, see **[Storage](docs/configuration.md#storage)** in the configuration guide. diff --git a/config/system.yaml b/config/system.yaml index 56e7b3a8..a2fca551 100644 --- a/config/system.yaml +++ b/config/system.yaml @@ -295,6 +295,13 @@ storage: # database: "./eval_results.db" # table_name: "evaluation_results" + # Langfuse backend (optional) - export scores to Langfuse observability platform + # Requires: pip install 'lightspeed-evaluation[langfuse]' + # Credentials via env vars: LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST + # Or provide them inline below: + # - type: "langfuse" + # host: "https://cloud.langfuse.com" + # Visualization settings visualization: figsize: [12, 8] # Graph size (width, height) diff --git a/docs/configuration.md b/docs/configuration.md index 2b1e8fa7..8630a972 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -400,6 +400,53 @@ Save results to a database for querying and analysis. Supports SQLite, PostgreSQ > **Note:** Database storage is incremental - results are saved as each conversation completes. Storage failures are logged as warnings but don't stop the evaluation. +### Langfuse Backend (Optional) +Export evaluation scores to [Langfuse](https://langfuse.com) for observability, analytics, and score tracking. Creates one trace per evaluation run with one numeric score per metric result. + +Requires the Langfuse SDK v4: +```bash +# Using pip +pip install 'lightspeed-evaluation[langfuse]' + +# Using uv +uv sync --extra langfuse +``` + +| Setting (storage[type="langfuse"].) | Default | Description | +|-------------------------------------|---------|-------------| +| type | `"langfuse"` | Backend type (required) | +| host | `null` | Langfuse API host URL (falls back to `LANGFUSE_HOST` env var) | +| public_key | `null` | Langfuse public key (falls back to `LANGFUSE_PUBLIC_KEY` env var) | +| secret_key | `null` | Langfuse secret key (falls back to `LANGFUSE_SECRET_KEY` env var) | + +> **Credentials:** Configure credentials via environment variables (`LANGFUSE_PUBLIC_KEY`, `LANGFUSE_SECRET_KEY`, `LANGFUSE_HOST`) or inline in the YAML config. Environment variables are the recommended approach — inline config fields take precedence when set. + +> **Score handling:** Results with a numeric score (PASS/FAIL) are exported as `NUMERIC` scores. Results without a score (`score=None`, e.g. ERROR/SKIPPED) are skipped. All Langfuse errors are logged but never abort the evaluation. + +### Example: Langfuse via Environment Variables +```yaml +storage: + - type: "file" + output_dir: "./eval_output" + - type: "langfuse" +``` +```bash +export LANGFUSE_PUBLIC_KEY="pk-lf-..." +export LANGFUSE_SECRET_KEY="sk-lf-..." +export LANGFUSE_HOST="https://cloud.langfuse.com" +``` + +### Example: Langfuse with Inline Credentials +```yaml +storage: + - type: "file" + output_dir: "./eval_output" + - type: "langfuse" + host: "https://cloud.langfuse.com" + public_key: "pk-lf-..." + secret_key: "sk-lf-..." +``` + ### Output types | Output type (in `enabled_outputs`) | Description | diff --git a/pyproject.toml b/pyproject.toml index 79531cd8..75b5356e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,15 @@ nlp-metrics = [ "rapidfuzz>=3.0.0,<=3.14.3", # Required for semantic_similarity_distance ] +# Langfuse observability - export evaluation scores to Langfuse +# Install with: +# pip install 'lightspeed-evaluation[langfuse]' +# or +# uv sync --extra langfuse +langfuse = [ + "langfuse>=4.0.0,<5.0.0", +] + [dependency-groups] dev = [ "bandit>=1.7.0,<=1.9.2", diff --git a/src/lightspeed_evaluation/core/storage/__init__.py b/src/lightspeed_evaluation/core/storage/__init__.py index 596a893d..efd7efb1 100644 --- a/src/lightspeed_evaluation/core/storage/__init__.py +++ b/src/lightspeed_evaluation/core/storage/__init__.py @@ -32,6 +32,7 @@ from lightspeed_evaluation.core.storage.config import ( DatabaseBackendConfig, FileBackendConfig, + LangfuseBackendConfig, StorageBackendConfig, ) from lightspeed_evaluation.core.storage.factory import ( @@ -56,6 +57,7 @@ "StorageError", "FileBackendConfig", "DatabaseBackendConfig", + "LangfuseBackendConfig", "StorageBackendConfig", "CompositeStorageBackend", "NoOpStorageBackend", diff --git a/src/lightspeed_evaluation/core/storage/config.py b/src/lightspeed_evaluation/core/storage/config.py index c005b98b..4497d85b 100644 --- a/src/lightspeed_evaluation/core/storage/config.py +++ b/src/lightspeed_evaluation/core/storage/config.py @@ -1,6 +1,6 @@ """Configuration models for storage backends. -Defines Pydantic models for file and database storage configuration. +Defines Pydantic models for file, database, and Langfuse storage configuration. """ from typing import Annotated, Literal, Optional @@ -126,8 +126,39 @@ def validate_connection_fields(self) -> "DatabaseBackendConfig": return self +class LangfuseBackendConfig(BaseModel): + """Configuration for Langfuse observability storage backend. + + Exports evaluation scores to Langfuse as a trace with per-metric scores. + Requires the ``langfuse`` optional extra: ``pip install 'lightspeed-evaluation[langfuse]'`` + + Credentials are resolved from config fields first, then ``LANGFUSE_PUBLIC_KEY``, + ``LANGFUSE_SECRET_KEY``, and ``LANGFUSE_HOST`` environment variables as fallback. + + Example: + - type: "langfuse" + host: "https://cloud.langfuse.com" + """ + + model_config = ConfigDict(extra="forbid") + + type: Literal["langfuse"] = "langfuse" + host: Optional[str] = Field( + default=None, + description="Langfuse API host URL (falls back to LANGFUSE_HOST env var)", + ) + public_key: Optional[str] = Field( + default=None, + description="Langfuse public key (falls back to LANGFUSE_PUBLIC_KEY env var)", + ) + secret_key: Optional[str] = Field( + default=None, + description="Langfuse secret key (falls back to LANGFUSE_SECRET_KEY env var)", + ) + + # Discriminated union for polymorphic storage configuration StorageBackendConfig = Annotated[ - FileBackendConfig | DatabaseBackendConfig, + FileBackendConfig | DatabaseBackendConfig | LangfuseBackendConfig, Field(discriminator="type"), ] diff --git a/src/lightspeed_evaluation/core/storage/factory.py b/src/lightspeed_evaluation/core/storage/factory.py index c43bb020..889af1b9 100644 --- a/src/lightspeed_evaluation/core/storage/factory.py +++ b/src/lightspeed_evaluation/core/storage/factory.py @@ -14,9 +14,11 @@ from lightspeed_evaluation.core.storage.config import ( DatabaseBackendConfig, FileBackendConfig, + LangfuseBackendConfig, StorageBackendConfig, ) from lightspeed_evaluation.core.storage.file_storage import FileStorageBackend +from lightspeed_evaluation.core.storage.langfuse_storage import LangfuseStorageBackend from lightspeed_evaluation.core.storage.protocol import BaseStorageBackend from lightspeed_evaluation.core.storage.sql_storage import SQLStorageBackend from lightspeed_evaluation.core.system.exceptions import ConfigurationError @@ -127,6 +129,9 @@ def create_pipeline_storage_backend( "File storage entries in ``storage`` require ``system_config`` " "when building the pipeline storage backend." ) + elif isinstance(config, LangfuseBackendConfig): + logger.info("Pipeline storage: langfuse backend") + backends.append(LangfuseStorageBackend(config)) else: raise ConfigurationError( f"Unknown storage backend type: {type(config).__name__!r}" diff --git a/src/lightspeed_evaluation/core/storage/langfuse_storage.py b/src/lightspeed_evaluation/core/storage/langfuse_storage.py new file mode 100644 index 00000000..08a02568 --- /dev/null +++ b/src/lightspeed_evaluation/core/storage/langfuse_storage.py @@ -0,0 +1,258 @@ +"""Langfuse storage backend for evaluation results. + +Implements :class:`~lightspeed_evaluation.core.storage.protocol.BaseStorageBackend` +so Langfuse plugs into the standard pipeline storage lifecycle without any +changes to the runner, API, or pipeline modules. + +Install with: ``pip install 'lightspeed-evaluation[langfuse]'`` + +Requires **Langfuse Python SDK v4** (``langfuse>=4.0.0,<5.0.0``). + +Credentials are resolved from :class:`LangfuseBackendConfig` fields first, +then from ``LANGFUSE_PUBLIC_KEY``, ``LANGFUSE_SECRET_KEY``, and +``LANGFUSE_HOST`` environment variables as fallback (standard Langfuse SDK +behavior). + +Lifecycle: + 1. ``initialize(run_info)`` — creates the Langfuse client. + 2. ``save_run(results)`` — accumulates all results (called per conversation). + 3. ``finalize()`` — creates a trace span, writes scores, and flushes. + 4. ``close()`` — shuts down the client. +""" + +from __future__ import annotations + +import importlib +import logging +from typing import Any, Optional + +from lightspeed_evaluation.core.models.data import EvaluationData, EvaluationResult +from lightspeed_evaluation.core.storage.config import LangfuseBackendConfig +from lightspeed_evaluation.core.storage.protocol import RunInfo + +logger = logging.getLogger(__name__) + +_HAS_LANGFUSE = importlib.util.find_spec("langfuse") is not None + + +class LangfuseStorageBackend: + """Storage backend that exports evaluation results to Langfuse. + + Creates one Langfuse trace (observation span) per evaluation run and + one score per evaluation result via ``create_score()``. + Results with ``score=None`` (ERROR/SKIPPED) are skipped from numeric + scoring but their status is logged. + + Uses the Langfuse Python SDK v4 API: + ``start_as_current_observation()``, ``create_score()``, ``flush()``. + + All Langfuse SDK errors are caught and logged — they never fail + the evaluation pipeline. + """ + + def __init__(self, config: LangfuseBackendConfig) -> None: + """Initialize the Langfuse storage backend. + + Args: + config: Langfuse backend configuration with optional host, + public_key, and secret_key fields. + """ + self._config = config + self._client: Any = None + self._run_info: Optional[RunInfo] = None + self._results: list[EvaluationResult] = [] + + @property + def backend_name(self) -> str: + """Return the name of this storage backend.""" + return "langfuse" + + def initialize(self, run_info: RunInfo) -> None: + """Create the Langfuse client for this run.""" + self._run_info = run_info + self._results = [] + + if not _HAS_LANGFUSE: + logger.error( + "langfuse is not installed. " + "Add: pip install 'lightspeed-evaluation[langfuse]'" + ) + return + + langfuse_mod = importlib.import_module("langfuse") + + kwargs = self._build_client_kwargs() + try: + self._client = langfuse_mod.Langfuse(**kwargs) + except (RuntimeError, ValueError, OSError, ConnectionError): + logger.exception("langfuse: failed to initialize client") + self._client = None + + def save_result(self, result: EvaluationResult) -> None: + """Accumulate a single result for batch export at finalize.""" + self._results.append(result) + + def save_run(self, results: list[EvaluationResult]) -> None: + """Accumulate conversation results for batch export at finalize.""" + self._results.extend(results) + + def set_evaluation_context( + self, evaluation_data: Optional[list[EvaluationData]] = None + ) -> None: + """No-op — Langfuse export does not need the full evaluation dataset.""" + _ = evaluation_data + + def finalize(self) -> None: + """Create a trace span, write all scores, and flush to Langfuse.""" + if self._client is None: + return + + if not self._results: + logger.info("langfuse: no results to report; skipping") + return + + try: + self._write_trace_and_scores() + except (RuntimeError, ValueError, OSError, ConnectionError): + logger.exception("langfuse: failed to write trace and scores") + + def close(self) -> None: + """Shut down the Langfuse client.""" + if self._client is not None: + try: + self._client.shutdown() + except (RuntimeError, OSError, ConnectionError): + logger.debug("langfuse: shutdown raised; ignoring") + self._client = None + + def _build_client_kwargs(self) -> dict[str, Any]: + """Build keyword arguments for the Langfuse constructor.""" + kwargs: dict[str, Any] = {} + if self._config.public_key: + kwargs["public_key"] = self._config.public_key + if self._config.secret_key: + kwargs["secret_key"] = self._config.secret_key + if self._config.host: + kwargs["host"] = self._config.host.strip() + return kwargs + + def _write_trace_and_scores(self) -> None: + """Create one trace span and emit one score per result row. + + Uses the v4 observation-centric API: + - ``start_as_current_observation()`` to create the trace span + - ``create_score()`` to attach scores by trace_id + - ``flush()`` to ensure all events are sent + """ + run_name = self._run_info.name if self._run_info else "evaluation" + trace_name = _truncate(f"lightspeed_eval__{run_name}", 256) + + trace_meta: dict[str, Any] = { + "run_name": run_name, + "result_count": len(self._results), + "rows_preview": self._build_rows_preview(), + } + + with self._client.start_as_current_observation( + name=trace_name, + as_type="span", + metadata=trace_meta, + ) as span: + trace_id = span.trace_id + + for r in self._results: + if r.score is None: + logger.debug( + "langfuse: skipping score for %s " + "(status=%s, no numeric score)", + r.metric_identifier, + r.result, + ) + continue + + self._client.create_score( + trace_id=trace_id, + name=_truncate(r.metric_identifier, 200), + value=float(r.score), + data_type="NUMERIC", + comment=_format_comment(r), + metadata=_build_score_metadata(r), + ) + + self._client.flush() + + def _build_rows_preview(self) -> list[dict[str, Any]]: + """Build a compact preview of the first 50 rows for trace metadata.""" + preview: list[dict[str, Any]] = [] + for i, r in enumerate(self._results[:50]): + preview.append( + { + "idx": i, + "conversation_group_id": r.conversation_group_id, + "turn_id": r.turn_id or "", + "metric": r.metric_identifier, + "result": r.result, + "score": r.score, + } + ) + return preview + + +def _format_comment(r: EvaluationResult) -> str: + """Build a human-readable comment for a Langfuse score entry.""" + parts: list[str] = [ + f"result={r.result}", + f"conversation_group_id={r.conversation_group_id}", + f"turn_id={r.turn_id or ''}", + ] + if r.reason: + max_reason = 1200 + reason = ( + r.reason + if len(r.reason) <= max_reason + else r.reason[: max_reason - 3] + "..." + ) + parts.append(f"reason={reason}") + return " | ".join(parts) + + +def _build_score_metadata(r: EvaluationResult) -> dict[str, Any]: + """Build per-score metadata mirroring evaluation CSV fields.""" + max_text = 8000 + return { + "query": _truncate(r.query, max_text) if r.query else "", + "response": _truncate(r.response, max_text) if r.response else "", + "conversation_group_id": r.conversation_group_id, + "turn_id": r.turn_id or "", + "tool_calls": _safe_truncate(r.tool_calls, max_text), + "contexts": _safe_truncate(r.contexts, max_text), + "expected_response": _format_expected_response(r.expected_response, max_text), + "expected_intent": _safe_truncate(r.expected_intent, max_text), + "expected_tool_calls": _safe_truncate(r.expected_tool_calls, max_text), + "expected_keywords": _safe_truncate(r.expected_keywords, max_text), + } + + +def _safe_truncate(value: Optional[str], max_len: int) -> str: + """Truncate a nullable string, returning empty string for None.""" + if value is None or not str(value).strip(): + return "" + return _truncate(str(value), max_len) + + +def _format_expected_response(value: str | list[str] | None, max_len: int) -> str: + """Format expected_response which can be a string or list of strings.""" + if value is None: + return "" + if isinstance(value, list): + text = "\n---\n".join(str(x) for x in value) + else: + text = str(value) + return _truncate(text, max_len) + + +def _truncate(s: str, max_len: int) -> str: + """Truncate a string with ellipsis if it exceeds max_len.""" + if len(s) <= max_len: + return s + return s[: max_len - 3] + "..." diff --git a/src/lightspeed_evaluation/core/system/loader.py b/src/lightspeed_evaluation/core/system/loader.py index 7dbb4c4f..ae544c28 100644 --- a/src/lightspeed_evaluation/core/system/loader.py +++ b/src/lightspeed_evaluation/core/system/loader.py @@ -23,6 +23,7 @@ from lightspeed_evaluation.core.storage.config import ( DatabaseBackendConfig, FileBackendConfig, + LangfuseBackendConfig, StorageBackendConfig, ) from lightspeed_evaluation.core.system.exceptions import ConfigurationError @@ -34,7 +35,13 @@ logger = logging.getLogger(__name__) # Supported storage backend types -SUPPORTED_STORAGE_TYPES: tuple[str, ...] = ("file", "sqlite", "postgres", "mysql") +SUPPORTED_STORAGE_TYPES: tuple[str, ...] = ( + "file", + "sqlite", + "postgres", + "mysql", + "langfuse", +) DATABASE_STORAGE_TYPES: tuple[str, ...] = ("sqlite", "postgres", "mysql") @@ -261,6 +268,8 @@ def _parse_storage_config( backends.append(FileBackendConfig(**item)) elif backend_type in DATABASE_STORAGE_TYPES: backends.append(DatabaseBackendConfig(**item)) + elif backend_type == "langfuse": + backends.append(LangfuseBackendConfig(**item)) else: raise ConfigurationError( f"Unknown storage backend type {backend_type!r}. " diff --git a/tests/unit/core/storage/test_langfuse_storage.py b/tests/unit/core/storage/test_langfuse_storage.py new file mode 100644 index 00000000..64fdef7a --- /dev/null +++ b/tests/unit/core/storage/test_langfuse_storage.py @@ -0,0 +1,206 @@ +# pylint: disable=protected-access +"""Tests for Langfuse storage backend.""" + +from typing import Any + +import pytest +from pytest_mock import MockerFixture + +from lightspeed_evaluation.core.models.data import EvaluationResult +from lightspeed_evaluation.core.storage import create_pipeline_storage_backend +from lightspeed_evaluation.core.storage.config import LangfuseBackendConfig +from lightspeed_evaluation.core.storage.langfuse_storage import LangfuseStorageBackend +from lightspeed_evaluation.core.storage.protocol import RunInfo +from lightspeed_evaluation.core.system.loader import ConfigLoader + +_RESULT_DEFAULTS: dict = { + "conversation_group_id": "conv_1", + "turn_id": "turn_1", + "metric_identifier": "ragas:answer_relevancy", + "result": "PASS", + "score": 0.85, + "reason": "Looks good", + "query": "What is OpenShift?", + "response": "OpenShift is a Kubernetes platform.", +} + + +def _make_result(**overrides: Any) -> EvaluationResult: + """Build a minimal EvaluationResult for testing.""" + return EvaluationResult(**{**_RESULT_DEFAULTS, **overrides}) + + +class TestLangfuseStorageBackend: + """Unit tests for LangfuseStorageBackend.""" + + def test_backend_name(self) -> None: + """Backend name is 'langfuse'.""" + backend = LangfuseStorageBackend(LangfuseBackendConfig()) + assert backend.backend_name == "langfuse" + + def test_save_run_accumulates_results(self) -> None: + """save_run extends internal results list.""" + backend = LangfuseStorageBackend(LangfuseBackendConfig()) + backend.save_run([_make_result(), _make_result()]) + assert len(backend._results) == 2 + + def test_initialize_creates_client_with_config(self, mocker: MockerFixture) -> None: + """initialize() creates a Langfuse client with explicit credentials.""" + mock_langfuse_cls = mocker.MagicMock() + mock_module = mocker.MagicMock() + mock_module.Langfuse = mock_langfuse_cls + + mocker.patch( + "lightspeed_evaluation.core.storage.langfuse_storage._HAS_LANGFUSE", + True, + ) + mocker.patch( + "lightspeed_evaluation.core.storage.langfuse_storage.importlib.import_module", + return_value=mock_module, + ) + + config = LangfuseBackendConfig( + host="https://cloud.langfuse.com", + public_key="pk-test", + secret_key="sk-test", + ) + backend = LangfuseStorageBackend(config) + backend.initialize(RunInfo(name="test_run")) + + mock_langfuse_cls.assert_called_once_with( + public_key="pk-test", + secret_key="sk-test", + host="https://cloud.langfuse.com", + ) + assert backend._client is not None + + def test_initialize_logs_error_when_sdk_missing( + self, mocker: MockerFixture, caplog: pytest.LogCaptureFixture + ) -> None: + """initialize() logs error and sets client=None when langfuse not installed.""" + mocker.patch( + "lightspeed_evaluation.core.storage.langfuse_storage._HAS_LANGFUSE", + False, + ) + + backend = LangfuseStorageBackend(LangfuseBackendConfig()) + with caplog.at_level("ERROR"): + backend.initialize(RunInfo(name="test")) + + assert "langfuse is not installed" in caplog.text + assert backend._client is None + + def test_initialize_catches_client_error(self, mocker: MockerFixture) -> None: + """initialize() catches client construction errors gracefully.""" + mock_module = mocker.MagicMock() + mock_module.Langfuse.side_effect = ConnectionError("refused") + + mocker.patch( + "lightspeed_evaluation.core.storage.langfuse_storage._HAS_LANGFUSE", + True, + ) + mocker.patch( + "lightspeed_evaluation.core.storage.langfuse_storage.importlib.import_module", + return_value=mock_module, + ) + + backend = LangfuseStorageBackend(LangfuseBackendConfig()) + backend.initialize(RunInfo(name="test")) + assert backend._client is None + + def test_finalize_creates_trace_and_scores(self, mocker: MockerFixture) -> None: + """finalize() creates a trace span and scores via v4 create_score API.""" + mock_client = mocker.MagicMock() + mock_span = mocker.MagicMock() + mock_span.trace_id = "trace-abc-123" + mock_client.start_as_current_observation.return_value.__enter__ = ( + mocker.MagicMock(return_value=mock_span) + ) + mock_client.start_as_current_observation.return_value.__exit__ = ( + mocker.MagicMock(return_value=False) + ) + + backend = LangfuseStorageBackend(LangfuseBackendConfig()) + backend._client = mock_client + backend._run_info = RunInfo(name="eval_run") + backend._results = [ + _make_result(metric_identifier="ragas:relevancy", score=0.9), + _make_result(metric_identifier="custom:accuracy", score=0.3, result="FAIL"), + ] + + backend.finalize() + + call_kwargs = mock_client.start_as_current_observation.call_args.kwargs + assert call_kwargs["as_type"] == "span" + assert "eval_run" in call_kwargs["name"] + + assert mock_client.create_score.call_count == 2 + first_score = mock_client.create_score.call_args_list[0].kwargs + assert first_score["trace_id"] == "trace-abc-123" + assert first_score["name"] == "ragas:relevancy" + assert first_score["value"] == pytest.approx(0.9) + assert first_score["data_type"] == "NUMERIC" + + mock_client.flush.assert_called_once() + + def test_finalize_skips_none_scores(self, mocker: MockerFixture) -> None: + """finalize() skips results with score=None (ERROR/SKIPPED).""" + mock_client = mocker.MagicMock() + mock_span = mocker.MagicMock() + mock_span.trace_id = "trace-xyz" + mock_client.start_as_current_observation.return_value.__enter__ = ( + mocker.MagicMock(return_value=mock_span) + ) + mock_client.start_as_current_observation.return_value.__exit__ = ( + mocker.MagicMock(return_value=False) + ) + + backend = LangfuseStorageBackend(LangfuseBackendConfig()) + backend._client = mock_client + backend._run_info = RunInfo(name="test") + backend._results = [ + _make_result(score=None, result="ERROR"), + _make_result(score=0.8), + ] + + backend.finalize() + + assert mock_client.create_score.call_count == 1 + + def test_finalize_noop_when_no_client(self) -> None: + """finalize() is a no-op when client failed to initialize.""" + backend = LangfuseStorageBackend(LangfuseBackendConfig()) + backend._client = None + backend._results = [_make_result()] + backend.finalize() + + def test_close_shuts_down_client(self, mocker: MockerFixture) -> None: + """close() calls shutdown and sets client to None.""" + mock_client = mocker.MagicMock() + backend = LangfuseStorageBackend(LangfuseBackendConfig()) + backend._client = mock_client + + backend.close() + + mock_client.shutdown.assert_called_once() + assert backend._client is None + + +class TestLangfuseFactoryAndLoader: + """Integration tests for factory and config loader.""" + + def test_factory_creates_langfuse_backend(self) -> None: + """create_pipeline_storage_backend handles LangfuseBackendConfig.""" + backend = create_pipeline_storage_backend([LangfuseBackendConfig()]) + assert isinstance(backend, LangfuseStorageBackend) + backend.close() + + def test_loader_parses_langfuse_config(self) -> None: + """ConfigLoader._parse_storage_config handles type='langfuse'.""" + loader = ConfigLoader() + configs = loader._parse_storage_config( + [{"type": "langfuse", "host": "https://cloud.langfuse.com"}] + ) + assert len(configs) == 1 + assert isinstance(configs[0], LangfuseBackendConfig) + assert configs[0].host == "https://cloud.langfuse.com"