Skip to content

Commit 0b2078e

Browse files
authored
feat(batch-evaluation): add trace tags and roll up scores to trace (#1530)
1 parent 0241803 commit 0b2078e

File tree

3 files changed

+139
-8
lines changed

3 files changed

+139
-8
lines changed

langfuse/_client/client.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
PaginatedDatasetRuns,
8787
)
8888
from langfuse.api.resources.ingestion.types.score_body import ScoreBody
89+
from langfuse.api.resources.ingestion.types.trace_body import TraceBody
8990
from langfuse.api.resources.prompts.types import (
9091
CreatePromptRequest_Chat,
9192
CreatePromptRequest_Text,
@@ -2098,6 +2099,39 @@ def create_score(
20982099
f"Error creating score: Failed to process score event for trace_id={trace_id}, name={name}. Error: {e}"
20992100
)
21002101

2102+
def _create_trace_tags_via_ingestion(
2103+
self,
2104+
*,
2105+
trace_id: str,
2106+
tags: List[str],
2107+
) -> None:
2108+
"""Private helper to enqueue trace tag updates via ingestion API events."""
2109+
if not self._tracing_enabled:
2110+
return
2111+
2112+
if len(tags) == 0:
2113+
return
2114+
2115+
try:
2116+
new_body = TraceBody(
2117+
id=trace_id,
2118+
tags=tags,
2119+
)
2120+
2121+
event = {
2122+
"id": self.create_trace_id(),
2123+
"type": "trace-create",
2124+
"timestamp": _get_timestamp(),
2125+
"body": new_body,
2126+
}
2127+
2128+
if self._resources is not None:
2129+
self._resources.add_trace_task(event)
2130+
except Exception as e:
2131+
langfuse_logger.exception(
2132+
f"Error updating trace tags: Failed to process trace update event for trace_id={trace_id}. Error: {e}"
2133+
)
2134+
21012135
@overload
21022136
def score_current_span(
21032137
self,
@@ -3115,8 +3149,10 @@ def run_batched_evaluation(
31153149
max_retries: int = 3,
31163150
evaluators: List[EvaluatorFunction],
31173151
composite_evaluator: Optional[CompositeEvaluatorFunction] = None,
3118-
max_concurrency: int = 50,
3152+
max_concurrency: int = 5,
31193153
metadata: Optional[Dict[str, Any]] = None,
3154+
_add_observation_scores_to_trace: bool = False,
3155+
_additional_trace_tags: Optional[List[str]] = None,
31203156
resume_from: Optional[BatchEvaluationResumeToken] = None,
31213157
verbose: bool = False,
31223158
) -> BatchEvaluationResult:
@@ -3158,7 +3194,7 @@ def run_batched_evaluation(
31583194
items matching the filter. Useful for testing or limiting evaluation runs.
31593195
Default: None (process all).
31603196
max_concurrency: Maximum number of items to evaluate concurrently. Controls
3161-
parallelism and resource usage. Default: 50.
3197+
parallelism and resource usage. Default: 5.
31623198
composite_evaluator: Optional function that creates a composite score from
31633199
item-level evaluations. Receives the original item and its evaluations,
31643200
returns a single Evaluation. Useful for weighted averages or combined metrics.
@@ -3327,6 +3363,8 @@ def composite_evaluator(*, item, evaluations):
33273363
max_concurrency=max_concurrency,
33283364
composite_evaluator=composite_evaluator,
33293365
metadata=metadata,
3366+
_add_observation_scores_to_trace=_add_observation_scores_to_trace,
3367+
_additional_trace_tags=_additional_trace_tags,
33303368
max_retries=max_retries,
33313369
verbose=verbose,
33323370
resume_from=resume_from,

langfuse/_client/resource_manager.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,29 @@ def add_score_task(self, event: dict, *, force_sample: bool = False) -> None:
351351

352352
return
353353

354+
def add_trace_task(
355+
self,
356+
event: dict,
357+
) -> None:
358+
try:
359+
langfuse_logger.debug(
360+
f"Trace: Enqueuing event type={event['type']} for trace_id={event['body'].id}"
361+
)
362+
self._score_ingestion_queue.put(event, block=False)
363+
364+
except Full:
365+
langfuse_logger.warning(
366+
"System overload: Trace ingestion queue has reached capacity (100,000 items). Trace update will be dropped. Consider increasing flush frequency or decreasing event volume."
367+
)
368+
369+
return
370+
except Exception as e:
371+
langfuse_logger.error(
372+
f"Unexpected error: Failed to process trace event. The trace update will be dropped. Error details: {e}"
373+
)
374+
375+
return
376+
354377
@property
355378
def tracer(self) -> Optional[Tracer]:
356379
return self._otel_tracer

langfuse/batch_evaluation.py

Lines changed: 76 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
List,
1919
Optional,
2020
Protocol,
21+
Set,
2122
Tuple,
2223
Union,
2324
cast,
@@ -849,9 +850,11 @@ async def run_async(
849850
fetch_batch_size: int = 50,
850851
fetch_trace_fields: Optional[str] = None,
851852
max_items: Optional[int] = None,
852-
max_concurrency: int = 50,
853+
max_concurrency: int = 5,
853854
composite_evaluator: Optional[CompositeEvaluatorFunction] = None,
854855
metadata: Optional[Dict[str, Any]] = None,
856+
_add_observation_scores_to_trace: bool = False,
857+
_additional_trace_tags: Optional[List[str]] = None,
855858
max_retries: int = 3,
856859
verbose: bool = False,
857860
resume_from: Optional[BatchEvaluationResumeToken] = None,
@@ -873,6 +876,10 @@ async def run_async(
873876
max_concurrency: Maximum number of concurrent evaluations.
874877
composite_evaluator: Optional function to create composite scores.
875878
metadata: Metadata to add to all created scores.
879+
_add_observation_scores_to_trace: Private option to duplicate
880+
observation-level scores onto the parent trace.
881+
_additional_trace_tags: Private option to add tags on traces via
882+
ingestion trace-create events.
876883
max_retries: Maximum retries for failed batch fetches.
877884
verbose: If True, log progress to console.
878885
resume_from: Resume token from a previous failed run.
@@ -903,6 +910,12 @@ async def run_async(
903910

904911
# Handle resume token by modifying filter
905912
effective_filter = self._build_timestamp_filter(filter, resume_from)
913+
normalized_additional_trace_tags = (
914+
self._dedupe_tags(_additional_trace_tags)
915+
if _additional_trace_tags is not None
916+
else []
917+
)
918+
updated_trace_ids: Set[str] = set()
906919

907920
# Create semaphore for concurrency control
908921
semaphore = asyncio.Semaphore(max_concurrency)
@@ -1011,6 +1024,7 @@ async def process_item(
10111024
evaluators=evaluators,
10121025
composite_evaluator=composite_evaluator,
10131026
metadata=metadata,
1027+
_add_observation_scores_to_trace=_add_observation_scores_to_trace,
10141028
evaluator_stats_dict=evaluator_stats_dict,
10151029
)
10161030
return (item_id, result)
@@ -1043,6 +1057,20 @@ async def process_item(
10431057
# Store evaluations for this item
10441058
item_evaluations[item_id] = evaluations
10451059

1060+
if normalized_additional_trace_tags:
1061+
trace_id = (
1062+
item_id
1063+
if scope == "traces"
1064+
else cast(ObservationsView, item).trace_id
1065+
)
1066+
1067+
if trace_id and trace_id not in updated_trace_ids:
1068+
self.client._create_trace_tags_via_ingestion(
1069+
trace_id=trace_id,
1070+
tags=normalized_additional_trace_tags,
1071+
)
1072+
updated_trace_ids.add(trace_id)
1073+
10461074
# Update last processed tracking
10471075
last_item_timestamp = self._get_item_timestamp(item, scope)
10481076
last_item_id = item_id
@@ -1168,6 +1196,7 @@ async def _process_batch_evaluation_item(
11681196
evaluators: List[EvaluatorFunction],
11691197
composite_evaluator: Optional[CompositeEvaluatorFunction],
11701198
metadata: Optional[Dict[str, Any]],
1199+
_add_observation_scores_to_trace: bool,
11711200
evaluator_stats_dict: Dict[str, EvaluatorStats],
11721201
) -> Tuple[int, int, int, List[Evaluation]]:
11731202
"""Process a single item: map, evaluate, create scores.
@@ -1179,6 +1208,8 @@ async def _process_batch_evaluation_item(
11791208
evaluators: List of evaluator functions.
11801209
composite_evaluator: Optional composite evaluator function.
11811210
metadata: Additional metadata to add to scores.
1211+
_add_observation_scores_to_trace: Whether to duplicate
1212+
observation-level scores at trace level.
11821213
evaluator_stats_dict: Dictionary tracking evaluator statistics.
11831214
11841215
Returns:
@@ -1226,16 +1257,16 @@ async def _process_batch_evaluation_item(
12261257
# Create scores for item-level evaluations
12271258
item_id = self._get_item_id(item, scope)
12281259
for evaluation in evaluations:
1229-
self._create_score_for_scope(
1260+
scores_created += self._create_score_for_scope(
12301261
scope=scope,
12311262
item_id=item_id,
12321263
trace_id=cast(ObservationsView, item).trace_id
12331264
if scope == "observations"
12341265
else None,
12351266
evaluation=evaluation,
12361267
additional_metadata=metadata,
1268+
add_observation_score_to_trace=_add_observation_scores_to_trace,
12371269
)
1238-
scores_created += 1
12391270

12401271
# Run composite evaluator if provided and we have evaluations
12411272
if composite_evaluator and evaluations:
@@ -1251,16 +1282,16 @@ async def _process_batch_evaluation_item(
12511282

12521283
# Create scores for all composite evaluations
12531284
for composite_eval in composite_evals:
1254-
self._create_score_for_scope(
1285+
composite_scores_created += self._create_score_for_scope(
12551286
scope=scope,
12561287
item_id=item_id,
12571288
trace_id=cast(ObservationsView, item).trace_id
12581289
if scope == "observations"
12591290
else None,
12601291
evaluation=composite_eval,
12611292
additional_metadata=metadata,
1293+
add_observation_score_to_trace=_add_observation_scores_to_trace,
12621294
)
1263-
composite_scores_created += 1
12641295

12651296
# Add composite evaluations to the list
12661297
evaluations.extend(composite_evals)
@@ -1382,7 +1413,8 @@ def _create_score_for_scope(
13821413
trace_id: Optional[str] = None,
13831414
evaluation: Evaluation,
13841415
additional_metadata: Optional[Dict[str, Any]],
1385-
) -> None:
1416+
add_observation_score_to_trace: bool = False,
1417+
) -> int:
13861418
"""Create a score linked to the appropriate entity based on scope.
13871419
13881420
Args:
@@ -1391,6 +1423,11 @@ def _create_score_for_scope(
13911423
trace_id: The trace ID of the entity; required if scope=observations
13921424
evaluation: The evaluation result to create a score from.
13931425
additional_metadata: Additional metadata to merge with evaluation metadata.
1426+
add_observation_score_to_trace: Whether to duplicate observation
1427+
score on parent trace as well.
1428+
1429+
Returns:
1430+
Number of score events created.
13941431
"""
13951432
# Merge metadata
13961433
score_metadata = {
@@ -1408,6 +1445,7 @@ def _create_score_for_scope(
14081445
data_type=evaluation.data_type, # type: ignore[arg-type]
14091446
config_id=evaluation.config_id,
14101447
)
1448+
return 1
14111449
elif scope == "observations":
14121450
self.client.create_score(
14131451
observation_id=item_id,
@@ -1419,6 +1457,23 @@ def _create_score_for_scope(
14191457
data_type=evaluation.data_type, # type: ignore[arg-type]
14201458
config_id=evaluation.config_id,
14211459
)
1460+
score_count = 1
1461+
1462+
if add_observation_score_to_trace and trace_id:
1463+
self.client.create_score(
1464+
trace_id=trace_id,
1465+
name=evaluation.name,
1466+
value=evaluation.value, # type: ignore
1467+
comment=evaluation.comment,
1468+
metadata=score_metadata,
1469+
data_type=evaluation.data_type, # type: ignore[arg-type]
1470+
config_id=evaluation.config_id,
1471+
)
1472+
score_count += 1
1473+
1474+
return score_count
1475+
1476+
return 0
14221477

14231478
def _build_timestamp_filter(
14241479
self,
@@ -1519,6 +1574,21 @@ def _get_timestamp_field_for_scope(scope: str) -> str:
15191574
return "start_time"
15201575
return "timestamp" # Default
15211576

1577+
@staticmethod
1578+
def _dedupe_tags(tags: Optional[List[str]]) -> List[str]:
1579+
"""Deduplicate tags while preserving order."""
1580+
if tags is None:
1581+
return []
1582+
1583+
deduped: List[str] = []
1584+
seen = set()
1585+
for tag in tags:
1586+
if tag not in seen:
1587+
deduped.append(tag)
1588+
seen.add(tag)
1589+
1590+
return deduped
1591+
15221592
def _build_result(
15231593
self,
15241594
total_items_fetched: int,

0 commit comments

Comments
 (0)