Skip to content

Commit 7b67d48

Browse files
committed
feat(tracing): add export-stage otel span masking
1 parent 560fbe4 commit 7b67d48

11 files changed

Lines changed: 1206 additions & 30 deletions

File tree

langfuse/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@
3636
is_known_llm_instrumentor,
3737
is_langfuse_span,
3838
)
39+
from .types import (
40+
MaskOtelSpansFunction,
41+
MaskOtelSpansParams,
42+
MaskOtelSpansResult,
43+
OtelSpanData,
44+
OtelSpanIdentifier,
45+
OtelSpanPatch,
46+
)
3947

4048
Langfuse = _client_module.Langfuse
4149

@@ -71,6 +79,12 @@
7179
"is_genai_span",
7280
"is_known_llm_instrumentor",
7381
"KNOWN_LLM_INSTRUMENTATION_SCOPE_PREFIXES",
82+
"MaskOtelSpansFunction",
83+
"MaskOtelSpansParams",
84+
"MaskOtelSpansResult",
85+
"OtelSpanData",
86+
"OtelSpanIdentifier",
87+
"OtelSpanPatch",
7488
"experiment",
7589
"api",
7690
]

langfuse/_client/client.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,13 @@
132132
PromptClient,
133133
TextPromptClient,
134134
)
135-
from langfuse.types import MaskFunction, ScoreDataType, SpanLevel, TraceContext
135+
from langfuse.types import (
136+
MaskFunction,
137+
MaskOtelSpansFunction,
138+
ScoreDataType,
139+
SpanLevel,
140+
TraceContext,
141+
)
136142

137143

138144
class Langfuse:
@@ -169,7 +175,8 @@ class Langfuse:
169175
release (Optional[str]): Release version/hash of your application. Used for grouping analytics by release.
170176
media_upload_thread_count (Optional[int]): Number of background threads for handling media uploads. Defaults to 1. Can also be set via LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT environment variable.
171177
sample_rate (Optional[float]): Sampling rate for traces (0.0 to 1.0). Defaults to 1.0 (100% of traces are sampled). Can also be set via LANGFUSE_SAMPLE_RATE environment variable.
172-
mask (Optional[MaskFunction]): Function to mask sensitive data in traces before sending to the API.
178+
mask (Optional[MaskFunction]): Function to mask sensitive data synchronously when Langfuse SDK attributes are created. This applies only to attributes set through Langfuse SDK APIs.
179+
mask_otel_spans (Optional[MaskOtelSpansFunction]): Synchronous export-stage hook that receives a batch of OpenTelemetry span snapshots and can return sparse attribute patches before spans are sent to Langfuse. This runs after export filtering and affects only spans exported by this Langfuse client, not spans already sent to other observability backends. It usually runs on the OpenTelemetry batch span processor worker thread; during flush and shutdown, it may run on the caller thread. The batch contents are governed by the OpenTelemetry export batch settings and are not guaranteed to contain a complete trace or request. If the hook raises or returns an invalid batch result, the whole export batch is dropped.
173180
blocked_instrumentation_scopes (Optional[List[str]]): Deprecated. Use `should_export_span` instead. Equivalent behavior:
174181
```python
175182
from langfuse.span_filter import is_default_export_span
@@ -246,6 +253,7 @@ def __init__(
246253
media_upload_thread_count: Optional[int] = None,
247254
sample_rate: Optional[float] = None,
248255
mask: Optional[MaskFunction] = None,
256+
mask_otel_spans: Optional[MaskOtelSpansFunction] = None,
249257
blocked_instrumentation_scopes: Optional[List[str]] = None,
250258
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
251259
additional_headers: Optional[Dict[str, str]] = None,
@@ -342,6 +350,7 @@ def __init__(
342350
media_upload_thread_count=media_upload_thread_count,
343351
sample_rate=sample_rate,
344352
mask=mask,
353+
mask_otel_spans=mask_otel_spans,
345354
tracing_enabled=self._tracing_enabled,
346355
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
347356
should_export_span=should_export_span,

langfuse/_client/get_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def _create_client_from_instance(
5050
media_upload_thread_count=instance.media_upload_thread_count,
5151
sample_rate=instance.sample_rate,
5252
mask=instance.mask,
53+
mask_otel_spans=instance.mask_otel_spans,
5354
blocked_instrumentation_scopes=instance.blocked_instrumentation_scopes,
5455
should_export_span=instance.should_export_span,
5556
additional_headers=instance.additional_headers,

langfuse/_client/resource_manager.py

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
from langfuse._utils.request import LangfuseClient
4646
from langfuse.api import AsyncLangfuseAPI, LangfuseAPI
4747
from langfuse.logger import langfuse_logger
48-
from langfuse.types import MaskFunction
48+
from langfuse.types import MaskFunction, MaskOtelSpansFunction
4949

5050
from .._version import __version__ as langfuse_version
5151

@@ -94,6 +94,7 @@ def __new__(
9494
media_upload_thread_count: Optional[int] = None,
9595
sample_rate: Optional[float] = None,
9696
mask: Optional[MaskFunction] = None,
97+
mask_otel_spans: Optional[MaskOtelSpansFunction] = None,
9798
tracing_enabled: Optional[bool] = None,
9899
blocked_instrumentation_scopes: Optional[List[str]] = None,
99100
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
@@ -128,6 +129,7 @@ def __new__(
128129
media_upload_thread_count=media_upload_thread_count,
129130
sample_rate=sample_rate,
130131
mask=mask,
132+
mask_otel_spans=mask_otel_spans,
131133
tracing_enabled=tracing_enabled
132134
if tracing_enabled is not None
133135
else True,
@@ -157,6 +159,7 @@ def _initialize_instance(
157159
httpx_client: Optional[httpx.Client] = None,
158160
sample_rate: Optional[float] = None,
159161
mask: Optional[MaskFunction] = None,
162+
mask_otel_spans: Optional[MaskOtelSpansFunction] = None,
160163
tracing_enabled: bool = True,
161164
blocked_instrumentation_scopes: Optional[List[str]] = None,
162165
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
@@ -169,6 +172,7 @@ def _initialize_instance(
169172
self.tracing_enabled = tracing_enabled
170173
self.base_url = base_url
171174
self.mask = mask
175+
self.mask_otel_spans = mask_otel_spans
172176
self.environment = environment
173177

174178
# Store additional client settings for get_client() to use
@@ -184,33 +188,6 @@ def _initialize_instance(
184188
self.span_exporter = span_exporter
185189
self.tracer_provider: Optional[TracerProvider] = None
186190

187-
# OTEL Tracer
188-
if tracing_enabled:
189-
tracer_provider = tracer_provider or _init_tracer_provider(
190-
environment=environment, release=release, sample_rate=sample_rate
191-
)
192-
self.tracer_provider = tracer_provider
193-
194-
langfuse_processor = LangfuseSpanProcessor(
195-
public_key=self.public_key,
196-
secret_key=secret_key,
197-
base_url=base_url,
198-
timeout=timeout,
199-
flush_at=flush_at,
200-
flush_interval=flush_interval,
201-
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
202-
should_export_span=should_export_span,
203-
additional_headers=additional_headers,
204-
span_exporter=span_exporter,
205-
)
206-
tracer_provider.add_span_processor(langfuse_processor)
207-
208-
self._otel_tracer = tracer_provider.get_tracer(
209-
LANGFUSE_TRACER_NAME,
210-
langfuse_version,
211-
attributes={"public_key": self.public_key},
212-
)
213-
214191
# API Clients
215192

216193
## API clients must be singletons because the underlying HTTPX clients
@@ -266,6 +243,35 @@ def _initialize_instance(
266243
)
267244
self._media_upload_consumers = []
268245

246+
# OTEL Tracer
247+
if tracing_enabled:
248+
tracer_provider = tracer_provider or _init_tracer_provider(
249+
environment=environment, release=release, sample_rate=sample_rate
250+
)
251+
self.tracer_provider = tracer_provider
252+
253+
langfuse_processor = LangfuseSpanProcessor(
254+
public_key=self.public_key,
255+
secret_key=secret_key,
256+
base_url=base_url,
257+
timeout=timeout,
258+
flush_at=flush_at,
259+
flush_interval=flush_interval,
260+
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
261+
should_export_span=should_export_span,
262+
additional_headers=additional_headers,
263+
span_exporter=span_exporter,
264+
media_manager=self._media_manager,
265+
mask_otel_spans=mask_otel_spans,
266+
)
267+
tracer_provider.add_span_processor(langfuse_processor)
268+
269+
self._otel_tracer = tracer_provider.get_tracer(
270+
LANGFUSE_TRACER_NAME,
271+
langfuse_version,
272+
attributes={"public_key": self.public_key},
273+
)
274+
269275
media_upload_thread_count = media_upload_thread_count or max(
270276
int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1
271277
)

0 commit comments

Comments
 (0)