Skip to content

Commit 8060118

Browse files
Add split() and Pipe
1 parent d5d14b9 commit 8060118

7 files changed

Lines changed: 466 additions & 2 deletions

File tree

.github/workflows/ci.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,8 @@ jobs:
6363
steps:
6464
- uses: actions/checkout@v4
6565
- uses: dtolnay/rust-toolchain@stable
66-
- run: cargo test --doc
66+
- run: cargo test --doc
67+
- run: cargo doc --lib --no-deps --all-features --document-private-items
68+
env:
69+
RUSTDOCFLAGS: -Dwarnings
70+
RUSTFLAGS: -Dwarnings

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"rust-analyzer.runnables.extraTestBinaryArgs": [
2020
"--nocapture"
2121
],
22+
"rust-analyzer.cargo.features": "all",
2223
"gitlens.codeLens.enabled": false,
2324
"editor.semanticTokenColorCustomizations": {
2425
"rules": {

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ tokio-time = ["dep:tokio"]
1515
[dependencies]
1616
futures = "0.3.31"
1717
log = "0.4.27"
18-
tokio = { version = "1.45.1", optional = true, features = ["time"] }
18+
tokio = { version = "1.48.0", optional = true, features = ["time"] }
1919

2020
[dev-dependencies]
2121
static_assertions = "1.1.0"
2222
tokio-test = "0.4.4"
23+
tokio = { version = "1.48.0", features = ["time", "io-util"] }
2324

2425
[profile.dev]
2526
opt-level = 3

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
pub mod sealed;
22
pub mod shared;
3+
#[cfg(feature = "tokio-time")]
4+
pub mod split;
35
pub mod stopwatch;
46
pub mod sync;
57
mod time;

src/split.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use std::cell::RefCell;
2+
use std::io;
3+
use std::pin::Pin;
4+
use std::rc::Rc;
5+
use std::task::{Context, Poll};
6+
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
7+
8+
/// The readable half of a value returned from [`split`].
9+
pub struct ReadHalf<T: AsyncRead>(Rc<RefCell<T>>);
10+
11+
/// The writable half of a value returned from [`split`].
12+
pub struct WriteHalf<T: AsyncWrite>(Rc<RefCell<T>>);
13+
14+
/// Splits a single value implementing `AsyncRead + AsyncWrite` into separate `AsyncRead` and `AsyncWrite` handles.
15+
/// Non-thread-safe equivalent of [`tokio::io::split`](https://docs.rs/tokio/latest/tokio/io/fn.split.html) without the overhead of a mutex.
16+
pub fn split<T: AsyncRead + AsyncWrite>(value: T) -> (ReadHalf<T>, WriteHalf<T>) {
17+
let shared = Rc::new(RefCell::new(value));
18+
(ReadHalf(shared.clone()), WriteHalf(shared))
19+
}
20+
21+
fn with_pin<T, R>(half: &RefCell<T>, f: impl FnOnce(Pin<&mut T>) -> R) -> R {
22+
let mut guard = half.borrow_mut();
23+
24+
// SAFETY: we do not move the stream
25+
let stream = unsafe { Pin::new_unchecked(&mut *guard) };
26+
27+
f(stream)
28+
}
29+
30+
impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
31+
fn poll_read(
32+
self: Pin<&mut Self>,
33+
cx: &mut Context<'_>,
34+
buf: &mut ReadBuf<'_>,
35+
) -> Poll<io::Result<()>> {
36+
with_pin(&self.0, |inner| inner.poll_read(cx, buf))
37+
}
38+
}
39+
40+
impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
41+
fn poll_write(
42+
self: Pin<&mut Self>,
43+
cx: &mut Context<'_>,
44+
buf: &[u8],
45+
) -> Poll<Result<usize, io::Error>> {
46+
with_pin(&self.0, |inner| inner.poll_write(cx, buf))
47+
}
48+
49+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
50+
with_pin(&self.0, |inner| inner.poll_flush(cx))
51+
}
52+
53+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
54+
with_pin(&self.0, |inner| inner.poll_shutdown(cx))
55+
}
56+
57+
fn poll_write_vectored(
58+
self: Pin<&mut Self>,
59+
cx: &mut Context<'_>,
60+
bufs: &[io::IoSlice<'_>],
61+
) -> Poll<Result<usize, io::Error>> {
62+
with_pin(&self.0, |inner| inner.poll_write_vectored(cx, bufs))
63+
}
64+
65+
fn is_write_vectored(&self) -> bool {
66+
self.0.borrow().is_write_vectored()
67+
}
68+
}

src/sync/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ pub mod bounded;
44
pub mod condvar;
55
pub mod error;
66
pub mod oneshot;
7+
#[cfg(feature = "tokio-time")]
8+
pub mod pipe;
79
pub mod semaphore;
810
mod shared_state;
911
pub mod unbounded;

0 commit comments

Comments
 (0)