-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathasync_runner.py
More file actions
194 lines (149 loc) · 6.81 KB
/
async_runner.py
File metadata and controls
194 lines (149 loc) · 6.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
"""Asyncio event loop runner executed in a background thread.
Provides a small utility to run an asyncio event loop in its own thread and
schedule coroutines or callbacks onto it from arbitrary threads.
Usage:
runner = AsyncRunner()
runner.start()
# schedule a coroutine (returns concurrent.futures.Future)
fut = runner.submit(some_coro(arg=1))
# wait for result
result = fut.result()
runner.stop()
Also supports context manager:
with AsyncRunner() as runner:
result = runner.run_coroutine(some_coro(), timeout=5)
"""
from __future__ import annotations
import asyncio
import concurrent.futures
import threading
import traceback
from typing import Any, Callable, Optional
class AsyncRunner:
"""Run an asyncio loop in a dedicated thread."""
def __init__(self) -> None:
self._loop: asyncio.AbstractEventLoop | None = None
self._thread: threading.Thread | None = None
self._ready_evt = threading.Event()
self._stopped_evt = threading.Event()
self._exc_in_thread: BaseException | None = None
self._lock = threading.RLock()
# --------------------------- lifecycle ---------------------------
def start(self) -> None:
"""Start the event loop thread if not already started."""
with self._lock:
if self._thread and self._thread.is_alive():
return
self._ready_evt.clear()
self._stopped_evt.clear()
self._exc_in_thread = None
def target() -> None:
try:
loop = asyncio.new_event_loop()
self._loop = loop
asyncio.set_event_loop(loop)
self._ready_evt.set()
# Run forever until stop() posts loop.stop()
loop.run_forever()
# After loop.stop(), attempt graceful cleanup
try:
self._cancel_pending(loop)
finally:
loop.close()
except BaseException as e: # capture any fatal error in the thread
self._exc_in_thread = e
traceback.print_exc()
finally:
self._stopped_evt.set()
self._thread = threading.Thread(target=target, name="AsyncLoop", daemon=True)
self._thread.start()
# wait until the loop is created and installed in the thread
self._ready_evt.wait()
# if thread failed during startup, surface the exception now
if self._exc_in_thread:
raise RuntimeError("AsyncRunner failed to start") from self._exc_in_thread
def stop(self, join_timeout: float = 5.0) -> None:
"""Stop the loop and join the thread.
Args:
join_timeout: maximum time (seconds) to wait for the thread to join.
"""
with self._lock:
loop = self._loop
thread = self._thread
if loop and loop.is_running():
try:
loop.call_soon_threadsafe(loop.stop)
except RuntimeError:
# loop may already be closed
pass
# Wait for the loop thread to finish cleanup
self._stopped_evt.wait(timeout=join_timeout)
with self._lock:
if thread and thread.is_alive():
# give it a bit more time; if still alive, we detach (daemon)
thread.join(timeout=join_timeout)
self._thread = None
self._loop = None
# ------------------------ scheduling API -------------------------
def submit(self, coro: "asyncio.coroutines.Coroutine[Any, Any, Any]") -> concurrent.futures.Future:
"""Schedule a coroutine for execution (thread-safe).
Returns a concurrent.futures.Future (not an asyncio.Future).
"""
loop = self._ensure_running_loop()
return asyncio.run_coroutine_threadsafe(coro, loop)
def call_soon(self, callback: Callable[..., Any], *args: Any) -> None:
"""Thread-safe call to schedule a callback on the loop ASAP."""
loop = self._ensure_running_loop()
loop.call_soon_threadsafe(callback, *args)
def run_sync(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> concurrent.futures.Future:
"""Run a blocking function in the loop's default executor.
Returns a concurrent.futures.Future (use .result() to wait).
"""
loop = self._ensure_running_loop()
return asyncio.run_coroutine_threadsafe(loop.run_in_executor(None, lambda: func(*args, **kwargs)), loop)
def run_coroutine(self, coro: "asyncio.coroutines.Coroutine[Any, Any, Any]", timeout: Optional[float] = None) -> Any:
"""Submit a coroutine and (optionally) wait for its result with a timeout."""
fut = self.submit(coro)
return fut.result(timeout=timeout)
# ---------------------------- helpers ----------------------------
def is_running(self) -> bool:
"""Return True if the event loop thread is running."""
loop = self._loop
return bool(loop and loop.is_running())
def get_loop(self) -> asyncio.AbstractEventLoop:
"""Get the underlying event loop (raises if not started)."""
return self._ensure_running_loop()
def _ensure_running_loop(self) -> asyncio.AbstractEventLoop:
"""Internal: ensure loop is up and running, else raise."""
# Fast path
if self._loop and self._loop.is_running():
return self._loop
# If not ready yet, wait shortly
if not self._ready_evt.is_set():
self._ready_evt.wait(timeout=1.0)
if self._exc_in_thread:
raise RuntimeError("AsyncRunner thread crashed") from self._exc_in_thread
if not (self._loop and self._loop.is_running()):
raise RuntimeError("AsyncRunner is not running. Call start() first.")
return self._loop
@staticmethod
def _cancel_pending(loop: asyncio.AbstractEventLoop) -> None:
"""Cancel all pending tasks on the loop and run one iteration to let them finalize."""
try:
pending = [t for t in asyncio.all_tasks(loop=loop) if not t.done()]
for t in pending:
t.cancel()
if pending:
# Run a brief loop iteration to allow cancellations to propagate
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
except Exception:
# We don't re-raise during shutdown
traceback.print_exc()
# ---------------------- context manager API ----------------------
def __enter__(self) -> "AsyncRunner":
self.start()
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.stop()
# Backwards compatible alias if someone imported AsyncLoopRunner, etc.
__all__ = ["AsyncRunner"]