Skip to content

Commit 6e84863

Browse files
kesmit13claude
andcommitted
Add cancellable wrapper for responsive async UDF cancellation
Wraps the inner coroutine in _cancellable_run which polls cancel_event and raises CancelledError at the next await (~100ms), ensuring vector UDFs respect disconnect/timeout signals without waiting for completion. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent f605217 commit 6e84863

1 file changed

Lines changed: 26 additions & 1 deletion

File tree

singlestoredb/functions/ext/asgi.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,28 @@ async def to_thread(
113113
return await loop.run_in_executor(None, func_call)
114114

115115

116+
async def _poll_cancel(cancel_event: threading.Event) -> None:
117+
while not cancel_event.is_set():
118+
await asyncio.sleep(0.1)
119+
120+
121+
async def _cancellable_run(
122+
cancel_event: threading.Event,
123+
coro: Any,
124+
) -> Any:
125+
task = asyncio.create_task(coro)
126+
cancel_check = asyncio.create_task(_poll_cancel(cancel_event))
127+
done, pending = await asyncio.wait(
128+
[task, cancel_check], return_when=asyncio.FIRST_COMPLETED,
129+
)
130+
for p in pending:
131+
p.cancel()
132+
if cancel_check in done:
133+
task.cancel()
134+
raise asyncio.CancelledError()
135+
return task.result()
136+
137+
116138
# Use negative values to indicate unsigned ints / binary data / usec time precision
117139
rowdat_1_type_map = {
118140
'bool': ft.LONGLONG,
@@ -1192,7 +1214,10 @@ async def __call__(
11921214
func_task = asyncio.create_task(
11931215
to_thread(
11941216
lambda: asyncio.run(
1195-
func(cancel_event, call_timer, *inputs),
1217+
_cancellable_run(
1218+
cancel_event,
1219+
func(cancel_event, call_timer, *inputs),
1220+
),
11961221
),
11971222
),
11981223
)

0 commit comments

Comments
 (0)