Skip to content

Commit d2a8b29

Browse files
authored
Merge pull request #26 from RustedBytes/reduce-py
move more to rust side
2 parents 66c577d + a274c1a commit d2a8b29

4 files changed

Lines changed: 221 additions & 183 deletions

File tree

python/rsloop/__init__.py

Lines changed: 9 additions & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,15 @@
77
import ssl as __ssl
88
import sys as __sys
99
import typing as __typing
10-
import asyncio.base_events as __asyncio_base_events
11-
import io as __io
10+
11+
from ._loop import PyLoop as Loop
12+
from ._loop import __version__
13+
from ._loop import new_event_loop
14+
from ._loop import start_server as __start_server
15+
from ._loop import open_connection as __open_connection
16+
from ._loop import profiler_running
17+
from ._loop import start_profiler
18+
from ._loop import stop_profiler
1219

1320
__DLL_DIR_HANDLES: list[object] = []
1421

@@ -49,14 +56,6 @@ def __configure_windows_dll_search_path() -> None:
4956

5057
__configure_windows_dll_search_path()
5158

52-
from ._loop import PyLoop as Loop
53-
from ._loop import __version__
54-
from ._loop import open_connection as __open_connection
55-
from ._loop import profiler_running as __profiler_running
56-
from ._loop import start_profiler as __start_profiler
57-
from ._loop import start_server as __start_server
58-
from ._loop import stop_profiler as __stop_profiler
59-
6059
__all__: tuple[str, ...] = (
6160
"Loop",
6261
"__version__",
@@ -102,24 +101,6 @@ def __set_event_loop(loop: Loop | None) -> None:
102101
__asyncio.events.set_event_loop = __set_event_loop
103102

104103

105-
def new_event_loop() -> Loop:
106-
return Loop()
107-
108-
109-
def profiler_running() -> bool:
110-
return __profiler_running()
111-
112-
113-
def start_profiler() -> None:
114-
"""Start a Tracy profiling session."""
115-
__start_profiler()
116-
117-
118-
def stop_profiler() -> None:
119-
"""Stop the active Tracy profiling session."""
120-
__stop_profiler()
121-
122-
123104
@__contextlib.contextmanager
124105
def profile() -> __typing.Iterator[None]:
125106
"""Context manager wrapper around ``start_profiler()`` / ``stop_profiler()``."""
@@ -141,130 +122,17 @@ def profile() -> __typing.Iterator[None]:
141122
__ORIG_SOCK_RECVFROM_INTO = getattr(Loop, "sock_recvfrom_into", None)
142123
__ORIG_SOCK_SENDTO = getattr(Loop, "sock_sendto", None)
143124
__ORIG_SOCK_SENDFILE = getattr(Loop, "sock_sendfile", None)
144-
__ORIG_RUN_FOREVER = Loop.run_forever
145-
__ORIG_RUN_UNTIL_COMPLETE = Loop.run_until_complete
146-
__ORIG_SHUTDOWN_ASYNCGENS = Loop.shutdown_asyncgens
147-
__ORIG_CLOSE = Loop.close
148-
__ORIG_CREATE_TASK = Loop.create_task
149125
__USE_FAST_STREAMS = __os.environ.get("RSLOOP_USE_FAST_STREAMS", "1") != "0"
150-
__ASYNCGEN_STATE: dict[Loop, dict[str, object]] = {}
151-
__LOOP_CONFIG: dict[Loop, dict[str, object]] = {}
152126

153127
if __USE_FAST_STREAMS and __asyncio.open_connection is __ORIG_OPEN_CONNECTION:
154128
__asyncio.open_connection = __open_connection
155129
if __USE_FAST_STREAMS and __asyncio.start_server is __ORIG_START_SERVER:
156130
__asyncio.start_server = __start_server
157131

158132
_asyncio = __asyncio
159-
_io = __io
160133
_os = __os
161134

162135

163-
def __get_asyncgen_state(loop: Loop) -> dict[str, object]:
164-
state = __ASYNCGEN_STATE.get(loop)
165-
if state is None:
166-
state = {
167-
"active": set(),
168-
"shutdown_called": False,
169-
"old_hooks": None,
170-
}
171-
__ASYNCGEN_STATE[loop] = state
172-
return state
173-
174-
175-
def __get_loop_config(loop: Loop) -> dict[str, object]:
176-
config = __LOOP_CONFIG.get(loop)
177-
if config is None:
178-
config = {
179-
"slow_callback_duration": 0.1,
180-
}
181-
__LOOP_CONFIG[loop] = config
182-
return config
183-
184-
185-
@__contextlib.contextmanager
186-
def __asyncgen_hooks_installed(loop: Loop) -> __typing.Iterator[None]:
187-
state = __get_asyncgen_state(loop)
188-
old_hooks = __sys.get_asyncgen_hooks()
189-
state["old_hooks"] = old_hooks
190-
__sys.set_asyncgen_hooks(
191-
firstiter=lambda agen: __asyncgen_firstiter_hook(loop, agen),
192-
finalizer=lambda agen: __asyncgen_finalizer_hook(loop, agen),
193-
)
194-
try:
195-
yield None
196-
finally:
197-
saved_hooks = state.get("old_hooks")
198-
if saved_hooks is not None:
199-
__sys.set_asyncgen_hooks(*saved_hooks)
200-
state["old_hooks"] = None
201-
202-
203-
def __asyncgen_firstiter_hook(loop: Loop, agen) -> None:
204-
state = __get_asyncgen_state(loop)
205-
if state["shutdown_called"]:
206-
import warnings as __warnings
207-
208-
__warnings.warn(
209-
f"asynchronous generator {agen!r} was scheduled after "
210-
f"loop.shutdown_asyncgens() call",
211-
ResourceWarning,
212-
source=loop,
213-
)
214-
state["active"].add(agen)
215-
216-
217-
def __asyncgen_finalizer_hook(loop: Loop, agen) -> None:
218-
state = __get_asyncgen_state(loop)
219-
state["active"].discard(agen)
220-
if not loop.is_closed():
221-
loop.call_soon_threadsafe(loop.create_task, agen.aclose())
222-
223-
224-
async def __loop_shutdown_asyncgens(self):
225-
state = __get_asyncgen_state(self)
226-
state["shutdown_called"] = True
227-
228-
if not state["active"]:
229-
return
230-
231-
closing_agens = list(state["active"])
232-
state["active"].clear()
233-
234-
results = await __asyncio.gather(
235-
*(agen.aclose() for agen in closing_agens),
236-
return_exceptions=True,
237-
)
238-
239-
for result, agen in zip(results, closing_agens):
240-
if isinstance(result, Exception):
241-
self.call_exception_handler(
242-
{
243-
"message": f"an error occurred during closing of asynchronous generator {agen!r}",
244-
"exception": result,
245-
"asyncgen": agen,
246-
}
247-
)
248-
249-
250-
def __loop_run_forever(self):
251-
with __asyncgen_hooks_installed(self):
252-
return __ORIG_RUN_FOREVER(self)
253-
254-
255-
def __loop_run_until_complete(self, future):
256-
with __asyncgen_hooks_installed(self):
257-
return __ORIG_RUN_UNTIL_COMPLETE(self, future)
258-
259-
260-
def __loop_close(self):
261-
try:
262-
return __ORIG_CLOSE(self)
263-
finally:
264-
__ASYNCGEN_STATE.pop(self, None)
265-
__LOOP_CONFIG.pop(self, None)
266-
267-
268136
def __cancel_all_tasks(loop: Loop) -> None:
269137
to_cancel = __asyncio.all_tasks(loop)
270138
if not to_cancel:
@@ -574,14 +442,6 @@ async def __loop_sock_sendfile(
574442
file.seek(offset + total_sent)
575443

576444

577-
def __get_slow_callback_duration(self):
578-
return __get_loop_config(self)["slow_callback_duration"]
579-
580-
581-
def __set_slow_callback_duration(self, value):
582-
__get_loop_config(self)["slow_callback_duration"] = float(value)
583-
584-
585445
async def __loop_create_datagram_endpoint(
586446
self,
587447
protocol_factory,
@@ -1522,24 +1382,6 @@ async def __loop_create_connection(
15221382
if __ORIG_SOCK_SENDFILE is None:
15231383
Loop.sock_sendfile = __loop_sock_sendfile
15241384

1525-
if not hasattr(Loop, "slow_callback_duration"):
1526-
Loop.slow_callback_duration = property(
1527-
__get_slow_callback_duration,
1528-
__set_slow_callback_duration,
1529-
)
1530-
1531-
if Loop.run_forever is __ORIG_RUN_FOREVER:
1532-
Loop.run_forever = __loop_run_forever
1533-
1534-
if Loop.run_until_complete is __ORIG_RUN_UNTIL_COMPLETE:
1535-
Loop.run_until_complete = __loop_run_until_complete
1536-
1537-
if Loop.shutdown_asyncgens is __ORIG_SHUTDOWN_ASYNCGENS:
1538-
Loop.shutdown_asyncgens = __loop_shutdown_asyncgens
1539-
1540-
if Loop.close is __ORIG_CLOSE:
1541-
Loop.close = __loop_close
1542-
15431385
# Keep the Rust implementation on the hot path. It already handles task
15441386
# factories and keyword forwarding, while the Python wrapper adds measurable
15451387
# overhead in task-heavy workloads.

src/lib.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ pub use fast_streams::{open_connection, start_server, PyFastStreamReader, PyFast
2121
pub use loop_core::{LoopCommand, LoopCore};
2222
pub use process_transport::{PyProcessPipeTransport, PyProcessTransport};
2323
pub use profiler::{profiler_running, start_profiler, stop_profiler};
24-
pub use python_api::{future_done_stop, new_event_loop, signal_bridge, PyLoop};
24+
pub use python_api::{
25+
asyncgen_finalizer_hook, asyncgen_firstiter_hook, future_done_stop, new_event_loop,
26+
signal_bridge, PyLoop,
27+
};
2528
pub use stream_transport::{PyServer, PyStreamTransport};
2629

2730
use pyo3::prelude::*;
@@ -39,6 +42,8 @@ fn _loop(m: &Bound<'_, PyModule>) -> PyResult<()> {
3942
m.add_class::<PyFastStreamReader>()?;
4043
m.add_class::<PyFastStreamWriter>()?;
4144
m.add_function(wrap_pyfunction!(new_event_loop, m)?)?;
45+
m.add_function(wrap_pyfunction!(asyncgen_firstiter_hook, m)?)?;
46+
m.add_function(wrap_pyfunction!(asyncgen_finalizer_hook, m)?)?;
4247
m.add_function(wrap_pyfunction!(future_done_stop, m)?)?;
4348
m.add_function(wrap_pyfunction!(open_connection, m)?)?;
4449
m.add_function(wrap_pyfunction!(profiler_running, m)?)?;
@@ -47,5 +52,13 @@ fn _loop(m: &Bound<'_, PyModule>) -> PyResult<()> {
4752
m.add_function(wrap_pyfunction!(start_server, m)?)?;
4853
m.add_function(wrap_pyfunction!(stop_profiler, m)?)?;
4954
m.add("_future_done_stop", m.getattr("future_done_stop")?)?;
55+
m.add(
56+
"_asyncgen_firstiter_hook",
57+
m.getattr("asyncgen_firstiter_hook")?,
58+
)?;
59+
m.add(
60+
"_asyncgen_finalizer_hook",
61+
m.getattr("asyncgen_finalizer_hook")?,
62+
)?;
5063
Ok(())
5164
}

src/loop_core.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::runtime::run_runtime_thread;
1818
use crate::stream_transport::{ReaderTarget, ServerCore, ServerListener, StreamTransportCore};
1919
use crossbeam_channel::Sender;
2020
use pyo3::prelude::*;
21-
use pyo3::types::{PyDict, PyTuple};
21+
use pyo3::types::{PyDict, PySet, PyTuple};
2222

2323
const READY_DRAIN_SLICE: usize = 64;
2424

@@ -135,7 +135,9 @@ pub struct LoopState {
135135
pub closed: bool,
136136
pub running: bool,
137137
pub stopping: bool,
138+
pub slow_callback_duration: f64,
138139
pub asyncgens_shutdown_called: bool,
140+
pub active_asyncgens: Option<Py<PySet>>,
139141
pub executor_shutdown_called: bool,
140142
pub signal_handlers: HashMap<i32, SignalHandlerTemplate>,
141143
pub previous_signal_handlers: HashMap<i32, Py<PyAny>>,
@@ -152,7 +154,9 @@ impl LoopState {
152154
closed: false,
153155
running: false,
154156
stopping: false,
157+
slow_callback_duration: 0.1,
155158
asyncgens_shutdown_called: false,
159+
active_asyncgens: None,
156160
executor_shutdown_called: false,
157161
signal_handlers: HashMap::new(),
158162
previous_signal_handlers: HashMap::new(),

0 commit comments

Comments
 (0)