Skip to content

Commit 4157b0b

Browse files
vblagojesjrl
andauthored
fix: Properly cleanup Langfuse tracing context after pipeline run failures (#1999)
* Properly cleanup context after pipeline run failures * Format * Update integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py Co-authored-by: Sebastian Husch Lee <10526848+sjrl@users.noreply.github.com> * Lint * PR feedback * Small fix * Small nit --------- Co-authored-by: Sebastian Husch Lee <10526848+sjrl@users.noreply.github.com>
1 parent 90397dd commit 4157b0b

2 files changed

Lines changed: 92 additions & 20 deletions

File tree

integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -449,22 +449,33 @@ def trace(
449449
self._context.append(span)
450450
span.set_tags(tags)
451451

452-
yield span
453-
454-
# Let the span handler process the span
455-
self._span_handler.handle(span, component_type)
456-
457-
# In this section, we finalize both regular spans and generation spans created using the LangfuseSpan class.
458-
# It's important to end() these spans to ensure they are properly closed and all relevant data is recorded.
459-
# Note that we do not call end() on the main trace span itself (StatefulTraceClient), as its lifecycle is
460-
# managed differently.
461-
raw_span = span.raw_span()
462-
if isinstance(raw_span, (StatefulSpanClient, StatefulGenerationClient)):
463-
raw_span.end()
464-
self._context.pop()
465-
466-
if self.enforce_flush:
467-
self.flush()
452+
try:
453+
yield span
454+
finally:
455+
# Always clean up context, even if nested operations fail
456+
try:
457+
# Process span data (may fail with nested pipeline exceptions)
458+
self._span_handler.handle(span, component_type)
459+
460+
# End span (may fail if span data is corrupted)
461+
raw_span = span.raw_span()
462+
if isinstance(raw_span, (StatefulSpanClient, StatefulGenerationClient)):
463+
raw_span.end()
464+
except Exception as cleanup_error:
465+
# Log cleanup errors but don't let them corrupt context
466+
logger.warning(
467+
"Error during span cleanup for {operation_name}: {cleanup_error}",
468+
operation_name=operation_name,
469+
cleanup_error=cleanup_error,
470+
)
471+
finally:
472+
# CRITICAL: Always pop context to prevent corruption
473+
# This is especially important for nested pipeline scenarios
474+
if self._context and self._context[-1] == span:
475+
self._context.pop()
476+
477+
if self.enforce_flush:
478+
self.flush()
468479

469480
def flush(self) -> None:
470481
self._tracer.flush()

integrations/langfuse/tests/test_tracer.py

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,20 @@
33
# SPDX-License-Identifier: Apache-2.0
44

55
import datetime
6+
import json
67
import logging
78
import sys
8-
from unittest.mock import MagicMock, Mock, patch
99
from typing import Optional
10+
from unittest.mock import MagicMock, Mock, patch
1011

1112
import pytest
13+
from haystack import Pipeline, component
1214
from haystack.dataclasses import ChatMessage, ToolCall
13-
from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer, LangfuseSpan, SpanContext, DefaultSpanHandler
14-
from haystack_integrations.tracing.langfuse.tracer import _COMPONENT_OUTPUT_KEY
15+
16+
from haystack_integrations.components.connectors.langfuse import LangfuseConnector
17+
from haystack_integrations.tracing.langfuse.tracer import (
18+
_COMPONENT_OUTPUT_KEY, DefaultSpanHandler, LangfuseSpan, LangfuseTracer,
19+
SpanContext)
1520

1621

1722
class MockSpan:
@@ -367,7 +372,8 @@ def test_update_span_flush_disable(self, monkeypatch):
367372
monkeypatch.setenv("HAYSTACK_LANGFUSE_ENFORCE_FLUSH", "false")
368373
tracer_mock = Mock()
369374

370-
from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer
375+
from haystack_integrations.tracing.langfuse.tracer import \
376+
LangfuseTracer
371377

372378
tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False)
373379
with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span:
@@ -397,3 +403,58 @@ def test_init_with_tracing_disabled(self, monkeypatch, caplog):
397403

398404
LangfuseTracer(tracer=MockTracer(), name="Haystack", public=False)
399405
assert "tracing is disabled" in caplog.text
406+
407+
def test_context_cleanup_after_nested_failures(self):
408+
"""
409+
Test that tracer context is properly cleaned up even when nested operations fail.
410+
411+
This test addresses a critical bug where failing nested operations (like inner pipelines)
412+
could corrupt the tracing context, leaving stale spans that affect subsequent operations.
413+
The fix ensures proper cleanup through try/finally blocks.
414+
415+
Before the fix: context would retain spans after failures (length > 0)
416+
After the fix: context is always cleaned up (length == 0)
417+
"""
418+
419+
420+
@component
421+
class FailingParser:
422+
@component.output_types(result=str)
423+
def run(self, data: str):
424+
# This will fail with ValueError when data is not valid JSON
425+
parsed = json.loads(data)
426+
return {"result": parsed["key"]}
427+
428+
@component
429+
class ComponentWithNestedPipeline:
430+
def __init__(self):
431+
# This simulates IntentClassifier's internal pipeline
432+
self.internal_pipeline = Pipeline()
433+
self.internal_pipeline.add_component("parser", FailingParser())
434+
435+
@component.output_types(result=str)
436+
def run(self, input_data: str):
437+
# Run nested pipeline - this is where corruption occurs
438+
result = self.internal_pipeline.run({"parser": {"data": input_data}})
439+
return {"result": result["parser"]["result"]}
440+
441+
tracer = LangfuseConnector("test")
442+
443+
main_pipeline = Pipeline()
444+
main_pipeline.add_component("nested_component", ComponentWithNestedPipeline())
445+
main_pipeline.add_component("tracer", tracer)
446+
447+
# Test 1: First run will fail and should clean up context
448+
try:
449+
main_pipeline.run({"nested_component": {"input_data": "invalid json"}})
450+
except Exception:
451+
pass # Expected to fail
452+
453+
# Critical assertion: context should be empty after failed operation
454+
assert len(tracer.tracer._context) == 0
455+
456+
# Test 2: Second run should work normally with clean context
457+
main_pipeline.run({"nested_component": {"input_data": '{"key": "valid"}'}})
458+
459+
# Critical assertion: context should be empty after successful operation
460+
assert len(tracer.tracer._context) == 0

0 commit comments

Comments
 (0)