Skip to content

Commit 02ea61c

Browse files
fix(langfuse): fix trace name bug and address Copilot review comments
- Remove `TraceMetadata` import (removed in Langfuse v4) - Use `langfuse.propagate_attributes(trace_name=...)` in `LangfuseTracer.trace()` via ExitStack so trace-level metadata covers the full span lifetime (fixes trace name showing as component name 'tracer' instead of pipeline name) - Remove `update_trace()` calls from `create_span()` and `handle()` (method removed in v4); replace pipeline IO call with `set_trace_io()` - Use `span.raw_span().set_trace_as_public()` instead of `span._span...` to avoid accessing private attribute (Copilot comment 2) - Add `set_trace_io` and `set_trace_as_public` to MockSpan; add `MockPropagateAttributesContextManager` helper - Add unit test for `public=True` trace path (Copilot comment 4)
1 parent 10d90f5 commit 02ea61c

2 files changed

Lines changed: 124 additions & 84 deletions

File tree

integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py

Lines changed: 85 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import langfuse
2424
from langfuse import LangfuseSpan as LangfuseClientSpan
25-
from langfuse.types import TraceContext, TraceMetadata
25+
from langfuse.types import TraceContext
2626

2727
logger = logging.getLogger(__name__)
2828

@@ -308,22 +308,8 @@ def create_span(self, context: SpanContext) -> LangfuseSpan:
308308
# Create LangfuseSpan which will handle entering the context manager
309309
span = LangfuseSpan(span_context_manager)
310310

311-
# Build trace metadata from context
312-
trace_metadata: TraceMetadata = {
313-
"name": context.trace_name,
314-
"user_id": tracing_ctx.get("user_id"),
315-
"session_id": tracing_ctx.get("session_id"),
316-
"version": tracing_ctx.get("version"),
317-
"metadata": None,
318-
"tags": tracing_ctx.get("tags"),
319-
"public": context.public,
320-
"release": None,
321-
}
322-
323-
# Filter out None values and apply trace attributes
324-
trace_attrs = {k: v for k, v in trace_metadata.items() if v is not None}
325-
if trace_attrs:
326-
span._span.update_trace(**trace_attrs)
311+
if context.public:
312+
span.raw_span().set_trace_as_public()
327313

328314
return span
329315

@@ -356,7 +342,7 @@ def handle(self, span: LangfuseSpan, component_type: str | None) -> None:
356342
if at_pipeline_level:
357343
coerced_input = tracing_utils.coerce_tag_value(span.get_data().get(_PIPELINE_INPUT_KEY))
358344
coerced_output = tracing_utils.coerce_tag_value(span.get_data().get(_PIPELINE_OUTPUT_KEY))
359-
span.raw_span().update_trace(input=coerced_input, output=coerced_output)
345+
span.raw_span().set_trace_io(input=coerced_input, output=coerced_output)
360346
# special case for ToolInvoker (to update the span name to be: `original_component_name - [tool_names]`)
361347
if component_type == "ToolInvoker":
362348
tool_names: list[str] = []
@@ -469,6 +455,8 @@ def trace(
469455
span_name = tags.get(_COMPONENT_NAME_KEY, operation_name)
470456
component_type = tags.get(_COMPONENT_TYPE_KEY)
471457

458+
is_root_span = not (parent_span or self.current_span())
459+
472460
# Create a new span context
473461
span_context = SpanContext(
474462
name=span_name,
@@ -482,74 +470,90 @@ def trace(
482470
public=self._public,
483471
)
484472

485-
# Create span using the handler
486-
span = self._span_handler.create_span(span_context)
473+
tracing_ctx = tracing_context_var.get({})
487474

488-
# Build new span hierarchy: copy existing stack, add new span, save for restoration
489-
prev_stack = span_stack_var.get()
490-
new_stack = (prev_stack or []).copy()
491-
new_stack.append(span)
492-
token = span_stack_var.set(new_stack)
475+
with contextlib.ExitStack() as stack:
476+
if is_root_span:
477+
# propagate_attributes sets trace-level metadata (trace name, user, session, etc.)
478+
# for the root span and all child spans created within this context
479+
stack.enter_context(
480+
langfuse.propagate_attributes(
481+
trace_name=self._name,
482+
user_id=tracing_ctx.get("user_id") or None,
483+
session_id=tracing_ctx.get("session_id") or None,
484+
version=tracing_ctx.get("version") or None,
485+
tags=tracing_ctx.get("tags") or None,
486+
)
487+
)
493488

494-
span.set_tags(tags)
489+
# Create span using the handler
490+
span = self._span_handler.create_span(span_context)
495491

496-
try:
497-
yield span
498-
except Exception:
499-
# Exception occurred - capture exception info and pass to __exit__
500-
# This allows Langfuse/OpenTelemetry to properly mark the span with ERROR level
501-
exc_info = sys.exc_info()
502-
try:
503-
# Process span data (may fail with nested pipeline exceptions)
504-
self._span_handler.handle(span, component_type)
505-
506-
# End span with exception info (may fail if span data is corrupted)
507-
raw_span = span.raw_span()
508-
if span._context_manager is not None:
509-
# Pass actual exception info to mark span as failed with ERROR level
510-
span._context_manager.__exit__(*exc_info)
511-
elif hasattr(raw_span, "end"):
512-
# Only call end() if it's not a context manager
513-
raw_span.end()
514-
except Exception as cleanup_error:
515-
# Log cleanup errors but don't let them corrupt context
516-
logger.warning(
517-
"Error during span cleanup for {operation_name}: {cleanup_error}",
518-
operation_name=operation_name,
519-
cleanup_error=cleanup_error,
520-
)
492+
# Build new span hierarchy: copy existing stack, add new span, save for restoration
493+
prev_stack = span_stack_var.get()
494+
new_stack = (prev_stack or []).copy()
495+
new_stack.append(span)
496+
token = span_stack_var.set(new_stack)
521497

522-
# Re-raise the original exception
523-
raise
524-
else:
525-
# No exception - clean exit with success status
526-
# This preserves any manually-set log levels (WARNING, DEBUG)
527-
try:
528-
# Process span data
529-
self._span_handler.handle(span, component_type)
530-
531-
# End span successfully
532-
raw_span = span.raw_span()
533-
# In v3, we need to properly exit context managers
534-
if span._context_manager is not None:
535-
# No exception - pass None to indicate success
536-
span._context_manager.__exit__(None, None, None)
537-
elif hasattr(raw_span, "end"):
538-
# Only call end() if it's not a context manager
539-
raw_span.end()
540-
except Exception as cleanup_error:
541-
# Log cleanup errors but don't let them corrupt context
542-
logger.warning(
543-
"Error during span cleanup for {operation_name}: {cleanup_error}",
544-
operation_name=operation_name,
545-
cleanup_error=cleanup_error,
546-
)
547-
finally:
548-
# Restore previous span stack using saved token
549-
span_stack_var.reset(token)
498+
span.set_tags(tags)
550499

551-
if self.enforce_flush:
552-
self.flush()
500+
try:
501+
yield span
502+
except Exception:
503+
# Exception occurred - capture exception info and pass to __exit__
504+
# This allows Langfuse/OpenTelemetry to properly mark the span with ERROR level
505+
exc_info = sys.exc_info()
506+
try:
507+
# Process span data (may fail with nested pipeline exceptions)
508+
self._span_handler.handle(span, component_type)
509+
510+
# End span with exception info (may fail if span data is corrupted)
511+
raw_span = span.raw_span()
512+
if span._context_manager is not None:
513+
# Pass actual exception info to mark span as failed with ERROR level
514+
span._context_manager.__exit__(*exc_info)
515+
elif hasattr(raw_span, "end"):
516+
# Only call end() if it's not a context manager
517+
raw_span.end()
518+
except Exception as cleanup_error:
519+
# Log cleanup errors but don't let them corrupt context
520+
logger.warning(
521+
"Error during span cleanup for {operation_name}: {cleanup_error}",
522+
operation_name=operation_name,
523+
cleanup_error=cleanup_error,
524+
)
525+
526+
# Re-raise the original exception
527+
raise
528+
else:
529+
# No exception - clean exit with success status
530+
# This preserves any manually-set log levels (WARNING, DEBUG)
531+
try:
532+
# Process span data
533+
self._span_handler.handle(span, component_type)
534+
535+
# End span successfully
536+
raw_span = span.raw_span()
537+
# In v3, we need to properly exit context managers
538+
if span._context_manager is not None:
539+
# No exception - pass None to indicate success
540+
span._context_manager.__exit__(None, None, None)
541+
elif hasattr(raw_span, "end"):
542+
# Only call end() if it's not a context manager
543+
raw_span.end()
544+
except Exception as cleanup_error:
545+
# Log cleanup errors but don't let them corrupt context
546+
logger.warning(
547+
"Error during span cleanup for {operation_name}: {cleanup_error}",
548+
operation_name=operation_name,
549+
cleanup_error=cleanup_error,
550+
)
551+
finally:
552+
# Restore previous span stack using saved token
553+
span_stack_var.reset(token)
554+
555+
if self.enforce_flush:
556+
self.flush()
553557

554558
def flush(self) -> None:
555559
"""Flush all pending spans to Langfuse."""

integrations/langfuse/tests/test_tracer.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,16 @@ def span(self, name=None):
6363
# Return a new mock span for child spans
6464
return MockSpan(name=name or "child_span")
6565

66-
def update_trace(self, **kwargs):
67-
# v3 API method for updating trace-level data
68-
self._data.update(kwargs)
66+
def set_trace_io(self, *, input=None, output=None):
67+
# v4 API method for setting trace-level input/output
68+
if input is not None:
69+
self._data["input"] = input
70+
if output is not None:
71+
self._data["output"] = output
72+
73+
def set_trace_as_public(self):
74+
# v4 API method for marking trace as public
75+
self._data["public"] = True
6976

7077
def generation(self, name=None):
7178
# Return a new mock span for generation spans
@@ -84,6 +91,16 @@ def flush(self):
8491
pass
8592

8693

94+
class MockPropagateAttributesContextManager:
95+
"""No-op context manager for propagate_attributes mock."""
96+
97+
def __enter__(self):
98+
return self
99+
100+
def __exit__(self, *args):
101+
pass
102+
103+
87104
class MockLangfuseClient:
88105
"""Mock Langfuse client that has all the required methods"""
89106

@@ -274,6 +291,25 @@ def test_create_span_root_trace(self):
274291
finally:
275292
tracing_context_var.reset(token)
276293

294+
def test_create_span_root_trace_public(self):
295+
"""Test that public=True calls set_trace_as_public on the span."""
296+
mock_client = Mock()
297+
mock_client.start_as_current_observation = Mock(return_value=MockContextManager())
298+
299+
handler = DefaultSpanHandler()
300+
handler.init_tracer(mock_client)
301+
302+
context = MagicMock(spec=SpanContext)
303+
context.parent_span = None
304+
context.trace_name = "public_trace"
305+
context.operation_name = "haystack.pipeline.run"
306+
context.public = True
307+
308+
result = handler.create_span(context)
309+
310+
# set_trace_as_public should have been called on the raw span
311+
assert result.raw_span()._data.get("public") is True
312+
277313
def test_handle_generator(self):
278314
mock_span = Mock()
279315
mock_span.raw_span.return_value = mock_span

0 commit comments

Comments
 (0)