-
Notifications
You must be signed in to change notification settings - Fork 175
Expand file tree
/
Copy pathcompat.py
More file actions
100 lines (82 loc) · 2.68 KB
/
compat.py
File metadata and controls
100 lines (82 loc) · 2.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""
A compatibility module for backported codes from Python 3.6+ standard library.
"""
from __future__ import annotations
import asyncio
from typing import Any
__all__ = (
"all_tasks",
"asyncio_run",
"asyncio_run_forever",
"current_loop",
)
if hasattr(asyncio, "get_running_loop"): # Python 3.7+
current_loop = asyncio.get_running_loop
else:
current_loop = asyncio.get_event_loop
if hasattr(asyncio, "all_tasks"): # Python 3.7+
all_tasks = asyncio.all_tasks
else:
all_tasks = asyncio.Task.all_tasks # type: ignore
def _cancel_all_tasks(loop: asyncio.AbstractEventLoop) -> None:
to_cancel = all_tasks(loop)
if not to_cancel:
return
for task in to_cancel:
task.cancel()
loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True))
for task in to_cancel:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler({
"message": "unhandled exception during asyncio.run() shutdown",
"exception": task.exception(),
"task": task,
})
def _asyncio_run(coro: Any, *, debug: bool = False) -> Any:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(debug)
try:
return loop.run_until_complete(coro)
finally:
try:
_cancel_all_tasks(loop)
if hasattr(loop, "shutdown_asyncgens"): # Python 3.6+
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.stop()
loop.close()
asyncio.set_event_loop(None)
if hasattr(asyncio, "run"): # Python 3.7+
asyncio_run = asyncio.run
else:
asyncio_run = _asyncio_run # type: ignore[assignment]
def asyncio_run_forever(server_context: Any, *, debug: bool = False) -> Any:
"""
A proposed-but-not-implemented asyncio.run_forever() API based on
@vxgmichel's idea.
See discussions on https://github.com/python/asyncio/pull/465
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(debug)
forever = loop.create_future()
async def _run_forever() -> None:
async with server_context:
try:
await forever
except asyncio.CancelledError:
pass
try:
return loop.run_until_complete(_run_forever())
finally:
try:
_cancel_all_tasks(loop)
if hasattr(loop, "shutdown_asyncgens"): # Python 3.6+
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.stop()
loop.close()
asyncio.set_event_loop(None)