Skip to content

Commit 3053e2d

Browse files
committed
push
1 parent dea157d commit 3053e2d

3 files changed

Lines changed: 92 additions & 45 deletions

File tree

langfuse/otel/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ def __init__(
115115
timeout=timeout,
116116
environment=environment,
117117
release=release,
118-
debug=debug,
119118
).tracer
120119
if self.tracing_enabled
121120
else otel_trace_api.NoOpTracer()
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import base64
2+
from typing import Optional
3+
4+
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
5+
from opentelemetry.sdk.trace import ReadableSpan
6+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
7+
8+
from langfuse.otel._logger import logger
9+
from langfuse.otel._utils import span_formatter
10+
from langfuse.otel.constants import LANGFUSE_TRACER_NAME
11+
from langfuse.version import __version__ as langfuse_version
12+
13+
14+
class LangfuseSpanProcessor(BatchSpanProcessor):
15+
def __init__(
16+
self,
17+
*,
18+
public_key: str,
19+
secret_key: str,
20+
host: str,
21+
timeout: Optional[int] = None,
22+
):
23+
self.public_key = public_key
24+
25+
langfuse_span_exporter = OTLPSpanExporter(
26+
endpoint=f"{host}/api/public/otel/v1/traces",
27+
headers={
28+
"Authorization": "Basic "
29+
+ base64.b64encode(f"{public_key}:{secret_key}".encode("utf-8")).decode(
30+
"ascii"
31+
),
32+
"x_langfuse_sdk_name": "python",
33+
"x_langfuse_sdk_version": langfuse_version,
34+
"x_langfuse_public_key": public_key,
35+
},
36+
timeout=timeout,
37+
)
38+
39+
super().__init__(span_exporter=langfuse_span_exporter)
40+
41+
def on_end(self, span: ReadableSpan) -> None:
42+
# Only export spans that belong to the scoped project
43+
# This is important to not send spans to wrong project in multi-project setups
44+
if self._is_langfuse_span(span) and not self._is_langfuse_project_span(span):
45+
logger.debug(
46+
f"Skipping span from different project (current processor is for project '{self.public_key}'): {span_formatter(span)}"
47+
)
48+
return
49+
50+
logger.debug(f"Processing span:\n{span_formatter(span)}")
51+
52+
super().on_end(span)
53+
54+
@staticmethod
55+
def _is_langfuse_span(span: ReadableSpan) -> bool:
56+
return (
57+
span.instrumentation_scope is not None
58+
and LANGFUSE_TRACER_NAME in span.instrumentation_scope.name
59+
)
60+
61+
def _is_langfuse_project_span(self, span: ReadableSpan) -> bool:
62+
if not LangfuseSpanProcessor._is_langfuse_span(span):
63+
return False
64+
65+
if span.instrumentation_scope is not None:
66+
public_key_in_span = span.instrumentation_scope.name.split(":")[-1]
67+
68+
return public_key_in_span == self.public_key
69+
70+
return False

langfuse/otel/tracer.py

Lines changed: 22 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,13 @@
1-
import base64
21
import os
32
import threading
4-
from typing import Optional, cast
3+
from typing import Dict, Optional, cast
54

65
from opentelemetry import trace as otel_trace_api
7-
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
86
from opentelemetry.sdk.resources import Resource
97
from opentelemetry.sdk.trace import TracerProvider
10-
from opentelemetry.sdk.trace.export import (
11-
BatchSpanProcessor,
12-
ConsoleSpanExporter,
13-
SimpleSpanProcessor,
14-
)
158

169
from langfuse.environment import get_common_release_envs
17-
from langfuse.otel._utils import span_formatter
10+
from langfuse.otel._span_processor import LangfuseSpanProcessor
1811
from langfuse.otel.attributes import LangfuseSpanAttributes
1912
from langfuse.otel.constants import LANGFUSE_TRACER_NAME
2013
from langfuse.otel.environment_variables import (
@@ -28,7 +21,7 @@
2821
class LangfuseTracer:
2922
"""Singleton that provides access to the OTEL tracer."""
3023

31-
_instance: Optional["LangfuseTracer"] = None
24+
_instances: Dict[str, Optional["LangfuseTracer"]] = {}
3225
_lock = threading.Lock()
3326

3427
def __new__(
@@ -40,74 +33,59 @@ def __new__(
4033
timeout: Optional[int] = None,
4134
environment: Optional[str] = None,
4235
release: Optional[str] = None,
43-
debug: Optional[bool] = False,
4436
):
45-
if cls._instance:
46-
return cls._instance
37+
if public_key in cls._instances:
38+
return cls._instances[public_key]
4739

4840
with cls._lock:
49-
if not cls._instance:
50-
cls._instance = super(LangfuseTracer, cls).__new__(cls)
51-
52-
cls._instance._otel_tracer = None
53-
cls._instance._initialize(
41+
if public_key not in cls._instances:
42+
instance = super(LangfuseTracer, cls).__new__(cls)
43+
instance._otel_tracer = None
44+
instance._initialize_instance(
5445
public_key=public_key,
5546
secret_key=secret_key,
5647
host=host,
5748
timeout=timeout,
5849
environment=environment,
5950
release=release,
60-
debug=debug,
6151
)
6252

63-
return cls._instance
53+
cls._instances[public_key] = instance
54+
55+
return cls._instances[public_key]
6456

65-
def _initialize(
57+
def _initialize_instance(
6658
self,
6759
*,
6860
public_key: str,
6961
secret_key: str,
7062
host: str,
63+
timeout: Optional[int] = None,
7164
environment: Optional[str] = None,
7265
release: Optional[str] = None,
73-
timeout: Optional[int] = None,
74-
debug: Optional[bool] = False,
7566
):
7667
tracer_provider = _init_tracer_provider(
7768
environment=environment, release=release
7869
)
7970

80-
if debug:
81-
console_span_exporter = ConsoleSpanExporter(formatter=span_formatter)
82-
console_span_processor = SimpleSpanProcessor(
83-
span_exporter=console_span_exporter
84-
)
85-
tracer_provider.add_span_processor(console_span_processor)
86-
87-
langfuse_exporter = OTLPSpanExporter(
88-
endpoint=f"{host}/api/public/otel/v1/traces",
89-
headers={
90-
"Authorization": "Basic "
91-
+ base64.b64encode(f"{public_key}:{secret_key}".encode("utf-8")).decode(
92-
"ascii"
93-
),
94-
"x_langfuse_sdk_name": "python",
95-
"x_langfuse_sdk_version": langfuse_version,
96-
"x_langfuse_public_key": public_key,
97-
},
71+
langfuse_processor = LangfuseSpanProcessor(
72+
public_key=public_key,
73+
secret_key=secret_key,
74+
host=host,
9875
timeout=timeout,
9976
)
100-
langfuse_processor = BatchSpanProcessor(span_exporter=langfuse_exporter)
10177
tracer_provider.add_span_processor(langfuse_processor)
10278

103-
self.name = LANGFUSE_TRACER_NAME
79+
tracer_provider = otel_trace_api.get_tracer_provider()
80+
self.name = f"{LANGFUSE_TRACER_NAME}:{public_key}"
10481
self._otel_tracer = tracer_provider.get_tracer(self.name, langfuse_version)
10582

10683
@property
10784
def tracer(self):
10885
return self._otel_tracer
10986

110-
def get_current_span(self):
87+
@staticmethod
88+
def get_current_span():
11189
return otel_trace_api.get_current_span()
11290

11391

0 commit comments

Comments
 (0)