Skip to content

Commit fe09fd5

Browse files
authored
feat(client): add custom span exporter support (#1618)
1 parent 01887dd commit fe09fd5

File tree

6 files changed

+97
-26
lines changed

6 files changed

+97
-26
lines changed

langfuse/_client/client.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import httpx
3030
from opentelemetry import trace as otel_trace_api
3131
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
32+
from opentelemetry.sdk.trace.export import SpanExporter
3233
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
3334
from opentelemetry.util._decorator import (
3435
_AgnosticContextManager,
@@ -179,8 +180,9 @@ class Langfuse:
179180
)
180181
```
181182
should_export_span (Optional[Callable[[ReadableSpan], bool]]): Callback to decide whether to export a span. If omitted, Langfuse uses the default filter (Langfuse SDK spans, spans with `gen_ai.*` attributes, and known LLM instrumentation scopes).
182-
additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and OTLPSpanExporter requests. These headers will be merged with default headers. Note: If httpx_client is provided, additional_headers must be set directly on your custom httpx_client as well.
183+
additional_headers (Optional[Dict[str, str]]): Additional headers to include in all API requests and in the default OTLPSpanExporter requests. These headers will be merged with default headers. Note: If httpx_client is provided, additional_headers must be set directly on your custom httpx_client as well. If `span_exporter` is provided, these headers are not wired into that exporter and must be configured on the exporter instance directly.
183184
tracer_provider(Optional[TracerProvider]): OpenTelemetry TracerProvider to use for Langfuse. This can be useful to set to have disconnected tracing between Langfuse and other OpenTelemetry-span emitting libraries. Note: To track active spans, the context is still shared between TracerProviders. This may lead to broken trace trees.
185+
span_exporter (Optional[SpanExporter]): Custom OpenTelemetry span exporter for the Langfuse span processor. If omitted, Langfuse creates an OTLPSpanExporter pointed at the Langfuse OTLP endpoint. If provided, Langfuse does not wire `base_url`, exporter headers, exporter auth, or exporter timeout into it. Configure endpoint, headers, and timeout on the exporter instance directly. If you are sending spans to Langfuse v4 or using Langfuse Cloud Fast Preview, include `x-langfuse-ingestion-version=4` on the exporter to enable real time processing of exported spans.
184186
185187
Example:
186188
```python
@@ -244,6 +246,7 @@ def __init__(
244246
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
245247
additional_headers: Optional[Dict[str, str]] = None,
246248
tracer_provider: Optional[TracerProvider] = None,
249+
span_exporter: Optional[SpanExporter] = None,
247250
):
248251
self._base_url = (
249252
base_url
@@ -340,6 +343,7 @@ def __init__(
340343
should_export_span=should_export_span,
341344
additional_headers=additional_headers,
342345
tracer_provider=tracer_provider,
346+
span_exporter=span_exporter,
343347
)
344348
self._mask = self._resources.mask
345349

langfuse/_client/get_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def _create_client_from_instance(
5454
should_export_span=instance.should_export_span,
5555
additional_headers=instance.additional_headers,
5656
tracer_provider=instance.tracer_provider,
57+
span_exporter=instance.span_exporter,
5758
httpx_client=instance.httpx_client,
5859
)
5960

langfuse/_client/resource_manager.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from opentelemetry import trace as otel_trace_api
2525
from opentelemetry.sdk.resources import Resource
2626
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
27+
from opentelemetry.sdk.trace.export import SpanExporter
2728
from opentelemetry.sdk.trace.sampling import Decision, TraceIdRatioBased
2829
from opentelemetry.trace import Tracer
2930

@@ -98,6 +99,7 @@ def __new__(
9899
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
99100
additional_headers: Optional[Dict[str, str]] = None,
100101
tracer_provider: Optional[TracerProvider] = None,
102+
span_exporter: Optional[SpanExporter] = None,
101103
) -> "LangfuseResourceManager":
102104
if public_key in cls._instances:
103105
return cls._instances[public_key]
@@ -133,6 +135,7 @@ def __new__(
133135
should_export_span=should_export_span,
134136
additional_headers=additional_headers,
135137
tracer_provider=tracer_provider,
138+
span_exporter=span_exporter,
136139
)
137140

138141
cls._instances[public_key] = instance
@@ -159,6 +162,7 @@ def _initialize_instance(
159162
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
160163
additional_headers: Optional[Dict[str, str]] = None,
161164
tracer_provider: Optional[TracerProvider] = None,
165+
span_exporter: Optional[SpanExporter] = None,
162166
) -> None:
163167
self.public_key = public_key
164168
self.secret_key = secret_key
@@ -177,6 +181,7 @@ def _initialize_instance(
177181
self.blocked_instrumentation_scopes = blocked_instrumentation_scopes
178182
self.should_export_span = should_export_span
179183
self.additional_headers = additional_headers
184+
self.span_exporter = span_exporter
180185
self.tracer_provider: Optional[TracerProvider] = None
181186

182187
# OTEL Tracer
@@ -196,6 +201,7 @@ def _initialize_instance(
196201
blocked_instrumentation_scopes=blocked_instrumentation_scopes,
197202
should_export_span=should_export_span,
198203
additional_headers=additional_headers,
204+
span_exporter=span_exporter,
199205
)
200206
tracer_provider.add_span_processor(langfuse_processor)
201207

langfuse/_client/span_processor.py

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from opentelemetry.context import Context
2020
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
2121
from opentelemetry.sdk.trace import ReadableSpan, Span
22-
from opentelemetry.sdk.trace.export import BatchSpanProcessor
22+
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
2323
from opentelemetry.trace import format_span_id
2424

2525
from langfuse._client.environment_variables import (
@@ -63,6 +63,7 @@ def __init__(
6363
blocked_instrumentation_scopes: Optional[List[str]] = None,
6464
should_export_span: Optional[Callable[[ReadableSpan], bool]] = None,
6565
additional_headers: Optional[Dict[str, str]] = None,
66+
span_exporter: Optional[SpanExporter] = None,
6667
):
6768
self.public_key = public_key
6869
self.blocked_instrumentation_scopes = (
@@ -82,37 +83,38 @@ def __init__(
8283
else None
8384
)
8485

85-
basic_auth_header = "Basic " + base64.b64encode(
86-
f"{public_key}:{secret_key}".encode("utf-8")
87-
).decode("ascii")
86+
if span_exporter is None:
87+
basic_auth_header = "Basic " + base64.b64encode(
88+
f"{public_key}:{secret_key}".encode("utf-8")
89+
).decode("ascii")
8890

89-
# Prepare default headers
90-
default_headers = {
91-
"Authorization": basic_auth_header,
92-
"x-langfuse-sdk-name": "python",
93-
"x-langfuse-sdk-version": langfuse_version,
94-
"x-langfuse-public-key": public_key,
95-
}
91+
# Prepare default headers
92+
default_headers = {
93+
"Authorization": basic_auth_header,
94+
"x-langfuse-sdk-name": "python",
95+
"x-langfuse-sdk-version": langfuse_version,
96+
"x-langfuse-public-key": public_key,
97+
}
9698

97-
# Merge additional headers if provided
98-
headers = {**default_headers, **(additional_headers or {})}
99+
# Merge additional headers if provided
100+
headers = {**default_headers, **(additional_headers or {})}
99101

100-
traces_export_path = os.environ.get(LANGFUSE_OTEL_TRACES_EXPORT_PATH, None)
102+
traces_export_path = os.environ.get(LANGFUSE_OTEL_TRACES_EXPORT_PATH, None)
101103

102-
endpoint = (
103-
f"{base_url}/{traces_export_path}"
104-
if traces_export_path
105-
else f"{base_url}/api/public/otel/v1/traces"
106-
)
104+
endpoint = (
105+
f"{base_url}/{traces_export_path}"
106+
if traces_export_path
107+
else f"{base_url}/api/public/otel/v1/traces"
108+
)
107109

108-
langfuse_span_exporter = OTLPSpanExporter(
109-
endpoint=endpoint,
110-
headers=headers,
111-
timeout=timeout,
112-
)
110+
span_exporter = OTLPSpanExporter(
111+
endpoint=endpoint,
112+
headers=headers,
113+
timeout=timeout,
114+
)
113115

114116
super().__init__(
115-
span_exporter=langfuse_span_exporter,
117+
span_exporter=span_exporter,
116118
export_timeout_millis=timeout * 1_000 if timeout else None,
117119
max_export_batch_size=flush_at,
118120
schedule_delay_millis=flush_interval * 1_000

tests/test_additional_headers_simple.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,25 @@
33
This module tests that additional headers are properly configured in the HTTP clients.
44
"""
55

6+
from typing import Sequence
7+
68
import httpx
9+
from opentelemetry.sdk.trace import ReadableSpan
10+
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
711

812
from langfuse._client.client import Langfuse
913

1014

15+
class NoOpSpanExporter(SpanExporter):
16+
"""Minimal exporter used to verify custom exporter injection."""
17+
18+
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
19+
return SpanExportResult.SUCCESS
20+
21+
def shutdown(self) -> None:
22+
pass
23+
24+
1125
class TestAdditionalHeadersSimple:
1226
"""Simple test suite for additional_headers functionality."""
1327

@@ -196,3 +210,19 @@ def test_span_processor_none_additional_headers_works(self):
196210
assert "Authorization" in exporter._headers
197211
assert "x-langfuse-sdk-name" in exporter._headers
198212
assert "x-langfuse-public-key" in exporter._headers
213+
214+
def test_span_processor_uses_custom_span_exporter_when_provided(self):
215+
"""Test that a custom exporter bypasses the default OTLP exporter construction."""
216+
from langfuse._client.span_processor import LangfuseSpanProcessor
217+
218+
custom_exporter = NoOpSpanExporter()
219+
220+
processor = LangfuseSpanProcessor(
221+
public_key="test-public-key",
222+
secret_key="test-secret-key",
223+
base_url="https://mock-host.com",
224+
additional_headers={"X-Custom-Trace-Header": "trace-value"},
225+
span_exporter=custom_exporter,
226+
)
227+
228+
assert processor.span_exporter is custom_exporter

tests/test_resource_manager.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,25 @@
11
"""Test the LangfuseResourceManager and get_client() function."""
22

3+
from typing import Sequence
4+
5+
from opentelemetry.sdk.trace import ReadableSpan
6+
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
7+
38
from langfuse import Langfuse
49
from langfuse._client.get_client import get_client
510
from langfuse._client.resource_manager import LangfuseResourceManager
611

712

13+
class NoOpSpanExporter(SpanExporter):
14+
"""Minimal exporter used to verify configuration propagation."""
15+
16+
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
17+
return SpanExportResult.SUCCESS
18+
19+
def shutdown(self) -> None:
20+
pass
21+
22+
823
def test_get_client_preserves_all_settings():
924
"""Test that get_client() preserves environment and all client settings."""
1025
with LangfuseResourceManager._lock:
@@ -13,14 +28,19 @@ def test_get_client_preserves_all_settings():
1328
def should_export(span):
1429
return span.name != "drop"
1530

31+
span_exporter = NoOpSpanExporter()
32+
1633
settings = {
34+
"public_key": "pk-comprehensive",
35+
"secret_key": "sk-comprehensive",
1736
"environment": "test-env",
1837
"release": "v1.2.3",
1938
"timeout": 30,
2039
"flush_at": 100,
2140
"sample_rate": 0.8,
2241
"should_export_span": should_export,
2342
"additional_headers": {"X-Custom": "value"},
43+
"span_exporter": span_exporter,
2444
}
2545

2646
original_client = Langfuse(**settings)
@@ -36,6 +56,7 @@ def should_export(span):
3656
assert rm.sample_rate == settings["sample_rate"]
3757
assert rm.should_export_span is should_export
3858
assert rm.additional_headers == settings["additional_headers"]
59+
assert rm.span_exporter is span_exporter
3960

4061
original_client.shutdown()
4162

@@ -49,6 +70,9 @@ def should_export_a(span):
4970
def should_export_b(span):
5071
return span.name.startswith("b")
5172

73+
exporter_a = NoOpSpanExporter()
74+
exporter_b = NoOpSpanExporter()
75+
5276
# Settings for client A
5377
settings_a = {
5478
"public_key": "pk-comprehensive-a",
@@ -58,6 +82,7 @@ def should_export_b(span):
5882
"timeout": 10,
5983
"sample_rate": 0.5,
6084
"should_export_span": should_export_a,
85+
"span_exporter": exporter_a,
6186
}
6287

6388
# Settings for client B
@@ -69,6 +94,7 @@ def should_export_b(span):
6994
"timeout": 20,
7095
"sample_rate": 0.9,
7196
"should_export_span": should_export_b,
97+
"span_exporter": exporter_b,
7298
}
7399

74100
client_a = Langfuse(**settings_a)
@@ -91,6 +117,8 @@ def should_export_b(span):
91117
assert retrieved_b._resources.release == settings_b["release"]
92118
assert retrieved_a._resources.should_export_span is should_export_a
93119
assert retrieved_b._resources.should_export_span is should_export_b
120+
assert retrieved_a._resources.span_exporter is exporter_a
121+
assert retrieved_b._resources.span_exporter is exporter_b
94122

95123
client_a.shutdown()
96124
client_b.shutdown()

0 commit comments

Comments
 (0)