Skip to content

Commit a18dc1b

Browse files
committed
[argus] models/factory: per-event-loop httpx client for ChatOpenAI
langchain-openai (as of 1.1.7) caches the async httpx client with a process-global @lru_cache. LangGraph's worker model spins up a fresh asyncio loop per task; reusing an httpx connection from a prior loop after that loop has been torn down raises: RuntimeError: Event loop is closed at stream-cleanup time. The upstream issue (langchain-ai/langchain#35783) is open as of 2026-04 — none of the four community-submitted PRs were merged. Until upstream lands a fix, side-step the broken cache by passing an explicit http_async_client into ChatOpenAI ourselves, scoped to the currently-running event loop. Cache keyed on id(loop) with weak refs, so clients are GC'd when their loop dies — no leak across long-lived processes that handle thousands of short-lived loops. Only injected for ChatOpenAI subclasses (the only models hit by this bug) and only when the caller hasn't already provided http_async_client themselves. Tests cover: - Sync construction returns None (factory falls back to default) - Same-loop calls reuse the cached client - A closed cached client is replaced (the post-loop-teardown case) - Non-ChatOpenAI subclasses get no http_async_client injection - An explicit http_async_client kwarg is preserved PR-candidate: no Upstream-issue: langchain-ai/langchain#35783 Reason: This belongs in langchain-openai, not deer-flow. Kept here as a load-bearing workaround until #35783 is fixed upstream.
1 parent bca5991 commit a18dc1b

2 files changed

Lines changed: 159 additions & 1 deletion

File tree

backend/packages/harness/deerflow/models/factory.py

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1+
import asyncio
12
import logging
3+
import weakref
24

5+
import httpx
36
from langchain.chat_models import BaseChatModel
7+
from langchain_openai import ChatOpenAI
48

59
from deerflow.config import get_app_config
610
from deerflow.reflection import resolve_class
@@ -9,6 +13,53 @@
913
logger = logging.getLogger(__name__)
1014

1115

16+
# Per-event-loop httpx.AsyncClient cache.
17+
#
18+
# langchain-openai (as of 1.1.7) caches the async httpx client with a
19+
# process-global @lru_cache. LangGraph's worker model spins up a fresh
20+
# asyncio loop per task; reusing an httpx connection from a prior loop
21+
# after that loop has been torn down raises:
22+
#
23+
# RuntimeError: Event loop is closed
24+
#
25+
# at stream-cleanup time. The upstream issue (langchain-ai/langchain#35783)
26+
# is open as of 2026-04. Until it's fixed there, we side-step the broken
27+
# cache by passing an explicit http_async_client into ChatOpenAI ourselves,
28+
# scoped to the *currently running* event loop.
29+
#
30+
# WeakValueDictionary so the client is GC'd when its loop is GC'd; no leak
31+
# across long-lived processes that handle thousands of short-lived loops.
32+
_PER_LOOP_HTTPX_CLIENTS: "weakref.WeakValueDictionary[int, httpx.AsyncClient]" = (
33+
weakref.WeakValueDictionary()
34+
)
35+
36+
37+
def _httpx_client_for_current_loop() -> httpx.AsyncClient | None:
38+
"""Return an httpx.AsyncClient bound to the currently-running event loop.
39+
40+
Returns None when called outside an async context (e.g. sync model
41+
construction during config validation), in which case ChatOpenAI falls
42+
back to its default — which is fine for non-async paths.
43+
"""
44+
try:
45+
loop = asyncio.get_running_loop()
46+
except RuntimeError:
47+
return None
48+
49+
loop_id = id(loop)
50+
client = _PER_LOOP_HTTPX_CLIENTS.get(loop_id)
51+
if client is not None and not client.is_closed:
52+
return client
53+
54+
# Default keepalive is fine because the pool is per-loop now — when the
55+
# loop dies the client is GC'd and the connections close cleanly.
56+
client = httpx.AsyncClient(
57+
timeout=httpx.Timeout(600.0, connect=10.0),
58+
)
59+
_PER_LOOP_HTTPX_CLIENTS[loop_id] = client
60+
return client
61+
62+
1263
def _deep_merge_dicts(base: dict | None, override: dict) -> dict:
1364
"""Recursively merge two dictionaries without mutating the inputs."""
1465
merged = dict(base or {})
@@ -113,7 +164,17 @@ def create_chat_model(name: str | None = None, thinking_enabled: bool = False, *
113164
elif "reasoning_effort" not in model_settings_from_config:
114165
model_settings_from_config["reasoning_effort"] = "medium"
115166

116-
model_instance = model_class(**{**model_settings_from_config, **kwargs})
167+
final_kwargs = {**model_settings_from_config, **kwargs}
168+
169+
# Bypass langchain-openai's process-global httpx client cache (see
170+
# _PER_LOOP_HTTPX_CLIENTS docstring above). Only relevant for ChatOpenAI
171+
# subclasses; native Anthropic/Google clients have their own pooling.
172+
if issubclass(model_class, ChatOpenAI) and "http_async_client" not in final_kwargs:
173+
per_loop_client = _httpx_client_for_current_loop()
174+
if per_loop_client is not None:
175+
final_kwargs["http_async_client"] = per_loop_client
176+
177+
model_instance = model_class(**final_kwargs)
117178

118179
callbacks = build_tracing_callbacks()
119180
if callbacks:

backend/tests/test_model_factory.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -863,3 +863,100 @@ def __init__(self, **kwargs):
863863

864864
# kwargs (runtime) takes precedence: thinking-disabled path sets reasoning_effort=minimal
865865
assert captured.get("reasoning_effort") == "minimal"
866+
867+
868+
# ---------------------------------------------------------------------------
869+
# Per-event-loop httpx client (langchain-openai cross-loop bypass)
870+
# ---------------------------------------------------------------------------
871+
872+
873+
class TestPerLoopHttpxClient:
874+
"""The factory injects a per-event-loop httpx.AsyncClient into ChatOpenAI
875+
subclasses to bypass langchain-openai's process-global @lru_cache, which
876+
breaks under LangGraph's loop-per-task model.
877+
878+
See module docstring on _PER_LOOP_HTTPX_CLIENTS for the full context.
879+
"""
880+
881+
def test_returns_none_outside_async_context(self):
882+
"""Sync model construction (e.g. config validation) should get None
883+
and let ChatOpenAI fall back to its default."""
884+
client = factory_module._httpx_client_for_current_loop()
885+
assert client is None
886+
887+
def test_same_loop_returns_same_client(self):
888+
"""Two calls inside the same event loop must return the same client
889+
(otherwise we'd leak connections per call)."""
890+
import asyncio as _asyncio
891+
892+
async def grab_two():
893+
a = factory_module._httpx_client_for_current_loop()
894+
b = factory_module._httpx_client_for_current_loop()
895+
return a, b
896+
897+
a, b = _asyncio.run(grab_two())
898+
assert a is not None
899+
assert a is b
900+
901+
def test_closed_client_is_replaced(self):
902+
"""A cached client that has been closed must not be reused — the
903+
helper must produce a fresh one. This is the key behavior under
904+
LangGraph's loop-per-task model: the previous loop's client may be
905+
closed but its id() can still be in our cache (because Python
906+
recycles loop addresses across sequential asyncio.run calls)."""
907+
import asyncio as _asyncio
908+
909+
captured: dict = {}
910+
911+
async def grab_first():
912+
client = factory_module._httpx_client_for_current_loop()
913+
captured["first"] = client
914+
await client.aclose()
915+
916+
async def grab_second():
917+
captured["second"] = factory_module._httpx_client_for_current_loop()
918+
919+
_asyncio.run(grab_first())
920+
_asyncio.run(grab_second())
921+
922+
assert captured["first"] is not None
923+
assert captured["second"] is not None
924+
assert captured["second"] is not captured["first"]
925+
assert not captured["second"].is_closed
926+
927+
def test_non_chatopenai_does_not_get_http_client_kwarg(self, monkeypatch):
928+
"""FakeChatModel extends BaseChatModel directly, not ChatOpenAI —
929+
the http_async_client injection must not touch it."""
930+
cfg = _make_app_config([_make_model("vanilla")])
931+
_patch_factory(monkeypatch, cfg)
932+
933+
FakeChatModel.captured_kwargs = {}
934+
factory_module.create_chat_model(name="vanilla")
935+
936+
assert "http_async_client" not in FakeChatModel.captured_kwargs
937+
938+
def test_explicit_http_async_client_in_kwargs_is_preserved(self, monkeypatch):
939+
"""If a caller already provides http_async_client, the factory must
940+
not overwrite it. This protects a future code path that wants to
941+
inject its own client (e.g. with custom proxies)."""
942+
from langchain_openai import ChatOpenAI
943+
944+
captured: dict = {}
945+
sentinel = object()
946+
947+
class CapturingChatOpenAI(ChatOpenAI):
948+
def __init__(self, **kwargs):
949+
captured.update(kwargs)
950+
# Don't actually try to build a real client; bail early
951+
raise _StopFactory()
952+
953+
class _StopFactory(Exception):
954+
pass
955+
956+
cfg = _make_app_config([_make_model("openai-clone", use="langchain_openai:ChatOpenAI")])
957+
_patch_factory(monkeypatch, cfg, model_class=CapturingChatOpenAI)
958+
959+
with pytest.raises(_StopFactory):
960+
factory_module.create_chat_model(name="openai-clone", http_async_client=sentinel)
961+
962+
assert captured.get("http_async_client") is sentinel

0 commit comments

Comments
 (0)