Skip to content

Commit df702da

Browse files
authored
fix(experiments): maintain propagated context in async experiments (#1587)
1 parent 840cf2a commit df702da

File tree

3 files changed

+50
-1
lines changed

3 files changed

+50
-1
lines changed

langfuse/_client/utils.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"""
66

77
import asyncio
8+
import contextvars
89
import json
910
import threading
1011
from hashlib import sha256
@@ -69,13 +70,14 @@ class _RunAsyncThread(threading.Thread):
6970

7071
def __init__(self, coro: Coroutine[Any, Any, Any]) -> None:
7172
self.coro = coro
73+
self.context = contextvars.copy_context()
7274
self.result: Any = None
7375
self.exception: Exception | None = None
7476
super().__init__()
7577

7678
def run(self) -> None:
7779
try:
78-
self.result = asyncio.run(self.coro)
80+
self.result = self.context.run(asyncio.run, self.coro)
7981
except Exception as e:
8082
self.exception = e
8183

tests/test_propagate_attributes.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2295,6 +2295,36 @@ def test_tags_attribute_key_format(self, langfuse_client, memory_exporter):
22952295
class TestPropagateAttributesExperiment(TestPropagateAttributesBase):
22962296
"""Tests for experiment attribute propagation."""
22972297

2298+
@pytest.mark.asyncio
2299+
async def test_experiment_propagates_user_id_in_async_context(
2300+
self, langfuse_client, memory_exporter
2301+
):
2302+
"""Verify run_experiment keeps propagated attributes when called from async code."""
2303+
import asyncio
2304+
2305+
local_data = [{"input": "test input", "expected_output": "expected output"}]
2306+
2307+
async def async_task(*, item, **kwargs):
2308+
await asyncio.sleep(0.01)
2309+
return f"processed: {item['input']}"
2310+
2311+
with propagate_attributes(user_id="async-experiment-user"):
2312+
langfuse_client.run_experiment(
2313+
name="Async Experiment",
2314+
data=local_data,
2315+
task=async_task,
2316+
)
2317+
2318+
langfuse_client.flush()
2319+
time.sleep(0.1)
2320+
2321+
root_span = self.get_span_by_name(memory_exporter, "experiment-item-run")
2322+
self.verify_span_attribute(
2323+
root_span,
2324+
LangfuseOtelSpanAttributes.TRACE_USER_ID,
2325+
"async-experiment-user",
2326+
)
2327+
22982328
def test_experiment_attributes_propagate_without_dataset(
22992329
self, langfuse_client, memory_exporter
23002330
):

tests/test_utils.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Test suite for utility functions in langfuse._client.utils module."""
22

33
import asyncio
4+
import contextvars
45
import threading
56
from unittest import mock
67

@@ -81,6 +82,22 @@ async def check_thread_isolation():
8182
result = run_async_safely(check_thread_isolation())
8283
assert result == "isolated"
8384

85+
@pytest.mark.asyncio
86+
async def test_run_async_context_preserves_contextvars(self):
87+
"""Test that threaded execution preserves the caller's contextvars."""
88+
request_id = contextvars.ContextVar("request_id")
89+
token = request_id.set("req-123")
90+
91+
async def read_contextvar():
92+
await asyncio.sleep(0.001)
93+
return request_id.get()
94+
95+
try:
96+
result = run_async_safely(read_contextvar())
97+
assert result == "req-123"
98+
finally:
99+
request_id.reset(token)
100+
84101
def test_multiple_calls_sync_context(self):
85102
"""Test multiple sequential calls in sync context."""
86103

0 commit comments

Comments
 (0)