Skip to content

Commit 246bef7

Browse files
authored
feat(span-processor): mark app root spans (#1651)
1 parent b59a611 commit 246bef7

9 files changed

Lines changed: 743 additions & 66 deletions

File tree

langfuse/_client/attributes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class LangfuseOtelSpanAttributes:
5959

6060
# Internal
6161
AS_ROOT = "langfuse.internal.as_root"
62+
IS_APP_ROOT = "langfuse.internal.is_app_root"
6263

6364
# Experiments
6465
EXPERIMENT_ID = "langfuse.experiment.id"

langfuse/_client/client.py

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import backoff
2929
import httpx
30+
from opentelemetry import context as otel_context_api
3031
from opentelemetry import trace as otel_trace_api
3132
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
3233
from opentelemetry.sdk.trace.export import SpanExporter
@@ -66,7 +67,9 @@
6667
)
6768
from langfuse._client.propagation import (
6869
PropagatedExperimentAttributes,
70+
_detach_context_token_safely,
6971
_propagate_attributes,
72+
_set_langfuse_trace_id_in_baggage,
7073
)
7174
from langfuse._client.resource_manager import LangfuseResourceManager
7275
from langfuse._client.span import (
@@ -1178,39 +1181,54 @@ def _start_as_current_otel_span_with_processed_media(
11781181
name=name,
11791182
end_on_exit=end_on_exit if end_on_exit is not None else True,
11801183
) as otel_span:
1184+
baggage_token = None
1185+
1186+
if otel_span.is_recording():
1187+
context_with_app_root_claim = _set_langfuse_trace_id_in_baggage(
1188+
trace_id=self._get_otel_trace_id(otel_span),
1189+
context=otel_context_api.get_current(),
1190+
)
1191+
baggage_token = otel_context_api.attach(context_with_app_root_claim)
1192+
11811193
span_class = self._get_span_class(
11821194
as_type or "generation"
11831195
) # default was "generation"
1184-
common_args = {
1185-
"otel_span": otel_span,
1186-
"langfuse_client": self,
1187-
"environment": self._environment,
1188-
"release": self._release,
1189-
"input": input,
1190-
"output": output,
1191-
"metadata": metadata,
1192-
"version": version,
1193-
"level": level,
1194-
"status_message": status_message,
1195-
}
11961196

1197-
if span_class in [
1198-
LangfuseGeneration,
1199-
LangfuseEmbedding,
1200-
]:
1201-
common_args.update(
1202-
{
1203-
"completion_start_time": completion_start_time,
1204-
"model": model,
1205-
"model_parameters": model_parameters,
1206-
"usage_details": usage_details,
1207-
"cost_details": cost_details,
1208-
"prompt": prompt,
1209-
}
1210-
)
1211-
# For span-like types (span, agent, tool, chain, retriever, evaluator, guardrail), no generation properties needed
1197+
try:
1198+
common_args = {
1199+
"otel_span": otel_span,
1200+
"langfuse_client": self,
1201+
"environment": self._environment,
1202+
"release": self._release,
1203+
"input": input,
1204+
"output": output,
1205+
"metadata": metadata,
1206+
"version": version,
1207+
"level": level,
1208+
"status_message": status_message,
1209+
}
1210+
1211+
if span_class in [
1212+
LangfuseGeneration,
1213+
LangfuseEmbedding,
1214+
]:
1215+
common_args.update(
1216+
{
1217+
"completion_start_time": completion_start_time,
1218+
"model": model,
1219+
"model_parameters": model_parameters,
1220+
"usage_details": usage_details,
1221+
"cost_details": cost_details,
1222+
"prompt": prompt,
1223+
}
1224+
)
1225+
# For span-like types (span, agent, tool, chain, retriever, evaluator, guardrail), no generation properties needed
1226+
1227+
yield span_class(**common_args) # type: ignore[arg-type]
12121228

1213-
yield span_class(**common_args) # type: ignore[arg-type]
1229+
finally:
1230+
if baggage_token is not None:
1231+
_detach_context_token_safely(baggage_token)
12141232

12151233
def _get_current_otel_span(self) -> Optional[otel_trace_api.Span]:
12161234
current_span = otel_trace_api.get_current_span()

langfuse/_client/propagation.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,9 @@ def _get_propagated_attributes_from_context(
316316
# Handle baggage
317317
baggage_entries = baggage.get_all(context=context)
318318
for baggage_key, baggage_value in baggage_entries.items():
319+
if baggage_key == LANGFUSE_TRACE_ID_BAGGAGE_KEY:
320+
continue
321+
319322
if baggage_key.startswith(LANGFUSE_BAGGAGE_PREFIX):
320323
span_key = _get_span_key_from_baggage_key(baggage_key)
321324

@@ -471,12 +474,44 @@ def _get_propagated_context_key(key: str) -> str:
471474

472475

473476
LANGFUSE_BAGGAGE_PREFIX = "langfuse_"
477+
LANGFUSE_TRACE_ID_BAGGAGE_KEY = "langfuse_trace_id"
474478

475479

476480
def _get_propagated_baggage_key(key: str) -> str:
477481
return f"{LANGFUSE_BAGGAGE_PREFIX}{key}"
478482

479483

484+
def _get_langfuse_trace_id_from_baggage(
485+
context: otel_context_api.Context,
486+
) -> Optional[str]:
487+
value = otel_baggage_api.get_baggage(
488+
name=LANGFUSE_TRACE_ID_BAGGAGE_KEY,
489+
context=context,
490+
)
491+
492+
if value is None:
493+
return None
494+
495+
return str(value).lower()
496+
497+
498+
def _set_langfuse_trace_id_in_baggage(
499+
*,
500+
trace_id: str,
501+
context: otel_context_api.Context,
502+
) -> otel_context_api.Context:
503+
normalized_trace_id = trace_id.lower()
504+
505+
if _get_langfuse_trace_id_from_baggage(context) == normalized_trace_id:
506+
return context
507+
508+
return otel_baggage_api.set_baggage(
509+
name=LANGFUSE_TRACE_ID_BAGGAGE_KEY,
510+
value=normalized_trace_id,
511+
context=context,
512+
)
513+
514+
480515
def _get_span_key_from_baggage_key(key: str) -> Optional[str]:
481516
if not key.startswith(LANGFUSE_BAGGAGE_PREFIX):
482517
return None

0 commit comments

Comments
 (0)