Skip to content

Commit aa1b8ed

Browse files
Merge branch 'cancel_udf' of https://github.com/singlestore-labs/singlestoredb-python into users/kaushik/udfs-temp
2 parents 45cf138 + 432c08a commit aa1b8ed

File tree

14 files changed

+667
-98
lines changed

14 files changed

+667
-98
lines changed

.github/workflows/code-check.yml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: Code checks
1+
name: Coverage tests
22

33
on:
44
push:
@@ -7,7 +7,6 @@ on:
77
branches: [ main ]
88
workflow_dispatch:
99

10-
1110
jobs:
1211
test-coverage:
1312
runs-on: ubuntu-latest
@@ -28,7 +27,7 @@ jobs:
2827
- name: Checkout code
2928
uses: actions/checkout@v4
3029
with:
31-
fetch-depth: 2
30+
fetch-depth: 0
3231

3332
- name: Set up Python
3433
uses: actions/setup-python@v4
@@ -61,7 +60,7 @@ jobs:
6160
# For pushes to main/master, compare against previous commit
6261
BASE_COMMIT="HEAD~1"
6362
echo "Push to main/master: Comparing against $BASE_COMMIT"
64-
else
63+
else:
6564
# For pushes to other branches, compare against master/main
6665
if git rev-parse --verify origin/main >/dev/null 2>&1; then
6766
BASE_COMMIT="origin/main"

.github/workflows/coverage.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ jobs:
2323

2424
steps:
2525
- uses: actions/checkout@v4
26+
with:
27+
fetch-depth: 0
2628

2729
- name: Set up Python
2830
uses: actions/setup-python@v4

singlestoredb/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,11 @@
438438
environ=['SINGLESTOREDB_EXT_FUNC_PORT'],
439439
)
440440

441+
register_option(
442+
'external_function.timeout', 'int', check_int, 24*60*60,
443+
'Specifies the timeout in seconds for processing a batch of rows.',
444+
environ=['SINGLESTOREDB_EXT_FUNC_TIMEOUT'],
445+
)
441446

442447
#
443448
# Debugging options

singlestoredb/functions/decorator.py

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import functools
23
import inspect
34
from typing import Any
@@ -19,6 +20,7 @@
1920
]
2021

2122
ReturnType = ParameterType
23+
UDFType = Callable[..., Any]
2224

2325

2426
def is_valid_type(obj: Any) -> bool:
@@ -100,38 +102,50 @@ def _func(
100102
name: Optional[str] = None,
101103
args: Optional[ParameterType] = None,
102104
returns: Optional[ReturnType] = None,
103-
) -> Callable[..., Any]:
105+
timeout: Optional[int] = None,
106+
) -> UDFType:
104107
"""Generic wrapper for UDF and TVF decorators."""
105108

106109
_singlestoredb_attrs = { # type: ignore
107110
k: v for k, v in dict(
108111
name=name,
109112
args=expand_types(args),
110113
returns=expand_types(returns),
114+
timeout=timeout,
111115
).items() if v is not None
112116
}
113117

114118
# No func was specified, this is an uncalled decorator that will get
115119
# called later, so the wrapper much be created with the func passed
116120
# in at that time.
117121
if func is None:
118-
def decorate(func: Callable[..., Any]) -> Callable[..., Any]:
122+
def decorate(func: UDFType) -> UDFType:
119123

120-
def wrapper(*args: Any, **kwargs: Any) -> Callable[..., Any]:
121-
return func(*args, **kwargs) # type: ignore
124+
if asyncio.iscoroutinefunction(func):
125+
async def async_wrapper(*args: Any, **kwargs: Any) -> UDFType:
126+
return await func(*args, **kwargs) # type: ignore
127+
async_wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
128+
return functools.wraps(func)(async_wrapper)
122129

123-
wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
124-
125-
return functools.wraps(func)(wrapper)
130+
else:
131+
def wrapper(*args: Any, **kwargs: Any) -> UDFType:
132+
return func(*args, **kwargs) # type: ignore
133+
wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
134+
return functools.wraps(func)(wrapper)
126135

127136
return decorate
128137

129-
def wrapper(*args: Any, **kwargs: Any) -> Callable[..., Any]:
130-
return func(*args, **kwargs) # type: ignore
131-
132-
wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
138+
if asyncio.iscoroutinefunction(func):
139+
async def async_wrapper(*args: Any, **kwargs: Any) -> UDFType:
140+
return await func(*args, **kwargs) # type: ignore
141+
async_wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
142+
return functools.wraps(func)(async_wrapper)
133143

134-
return functools.wraps(func)(wrapper)
144+
else:
145+
def wrapper(*args: Any, **kwargs: Any) -> UDFType:
146+
return func(*args, **kwargs) # type: ignore
147+
wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
148+
return functools.wraps(func)(wrapper)
135149

136150

137151
def udf(
@@ -140,7 +154,8 @@ def udf(
140154
name: Optional[str] = None,
141155
args: Optional[ParameterType] = None,
142156
returns: Optional[ReturnType] = None,
143-
) -> Callable[..., Any]:
157+
timeout: Optional[int] = None,
158+
) -> UDFType:
144159
"""
145160
Define a user-defined function (UDF).
146161
@@ -167,6 +182,9 @@ def udf(
167182
Specifies the return data type of the function. This parameter
168183
works the same way as `args`. If the function is a table-valued
169184
function, the return type should be a `Table` object.
185+
timeout : int, optional
186+
The timeout in seconds for the UDF execution. If not specified,
187+
the global default timeout is used.
170188
171189
Returns
172190
-------
@@ -178,4 +196,5 @@ def udf(
178196
name=name,
179197
args=args,
180198
returns=returns,
199+
timeout=timeout,
181200
)

0 commit comments

Comments
 (0)