Skip to content

Commit 1d00d86

Browse files
committed
Use separate timer for function call; not thread safe
1 parent 2e874a7 commit 1d00d86

File tree

2 files changed

+29
-77
lines changed

2 files changed

+29
-77
lines changed

singlestoredb/functions/ext/asgi.py

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,6 @@ def build_udf_endpoint(
319319

320320
async def do_func(
321321
cancel_event: threading.Event,
322-
finished_event: threading.Event,
323322
timer: Timer,
324323
row_ids: Sequence[int],
325324
rows: Sequence[Sequence[Any]],
@@ -333,7 +332,6 @@ async def do_func(
333332
out.append(await func(*row))
334333
else:
335334
out.append(func(*row))
336-
finished_event.set()
337335
return row_ids, list(zip(out))
338336

339337
return do_func
@@ -367,7 +365,6 @@ def build_vector_udf_endpoint(
367365

368366
async def do_func(
369367
cancel_event: threading.Event,
370-
finished_event: threading.Event,
371368
timer: Timer,
372369
row_ids: Sequence[int],
373370
cols: Sequence[Tuple[Sequence[Any], Optional[Sequence[bool]]]],
@@ -391,7 +388,6 @@ async def do_func(
391388
else:
392389
out = func()
393390

394-
finished_event.set()
395391
cancel_on_event(cancel_event)
396392

397393
# Single masked value
@@ -434,7 +430,6 @@ def build_tvf_endpoint(
434430

435431
async def do_func(
436432
cancel_event: threading.Event,
437-
finished_event: threading.Event,
438433
timer: Timer,
439434
row_ids: Sequence[int],
440435
rows: Sequence[Sequence[Any]],
@@ -452,7 +447,6 @@ async def do_func(
452447
res = func(*row)
453448
out.extend(as_list_of_tuples(res))
454449
out_ids.extend([row_ids[i]] * (len(out)-len(out_ids)))
455-
finished_event.set()
456450
return out_ids, out
457451

458452
return do_func
@@ -485,7 +479,6 @@ def build_vector_tvf_endpoint(
485479

486480
async def do_func(
487481
cancel_event: threading.Event,
488-
finished_event: threading.Event,
489482
timer: Timer,
490483
row_ids: Sequence[int],
491484
cols: Sequence[Tuple[Sequence[Any], Optional[Sequence[bool]]]],
@@ -517,8 +510,6 @@ async def do_func(
517510
else:
518511
func_res = func()
519512

520-
finished_event.set()
521-
522513
res = get_dataframe_columns(func_res)
523514

524515
cancel_on_event(cancel_event)
@@ -981,6 +972,12 @@ async def __call__(
981972
datetime.timezone.utc,
982973
).strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
983974
)
975+
call_timer = Timer(
976+
id=request_id,
977+
timestamp=datetime.datetime.now(
978+
datetime.timezone.utc,
979+
).strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
980+
)
984981

985982
assert scope['type'] == 'http'
986983

@@ -997,6 +994,7 @@ async def __call__(
997994
func_endpoint = self.endpoints.get(func_name)
998995

999996
timer.metadata['function'] = func_name.decode('utf-8') if func_name else ''
997+
call_timer.metadata['function'] = timer.metadata['function']
1000998

1001999
func = None
10021000
func_info: Dict[str, Any] = {}
@@ -1037,23 +1035,18 @@ async def __call__(
10371035
result = []
10381036

10391037
cancel_event = threading.Event()
1040-
finished_event = threading.Event()
1041-
1042-
# Async functions don't need to set the finished event
1043-
if func_info['is_async']:
1044-
finished_event.set()
10451038

10461039
with timer('parse_input'):
10471040
inputs = input_handler['load']( # type: ignore
10481041
func_info['colspec'], b''.join(data),
10491042
)
10501043

10511044
func_task = asyncio.create_task(
1052-
func(cancel_event, finished_event, timer, *inputs)
1045+
func(cancel_event, call_timer, *inputs)
10531046
if func_info['is_async']
10541047
else to_thread(
10551048
lambda: asyncio.run(
1056-
func(cancel_event, finished_event, timer, *inputs),
1049+
func(cancel_event, call_timer, *inputs),
10571050
),
10581051
),
10591052
)
@@ -1073,9 +1066,6 @@ async def __call__(
10731066

10741067
await cancel_all_tasks(pending)
10751068

1076-
# Make sure threads finish before we proceed
1077-
finished_event.wait()
1078-
10791069
for task in done:
10801070
if task is disconnect_task:
10811071
cancel_event.set()
@@ -1163,6 +1153,9 @@ async def __call__(
11631153
out['body'] = body
11641154
await send(out)
11651155

1156+
for k, v in call_timer.metrics.items():
1157+
timer.metrics[k] = v
1158+
11661159
timer.finish()
11671160

11681161
def _create_link(

singlestoredb/functions/ext/timer.py

Lines changed: 17 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import time
33
from typing import Any
44
from typing import Dict
5-
from typing import List
5+
from typing import Optional
66

77
from . import utils
88

@@ -46,93 +46,52 @@ class Timer:
4646
print(timer.metrics)
4747
# {'receive_data': 0.1, 'parse_input': 0.2, 'inner_operation': 0.05,
4848
# 'call_function': 0.35, 'total': 0.65}
49+
4950
"""
5051

5152
def __init__(self, **kwargs: Any) -> None:
52-
"""
53-
Initialize the Timer.
54-
55-
Parameters
56-
----------
57-
metrics : Dict[str, float]
58-
Dictionary to store the timing results
59-
60-
"""
6153
self.metadata: Dict[str, Any] = kwargs
6254
self.metrics: Dict[str, float] = dict()
63-
self._stack: List[Dict[str, Any]] = []
55+
self.entries: Dict[str, float] = dict()
56+
self._current_key: Optional[str] = None
6457
self.start_time = time.perf_counter()
6558

6659
def __call__(self, key: str) -> 'Timer':
67-
"""
68-
Set the key for the next context manager usage.
69-
70-
Parameters
71-
----------
72-
key : str
73-
The key to store the execution time under
74-
75-
Returns
76-
-------
77-
Timer
78-
Self, to be used as context manager
79-
80-
"""
8160
self._current_key = key
8261
return self
8362

8463
def __enter__(self) -> 'Timer':
85-
"""Enter the context manager and start timing."""
86-
if not hasattr(self, '_current_key'):
64+
if self._current_key is None:
8765
raise ValueError(
8866
"No key specified. Use timer('key_name') as context manager.",
8967
)
90-
91-
# Push current timing info onto stack
92-
timing_info = {
93-
'key': self._current_key,
94-
'start_time': time.perf_counter(),
95-
}
96-
self._stack.append(timing_info)
97-
98-
# Clear current key for next use
99-
delattr(self, '_current_key')
100-
68+
self.entries[self._current_key] = time.perf_counter()
10169
return self
10270

10371
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
104-
"""Exit the context manager and store the elapsed time."""
105-
if not self._stack:
106-
return
107-
108-
# Pop the current timing from stack
109-
timing_info = self._stack.pop()
110-
elapsed = time.perf_counter() - timing_info['start_time']
111-
self.metrics.setdefault(timing_info['key'], 0)
112-
self.metrics[timing_info['key']] += elapsed
72+
key = self._current_key
73+
if key and key in self.entries:
74+
start = self.entries.pop(key)
75+
elapsed = time.perf_counter() - start
76+
self.metrics[key] = elapsed
77+
self._current_key = None
11378

11479
async def __aenter__(self) -> 'Timer':
115-
"""Async enter for async context manager support."""
11680
return self.__enter__()
11781

11882
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
119-
"""Async exit for async context manager support."""
12083
self.__exit__(exc_type, exc_val, exc_tb)
12184

85+
def reset(self) -> None:
86+
self.metrics.clear()
87+
self.entries.clear()
88+
self._current_key = None
89+
12290
def finish(self) -> None:
12391
"""Finish the current timing context and store the elapsed time."""
124-
if self._stack:
125-
raise RuntimeError('finish() called within a `with` block.')
126-
12792
self.metrics['total'] = time.perf_counter() - self.start_time
128-
12993
self.log_metrics()
13094

131-
def reset(self) -> None:
132-
"""Clear all stored times and reset the stack."""
133-
self.metrics.clear()
134-
self._stack.clear()
135-
13695
def log_metrics(self) -> None:
13796
if self.metadata.get('function'):
13897
result = dict(type='function_metrics', **self.metadata, **self.metrics)

0 commit comments

Comments
 (0)