Skip to content

Commit d0b6cae

Browse files
committed
httpx aclose patch
1 parent 666e8d3 commit d0b6cae

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

ajet/backbone/warm_up.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import asyncio
77
import logging
88
import os
9+
from ajet.utils.async_utils import apply_httpx_aclose_patch
10+
apply_httpx_aclose_patch()
911

1012

1113
def init_parallel_rollout_logger(experiment_name):

ajet/utils/async_utils.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import concurrent.futures
33
from typing import Any
44

5-
65
def run_async_coroutine_with_timeout(coro, timeout: int = 3600) -> Any:
76
"""
87
Run an async coroutine with a timeout, supporting both inside and outside event loops.
@@ -32,3 +31,40 @@ def run_async_coroutine_with_timeout(coro, timeout: int = 3600) -> Any:
3231
except Exception:
3332
raise
3433
return final_res
34+
35+
36+
def apply_httpx_aclose_patch():
37+
try:
38+
from openai._base_client import AsyncHttpxClientWrapper
39+
40+
_original_init = AsyncHttpxClientWrapper.__init__
41+
42+
def _patched_init(self, *args, **kwargs):
43+
try:
44+
self._created_loop = asyncio.get_running_loop()
45+
except RuntimeError:
46+
self._created_loop = None
47+
_original_init(self, *args, **kwargs)
48+
49+
def _patched_del(self) -> None:
50+
if self.is_closed:
51+
return
52+
53+
try:
54+
current_loop = asyncio.get_running_loop()
55+
except RuntimeError:
56+
return
57+
58+
if getattr(self, "_created_loop", None) is not None and current_loop is not self._created_loop:
59+
return
60+
61+
try:
62+
current_loop.create_task(self.aclose())
63+
except Exception:
64+
pass
65+
66+
AsyncHttpxClientWrapper.__init__ = _patched_init
67+
AsyncHttpxClientWrapper.__del__ = _patched_del
68+
print("Applied httpx aclose patch.")
69+
except ImportError:
70+
pass

0 commit comments

Comments
 (0)