Skip to content

Commit 2f819f5

Browse files
kesmit13claude
andcommitted
Fix event loop closed error and add comprehensive UDF dispatch tests
Replace asyncio.run() with _run_with_graceful_shutdown() that drains pending callbacks before closing the loop, preventing RuntimeError from httpx/anyio TLS cleanup in async UDFs calling OpenAI/LangChain APIs. Add 17 unit tests covering graceful shutdown, cancellation timing, exception propagation, context variable isolation, and concurrent safety. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6e84863 commit 2f819f5

2 files changed

Lines changed: 325 additions & 1 deletion

File tree

singlestoredb/functions/ext/asgi.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,34 @@ async def _cancellable_run(
135135
return task.result()
136136

137137

138+
def _run_with_graceful_shutdown(coro: Any) -> Any:
139+
"""Run a coroutine in a new event loop, draining callbacks before close.
140+
141+
Unlike asyncio.run(), this prevents 'Event loop is closed' errors from
142+
libraries (httpx/anyio) that schedule cleanup callbacks during teardown.
143+
"""
144+
loop = asyncio.new_event_loop()
145+
try:
146+
asyncio.set_event_loop(loop)
147+
return loop.run_until_complete(coro)
148+
finally:
149+
try:
150+
pending = asyncio.all_tasks(loop)
151+
if pending:
152+
for task in pending:
153+
task.cancel()
154+
loop.run_until_complete(
155+
asyncio.gather(*pending, return_exceptions=True),
156+
)
157+
loop.run_until_complete(loop.shutdown_asyncgens())
158+
loop.run_until_complete(loop.shutdown_default_executor())
159+
finally:
160+
loop.call_soon(loop.stop)
161+
loop.run_forever()
162+
asyncio.set_event_loop(None)
163+
loop.close()
164+
165+
138166
# Use negative values to indicate unsigned ints / binary data / usec time precision
139167
rowdat_1_type_map = {
140168
'bool': ft.LONGLONG,
@@ -1213,7 +1241,7 @@ async def __call__(
12131241

12141242
func_task = asyncio.create_task(
12151243
to_thread(
1216-
lambda: asyncio.run(
1244+
lambda: _run_with_graceful_shutdown(
12171245
_cancellable_run(
12181246
cancel_event,
12191247
func(cancel_event, call_timer, *inputs),
Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
"""Tests for async UDF event loop graceful shutdown."""
2+
import asyncio
3+
import contextvars
4+
import threading
5+
import time
6+
import unittest
7+
from typing import Any
8+
from typing import List
9+
10+
from ..functions.ext.asgi import _cancellable_run
11+
from ..functions.ext.asgi import _run_with_graceful_shutdown
12+
from ..functions.ext.asgi import to_thread
13+
14+
15+
class TestRunWithGracefulShutdown(unittest.TestCase):
16+
"""Test _run_with_graceful_shutdown handles loop cleanup properly."""
17+
18+
def test_basic_coroutine(self) -> None:
19+
async def simple() -> int:
20+
return 42
21+
22+
result = _run_with_graceful_shutdown(simple())
23+
self.assertEqual(result, 42)
24+
25+
def test_callbacks_drained_before_close(self) -> None:
26+
"""Simulate httpx/anyio scheduling call_soon during teardown.
27+
28+
This is the exact pattern that causes 'Event loop is closed' with
29+
asyncio.run() -- a library schedules a callback in its __del__ or
30+
aclose() that fires after the loop is closed.
31+
"""
32+
callback_executed: List[bool] = []
33+
34+
async def coroutine_with_cleanup_callback() -> str:
35+
loop = asyncio.get_running_loop()
36+
loop.call_soon(lambda: callback_executed.append(True))
37+
return 'done'
38+
39+
result = _run_with_graceful_shutdown(coroutine_with_cleanup_callback())
40+
self.assertEqual(result, 'done')
41+
self.assertEqual(callback_executed, [True])
42+
43+
def test_no_event_loop_closed_error(self) -> None:
44+
"""Verify no RuntimeError when cleanup schedules on the loop."""
45+
errors: List[RuntimeError] = []
46+
47+
async def simulate_httpx_teardown() -> str:
48+
loop = asyncio.get_running_loop()
49+
50+
def deferred_cleanup() -> None:
51+
try:
52+
loop.call_soon(lambda: None)
53+
except RuntimeError as e:
54+
errors.append(e)
55+
56+
loop.call_soon(deferred_cleanup)
57+
return 'ok'
58+
59+
result = _run_with_graceful_shutdown(simulate_httpx_teardown())
60+
self.assertEqual(result, 'ok')
61+
self.assertEqual(errors, [])
62+
63+
def test_exception_propagates(self) -> None:
64+
async def failing() -> None:
65+
raise ValueError('test error')
66+
67+
with self.assertRaises(ValueError) as ctx:
68+
_run_with_graceful_shutdown(failing())
69+
self.assertEqual(str(ctx.exception), 'test error')
70+
71+
def test_callbacks_drained_even_on_exception(self) -> None:
72+
"""Cleanup callbacks still run even if coroutine raises."""
73+
callback_executed: List[bool] = []
74+
75+
async def failing_with_callback() -> None:
76+
loop = asyncio.get_running_loop()
77+
loop.call_soon(lambda: callback_executed.append(True))
78+
raise ValueError('boom')
79+
80+
with self.assertRaises(ValueError):
81+
_run_with_graceful_shutdown(failing_with_callback())
82+
self.assertEqual(callback_executed, [True])
83+
84+
def test_pending_tasks_cancelled(self) -> None:
85+
"""Background tasks are cancelled during shutdown."""
86+
async def background() -> None:
87+
await asyncio.sleep(999)
88+
89+
async def main_with_background_task() -> str:
90+
asyncio.create_task(background())
91+
return 'done'
92+
93+
result = _run_with_graceful_shutdown(main_with_background_task())
94+
self.assertEqual(result, 'done')
95+
96+
def test_isolation_between_calls(self) -> None:
97+
"""Each call gets its own event loop that is closed after use."""
98+
loops: List[asyncio.AbstractEventLoop] = []
99+
100+
async def capture_loop() -> bool:
101+
loops.append(asyncio.get_running_loop())
102+
return True
103+
104+
_run_with_graceful_shutdown(capture_loop())
105+
first_loop = loops[0]
106+
self.assertTrue(first_loop.is_closed())
107+
108+
_run_with_graceful_shutdown(capture_loop())
109+
second_loop = loops[1]
110+
self.assertTrue(second_loop.is_closed())
111+
112+
def test_cancellable_run_integration(self) -> None:
113+
"""Verify _cancellable_run works inside _run_with_graceful_shutdown."""
114+
cancel_event = threading.Event()
115+
116+
async def slow_func() -> str:
117+
return 'completed'
118+
119+
result = _run_with_graceful_shutdown(
120+
_cancellable_run(cancel_event, slow_func()),
121+
)
122+
self.assertEqual(result, 'completed')
123+
124+
def test_cancellation_via_event(self) -> None:
125+
"""Verify cancellation propagates through the full stack."""
126+
cancel_event = threading.Event()
127+
cancel_event.set()
128+
129+
async def blocked_func() -> str:
130+
await asyncio.sleep(999)
131+
return 'should not reach'
132+
133+
with self.assertRaises(asyncio.CancelledError):
134+
_run_with_graceful_shutdown(
135+
_cancellable_run(cancel_event, blocked_func()),
136+
)
137+
138+
139+
class TestUDFDispatchEdgeCases(unittest.TestCase):
140+
"""Test edge cases in the UDF dispatch stack."""
141+
142+
def test_timeout_cancels_running_function(self) -> None:
143+
"""Cancel event set from timer thread cancels a blocked coroutine."""
144+
cancel_event = threading.Event()
145+
146+
async def long_running() -> str:
147+
await asyncio.sleep(999)
148+
return 'should not reach'
149+
150+
def set_cancel_after_delay() -> None:
151+
time.sleep(0.2)
152+
cancel_event.set()
153+
154+
timer = threading.Thread(target=set_cancel_after_delay)
155+
timer.start()
156+
157+
start = time.monotonic()
158+
with self.assertRaises(asyncio.CancelledError):
159+
_run_with_graceful_shutdown(
160+
_cancellable_run(cancel_event, long_running()),
161+
)
162+
elapsed = time.monotonic() - start
163+
timer.join()
164+
# 0.2s delay + up to 0.1s poll interval + margin
165+
self.assertLess(elapsed, 0.5)
166+
167+
def test_exception_propagates_through_full_stack(self) -> None:
168+
"""User exception propagates unwrapped through the entire dispatch."""
169+
cancel_event = threading.Event()
170+
171+
class CustomUDFError(Exception):
172+
pass
173+
174+
async def failing_udf() -> None:
175+
raise CustomUDFError('embedding service unavailable')
176+
177+
with self.assertRaises(CustomUDFError) as ctx:
178+
_run_with_graceful_shutdown(
179+
_cancellable_run(cancel_event, failing_udf()),
180+
)
181+
self.assertEqual(str(ctx.exception), 'embedding service unavailable')
182+
183+
def test_cancel_event_detected_within_poll_interval(self) -> None:
184+
"""Cancellation is detected within one poll cycle (0.1s)."""
185+
cancel_event = threading.Event()
186+
187+
async def blocked() -> str:
188+
await asyncio.sleep(999)
189+
return 'unreachable'
190+
191+
def set_cancel() -> None:
192+
time.sleep(0.05)
193+
cancel_event.set()
194+
195+
timer = threading.Thread(target=set_cancel)
196+
timer.start()
197+
198+
start = time.monotonic()
199+
with self.assertRaises(asyncio.CancelledError):
200+
_run_with_graceful_shutdown(
201+
_cancellable_run(cancel_event, blocked()),
202+
)
203+
elapsed = time.monotonic() - start
204+
timer.join()
205+
# 0.05s delay + 0.1s poll interval + margin
206+
self.assertLess(elapsed, 0.25)
207+
208+
def test_context_vars_propagate_through_to_thread(self) -> None:
209+
"""Context variables are visible inside to_thread executor."""
210+
test_var: contextvars.ContextVar[str] = contextvars.ContextVar(
211+
'test_var',
212+
)
213+
test_var.set('hello_from_parent')
214+
captured: List[str] = []
215+
216+
def read_context_var() -> str:
217+
val = test_var.get('NOT_FOUND')
218+
captured.append(val)
219+
return val
220+
221+
async def run_in_thread() -> str:
222+
return await to_thread(read_context_var)
223+
224+
result = _run_with_graceful_shutdown(run_in_thread())
225+
self.assertEqual(result, 'hello_from_parent')
226+
self.assertEqual(captured, ['hello_from_parent'])
227+
228+
def test_concurrent_requests_isolated(self) -> None:
229+
"""Parallel executions don't share state."""
230+
results: List[Any] = [None, None, None]
231+
232+
def run_isolated(index: int) -> None:
233+
async def compute() -> int:
234+
await asyncio.sleep(0.05)
235+
return index * 10
236+
237+
results[index] = _run_with_graceful_shutdown(compute())
238+
239+
threads = [
240+
threading.Thread(target=run_isolated, args=(i,))
241+
for i in range(3)
242+
]
243+
for t in threads:
244+
t.start()
245+
for t in threads:
246+
t.join()
247+
248+
self.assertEqual(results, [0, 10, 20])
249+
250+
def test_sync_function_through_async_wrapper(self) -> None:
251+
"""Synchronous function works when wrapped as async coroutine."""
252+
cancel_event = threading.Event()
253+
254+
async def sync_as_async() -> int:
255+
# Simulates what decorator.py's async_wrapper does for sync UDFs
256+
return 42 + 1
257+
258+
result = _run_with_graceful_shutdown(
259+
_cancellable_run(cancel_event, sync_as_async()),
260+
)
261+
self.assertEqual(result, 43)
262+
263+
def test_cancel_event_not_set_on_success(self) -> None:
264+
"""Cancel event remains unset after successful execution."""
265+
cancel_event = threading.Event()
266+
267+
async def quick() -> str:
268+
return 'fast'
269+
270+
result = _run_with_graceful_shutdown(
271+
_cancellable_run(cancel_event, quick()),
272+
)
273+
self.assertEqual(result, 'fast')
274+
self.assertFalse(cancel_event.is_set())
275+
276+
def test_callbacks_from_cancelled_tasks_still_drain(self) -> None:
277+
"""Background task callbacks drain even when task is cancelled."""
278+
drained: List[bool] = []
279+
280+
async def bg_with_callback() -> None:
281+
loop = asyncio.get_running_loop()
282+
loop.call_soon(lambda: drained.append(True))
283+
await asyncio.sleep(999)
284+
285+
async def main() -> str:
286+
asyncio.create_task(bg_with_callback())
287+
await asyncio.sleep(0.05) # Let background task start
288+
return 'done'
289+
290+
result = _run_with_graceful_shutdown(main())
291+
self.assertEqual(result, 'done')
292+
self.assertEqual(drained, [True])
293+
294+
295+
if __name__ == '__main__':
296+
unittest.main()

0 commit comments

Comments
 (0)