Skip to content

Commit 1415295

Browse files
committed
feat(tracing): add export-stage otel span masking
1 parent 560fbe4 commit 1415295

11 files changed

Lines changed: 1128 additions & 29 deletions

File tree

langfuse/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@
3636
is_known_llm_instrumentor,
3737
is_langfuse_span,
3838
)
39+
from .types import (
40+
MaskOtelSpansFunction,
41+
MaskOtelSpansParams,
42+
MaskOtelSpansResult,
43+
OtelSpanData,
44+
OtelSpanIdentifier,
45+
OtelSpanPatch,
46+
)
3947

4048
Langfuse = _client_module.Langfuse
4149

@@ -71,6 +79,12 @@
7179
"is_genai_span",
7280
"is_known_llm_instrumentor",
7381
"KNOWN_LLM_INSTRUMENTATION_SCOPE_PREFIXES",
82+
"MaskOtelSpansFunction",
83+
"MaskOtelSpansParams",
84+
"MaskOtelSpansResult",
85+
"OtelSpanData",
86+
"OtelSpanIdentifier",
87+
"OtelSpanPatch",
7488
"experiment",
7589
"api",
7690
]

langfuse/_client/client.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,13 @@
132132
PromptClient,
133133
TextPromptClient,
134134
)
135-
from langfuse.types import MaskFunction, ScoreDataType, SpanLevel, TraceContext
135+
from langfuse.types import (
136+
MaskFunction,
137+
MaskOtelSpansFunction,
138+
ScoreDataType,
139+
SpanLevel,
140+
TraceContext,
141+
)
136142

137143

138144
class Langfuse:
@@ -170,6 +176,7 @@ class Langfuse:
170176
media_upload_thread_count (Optional[int]): Number of background threads for handling media uploads. Defaults to 1. Can also be set via LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT environment variable.
171177
sample_rate (Optional[float]): Sampling rate for traces (0.0 to 1.0). Defaults to 1.0 (100% of traces are sampled). Can also be set via LANGFUSE_SAMPLE_RATE environment variable.
172178
mask (Optional[MaskFunction]): Function to mask sensitive data in traces before sending to the API.
179+
mask_otel_spans (Optional[MaskOtelSpansFunction]): Synchronous export-stage hook that receives a batch of OpenTelemetry span snapshots and can return sparse attribute patches before spans are sent to Langfuse. If the hook raises or returns an invalid batch result, the whole export batch is dropped. During flush and shutdown, this hook may run on the caller thread.
173180
blocked_instrumentation_scopes (Optional[List[str]]): Deprecated. Use `should_export_span` instead. Equivalent behavior:
174181
```python
175182
from langfuse.span_filter import is_default_export_span
@@ -226,6 +233,7 @@ class Langfuse:
226233

227234
_resources: Optional[LangfuseResourceManager] = None
228235
_mask: Optional[MaskFunction] = None
236+
_mask_otel_spans: Optional[MaskOtelSpansFunction] = None
229237
_otel_tracer: otel_trace_api.Tracer
230238

231239
def __init__(
@@ -246,6 +254,7 @@ def __init__(
246254
media_upload_thread_count: Optional[int] = None,
247255
sample_rate: Optional[float] = None,
248256
mask: Optional[MaskFunction] = None,
257+
mask_otel_spans: Optional[MaskOtelSpansFunction] = None,
249258
blocked_instrumentation_scopes: Optional[List[str]] = None,
250259
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
251260
additional_headers: Optional[Dict[str, str]] = None,
@@ -342,6 +351,7 @@ def __init__(
342351
media_upload_thread_count=media_upload_thread_count,
343352
sample_rate=sample_rate,
344353
mask=mask,
354+
mask_otel_spans=mask_otel_spans,
345355
tracing_enabled=self._tracing_enabled,
346356
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
347357
should_export_span=should_export_span,
@@ -350,6 +360,7 @@ def __init__(
350360
span_exporter=span_exporter,
351361
)
352362
self._mask = self._resources.mask
363+
self._mask_otel_spans = self._resources.mask_otel_spans
353364

354365
self._otel_tracer = (
355366
self._resources.tracer

langfuse/_client/get_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def _create_client_from_instance(
5050
media_upload_thread_count=instance.media_upload_thread_count,
5151
sample_rate=instance.sample_rate,
5252
mask=instance.mask,
53+
mask_otel_spans=instance.mask_otel_spans,
5354
blocked_instrumentation_scopes=instance.blocked_instrumentation_scopes,
5455
should_export_span=instance.should_export_span,
5556
additional_headers=instance.additional_headers,

langfuse/_client/resource_manager.py

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
from langfuse._utils.request import LangfuseClient
4646
from langfuse.api import AsyncLangfuseAPI, LangfuseAPI
4747
from langfuse.logger import langfuse_logger
48-
from langfuse.types import MaskFunction
48+
from langfuse.types import MaskFunction, MaskOtelSpansFunction
4949

5050
from .._version import __version__ as langfuse_version
5151

@@ -94,6 +94,7 @@ def __new__(
9494
media_upload_thread_count: Optional[int] = None,
9595
sample_rate: Optional[float] = None,
9696
mask: Optional[MaskFunction] = None,
97+
mask_otel_spans: Optional[MaskOtelSpansFunction] = None,
9798
tracing_enabled: Optional[bool] = None,
9899
blocked_instrumentation_scopes: Optional[List[str]] = None,
99100
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
@@ -128,6 +129,7 @@ def __new__(
128129
media_upload_thread_count=media_upload_thread_count,
129130
sample_rate=sample_rate,
130131
mask=mask,
132+
mask_otel_spans=mask_otel_spans,
131133
tracing_enabled=tracing_enabled
132134
if tracing_enabled is not None
133135
else True,
@@ -157,6 +159,7 @@ def _initialize_instance(
157159
httpx_client: Optional[httpx.Client] = None,
158160
sample_rate: Optional[float] = None,
159161
mask: Optional[MaskFunction] = None,
162+
mask_otel_spans: Optional[MaskOtelSpansFunction] = None,
160163
tracing_enabled: bool = True,
161164
blocked_instrumentation_scopes: Optional[List[str]] = None,
162165
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
@@ -169,6 +172,7 @@ def _initialize_instance(
169172
self.tracing_enabled = tracing_enabled
170173
self.base_url = base_url
171174
self.mask = mask
175+
self.mask_otel_spans = mask_otel_spans
172176
self.environment = environment
173177

174178
# Store additional client settings for get_client() to use
@@ -184,33 +188,6 @@ def _initialize_instance(
184188
self.span_exporter = span_exporter
185189
self.tracer_provider: Optional[TracerProvider] = None
186190

187-
# OTEL Tracer
188-
if tracing_enabled:
189-
tracer_provider = tracer_provider or _init_tracer_provider(
190-
environment=environment, release=release, sample_rate=sample_rate
191-
)
192-
self.tracer_provider = tracer_provider
193-
194-
langfuse_processor = LangfuseSpanProcessor(
195-
public_key=self.public_key,
196-
secret_key=secret_key,
197-
base_url=base_url,
198-
timeout=timeout,
199-
flush_at=flush_at,
200-
flush_interval=flush_interval,
201-
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
202-
should_export_span=should_export_span,
203-
additional_headers=additional_headers,
204-
span_exporter=span_exporter,
205-
)
206-
tracer_provider.add_span_processor(langfuse_processor)
207-
208-
self._otel_tracer = tracer_provider.get_tracer(
209-
LANGFUSE_TRACER_NAME,
210-
langfuse_version,
211-
attributes={"public_key": self.public_key},
212-
)
213-
214191
# API Clients
215192

216193
## API clients must be singletons because the underlying HTTPX clients
@@ -266,6 +243,35 @@ def _initialize_instance(
266243
)
267244
self._media_upload_consumers = []
268245

246+
# OTEL Tracer
247+
if tracing_enabled:
248+
tracer_provider = tracer_provider or _init_tracer_provider(
249+
environment=environment, release=release, sample_rate=sample_rate
250+
)
251+
self.tracer_provider = tracer_provider
252+
253+
langfuse_processor = LangfuseSpanProcessor(
254+
public_key=self.public_key,
255+
secret_key=secret_key,
256+
base_url=base_url,
257+
timeout=timeout,
258+
flush_at=flush_at,
259+
flush_interval=flush_interval,
260+
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
261+
should_export_span=should_export_span,
262+
additional_headers=additional_headers,
263+
span_exporter=span_exporter,
264+
media_manager=self._media_manager,
265+
mask_otel_spans=mask_otel_spans,
266+
)
267+
tracer_provider.add_span_processor(langfuse_processor)
268+
269+
self._otel_tracer = tracer_provider.get_tracer(
270+
LANGFUSE_TRACER_NAME,
271+
langfuse_version,
272+
attributes={"public_key": self.public_key},
273+
)
274+
269275
media_upload_thread_count = media_upload_thread_count or max(
270276
int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1
271277
)

0 commit comments

Comments
 (0)