Skip to content

Commit 3212d90

Browse files
Pipe: more efficient truncate_front + more docs
1 parent f4776b9 commit 3212d90

3 files changed

Lines changed: 66 additions & 18 deletions

File tree

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub mod prelude {
1414
pub use crate::sync::condvar as local_condvar;
1515
pub use crate::sync::error as local_sync_error;
1616
pub use crate::sync::oneshot as local_oneshot;
17+
pub use crate::sync::pipe as local_pipe;
1718
pub use crate::sync::semaphore as local_semaphore;
1819
pub use crate::sync::unbounded as local_unbounded;
1920
pub use crate::{

src/split.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//! Utilities for splitting `AsyncRead + AsyncWrite` types into separate read and write halves.
2+
13
use std::cell::RefCell;
24
use std::io;
35
use std::pin::Pin;

src/sync/pipe.rs

Lines changed: 63 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ impl Pipe {
5252
let (head, tail) = self.buffer.as_slices();
5353
let bytes_copied = copy_slice(buf, head) + copy_slice(buf, tail);
5454
if bytes_copied > 0 {
55-
truncate_front(&mut self.buffer, bytes_copied);
55+
self.buffer.drain(..bytes_copied); // FIXME: replace with truncate_front when stabilized
5656
if let Some(waker) = self.write_waker.take() {
5757
waker.wake();
5858
}
@@ -128,13 +128,6 @@ fn copy_slice(dest: &mut ReadBuf, src: &[u8]) -> usize {
128128
bytes_to_copy
129129
}
130130

131-
fn truncate_front(deq: &mut VecDeque<u8>, count: usize) {
132-
assert!(deq.len() >= count);
133-
let keep = deq.len() - count;
134-
deq.rotate_left(count);
135-
deq.truncate(keep);
136-
}
137-
138131
impl AsyncRead for Pipe {
139132
fn poll_read(
140133
self: Pin<&mut Self>,
@@ -182,18 +175,18 @@ impl AsyncWrite for Pipe {
182175
/// Creates a unidirectional in-memory pipe with the specified maximum buffer size.
183176
/// Returns the readable and writable halves of the pipe.
184177
/// Non-thread-safe equivalent of [`tokio::io::simplex`](https://docs.rs/tokio/latest/tokio/io/fn.simplex.html).
185-
pub fn pipe(max_buf_size: usize) -> (PipeReader, PipeWriter) {
178+
pub fn pipe(max_buf_size: usize) -> (Reader, Writer) {
186179
let pipe = Rc::new(UnsafeCell::new(Pipe::new(max_buf_size)));
187-
(PipeReader(pipe.clone()), PipeWriter(pipe))
180+
(Reader(pipe.clone()), Writer(pipe))
188181
}
189182

190183
/// The readable half of a value returned from [`pipe`].
191-
pub struct PipeReader(Rc<UnsafeCell<Pipe>>);
184+
pub struct Reader(Rc<UnsafeCell<Pipe>>);
192185

193186
/// The writable half of a value returned from [`pipe`].
194-
pub struct PipeWriter(Rc<UnsafeCell<Pipe>>);
187+
pub struct Writer(Rc<UnsafeCell<Pipe>>);
195188

196-
impl AsyncRead for PipeReader {
189+
impl AsyncRead for Reader {
197190
fn poll_read(
198191
mut self: Pin<&mut Self>,
199192
cx: &mut Context<'_>,
@@ -204,14 +197,14 @@ impl AsyncRead for PipeReader {
204197
}
205198
}
206199

207-
impl Drop for PipeReader {
200+
impl Drop for Reader {
208201
fn drop(&mut self) {
209202
// SAFETY: exclusive access is guaranteed by the single-threaded context
210203
unsafe { self.0.with_unchecked(|pipe| pipe.close_read()) }
211204
}
212205
}
213206

214-
impl AsyncWrite for PipeWriter {
207+
impl AsyncWrite for Writer {
215208
fn poll_write(
216209
mut self: Pin<&mut Self>,
217210
cx: &mut Context<'_>,
@@ -248,7 +241,7 @@ impl AsyncWrite for PipeWriter {
248241
}
249242
}
250243

251-
impl Drop for PipeWriter {
244+
impl Drop for Writer {
252245
fn drop(&mut self) {
253246
// SAFETY: exclusive access is guaranteed by the single-threaded context
254247
unsafe { self.0.with_unchecked(|pipe| pipe.close_write()) }
@@ -327,17 +320,43 @@ mod tests {
327320
assert_eq!(&buf[..], data);
328321
}
329322

323+
#[test]
324+
fn test_partial_read() {
325+
let (mut reader, mut writer) = pipe(1024);
326+
327+
let data = b"Hello, world!";
328+
let mut write_task = spawn(writer.write_all(data));
329+
let write_ret = assert_ready!(write_task.poll());
330+
assert!(write_ret.is_ok());
331+
drop(write_task);
332+
333+
let mut buf = [0u8; 7];
334+
335+
let mut read_task = spawn(reader.read_exact(&mut buf));
336+
let read_ret = assert_ready!(read_task.poll());
337+
assert!(read_ret.is_ok());
338+
drop(read_task);
339+
assert_eq!(&buf[..], b"Hello, ");
340+
341+
let mut buf_ref = &mut buf[..];
342+
let mut read_task = spawn(reader.read_buf(&mut buf_ref));
343+
let read_ret = assert_ready!(read_task.poll());
344+
assert!(read_ret.is_ok());
345+
assert_eq!(&buf[..], b"world! ");
346+
}
347+
330348
#[test]
331349
fn test_drop_writer() {
332-
let (mut reader, writer) = pipe(1024);
350+
let (mut reader, mut writer) = pipe(1024);
351+
assert_ready!(spawn(writer.write_all(b"Hello, world!")).poll()).unwrap();
333352

334353
drop(writer);
335354
let mut buf = Vec::new();
336355
let mut read_eof_task = spawn(reader.read_to_end(&mut buf));
337356
let read_eof_ret = assert_ready!(read_eof_task.poll());
338357
assert!(read_eof_ret.is_ok());
339358
drop(read_eof_task);
340-
assert!(buf.is_empty());
359+
assert_eq!(&buf[..], b"Hello, world!");
341360
}
342361

343362
#[test]
@@ -383,4 +402,30 @@ mod tests {
383402
let err = write_ret.err().unwrap();
384403
assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
385404
}
405+
406+
#[test]
407+
fn test_non_contiguous_internal_buffer() {
408+
let (mut reader, mut writer) = pipe(4);
409+
410+
assert_ready!(spawn(writer.write_all(b"1234")).poll()).unwrap();
411+
412+
let mut buf = [0u8; 2];
413+
assert_ready!(spawn(reader.read_exact(&mut buf)).poll()).unwrap();
414+
assert_eq!(&buf[..], b"12");
415+
416+
assert_ready!(spawn(writer.write_all(b"56")).poll()).unwrap();
417+
418+
unsafe {
419+
reader.0.with_unchecked(|pipe| {
420+
let (head, tail) = pipe.buffer.as_slices();
421+
assert!(!head.is_empty());
422+
assert!(!tail.is_empty());
423+
});
424+
}
425+
426+
let mut buf = Vec::new();
427+
let read_ret = assert_ready!(spawn(reader.read_buf(&mut buf)).poll());
428+
assert!(read_ret.is_ok());
429+
assert_eq!(&buf[..], b"3456");
430+
}
386431
}

0 commit comments

Comments
 (0)