Skip to content

Commit 549faca

Browse files
authored
fix(langchain): exit propagation context gracefully (#1588)
1 parent 406e10a commit 549faca

File tree

2 files changed

+45
-21
lines changed

2 files changed

+45
-21
lines changed

langfuse/_client/propagation.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77

88
from typing import Any, Dict, Generator, List, Literal, Optional, TypedDict, Union, cast
99

10-
from opentelemetry import baggage
10+
from opentelemetry import (
11+
baggage,
12+
)
1113
from opentelemetry import (
1214
baggage as otel_baggage_api,
1315
)
@@ -17,6 +19,7 @@
1719
from opentelemetry import (
1820
trace as otel_trace_api,
1921
)
22+
from opentelemetry.context import _RUNTIME_CONTEXT
2023
from opentelemetry.util._decorator import (
2124
_AgnosticContextManager,
2225
_agnosticcontextmanager,
@@ -72,6 +75,22 @@ class PropagatedExperimentAttributes(TypedDict):
7275
experiment_item_root_observation_id: str
7376

7477

78+
def _detach_context_token_safely(token: Any) -> None:
79+
"""Detach a context token without emitting noisy async teardown errors.
80+
81+
OpenTelemetry tokens are backed by ``contextvars`` and must be detached in the
82+
same execution context where they were attached. Async frameworks can legitimately
83+
end spans or unwind context managers in a different task/context, in which case
84+
detach raises and the public OpenTelemetry helper logs an error. At that point the
85+
observation is already completed, so the mismatch is safe to ignore.
86+
"""
87+
88+
try:
89+
_RUNTIME_CONTEXT.detach(token)
90+
except Exception:
91+
pass
92+
93+
7594
def propagate_attributes(
7695
*,
7796
user_id: Optional[str] = None,
@@ -272,7 +291,7 @@ def _propagate_attributes(
272291
yield
273292

274293
finally:
275-
otel_context_api.detach(token)
294+
_detach_context_token_safely(token)
276295

277296

278297
def _get_propagated_attributes_from_context(

langfuse/langchain/CallbackHandler.py

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515

1616
import pydantic
1717
from opentelemetry import context, trace
18-
from opentelemetry.context import _RUNTIME_CONTEXT
1918
from opentelemetry.util._decorator import _AgnosticContextManager
2019

2120
from langfuse import propagate_attributes
2221
from langfuse._client.attributes import LangfuseOtelSpanAttributes
2322
from langfuse._client.client import Langfuse
2423
from langfuse._client.get_client import get_client
24+
from langfuse._client.propagation import _detach_context_token_safely
2525
from langfuse._client.span import (
2626
LangfuseAgent,
2727
LangfuseChain,
@@ -458,18 +458,7 @@ def _detach_observation(
458458
token = self._context_tokens.pop(run_id, None)
459459

460460
if token:
461-
try:
462-
# Directly detach from runtime context to avoid error logging
463-
_RUNTIME_CONTEXT.detach(token)
464-
except Exception:
465-
# Context detach can fail in async scenarios - this is expected and safe to ignore
466-
# The span itself was properly ended and tracing data is correctly captured
467-
#
468-
# Examples:
469-
# 1. Token created in one async task/thread, detached in another
470-
# 2. Context already detached by framework or other handlers
471-
# 3. Runtime context state mismatch in concurrent execution
472-
pass
461+
_detach_context_token_safely(token)
473462

474463
return cast(
475464
Union[
@@ -564,11 +553,8 @@ def on_chain_end(
564553
input=kwargs.get("inputs"),
565554
)
566555

567-
if (
568-
parent_run_id is None
569-
and self._propagation_context_manager is not None
570-
):
571-
self._propagation_context_manager.__exit__(None, None, None)
556+
if parent_run_id is None:
557+
self._exit_propagation_context()
572558

573559
span.end()
574560

@@ -579,6 +565,7 @@ def on_chain_end(
579565

580566
finally:
581567
if parent_run_id is None:
568+
self._exit_propagation_context()
582569
self._reset()
583570

584571
def on_chain_error(
@@ -608,10 +595,19 @@ def on_chain_error(
608595
status_message=str(error) if level else None,
609596
input=kwargs.get("inputs"),
610597
cost_details={"total": 0},
611-
).end()
598+
)
599+
600+
if parent_run_id is None:
601+
self._exit_propagation_context()
602+
603+
observation.end()
612604

613605
except Exception as e:
614606
langfuse_logger.exception(e)
607+
finally:
608+
if parent_run_id is None:
609+
self._exit_propagation_context()
610+
self._reset()
615611

616612
def on_chat_model_start(
617613
self,
@@ -1026,6 +1022,15 @@ def on_llm_error(
10261022
def _reset(self) -> None:
10271023
self._child_to_parent_run_id_map = {}
10281024

1025+
def _exit_propagation_context(self) -> None:
1026+
manager = self._propagation_context_manager
1027+
1028+
if manager is None:
1029+
return
1030+
1031+
self._propagation_context_manager = None
1032+
manager.__exit__(None, None, None)
1033+
10291034
def __join_tags_and_metadata(
10301035
self,
10311036
tags: Optional[List[str]] = None,

0 commit comments

Comments
 (0)