Skip to content

Commit 55172d5

Browse files
committed
add masking
1 parent 8903d80 commit 55172d5

3 files changed

Lines changed: 71 additions & 49 deletions

File tree

langfuse/otel/__init__.py

Lines changed: 58 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
)
2828
from langfuse.utils import _get_timestamp
2929

30-
from ..types import MapValue, ScoreDataType, SpanLevel
30+
from ..types import MapValue, MaskFunction, ScoreDataType, SpanLevel
3131
from ._logger import langfuse_logger
3232
from ._tracer import LangfuseTracer
3333

@@ -49,7 +49,7 @@ def __init__(
4949
release: Optional[str] = None,
5050
media_upload_thread_count: Optional[int] = None,
5151
sample_rate: Optional[float] = None,
52-
# mask: Optional[MaskFunction] = None, # TODO: implement masking
52+
mask: Optional[MaskFunction] = None,
5353
# sdk_integration: Optional[str] = "default", -> TO BE DEPRECATED
5454
# threads: Optional[int] = None, -> TO BE DEPRECATED
5555
# max_retries: Optional[int] = None, -> TO BE DEPRECATED
@@ -89,6 +89,8 @@ def __init__(
8989
if not self.tracing_enabled:
9090
langfuse_logger.info("Langfuse tracing is disabled")
9191

92+
self._mask = mask
93+
9294
# Initialize api and tracer if requirements are met
9395
self.langfuse_tracer = LangfuseTracer(
9496
public_key=public_key,
@@ -187,7 +189,7 @@ def start_span(
187189

188190
# Process media only if span is sampled
189191
if span.is_recording:
190-
self._process_media_span_attributes(
192+
self._set_processed_span_attributes(
191193
span=span,
192194
as_type=as_type,
193195
input=input,
@@ -313,7 +315,7 @@ def _start_as_current_span_with_processed_media(
313315
) as span:
314316
# Process media only if span is sampled
315317
if span.is_recording():
316-
self._process_media_span_attributes(
318+
self._set_processed_span_attributes(
317319
span=span,
318320
as_type=as_type,
319321
input=input,
@@ -323,7 +325,7 @@ def _start_as_current_span_with_processed_media(
323325

324326
yield span
325327

326-
def _process_media_span_attributes(
328+
def _set_processed_span_attributes(
327329
self,
328330
*,
329331
span: otel_trace_api.Span,
@@ -332,39 +334,61 @@ def _process_media_span_attributes(
332334
output: Optional[Any] = None,
333335
metadata: Optional[Any] = None,
334336
):
335-
media_processed_input = self._process_media_attribute(
337+
processed_input = self._process_media_and_apply_mask(
336338
span=span,
337339
data=input,
338340
field="input",
339341
)
340-
media_processed_output = self._process_media_attribute(
342+
processed_output = self._process_media_and_apply_mask(
341343
span=span,
342344
data=output,
343345
field="output",
344346
)
345-
media_processed_metadata = self._process_media_attribute(
347+
processed_metadata = self._process_media_and_apply_mask(
346348
span=span,
347349
data=metadata,
348350
field="metadata",
349351
)
350352

351353
media_processed_attributes = (
352354
create_generation_attributes(
353-
input=media_processed_input,
354-
output=media_processed_output,
355-
metadata=media_processed_metadata,
355+
input=processed_input,
356+
output=processed_output,
357+
metadata=processed_metadata,
356358
)
357359
if as_type == "generation"
358360
else create_span_attributes(
359-
input=media_processed_input,
360-
output=media_processed_output,
361-
metadata=media_processed_metadata,
361+
input=processed_input,
362+
output=processed_output,
363+
metadata=processed_metadata,
362364
)
363365
)
364366

365367
span.set_attributes(media_processed_attributes)
366368

367-
def _process_media_attribute(
369+
def _process_media_and_apply_mask(
370+
self,
371+
*,
372+
data: Optional[Any] = None,
373+
span: otel_trace_api.Span,
374+
field: Union[Literal["input"], Literal["output"], Literal["metadata"]],
375+
):
376+
return self._mask_attribute(
377+
data=self._process_media_in_attribute(data=data, span=span, field=field)
378+
)
379+
380+
def _mask_attribute(self, *, data):
381+
if not self._mask:
382+
return data
383+
384+
try:
385+
return self._mask(data=data)
386+
except Exception as e:
387+
langfuse_logger.error(f"Mask function failed with error: {e}")
388+
389+
return "<fully masked due to failed mask function>"
390+
391+
def _process_media_in_attribute(
368392
self,
369393
*,
370394
data: Optional[Any] = None,
@@ -387,8 +411,7 @@ def _process_media_attribute(
387411

388412
return media_processed_attribute
389413

390-
@staticmethod # TODO: reconsider marking methods as static as changing object method later is breaking change
391-
def get_current_span() -> Optional[otel_trace_api.Span]:
414+
def get_current_span(self) -> Optional[otel_trace_api.Span]:
392415
current_span = otel_trace_api.get_current_span()
393416

394417
if current_span is otel_trace_api.INVALID_SPAN:
@@ -488,13 +511,13 @@ def update_span(
488511
cost_details: Optional[Dict[str, float]] = None,
489512
prompt: Optional[PromptClient] = None,
490513
):
491-
media_processed_input = self._process_media_attribute(
514+
processed_input = self._process_media_and_apply_mask(
492515
data=input, field="input", span=span
493516
)
494-
media_processed_output = self._process_media_attribute(
517+
processed_output = self._process_media_and_apply_mask(
495518
data=output, field="output", span=span
496519
)
497-
media_processed_metadata = self._process_media_attribute(
520+
processed_metadata = self._process_media_and_apply_mask(
498521
data=metadata, field="metadata", span=span
499522
)
500523

@@ -509,9 +532,9 @@ def update_span(
509532
],
510533
):
511534
attributes = create_generation_attributes(
512-
input=media_processed_input,
513-
output=media_processed_output,
514-
metadata=media_processed_metadata,
535+
input=processed_input,
536+
output=processed_output,
537+
metadata=processed_metadata,
515538
version=version,
516539
level=level,
517540
status_message=status_message,
@@ -524,9 +547,9 @@ def update_span(
524547
)
525548
else:
526549
attributes = create_span_attributes(
527-
input=media_processed_input,
528-
output=media_processed_output,
529-
metadata=media_processed_metadata,
550+
input=processed_input,
551+
output=processed_output,
552+
metadata=processed_metadata,
530553
version=version,
531554
level=level,
532555
status_message=status_message,
@@ -552,13 +575,13 @@ def update_trace(
552575
tags: Optional[List[str]] = None,
553576
public: Optional[bool] = None,
554577
):
555-
media_processed_input = self._process_media_attribute(
578+
media_processed_input = self._process_media_and_apply_mask(
556579
data=input, field="input", span=span
557580
)
558-
media_processed_output = self._process_media_attribute(
581+
media_processed_output = self._process_media_and_apply_mask(
559582
data=output, field="output", span=span
560583
)
561-
media_processed_metadata = self._process_media_attribute(
584+
media_processed_metadata = self._process_media_and_apply_mask(
562585
data=metadata, field="metadata", span=span
563586
)
564587

@@ -593,24 +616,20 @@ def _create_remote_parent_span(*, trace_id: str, parent_span_id: Optional[str]):
593616

594617
return trace.NonRecordingSpan(span_context)
595618

596-
@staticmethod
597-
def create_span_id() -> str:
619+
def create_span_id(self) -> str:
598620
span_id_int = RandomIdGenerator().generate_span_id()
599621

600-
return Langfuse._format_span_id(span_id_int)
622+
return self._format_span_id(span_id_int)
601623

602-
@staticmethod
603-
def create_trace_id() -> str:
624+
def create_trace_id(self) -> str:
604625
trace_id_int = RandomIdGenerator().generate_trace_id()
605626

606-
return Langfuse._format_trace_id(trace_id_int)
627+
return self._format_trace_id(trace_id_int)
607628

608-
@staticmethod
609-
def _format_span_id(span_id_int: int) -> str:
629+
def _format_span_id(self, span_id_int: int) -> str:
610630
return format(span_id_int, "016x")
611631

612-
@staticmethod
613-
def _format_trace_id(trace_id_int: int) -> str:
632+
def _format_trace_id(self, trace_id_int: int) -> str:
614633
return format(trace_id_int, "032x")
615634

616635
@overload

langfuse/otel/_span_processor/__init__.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,17 @@ def __init__(
3737
f"{public_key}:{secret_key}".encode("utf-8")
3838
).decode("ascii")
3939

40-
langfuse_span_exporter = OTLPSpanExporter(
41-
endpoint=f"{host}/api/public/otel/v1/traces",
42-
headers={
43-
"Authorization": basic_auth_header,
44-
"x_langfuse_sdk_name": "python",
45-
"x_langfuse_sdk_version": langfuse_version,
46-
"x_langfuse_public_key": public_key,
47-
},
48-
timeout=timeout,
40+
langfuse_span_exporter = (
41+
OTLPSpanExporter( # TODO: allow passing cert similar as in httpx client
42+
endpoint=f"{host}/api/public/otel/v1/traces",
43+
headers={
44+
"Authorization": basic_auth_header,
45+
"x_langfuse_sdk_name": "python",
46+
"x_langfuse_sdk_version": langfuse_version,
47+
"x_langfuse_public_key": public_key,
48+
},
49+
timeout=timeout,
50+
)
4951
)
5052

5153
super().__init__(

langfuse/types/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import Any, Dict, List, Literal, Optional, Protocol, TypedDict, Union
55

66
from pydantic import BaseModel
7+
78
from langfuse.api import MediaContentType, UsageDetails
89
from langfuse.model import MapValue, ModelUsage, PromptClient
910

@@ -49,7 +50,7 @@ class MaskFunction(Protocol):
4950
The masked data that must be serializable to JSON.
5051
"""
5152

52-
def __call__(self, *, data: Any) -> Any: ...
53+
def __call__(self, *, data: Any, **kwargs: Dict[str, Any]) -> Any: ...
5354

5455

5556
class ParsedMediaReference(TypedDict):

0 commit comments

Comments
 (0)