Skip to content

Commit af76672

Browse files
authored
Merge pull request #6 from RustedBytes/compio
compio integration
2 parents 6398ffd + ab6d818 commit af76672

14 files changed

Lines changed: 1523 additions & 229 deletions

Cargo.lock

Lines changed: 423 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "rsloop"
3-
version = "0.1.4"
3+
version = "0.1.5"
44
edition = "2021"
55
description = "An event loop for asyncio written in Rust"
66
license = "Apache-2.0"
@@ -19,6 +19,7 @@ profiler = ["dep:pprof"]
1919

2020
[dependencies]
2121
async-std = "1"
22+
compio = { version = "0.18", default-features = false, features = ["runtime", "net", "io", "signal", "time", "io-uring"] }
2223
crossbeam-channel = "0.5"
2324
futures = { version = "0.3", default-features = false, features = ["std"] }
2425
libc = "0.2"
@@ -41,7 +42,7 @@ opt-level = 3
4142
lto = "fat"
4243
codegen-units = 1
4344
incremental = false
44-
strip = true
45+
# strip = true
4546

4647
[profile.bench]
4748
inherits = "release"

README.md

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
`rsloop` is a PyO3-based `asyncio` event loop implemented in Rust.
99

1010
Each `rsloop.Loop` owns a dedicated Rust runtime thread for loop coordination
11-
and I/O work. Python callbacks, tasks, and coroutines still run on the thread
12-
that calls `run_forever()` or `run_until_complete()` (usually the main Python
13-
thread).
11+
and I/O work. On Linux, low-level fd watchers plus plain TCP / Unix socket
12+
readiness, socket reads, and non-TLS server accepts are driven from that thread
13+
through `compio` with `io_uring` support enabled. Python callbacks, tasks, and
14+
coroutines still run on the thread that calls `run_forever()` or
15+
`run_until_complete()` (usually the main Python thread).
1416

1517
The package exposes:
1618

@@ -146,6 +148,19 @@ Otherwise `rsloop` falls back to the stdlib `asyncio.streams` helpers.
146148
The implementation lives in `src/fast_streams.rs` and is backed by the lower
147149
level transport code in `src/stream_transport.rs`.
148150

151+
## Runtime Model
152+
153+
Today the runtime is hybrid rather than fully single-threaded:
154+
155+
- the loop coordination thread is always the central scheduler
156+
- on Linux, `add_reader` / `add_writer`, plain socket reads, and non-TLS socket
157+
accept loops use the `compio` runtime on that thread
158+
- some transport paths still fall back to helper threads, especially TLS I/O,
159+
TLS server accept, and parts of the legacy transport write path
160+
161+
That means the codebase has started the move toward a single-runtime-thread I/O
162+
model, but has not finished eliminating every helper thread yet.
163+
149164
## Current Limitations
150165

151166
These gaps are visible in the current implementation.
@@ -163,7 +178,9 @@ These gaps are visible in the current implementation.
163178
- TLS uses a `rustls` backend with a narrower compatibility surface than
164179
CPython's OpenSSL-backed `ssl` module. In particular, encrypted private keys
165180
are not supported yet, and the fast-stream monkeypatch still falls back to
166-
stdlib helpers whenever `ssl` is enabled.
181+
stdlib helpers whenever `ssl` is enabled. TLS transport internals also still
182+
use helper-thread paths instead of the newer runtime-thread `compio` socket
183+
path.
167184
- Subprocess support is intentionally incomplete:
168185
`preexec_fn` is unsupported, and text mode is rejected for
169186
`asyncio.create_subprocess_exec()` /
@@ -178,6 +195,10 @@ These gaps are visible in the current implementation.
178195
Unix socket APIs and Unix signal handlers remain Unix-only, and several
179196
subprocess options such as `pass_fds`, `user`, `group`, and `umask` are
180197
still specific to Unix process spawning.
198+
- The transport runtime model is still in transition:
199+
plain socket reads and non-TLS accepts now run on the loop runtime thread on
200+
Linux, but writes and TLS-heavy paths are not fully collapsed onto that same
201+
single-threaded I/O path yet.
181202

182203
## Build
183204

build.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
fn main() {
2+
pyo3_build_config::use_pyo3_cfgs();
23
pyo3_build_config::add_extension_module_link_args();
34
}

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "maturin"
44

55
[project]
66
name = "rsloop"
7-
version = "0.1.4"
7+
version = "0.1.5"
88
description = "An event loop for asyncio written in Rust"
99
readme = "README.md"
1010
license = { file = "LICENSE" }

src/context.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ pub fn run_in_context(
6565
needs_run: bool,
6666
callback: &Py<PyAny>,
6767
args: &Py<PyTuple>,
68-
_context_args: &Py<PyTuple>,
6968
) -> PyResult<Py<PyAny>> {
7069
if !needs_run {
7170
return callback.call1(py, args.clone_ref(py));
@@ -89,18 +88,6 @@ pub fn run_in_context(
8988
}
9089
}
9190

92-
pub fn build_context_args(
93-
py: Python<'_>,
94-
callback: &Py<PyAny>,
95-
args: &Py<PyTuple>,
96-
) -> PyResult<Py<PyTuple>> {
97-
let args_bound = args.bind(py);
98-
let mut run_args = Vec::with_capacity(args_bound.len() + 1);
99-
run_args.push(callback.clone_ref(py));
100-
run_args.extend(args_bound.iter().map(|item| item.unbind()));
101-
Ok(PyTuple::new(py, run_args)?.unbind())
102-
}
103-
10491
#[inline]
10592
pub fn ensure_running_loop(py: Python<'_>, loop_obj: &Py<PyAny>) -> PyResult<()> {
10693
set_running_loop_fn(py)?.call1(py, (loop_obj.clone_ref(py),))?;

src/fast_streams.rs

Lines changed: 95 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,11 @@ impl PyFastStreamReader {
106106
value: Py<PyAny>,
107107
) -> PyResult<()> {
108108
let future = future.bind(py);
109-
match future.call_method1(python_names::set_result(py), (value,)) {
109+
match python_names::call_method1(py, future, python_names::set_result(py), value.bind(py)) {
110110
Ok(_) => Ok(()),
111111
Err(err) => {
112-
if future
113-
.call_method0(python_names::cancelled(py))?
112+
if python_names::call_method0(py, future, python_names::cancelled(py))?
113+
.bind(py)
114114
.extract::<bool>()?
115115
{
116116
Ok(())
@@ -127,11 +127,12 @@ impl PyFastStreamReader {
127127
exc: Py<PyAny>,
128128
) -> PyResult<()> {
129129
let future = future.bind(py);
130-
match future.call_method1(python_names::set_exception(py), (exc,)) {
130+
match python_names::call_method1(py, future, python_names::set_exception(py), exc.bind(py))
131+
{
131132
Ok(_) => Ok(()),
132133
Err(err) => {
133-
if future
134-
.call_method0(python_names::cancelled(py))?
134+
if python_names::call_method0(py, future, python_names::cancelled(py))?
135+
.bind(py)
135136
.extract::<bool>()?
136137
{
137138
Ok(())
@@ -160,25 +161,28 @@ impl PyFastStreamReader {
160161
}
161162

162163
fn create_future(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
163-
self.loop_obj
164-
.bind(py)
165-
.call_method0(python_names::create_future(py))
166-
.map(Bound::unbind)
164+
python_names::call_method0(py, self.loop_obj.bind(py), python_names::create_future(py))
167165
}
168166

169167
fn ready_result_future(&self, py: Python<'_>, value: Py<PyAny>) -> PyResult<Py<PyAny>> {
170168
let future = self.create_future(py)?;
171-
future
172-
.bind(py)
173-
.call_method1(python_names::set_result(py), (value,))?;
169+
python_names::call_method1(
170+
py,
171+
future.bind(py),
172+
python_names::set_result(py),
173+
value.bind(py),
174+
)?;
174175
Ok(future)
175176
}
176177

177178
fn ready_exception_future(&self, py: Python<'_>, exc: Py<PyAny>) -> PyResult<Py<PyAny>> {
178179
let future = self.create_future(py)?;
179-
future
180-
.bind(py)
181-
.call_method1(python_names::set_exception(py), (exc,))?;
180+
python_names::call_method1(
181+
py,
182+
future.bind(py),
183+
python_names::set_exception(py),
184+
exc.bind(py),
185+
)?;
182186
Ok(future)
183187
}
184188

@@ -214,21 +218,23 @@ impl PyFastStreamReader {
214218
fn maybe_resume_transport(&mut self, py: Python<'_>) -> PyResult<()> {
215219
if self.paused && self.buffer.len() <= self.limit && !self.transport.bind(py).is_none() {
216220
self.paused = false;
217-
self.transport
218-
.bind(py)
219-
.call_method0(python_names::resume_reading(py))?;
221+
python_names::call_method0(
222+
py,
223+
self.transport.bind(py),
224+
python_names::resume_reading(py),
225+
)?;
220226
}
221227
Ok(())
222228
}
223229

224230
fn maybe_pause_transport(&mut self, py: Python<'_>) -> PyResult<()> {
225231
if !self.transport.bind(py).is_none() && !self.paused && self.buffer.len() > 2 * self.limit
226232
{
227-
match self
228-
.transport
229-
.bind(py)
230-
.call_method0(python_names::pause_reading(py))
231-
{
233+
match python_names::call_method0(
234+
py,
235+
self.transport.bind(py),
236+
python_names::pause_reading(py),
237+
) {
232238
Ok(_) => {
233239
self.paused = true;
234240
}
@@ -308,9 +314,11 @@ impl PyFastStreamReader {
308314
}
309315
if self.paused && !self.transport.bind(py).is_none() {
310316
self.paused = false;
311-
self.transport
312-
.bind(py)
313-
.call_method0(python_names::resume_reading(py))?;
317+
python_names::call_method0(
318+
py,
319+
self.transport.bind(py),
320+
python_names::resume_reading(py),
321+
)?;
314322
}
315323
let future = self.create_future(py)?;
316324
self.waiter = Some(ReadWaiter {
@@ -525,14 +533,16 @@ impl PyFastStreamProtocol {
525533
reader: Py<PyFastStreamReader>,
526534
client_connected_cb: Py<PyAny>,
527535
) -> PyResult<Self> {
528-
let closed = loop_obj.call_method0(py, "create_future")?;
529-
let ready_none = loop_obj
530-
.bind(py)
531-
.call_method0(python_names::create_future(py))?
532-
.unbind();
533-
ready_none
534-
.bind(py)
535-
.call_method1(python_names::set_result(py), (py.None(),))?;
536+
let closed =
537+
python_names::call_method0(py, loop_obj.bind(py), python_names::create_future(py))?;
538+
let ready_none =
539+
python_names::call_method0(py, loop_obj.bind(py), python_names::create_future(py))?;
540+
python_names::call_method1(
541+
py,
542+
ready_none.bind(py),
543+
python_names::set_result(py),
544+
py.None().bind(py),
545+
)?;
536546
Ok(Self {
537547
closed,
538548
ready_none,
@@ -560,42 +570,55 @@ impl PyFastStreamProtocol {
560570
}
561571

562572
fn ready_exception_future(&self, py: Python<'_>, exc: Py<PyAny>) -> PyResult<Py<PyAny>> {
563-
let future = self
564-
.loop_obj
565-
.bind(py)
566-
.call_method0(python_names::create_future(py))?
567-
.unbind();
568-
future
569-
.bind(py)
570-
.call_method1(python_names::set_exception(py), (exc,))?;
573+
let future = python_names::call_method0(
574+
py,
575+
self.loop_obj.bind(py),
576+
python_names::create_future(py),
577+
)?;
578+
python_names::call_method1(
579+
py,
580+
future.bind(py),
581+
python_names::set_exception(py),
582+
exc.bind(py),
583+
)?;
571584
Ok(future)
572585
}
573586

574587
fn push_drain_waiter(&mut self, py: Python<'_>) -> PyResult<Py<PyAny>> {
575-
let future = self
576-
.loop_obj
577-
.bind(py)
578-
.call_method0(python_names::create_future(py))?
579-
.unbind();
588+
let future = python_names::call_method0(
589+
py,
590+
self.loop_obj.bind(py),
591+
python_names::create_future(py),
592+
)?;
580593
self.drain_waiters.push(future.clone_ref(py));
581594
Ok(future)
582595
}
583596

584597
fn resolve_drain_waiters(&mut self, py: Python<'_>, exc: Option<Py<PyAny>>) -> PyResult<()> {
585598
for future in self.drain_waiters.drain(..) {
586599
let future = future.bind(py);
587-
if future
588-
.call_method0(python_names::done(py))?
600+
if python_names::call_method0(py, future, python_names::done(py))?
601+
.bind(py)
589602
.extract::<bool>()?
590603
{
591604
continue;
592605
}
593606
match exc.as_ref() {
594607
Some(exc) => {
595-
future.call_method1(python_names::set_exception(py), (exc.clone_ref(py),))?;
608+
python_names::call_method1(
609+
py,
610+
future,
611+
python_names::set_exception(py),
612+
exc.bind(py),
613+
)?;
596614
}
597615
None => {
598-
future.call_method1(python_names::set_result(py), (py.None(),))?;
616+
python_names::call_method1(
617+
py,
618+
future,
619+
python_names::set_result(py),
620+
py.None().bind(py),
621+
)?;
599622
}
600623
}
601624
}
@@ -690,16 +713,31 @@ impl PyFastStreamProtocol {
690713
self.reader
691714
.borrow_mut(py)
692715
.set_exception_internal(py, exc.clone_ref(py))?;
693-
if !self.closed.call_method0(py, "done")?.extract::<bool>(py)? {
694-
self.closed
695-
.call_method1(py, "set_exception", (exc.clone_ref(py),))?;
716+
if !python_names::call_method0(py, self.closed.bind(py), python_names::done(py))?
717+
.bind(py)
718+
.extract::<bool>()?
719+
{
720+
python_names::call_method1(
721+
py,
722+
self.closed.bind(py),
723+
python_names::set_exception(py),
724+
exc.bind(py),
725+
)?;
696726
}
697727
self.resolve_drain_waiters(py, Some(exc))?;
698728
}
699729
None => {
700730
self.reader.borrow_mut(py).feed_eof_internal(py)?;
701-
if !self.closed.call_method0(py, "done")?.extract::<bool>(py)? {
702-
self.closed.call_method1(py, "set_result", (py.None(),))?;
731+
if !python_names::call_method0(py, self.closed.bind(py), python_names::done(py))?
732+
.bind(py)
733+
.extract::<bool>()?
734+
{
735+
python_names::call_method1(
736+
py,
737+
self.closed.bind(py),
738+
python_names::set_result(py),
739+
py.None().bind(py),
740+
)?;
703741
}
704742
self.resolve_drain_waiters(py, None)?;
705743
}

0 commit comments

Comments
 (0)