Skip to content

Commit 3539208

Browse files
committed
feat(batch-evaluation): add trace tags and roll up scores to trace
1 parent 0241803 commit 3539208

3 files changed

Lines changed: 191 additions & 36 deletions

File tree

langfuse/_client/client.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
PaginatedDatasetRuns,
8787
)
8888
from langfuse.api.resources.ingestion.types.score_body import ScoreBody
89+
from langfuse.api.resources.ingestion.types.trace_body import TraceBody
8990
from langfuse.api.resources.prompts.types import (
9091
CreatePromptRequest_Chat,
9192
CreatePromptRequest_Text,
@@ -2098,6 +2099,43 @@ def create_score(
20982099
f"Error creating score: Failed to process score event for trace_id={trace_id}, name={name}. Error: {e}"
20992100
)
21002101

2102+
def _create_trace_tags_via_ingestion(
2103+
self,
2104+
*,
2105+
trace_id: str,
2106+
tags: List[str],
2107+
) -> None:
2108+
"""Private helper to enqueue trace tag updates via ingestion API events."""
2109+
if not self._tracing_enabled:
2110+
return
2111+
2112+
if len(tags) == 0:
2113+
return
2114+
2115+
try:
2116+
new_body = TraceBody(
2117+
id=trace_id,
2118+
tags=tags,
2119+
)
2120+
2121+
event = {
2122+
"id": self.create_trace_id(),
2123+
"type": "trace-create",
2124+
"timestamp": _get_timestamp(),
2125+
"body": new_body,
2126+
}
2127+
2128+
if self._resources is not None:
2129+
self._resources.add_trace_task(
2130+
event,
2131+
trace_id=trace_id,
2132+
force_sample=True,
2133+
)
2134+
except Exception as e:
2135+
langfuse_logger.exception(
2136+
f"Error updating trace tags: Failed to process trace update event for trace_id={trace_id}. Error: {e}"
2137+
)
2138+
21012139
@overload
21022140
def score_current_span(
21032141
self,
@@ -3115,8 +3153,10 @@ def run_batched_evaluation(
31153153
max_retries: int = 3,
31163154
evaluators: List[EvaluatorFunction],
31173155
composite_evaluator: Optional[CompositeEvaluatorFunction] = None,
3118-
max_concurrency: int = 50,
3156+
max_concurrency: int = 5,
31193157
metadata: Optional[Dict[str, Any]] = None,
3158+
_add_observation_scores_to_trace: bool = False,
3159+
_additional_trace_tags: Optional[List[str]] = None,
31203160
resume_from: Optional[BatchEvaluationResumeToken] = None,
31213161
verbose: bool = False,
31223162
) -> BatchEvaluationResult:
@@ -3158,7 +3198,7 @@ def run_batched_evaluation(
31583198
items matching the filter. Useful for testing or limiting evaluation runs.
31593199
Default: None (process all).
31603200
max_concurrency: Maximum number of items to evaluate concurrently. Controls
3161-
parallelism and resource usage. Default: 50.
3201+
parallelism and resource usage. Default: 5.
31623202
composite_evaluator: Optional function that creates a composite score from
31633203
item-level evaluations. Receives the original item and its evaluations,
31643204
returns a single Evaluation. Useful for weighted averages or combined metrics.
@@ -3327,6 +3367,8 @@ def composite_evaluator(*, item, evaluations):
33273367
max_concurrency=max_concurrency,
33283368
composite_evaluator=composite_evaluator,
33293369
metadata=metadata,
3370+
_add_observation_scores_to_trace=_add_observation_scores_to_trace,
3371+
_additional_trace_tags=_additional_trace_tags,
33303372
max_retries=max_retries,
33313373
verbose=verbose,
33323374
resume_from=resume_from,

langfuse/_client/resource_manager.py

Lines changed: 71 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -306,37 +306,49 @@ def reset(cls) -> None:
306306

307307
cls._instances.clear()
308308

309+
def _enqueue_ingestion_task(
310+
self,
311+
*,
312+
event: dict,
313+
trace_id: Optional[str],
314+
sampling_name: str = "ingestion",
315+
force_sample: bool = False,
316+
) -> None:
317+
"""Enqueue ingestion event with trace sampling aligned to the OTel sampler."""
318+
# Sample ingestion events with the same sampler that is used for tracing
319+
tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider())
320+
should_sample = (
321+
force_sample
322+
or isinstance(
323+
tracer_provider, otel_trace_api.ProxyTracerProvider
324+
) # default to in-sample if otel sampler is not available
325+
or (
326+
tracer_provider.sampler.should_sample(
327+
parent_context=None,
328+
trace_id=int(trace_id, 16),
329+
name=sampling_name,
330+
).decision
331+
== Decision.RECORD_AND_SAMPLE
332+
if trace_id is not None
333+
else True
334+
)
335+
)
336+
337+
if should_sample:
338+
self._score_ingestion_queue.put(event, block=False)
339+
309340
def add_score_task(self, event: dict, *, force_sample: bool = False) -> None:
310341
try:
311-
# Sample scores with the same sampler that is used for tracing
312-
tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider())
313-
should_sample = (
314-
force_sample
315-
or isinstance(
316-
tracer_provider, otel_trace_api.ProxyTracerProvider
317-
) # default to in-sample if otel sampler is not available
318-
or (
319-
(
320-
tracer_provider.sampler.should_sample(
321-
parent_context=None,
322-
trace_id=int(event["body"].trace_id, 16),
323-
name="score",
324-
).decision
325-
== Decision.RECORD_AND_SAMPLE
326-
if hasattr(event["body"], "trace_id")
327-
else True
328-
)
329-
if event["body"].trace_id
330-
is not None # do not sample out session / dataset run scores
331-
else True
332-
)
342+
trace_id = event["body"].trace_id
343+
langfuse_logger.debug(
344+
f"Score: Enqueuing event type={event['type']} for trace_id={trace_id} name={event['body'].name} value={event['body'].value}"
345+
)
346+
self._enqueue_ingestion_task(
347+
event=event,
348+
trace_id=trace_id,
349+
sampling_name="score",
350+
force_sample=force_sample,
333351
)
334-
335-
if should_sample:
336-
langfuse_logger.debug(
337-
f"Score: Enqueuing event type={event['type']} for trace_id={event['body'].trace_id} name={event['body'].name} value={event['body'].value}"
338-
)
339-
self._score_ingestion_queue.put(event, block=False)
340352

341353
except Full:
342354
langfuse_logger.warning(
@@ -351,6 +363,37 @@ def add_score_task(self, event: dict, *, force_sample: bool = False) -> None:
351363

352364
return
353365

366+
def add_trace_task(
367+
self,
368+
event: dict,
369+
*,
370+
trace_id: Optional[str],
371+
force_sample: bool = False,
372+
) -> None:
373+
try:
374+
langfuse_logger.debug(
375+
f"Trace: Enqueuing event type={event['type']} for trace_id={trace_id}"
376+
)
377+
self._enqueue_ingestion_task(
378+
event=event,
379+
trace_id=trace_id,
380+
sampling_name="trace",
381+
force_sample=force_sample,
382+
)
383+
384+
except Full:
385+
langfuse_logger.warning(
386+
"System overload: Trace ingestion queue has reached capacity (100,000 items). Trace update will be dropped. Consider increasing flush frequency or decreasing event volume."
387+
)
388+
389+
return
390+
except Exception as e:
391+
langfuse_logger.error(
392+
f"Unexpected error: Failed to process trace event. The trace update will be dropped. Error details: {e}"
393+
)
394+
395+
return
396+
354397
@property
355398
def tracer(self) -> Optional[Tracer]:
356399
return self._otel_tracer

0 commit comments

Comments
 (0)