Skip to content

Commit f605217

Browse files
kesmit13claude
andcommitted
Use thread-per-request for async UDFs instead of shared event loop
The dedicated shared event loop still caused starvation under concurrent async UDF calls. Switch to the same model used by sync UDFs: each request gets its own thread with asyncio.run(), eliminating loop contention. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 35a6658 commit f605217

2 files changed

Lines changed: 29 additions & 81 deletions

File tree

singlestoredb/apps/_python_udfs.py

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,8 @@
1010
if typing.TYPE_CHECKING:
1111
from ._uvicorn_util import AwaitableUvicornServer
1212

13-
# Keep track of currently running server and app
13+
# Keep track of currently running server
1414
_running_server: 'typing.Optional[AwaitableUvicornServer]' = None
15-
_running_app: typing.Optional[Application] = None
1615

1716
# Maximum number of UDFs allowed
1817
MAX_UDFS_LIMIT = 10
@@ -22,7 +21,7 @@ async def run_udf_app(
2221
log_level: str = 'error',
2322
kill_existing_app_server: bool = True,
2423
) -> UdfConnectionInfo:
25-
global _running_server, _running_app
24+
global _running_server
2625
from ._uvicorn_util import AwaitableUvicornServer
2726

2827
try:
@@ -39,9 +38,6 @@ async def run_udf_app(
3938
if _running_server is not None:
4039
await _running_server.shutdown()
4140
_running_server = None
42-
if _running_app is not None:
43-
_running_app.shutdown()
44-
_running_app = None
4541

4642
# Kill if any other process is occupying the port
4743
kill_process_by_port(app_config.listen_port)
@@ -58,32 +54,27 @@ async def run_udf_app(
5854
log_level=log_level,
5955
)
6056

61-
try:
62-
if not app.endpoints:
63-
raise ValueError('You must define at least one function.')
64-
if len(app.endpoints) > MAX_UDFS_LIMIT:
65-
raise ValueError(
66-
f'You can only define a maximum of {MAX_UDFS_LIMIT} functions.',
67-
)
68-
69-
config = uvicorn.Config(
70-
app,
71-
host='0.0.0.0',
72-
port=app_config.listen_port,
73-
log_config=app.get_uvicorn_log_config(),
57+
if not app.endpoints:
58+
raise ValueError('You must define at least one function.')
59+
if len(app.endpoints) > MAX_UDFS_LIMIT:
60+
raise ValueError(
61+
f'You can only define a maximum of {MAX_UDFS_LIMIT} functions.',
7462
)
7563

76-
# Register the functions only if the app is running interactively.
77-
if app_config.running_interactively:
78-
app.register_functions(replace=True)
79-
80-
_running_app = app
81-
_running_server = AwaitableUvicornServer(config)
82-
asyncio.create_task(_running_server.serve())
83-
await _running_server.wait_for_startup()
84-
except Exception:
85-
app.shutdown()
86-
raise
64+
config = uvicorn.Config(
65+
app,
66+
host='0.0.0.0',
67+
port=app_config.listen_port,
68+
log_config=app.get_uvicorn_log_config(),
69+
)
70+
71+
# Register the functions only if the app is running interactively.
72+
if app_config.running_interactively:
73+
app.register_functions(replace=True)
74+
75+
_running_server = AwaitableUvicornServer(config)
76+
asyncio.create_task(_running_server.serve())
77+
await _running_server.wait_for_startup()
8778

8879
print(f'Python UDF registered at {base_url}')
8980

singlestoredb/functions/ext/asgi.py

Lines changed: 8 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
"""
2525
import argparse
2626
import asyncio
27-
import concurrent.futures
2827
import contextvars
2928
import dataclasses
3029
import datetime
@@ -1001,9 +1000,6 @@ def __init__(
10011000
self.log_level = log_level
10021001
self.disable_metrics = disable_metrics
10031002

1004-
self._udf_loop: Optional[asyncio.AbstractEventLoop] = None
1005-
self._udf_thread: Optional[threading.Thread] = None
1006-
10071003
# Configure logging
10081004
self._configure_logging()
10091005

@@ -1037,27 +1033,6 @@ def _configure_logging(self) -> None:
10371033
# Prevent propagation to avoid duplicate or differently formatted messages
10381034
self.logger.propagate = False
10391035

1040-
def _get_udf_loop(self) -> asyncio.AbstractEventLoop:
1041-
"""Get or create the dedicated UDF event loop."""
1042-
if self._udf_loop is None:
1043-
self._udf_loop = asyncio.new_event_loop()
1044-
self._udf_thread = threading.Thread(
1045-
target=self._udf_loop.run_forever,
1046-
daemon=True,
1047-
name='async-udf-loop',
1048-
)
1049-
self._udf_thread.start()
1050-
return self._udf_loop
1051-
1052-
def shutdown(self) -> None:
1053-
"""Shut down the dedicated UDF event loop."""
1054-
if self._udf_loop is not None:
1055-
self._udf_loop.call_soon_threadsafe(self._udf_loop.stop)
1056-
if self._udf_thread is not None:
1057-
self._udf_thread.join(timeout=5)
1058-
self._udf_loop = None
1059-
self._udf_thread = None
1060-
10611036
def get_uvicorn_log_config(self) -> Dict[str, Any]:
10621037
"""
10631038
Create uvicorn log config that matches the Application's logging format.
@@ -1206,7 +1181,6 @@ async def __call__(
12061181
try:
12071182
all_tasks = []
12081183
result = []
1209-
udf_future: 'Optional[concurrent.futures.Future[Any]]' = None
12101184

12111185
cancel_event = threading.Event()
12121186

@@ -1215,23 +1189,13 @@ async def __call__(
12151189
func_info['colspec'], b''.join(data),
12161190
)
12171191

1218-
func_task: 'asyncio.Task[Any]'
1219-
if func_info['is_async']:
1220-
udf_future = asyncio.run_coroutine_threadsafe(
1221-
func(cancel_event, call_timer, *inputs),
1222-
self._get_udf_loop(),
1223-
)
1224-
func_task = asyncio.ensure_future(
1225-
asyncio.wrap_future(udf_future),
1226-
)
1227-
else:
1228-
func_task = asyncio.create_task(
1229-
to_thread(
1230-
lambda: asyncio.run(
1231-
func(cancel_event, call_timer, *inputs),
1232-
),
1192+
func_task = asyncio.create_task(
1193+
to_thread(
1194+
lambda: asyncio.run(
1195+
func(cancel_event, call_timer, *inputs),
12331196
),
1234-
)
1197+
),
1198+
)
12351199
disconnect_task = asyncio.create_task(
12361200
asyncio.sleep(int(1e9))
12371201
if ignore_cancel else cancel_on_disconnect(receive),
@@ -1248,23 +1212,18 @@ async def __call__(
12481212
)
12491213

12501214
await cancel_all_tasks(pending)
1251-
if func_task in pending and udf_future is not None:
1215+
if func_task in pending:
12521216
cancel_event.set()
1253-
udf_future.cancel()
12541217

12551218
for task in done:
12561219
if task is disconnect_task:
12571220
cancel_event.set()
1258-
if udf_future is not None:
1259-
udf_future.cancel()
12601221
raise asyncio.CancelledError(
12611222
'Function call was cancelled by client disconnect',
12621223
)
12631224

12641225
elif task is timeout_task:
12651226
cancel_event.set()
1266-
if udf_future is not None:
1267-
udf_future.cancel()
12681227
raise asyncio.TimeoutError(
12691228
'Function call was cancelled due to timeout',
12701229
)
@@ -1327,9 +1286,7 @@ async def __call__(
13271286
await send(self.error_response_dict)
13281287

13291288
finally:
1330-
if udf_future is not None:
1331-
cancel_event.set()
1332-
udf_future.cancel()
1289+
cancel_event.set()
13331290
await cancel_all_tasks(all_tasks)
13341291

13351292
# Handle api reflection

0 commit comments

Comments
 (0)