Skip to content

Commit 126e6e2

Browse files
[LEADS-362] Add Langfuse as a storage backend
Integrate Langfuse as a pluggable storage backend using the existing Changes: - Add LangfuseBackendConfig to core/storage/config.py - Wire langfuse type in core/system/loader.py - Add LangfuseBackendConfig handling in core/storage/factory.py - Add LangfuseStorageBackend in core/storage/langfuse_storage.py - Add langfuse optional dependency to pyproject.toml - Add langfuse config example to config/system.yaml Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent e8b5f96 commit 126e6e2

7 files changed

Lines changed: 309 additions & 3 deletions

File tree

config/system.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,13 @@ storage:
295295
# database: "./eval_results.db"
296296
# table_name: "evaluation_results"
297297

298+
# Langfuse backend (optional) - export scores to Langfuse observability platform
299+
# Requires: pip install 'lightspeed-evaluation[langfuse]'
300+
# Credentials via env vars: LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST
301+
# Or provide them inline below:
302+
# - type: "langfuse"
303+
# host: "https://cloud.langfuse.com"
304+
298305
# Visualization settings
299306
visualization:
300307
figsize: [12, 8] # Graph size (width, height)

pyproject.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ nlp-metrics = [
5353
"rapidfuzz>=3.0.0,<=3.14.3", # Required for semantic_similarity_distance
5454
]
5555

56+
# Langfuse observability - export evaluation scores to Langfuse
57+
# Install with:
58+
# pip install 'lightspeed-evaluation[langfuse]'
59+
# or
60+
# uv sync --extra langfuse
61+
langfuse = [
62+
"langfuse>=2.0.0,<3.0.0",
63+
]
64+
5665
[dependency-groups]
5766
dev = [
5867
"bandit>=1.7.0,<=1.9.2",

src/lightspeed_evaluation/core/storage/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from lightspeed_evaluation.core.storage.config import (
3333
DatabaseBackendConfig,
3434
FileBackendConfig,
35+
LangfuseBackendConfig,
3536
StorageBackendConfig,
3637
)
3738
from lightspeed_evaluation.core.storage.factory import (
@@ -56,6 +57,7 @@
5657
"StorageError",
5758
"FileBackendConfig",
5859
"DatabaseBackendConfig",
60+
"LangfuseBackendConfig",
5961
"StorageBackendConfig",
6062
"CompositeStorageBackend",
6163
"NoOpStorageBackend",

src/lightspeed_evaluation/core/storage/config.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Configuration models for storage backends.
22
3-
Defines Pydantic models for file and database storage configuration.
3+
Defines Pydantic models for file, database, and Langfuse storage configuration.
44
"""
55

66
from typing import Annotated, Literal, Optional
@@ -126,8 +126,39 @@ def validate_connection_fields(self) -> "DatabaseBackendConfig":
126126
return self
127127

128128

129+
class LangfuseBackendConfig(BaseModel):
130+
"""Configuration for Langfuse observability storage backend.
131+
132+
Exports evaluation scores to Langfuse as a trace with per-metric scores.
133+
Requires the ``langfuse`` optional extra: ``pip install 'lightspeed-evaluation[langfuse]'``
134+
135+
Credentials are resolved from config fields first, then ``LANGFUSE_PUBLIC_KEY``,
136+
``LANGFUSE_SECRET_KEY``, and ``LANGFUSE_HOST`` environment variables as fallback.
137+
138+
Example:
139+
- type: "langfuse"
140+
host: "https://cloud.langfuse.com"
141+
"""
142+
143+
model_config = ConfigDict(extra="forbid")
144+
145+
type: Literal["langfuse"] = "langfuse"
146+
host: Optional[str] = Field(
147+
default=None,
148+
description="Langfuse API host URL (falls back to LANGFUSE_HOST env var)",
149+
)
150+
public_key: Optional[str] = Field(
151+
default=None,
152+
description="Langfuse public key (falls back to LANGFUSE_PUBLIC_KEY env var)",
153+
)
154+
secret_key: Optional[str] = Field(
155+
default=None,
156+
description="Langfuse secret key (falls back to LANGFUSE_SECRET_KEY env var)",
157+
)
158+
159+
129160
# Discriminated union for polymorphic storage configuration
130161
StorageBackendConfig = Annotated[
131-
FileBackendConfig | DatabaseBackendConfig,
162+
FileBackendConfig | DatabaseBackendConfig | LangfuseBackendConfig,
132163
Field(discriminator="type"),
133164
]

src/lightspeed_evaluation/core/storage/factory.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
from lightspeed_evaluation.core.storage.config import (
1515
DatabaseBackendConfig,
1616
FileBackendConfig,
17+
LangfuseBackendConfig,
1718
StorageBackendConfig,
1819
)
1920
from lightspeed_evaluation.core.storage.file_storage import FileStorageBackend
21+
from lightspeed_evaluation.core.storage.langfuse_storage import LangfuseStorageBackend
2022
from lightspeed_evaluation.core.storage.protocol import BaseStorageBackend
2123
from lightspeed_evaluation.core.storage.sql_storage import SQLStorageBackend
2224
from lightspeed_evaluation.core.system.exceptions import ConfigurationError
@@ -127,6 +129,9 @@ def create_pipeline_storage_backend(
127129
"File storage entries in ``storage`` require ``system_config`` "
128130
"when building the pipeline storage backend."
129131
)
132+
elif isinstance(config, LangfuseBackendConfig):
133+
logger.info("Pipeline storage: langfuse backend")
134+
backends.append(LangfuseStorageBackend(config))
130135
else:
131136
raise ConfigurationError(
132137
f"Unknown storage backend type: {type(config).__name__!r}"
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
"""Langfuse storage backend for evaluation results.
2+
3+
Implements :class:`~lightspeed_evaluation.core.storage.protocol.BaseStorageBackend`
4+
so Langfuse plugs into the standard pipeline storage lifecycle without any
5+
changes to the runner, API, or pipeline modules.
6+
7+
Install with: ``pip install 'lightspeed-evaluation[langfuse]'``
8+
9+
Credentials are resolved from :class:`LangfuseBackendConfig` fields first,
10+
then from ``LANGFUSE_PUBLIC_KEY``, ``LANGFUSE_SECRET_KEY``, and
11+
``LANGFUSE_HOST`` environment variables as fallback (standard Langfuse SDK
12+
behavior).
13+
14+
Lifecycle:
15+
1. ``initialize(run_info)`` — creates the Langfuse client and trace.
16+
2. ``save_run(results)`` — accumulates all results (called per conversation).
17+
3. ``finalize()`` — writes scores to the trace and flushes.
18+
4. ``close()`` — shuts down the client.
19+
"""
20+
21+
from __future__ import annotations
22+
23+
import importlib
24+
import logging
25+
from typing import Any, Optional
26+
27+
from lightspeed_evaluation.core.models.data import EvaluationData, EvaluationResult
28+
from lightspeed_evaluation.core.storage.config import LangfuseBackendConfig
29+
from lightspeed_evaluation.core.storage.protocol import RunInfo
30+
31+
logger = logging.getLogger(__name__)
32+
33+
_HAS_LANGFUSE = importlib.util.find_spec("langfuse") is not None
34+
35+
36+
class LangfuseStorageBackend:
37+
"""Storage backend that exports evaluation results to Langfuse.
38+
39+
Creates one Langfuse trace per evaluation run with one score per
40+
evaluation result. Results with ``score=None`` (ERROR/SKIPPED) are
41+
skipped from numeric scoring but their status is logged.
42+
43+
All Langfuse SDK errors are caught and logged — they never fail
44+
the evaluation pipeline.
45+
"""
46+
47+
def __init__(self, config: LangfuseBackendConfig) -> None:
48+
"""Initialize the Langfuse storage backend.
49+
50+
Args:
51+
config: Langfuse backend configuration with optional host,
52+
public_key, and secret_key fields.
53+
"""
54+
self._config = config
55+
self._client: Any = None
56+
self._trace: Any = None
57+
self._run_info: Optional[RunInfo] = None
58+
self._results: list[EvaluationResult] = []
59+
60+
@property
61+
def backend_name(self) -> str:
62+
"""Return the name of this storage backend."""
63+
return "langfuse"
64+
65+
def initialize(self, run_info: RunInfo) -> None:
66+
"""Create the Langfuse client and a trace for this run."""
67+
self._run_info = run_info
68+
self._results = []
69+
70+
if not _HAS_LANGFUSE:
71+
logger.error(
72+
"langfuse is not installed. "
73+
"Add: pip install 'lightspeed-evaluation[langfuse]'"
74+
)
75+
return
76+
77+
langfuse_mod = importlib.import_module("langfuse")
78+
79+
kwargs = self._build_client_kwargs()
80+
try:
81+
self._client = langfuse_mod.Langfuse(**kwargs)
82+
except (RuntimeError, ValueError, OSError, ConnectionError):
83+
logger.exception("langfuse: failed to initialize client")
84+
self._client = None
85+
86+
def save_result(self, result: EvaluationResult) -> None:
87+
"""Accumulate a single result for batch export at finalize."""
88+
self._results.append(result)
89+
90+
def save_run(self, results: list[EvaluationResult]) -> None:
91+
"""Accumulate conversation results for batch export at finalize."""
92+
self._results.extend(results)
93+
94+
def set_evaluation_context(
95+
self, evaluation_data: Optional[list[EvaluationData]] = None
96+
) -> None:
97+
"""No-op — Langfuse export does not need the full evaluation dataset."""
98+
_ = evaluation_data
99+
100+
def finalize(self) -> None:
101+
"""Create the trace, write all scores, and flush to Langfuse."""
102+
if self._client is None:
103+
return
104+
105+
if not self._results:
106+
logger.info("langfuse: no results to report; skipping")
107+
return
108+
109+
try:
110+
self._write_trace_and_scores()
111+
except (RuntimeError, ValueError, OSError, ConnectionError):
112+
logger.exception("langfuse: failed to write trace and scores")
113+
114+
def close(self) -> None:
115+
"""Shut down the Langfuse client."""
116+
if self._client is not None:
117+
try:
118+
self._client.shutdown()
119+
except (RuntimeError, OSError, ConnectionError):
120+
logger.debug("langfuse: shutdown raised; ignoring")
121+
self._client = None
122+
123+
def _build_client_kwargs(self) -> dict[str, Any]:
124+
"""Build keyword arguments for the Langfuse constructor."""
125+
kwargs: dict[str, Any] = {}
126+
if self._config.public_key:
127+
kwargs["public_key"] = self._config.public_key
128+
if self._config.secret_key:
129+
kwargs["secret_key"] = self._config.secret_key
130+
if self._config.host:
131+
kwargs["host"] = self._config.host.strip()
132+
return kwargs
133+
134+
def _write_trace_and_scores(self) -> None:
135+
"""Create one trace and emit one score per result row."""
136+
run_name = self._run_info.name if self._run_info else "evaluation"
137+
138+
trace_meta: dict[str, Any] = {
139+
"run_name": run_name,
140+
"result_count": len(self._results),
141+
"rows_preview": self._build_rows_preview(),
142+
}
143+
144+
self._trace = self._client.trace(
145+
name=_truncate(f"lightspeed_eval__{run_name}", 256),
146+
metadata=trace_meta,
147+
)
148+
149+
for r in self._results:
150+
if r.score is None:
151+
logger.debug(
152+
"langfuse: skipping score for %s (status=%s, no numeric score)",
153+
r.metric_identifier,
154+
r.result,
155+
)
156+
continue
157+
158+
self._trace.score(
159+
name=_truncate(r.metric_identifier, 200),
160+
value=float(r.score),
161+
comment=_format_comment(r),
162+
metadata=_build_score_metadata(r),
163+
)
164+
165+
self._client.flush()
166+
167+
def _build_rows_preview(self) -> list[dict[str, Any]]:
168+
"""Build a compact preview of the first 50 rows for trace metadata."""
169+
preview: list[dict[str, Any]] = []
170+
for i, r in enumerate(self._results[:50]):
171+
preview.append(
172+
{
173+
"idx": i,
174+
"conversation_group_id": r.conversation_group_id,
175+
"turn_id": r.turn_id or "",
176+
"metric": r.metric_identifier,
177+
"result": r.result,
178+
"score": r.score,
179+
}
180+
)
181+
return preview
182+
183+
184+
def _format_comment(r: EvaluationResult) -> str:
185+
"""Build a human-readable comment for a Langfuse score entry."""
186+
parts: list[str] = [
187+
f"result={r.result}",
188+
f"conversation_group_id={r.conversation_group_id}",
189+
f"turn_id={r.turn_id or ''}",
190+
]
191+
if r.reason:
192+
max_reason = 1200
193+
reason = (
194+
r.reason
195+
if len(r.reason) <= max_reason
196+
else r.reason[: max_reason - 3] + "..."
197+
)
198+
parts.append(f"reason={reason}")
199+
return " | ".join(parts)
200+
201+
202+
def _build_score_metadata(r: EvaluationResult) -> dict[str, Any]:
203+
"""Build per-score metadata mirroring evaluation CSV fields."""
204+
max_text = 8000
205+
return {
206+
"query": _truncate(r.query, max_text) if r.query else "",
207+
"response": _truncate(r.response, max_text) if r.response else "",
208+
"conversation_group_id": r.conversation_group_id,
209+
"turn_id": r.turn_id or "",
210+
"tool_calls": _safe_truncate(r.tool_calls, max_text),
211+
"contexts": _safe_truncate(r.contexts, max_text),
212+
"expected_response": _format_expected_response(r.expected_response, max_text),
213+
"expected_intent": _safe_truncate(r.expected_intent, max_text),
214+
"expected_tool_calls": _safe_truncate(r.expected_tool_calls, max_text),
215+
"expected_keywords": _safe_truncate(r.expected_keywords, max_text),
216+
}
217+
218+
219+
def _safe_truncate(value: Optional[str], max_len: int) -> str:
220+
"""Truncate a nullable string, returning empty string for None."""
221+
if value is None or not str(value).strip():
222+
return ""
223+
return _truncate(str(value), max_len)
224+
225+
226+
def _format_expected_response(
227+
value: str | list[str] | None, max_len: int
228+
) -> str:
229+
"""Format expected_response which can be a string or list of strings."""
230+
if value is None:
231+
return ""
232+
if isinstance(value, list):
233+
text = "\n---\n".join(str(x) for x in value)
234+
else:
235+
text = str(value)
236+
return _truncate(text, max_len)
237+
238+
239+
def _truncate(s: str, max_len: int) -> str:
240+
"""Truncate a string with ellipsis if it exceeds max_len."""
241+
if len(s) <= max_len:
242+
return s
243+
return s[: max_len - 3] + "..."

src/lightspeed_evaluation/core/system/loader.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from lightspeed_evaluation.core.storage.config import (
2424
DatabaseBackendConfig,
2525
FileBackendConfig,
26+
LangfuseBackendConfig,
2627
StorageBackendConfig,
2728
)
2829
from lightspeed_evaluation.core.system.exceptions import ConfigurationError
@@ -34,7 +35,13 @@
3435
logger = logging.getLogger(__name__)
3536

3637
# Supported storage backend types
37-
SUPPORTED_STORAGE_TYPES: tuple[str, ...] = ("file", "sqlite", "postgres", "mysql")
38+
SUPPORTED_STORAGE_TYPES: tuple[str, ...] = (
39+
"file",
40+
"sqlite",
41+
"postgres",
42+
"mysql",
43+
"langfuse",
44+
)
3845
DATABASE_STORAGE_TYPES: tuple[str, ...] = ("sqlite", "postgres", "mysql")
3946

4047

@@ -261,6 +268,8 @@ def _parse_storage_config(
261268
backends.append(FileBackendConfig(**item))
262269
elif backend_type in DATABASE_STORAGE_TYPES:
263270
backends.append(DatabaseBackendConfig(**item))
271+
elif backend_type == "langfuse":
272+
backends.append(LangfuseBackendConfig(**item))
264273
else:
265274
raise ConfigurationError(
266275
f"Unknown storage backend type {backend_type!r}. "

0 commit comments

Comments
 (0)