Skip to content

Commit 645bbad

Browse files
Pipe: rename and fix docs.
1 parent 4fe21ea commit 645bbad

File tree

1 file changed

+50
-47
lines changed

1 file changed

+50
-47
lines changed

src/sync/pipe.rs

Lines changed: 50 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ use std::{cmp, io};
77
use std::{collections::VecDeque, pin::Pin};
88
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
99

10-
/// Unidirectional in-memory stream of bytes implementing `AsyncRead` and `AsyncWrite`.
11-
/// Non-thread-safe equivalent of [`tokio::io::SimplexStream`](https://docs.rs/tokio/latest/tokio/io/struct.SimplexStream.html).
10+
/// Unidirectional in-memory pipe implementing `AsyncRead` and `AsyncWrite`.
11+
/// A more efficient version of [`tokio::io::SimplexStream`](https://docs.rs/tokio/latest/tokio/io/struct.SimplexStream.html)
12+
/// optimized for single-threaded use cases.
1213
#[derive(Debug)]
1314
pub struct Pipe {
1415
buffer: VecDeque<u8>,
@@ -19,7 +20,7 @@ pub struct Pipe {
1920
}
2021

2122
impl Pipe {
22-
/// Creates a new `Pipe` with the specified maximum buffer size.
23+
/// Create a new `Pipe` with a fixed-size pre-allocated buffer of `max_buf_size` bytes.
2324
pub fn new(max_buf_size: usize) -> Self {
2425
Self {
2526
buffer: VecDeque::with_capacity(max_buf_size),
@@ -30,6 +31,12 @@ impl Pipe {
3031
}
3132
}
3233

34+
/// Split the pipe into non-[`Send`] owned readable and writable ends.
35+
pub fn into_split(self) -> (ReadEnd, WriteEnd) {
36+
let pipe = Rc::new(UnsafeCell::new(self));
37+
(ReadEnd(pipe.clone()), WriteEnd(pipe))
38+
}
39+
3340
fn close_write(&mut self) {
3441
self.is_closed = true;
3542
if let Some(waker) = self.read_waker.take() {
@@ -173,21 +180,13 @@ impl AsyncWrite for Pipe {
173180
}
174181
}
175182

176-
/// Creates a unidirectional in-memory pipe with the specified maximum buffer size.
177-
/// Returns the readable and writable halves of the pipe.
178-
/// Non-thread-safe equivalent of [`tokio::io::simplex`](https://docs.rs/tokio/latest/tokio/io/fn.simplex.html).
179-
pub fn pipe(max_buf_size: usize) -> (Reader, Writer) {
180-
let pipe = Rc::new(UnsafeCell::new(Pipe::new(max_buf_size)));
181-
(Reader(pipe.clone()), Writer(pipe))
182-
}
183-
184-
/// The readable half of a value returned from [`pipe`].
185-
pub struct Reader(Rc<UnsafeCell<Pipe>>);
183+
/// The readable end of a [`Pipe`]. Not thread-safe.
184+
pub struct ReadEnd(Rc<UnsafeCell<Pipe>>);
186185

187-
/// The writable half of a value returned from [`pipe`].
188-
pub struct Writer(Rc<UnsafeCell<Pipe>>);
186+
/// The writable end of a [`Pipe`]. Not thread-safe.
187+
pub struct WriteEnd(Rc<UnsafeCell<Pipe>>);
189188

190-
impl AsyncRead for Reader {
189+
impl AsyncRead for ReadEnd {
191190
fn poll_read(
192191
mut self: Pin<&mut Self>,
193192
cx: &mut Context<'_>,
@@ -198,14 +197,14 @@ impl AsyncRead for Reader {
198197
}
199198
}
200199

201-
impl Drop for Reader {
200+
impl Drop for ReadEnd {
202201
fn drop(&mut self) {
203202
// SAFETY: exclusive access is guaranteed by the single-threaded context
204203
unsafe { self.0.with_unchecked(|pipe| pipe.close_read()) }
205204
}
206205
}
207206

208-
impl AsyncWrite for Writer {
207+
impl AsyncWrite for WriteEnd {
209208
fn poll_write(
210209
mut self: Pin<&mut Self>,
211210
cx: &mut Context<'_>,
@@ -242,66 +241,70 @@ impl AsyncWrite for Writer {
242241
}
243242
}
244243

245-
impl Drop for Writer {
244+
impl Drop for WriteEnd {
246245
fn drop(&mut self) {
247246
// SAFETY: exclusive access is guaranteed by the single-threaded context
248247
unsafe { self.0.with_unchecked(|pipe| pipe.close_write()) }
249248
}
250249
}
251250

252-
/// Create a pair of connected [`DuplexPipe`]s with the given capacity.
253-
pub fn duplex_pipe(max_buf_size: usize) -> (DuplexPipe, DuplexPipe) {
254-
let (read1, write1) = pipe(max_buf_size);
255-
let (read2, write2) = pipe(max_buf_size);
256-
(DuplexPipe(read1, write2), DuplexPipe(read2, write1))
251+
/// Create a bi-directional in-memory stream of bytes using two [`Pipe`]s in opposite directions.
252+
/// Non-thread-safe equivalent of [`tokio::io::duplex`](https://docs.rs/tokio/latest/tokio/io/fn.duplex.html).
253+
/// # Returns
254+
/// A tuple containing two connected [`DuplexEnd`]s. Each end can be used for both reading and writing.
255+
/// Data written to one end can be read from the other end and vice versa.
256+
pub fn duplex_pipe(max_buf_size: usize) -> (DuplexEnd, DuplexEnd) {
257+
let (read1, write1) = Pipe::new(max_buf_size).into_split();
258+
let (read2, write2) = Pipe::new(max_buf_size).into_split();
259+
(DuplexEnd(read1, write2), DuplexEnd(read2, write1))
257260
}
258261

259262
/// Bidirectional in-memory stream of bytes implementing `AsyncRead` and `AsyncWrite`.
260263
/// Non-thread-safe equivalent of [`tokio::io::DuplexStream`](https://docs.rs/tokio/latest/tokio/io/struct.DuplexStream.html).
261-
pub struct DuplexPipe(Reader, Writer);
264+
pub struct DuplexEnd(ReadEnd, WriteEnd);
262265

263-
impl DuplexPipe {
264-
/// Splits the `DuplexPipe` into owned readable and writable halves.
265-
pub fn into_split(self) -> (Reader, Writer) {
266-
let DuplexPipe(read, write) = self;
266+
impl DuplexEnd {
267+
/// Splits the [`DuplexEnd`] into owned readable and writable halves.
268+
pub fn into_split(self) -> (ReadEnd, WriteEnd) {
269+
let DuplexEnd(read, write) = self;
267270
(read, write)
268271
}
269272

270-
/// Splits the `DuplexPipe` into mutable references to the readable and writable halves.
271-
pub fn split(&mut self) -> (&mut Reader, &mut Writer) {
272-
let DuplexPipe(read, write) = self;
273+
/// Splits the [`DuplexEnd`] into mutable references to the readable and writable halves.
274+
pub fn split(&mut self) -> (&mut ReadEnd, &mut WriteEnd) {
275+
let DuplexEnd(read, write) = self;
273276
(read, write)
274277
}
275278
}
276279

277-
impl AsyncRead for DuplexPipe {
280+
impl AsyncRead for DuplexEnd {
278281
fn poll_read(
279282
self: Pin<&mut Self>,
280283
cx: &mut Context<'_>,
281284
buf: &mut ReadBuf<'_>,
282285
) -> Poll<io::Result<()>> {
283-
let DuplexPipe(read, _write) = self.get_mut();
286+
let DuplexEnd(read, _write) = self.get_mut();
284287
Pin::new(read).poll_read(cx, buf)
285288
}
286289
}
287290

288-
impl AsyncWrite for DuplexPipe {
291+
impl AsyncWrite for DuplexEnd {
289292
fn poll_write(
290293
self: Pin<&mut Self>,
291294
cx: &mut Context<'_>,
292295
buf: &[u8],
293296
) -> Poll<Result<usize, io::Error>> {
294-
let DuplexPipe(_read, write) = self.get_mut();
297+
let DuplexEnd(_read, write) = self.get_mut();
295298
Pin::new(write).poll_write(cx, buf)
296299
}
297300

298301
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
299-
let DuplexPipe(_read, write) = self.get_mut();
302+
let DuplexEnd(_read, write) = self.get_mut();
300303
Pin::new(write).poll_flush(cx)
301304
}
302305

303306
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
304-
let DuplexPipe(_read, write) = self.get_mut();
307+
let DuplexEnd(_read, write) = self.get_mut();
305308
Pin::new(write).poll_shutdown(cx)
306309
}
307310
}
@@ -314,7 +317,7 @@ mod tests {
314317

315318
#[test]
316319
fn test_write_then_read() {
317-
let (mut reader, mut writer) = pipe(1024);
320+
let (mut reader, mut writer) = Pipe::new(1024).into_split();
318321

319322
let data = b"Hello, world!";
320323
let mut write_task = spawn(writer.write_all(data));
@@ -332,7 +335,7 @@ mod tests {
332335

333336
#[test]
334337
fn test_reader_notifies_writer() {
335-
let (mut reader, mut writer) = pipe(7);
338+
let (mut reader, mut writer) = Pipe::new(7).into_split();
336339

337340
let data = b"Hello, world!";
338341
let mut write_task = spawn(writer.write_all(data));
@@ -359,7 +362,7 @@ mod tests {
359362

360363
#[test]
361364
fn test_writer_notifies_reader() {
362-
let (mut reader, mut writer) = pipe(1024);
365+
let (mut reader, mut writer) = Pipe::new(1024).into_split();
363366

364367
let mut buf = Vec::new();
365368
let mut read_task = spawn(reader.read_buf(&mut buf));
@@ -380,7 +383,7 @@ mod tests {
380383

381384
#[test]
382385
fn test_partial_read() {
383-
let (mut reader, mut writer) = pipe(1024);
386+
let (mut reader, mut writer) = Pipe::new(1024).into_split();
384387

385388
let data = b"Hello, world!";
386389
let mut write_task = spawn(writer.write_all(data));
@@ -405,7 +408,7 @@ mod tests {
405408

406409
#[test]
407410
fn test_drop_writer() {
408-
let (mut reader, mut writer) = pipe(1024);
411+
let (mut reader, mut writer) = Pipe::new(1024).into_split();
409412
assert_ready!(spawn(writer.write_all(b"Hello, world!")).poll()).unwrap();
410413

411414
drop(writer);
@@ -419,7 +422,7 @@ mod tests {
419422

420423
#[test]
421424
fn test_drop_writer_notify_reader() {
422-
let (mut reader, writer) = pipe(1024);
425+
let (mut reader, writer) = Pipe::new(1024).into_split();
423426

424427
let mut buf = Vec::new();
425428
let mut read_task = spawn(reader.read_buf(&mut buf));
@@ -435,7 +438,7 @@ mod tests {
435438

436439
#[test]
437440
fn test_drop_reader() {
438-
let (reader, mut writer) = pipe(1024);
441+
let (reader, mut writer) = Pipe::new(1024).into_split();
439442

440443
drop(reader);
441444
let data = b"Hello, world!";
@@ -447,7 +450,7 @@ mod tests {
447450

448451
#[test]
449452
fn test_drop_reader_notify_writer() {
450-
let (reader, mut writer) = pipe(5);
453+
let (reader, mut writer) = Pipe::new(5).into_split();
451454

452455
let data = b"Hello, world!";
453456
let mut write_task = spawn(writer.write_all(data));
@@ -463,7 +466,7 @@ mod tests {
463466

464467
#[test]
465468
fn test_non_contiguous_internal_buffer() {
466-
let (mut reader, mut writer) = pipe(4);
469+
let (mut reader, mut writer) = Pipe::new(4).into_split();
467470

468471
assert_ready!(spawn(writer.write_all(b"1234")).poll()).unwrap();
469472

0 commit comments

Comments
 (0)