Add the ability to cancel UDFs by client disconnect or timeout#64
Add the ability to cancel UDFs by client disconnect or timeout#64
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR adds the ability to cancel User-Defined Functions (UDFs) by client disconnect or timeout. The changes introduce async UDF support, timeout configuration, cancellation mechanisms, and comprehensive testing for the new timeout and async functionality.
- Adds async UDF support with
async/awaitsyntax - Implements timeout-based cancellation with configurable timeouts
- Introduces client disconnect detection and cancellation
- Adds comprehensive metrics and logging for function execution
Reviewed Changes
Copilot reviewed 14 out of 15 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| singlestoredb/functions/decorator.py | Adds timeout parameter and async function wrapper support |
| singlestoredb/functions/ext/asgi.py | Implements cancellation logic, timeout handling, and execution metrics |
| singlestoredb/functions/ext/timer.py | New timer utility for function execution metrics |
| singlestoredb/config.py | Adds external_function.timeout configuration option |
| singlestoredb/tests/ext_funcs/init.py | Updates type imports and adds async test functions |
| singlestoredb/tests/test_ext_func.py | Adds comprehensive tests for timeout and async functionality |
| singlestoredb/tests/test.sql | Adds longer_data table for timeout testing |
| singlestoredb/management/init.py | Adds manage_regions import |
| singlestoredb/functions/typing/*.py | New typing modules for cleaner imports |
Comments suppressed due to low confidence (2)
singlestoredb/tests/test_ext_func.py:171
- The assertion is inside the context manager but should be outside. The exception won't be available until after the context manager exits.
assert 'timeout' in str(exc.exception).lower()
singlestoredb/tests/test_ext_func.py:200
- The assertion is inside the context manager but should be outside. The exception won't be available until after the context manager exits.
assert 'timeout' in str(exc.exception).lower()
| for i, res in zip(row_ids, func_map(func, rows)): | ||
| out.extend(as_list_of_tuples(res)) | ||
| out_ids.extend([row_ids[i]] * (len(out)-len(out_ids))) | ||
| async with timer('call_function'): |
There was a problem hiding this comment.
The variable i is used in the loop but row_ids[i] should be i since we're iterating with zip(row_ids, rows) and i is already the row_id value.
| with timer('receive_data'): | ||
| while more_body: | ||
| request = await receive() | ||
| if request['type'] == 'http.disconnect': |
There was a problem hiding this comment.
This disconnect check during data reception will raise a RuntimeError, but the main cancellation logic expects asyncio.CancelledError. This inconsistency could lead to unhandled exceptions.
| else: | ||
| res = func(*row) | ||
| out.extend(as_list_of_tuples(res)) | ||
| out_ids.extend([row_ids[i]] * (len(out)-len(out_ids))) |
There was a problem hiding this comment.
[nitpick] The calculation [row_ids[i]] * (len(out)-len(out_ids)) is complex and error-prone. Consider storing the previous length and using that for clarity.
| out_ids.extend([row_ids[i]] * (len(out)-len(out_ids))) | |
| prev_len_out = len(out) | |
| out_ids.extend([row_ids[i]] * (prev_len_out - len(out_ids))) |
No description provided.