Skip to content

Commit b84d9ea

Browse files
kesmit13claude
andcommitted
Fix udf_future NameError and lazily initialize UDF event loop
- Move udf_future initialization before input_handler['load']() to prevent NameError in finally block if parsing raises - Lazily create UDF event loop on first async UDF invocation instead of unconditionally in __init__, avoiding wasted resources for sync-only or metadata-only usage - Guard shutdown() against None loop/thread Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 35e92d5 commit b84d9ea

1 file changed

Lines changed: 20 additions & 12 deletions

File tree

singlestoredb/functions/ext/asgi.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,14 +1001,8 @@ def __init__(
10011001
self.log_level = log_level
10021002
self.disable_metrics = disable_metrics
10031003

1004-
# Dedicated event loop for async UDF execution, isolated from the server loop
1005-
self._udf_loop = asyncio.new_event_loop()
1006-
self._udf_thread = threading.Thread(
1007-
target=self._udf_loop.run_forever,
1008-
daemon=True,
1009-
name='async-udf-loop',
1010-
)
1011-
self._udf_thread.start()
1004+
self._udf_loop: Optional[asyncio.AbstractEventLoop] = None
1005+
self._udf_thread: Optional[threading.Thread] = None
10121006

10131007
# Configure logging
10141008
self._configure_logging()
@@ -1043,10 +1037,24 @@ def _configure_logging(self) -> None:
10431037
# Prevent propagation to avoid duplicate or differently formatted messages
10441038
self.logger.propagate = False
10451039

1040+
def _get_udf_loop(self) -> asyncio.AbstractEventLoop:
1041+
"""Get or create the dedicated UDF event loop."""
1042+
if self._udf_loop is None:
1043+
self._udf_loop = asyncio.new_event_loop()
1044+
self._udf_thread = threading.Thread(
1045+
target=self._udf_loop.run_forever,
1046+
daemon=True,
1047+
name='async-udf-loop',
1048+
)
1049+
self._udf_thread.start()
1050+
return self._udf_loop
1051+
10461052
def shutdown(self) -> None:
10471053
"""Shut down the dedicated UDF event loop."""
1048-
self._udf_loop.call_soon_threadsafe(self._udf_loop.stop)
1049-
self._udf_thread.join(timeout=5)
1054+
if self._udf_loop is not None:
1055+
self._udf_loop.call_soon_threadsafe(self._udf_loop.stop)
1056+
if self._udf_thread is not None:
1057+
self._udf_thread.join(timeout=5)
10501058

10511059
def get_uvicorn_log_config(self) -> Dict[str, Any]:
10521060
"""
@@ -1196,6 +1204,7 @@ async def __call__(
11961204
try:
11971205
all_tasks = []
11981206
result = []
1207+
udf_future: 'Optional[concurrent.futures.Future[Any]]' = None
11991208

12001209
cancel_event = threading.Event()
12011210

@@ -1205,11 +1214,10 @@ async def __call__(
12051214
)
12061215

12071216
func_task: 'asyncio.Task[Any]'
1208-
udf_future: 'Optional[concurrent.futures.Future[Any]]' = None
12091217
if func_info['is_async']:
12101218
udf_future = asyncio.run_coroutine_threadsafe(
12111219
func(cancel_event, call_timer, *inputs),
1212-
self._udf_loop,
1220+
self._get_udf_loop(),
12131221
)
12141222
func_task = asyncio.ensure_future(
12151223
asyncio.wrap_future(udf_future),

0 commit comments

Comments
 (0)