diff --git a/Cargo.lock b/Cargo.lock index f1905a9c85f..307f3edd1c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4276,6 +4276,7 @@ version = "0.8.0" dependencies = [ "clap", "fluent", + "rustix", "uucore", ] diff --git a/src/uu/tee/Cargo.toml b/src/uu/tee/Cargo.toml index 70246151c10..61fae37fb60 100644 --- a/src/uu/tee/Cargo.toml +++ b/src/uu/tee/Cargo.toml @@ -24,6 +24,10 @@ clap = { workspace = true } uucore = { workspace = true, features = ["libc", "parser", "signals"] } fluent = { workspace = true } +[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies] +rustix = { workspace = true, features = ["pipe"] } +uucore = { workspace = true, features = ["pipes"] } + [[bin]] name = "tee" path = "src/main.rs" diff --git a/src/uu/tee/src/tee.rs b/src/uu/tee/src/tee.rs index 792f1c8c2fc..8cb73911068 100644 --- a/src/uu/tee/src/tee.rs +++ b/src/uu/tee/src/tee.rs @@ -149,8 +149,88 @@ struct MultiWriter { impl MultiWriter { /// Copies all bytes from the input buffer to the output buffer /// without buffering which is POSIX requirement. - pub fn copy_unbuffered(&mut self, mut input: R) -> Result<()> { - // todo: support splice() and tee() fast-path at here + pub fn copy_unbuffered(&mut self, mut input: NamedReader) -> Result<()> { + #[cfg(any(target_os = "linux", target_os = "android"))] + { + use ::rustix::fd::AsFd; + use uucore::pipes::{MAX_ROOTLESS_PIPE_SIZE, pipe, splice, splice_exact}; + let (pipe_read, pipe_write) = pipe()?; // needed to duplicate input + let (pipe2_read, pipe2_write) = pipe()?; // force-tee() even output is not pipe + let input = input.inner.as_fd(); + let mode = self.output_error_mode; + // improve throughput + let _ = rustix::pipe::fcntl_setpipe_size( + self.writers[0].inner.as_fd(), + MAX_ROOTLESS_PIPE_SIZE, + ); + 'splice: loop { + let mut aborted = None; + match splice(&input, &pipe_write, MAX_ROOTLESS_PIPE_SIZE) { + Ok(0) => return Ok(()), + Err(_) => break 'splice, + Ok(s) => { + let w_len = self.writers.len(); + // len - 1 outputs do not consume input + for other in &mut self.writers[..w_len - 1] { + assert_eq!( + uucore::pipes::tee(&pipe_read, &pipe2_write, s), + Ok(s), + "tee() between internal pipes should not be blocked" + ); + let fd = other.inner.as_fd(); + if splice_exact(&pipe2_read, &fd, s).is_err() { + // fallback with proper error message + debug_assert!(s <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage"); + let mut drain = Vec::with_capacity(s); + let mut reader = (&pipe2_read).take(s as u64); + let res = (|| { + reader.read_to_end(&mut drain)?; + other.inner.write_all(&drain)?; + other.inner.flush() + })(); + if let Err(e) = res { + if let Err(e) = + process_error(mode, e, other, &mut self.ignored_errors) + { + aborted.get_or_insert(e); + } + other.name.clear(); //mark as exited + } + } + } + // last one consumes input + if let Some(last) = self.writers.last_mut() { + if splice_exact(&pipe_read, &last.inner.as_fd(), s).is_err() { + // fallback with proper error message + debug_assert!(s <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage"); + let mut drain = Vec::with_capacity(s); + let mut reader = (&pipe_read).take(s as u64); + let res = (|| { + reader.read_to_end(&mut drain)?; + last.inner.write_all(&drain)?; + last.inner.flush() + })(); + if let Err(e) = res { + if let Err(e) = + process_error(mode, e, last, &mut self.ignored_errors) + { + aborted.get_or_insert(e); + } + last.name.clear(); //mark as exited + } + } + } + } + } + self.writers.retain(|w| !w.name.is_empty()); + if let Some(e) = aborted { + return Err(e); + } + if self.writers.is_empty() { + return Err(Error::from(ErrorKind::Other)); + } + } + } // The implementation for this function is adopted from the generic buffer copy implementation from // the standard library: // https://github.com/rust-lang/rust/blob/2feb91181882e525e698c4543063f4d0296fcf91/library/std/src/io/copy.rs#L271-L297 @@ -158,8 +238,11 @@ impl MultiWriter { // Use buffer size from std implementation // https://github.com/rust-lang/rust/blob/2feb91181882e525e698c4543063f4d0296fcf91/library/std/src/sys/io/mod.rs#L44 const BUF_SIZE: usize = 8 * 1024; + #[cfg(not(any(target_os = "linux", target_os = "android")))] let mut buffer = [0u8; BUF_SIZE]; - // fast-path for small input. needs 2+ read to catch end of file + // fast-path for small input on the platform missing splice + // needs 2+ read to catch end of file + #[cfg(not(any(target_os = "linux", target_os = "android")))] for _ in 0..2 { match input.read(&mut buffer) { Ok(0) => return Ok(()), // end of file @@ -289,3 +372,13 @@ impl Read for NamedReader { }) } } + +#[cfg(any(target_os = "linux", target_os = "android"))] +impl rustix::fd::AsFd for Writer { + fn as_fd(&self) -> rustix::fd::BorrowedFd<'_> { + match self { + Self::File(f) => f.as_fd(), + Self::Stdout(s) => s.as_fd(), + } + } +} diff --git a/src/uucore/src/lib/features/pipes.rs b/src/uucore/src/lib/features/pipes.rs index 292cf4c4162..458611db30c 100644 --- a/src/uucore/src/lib/features/pipes.rs +++ b/src/uucore/src/lib/features/pipes.rs @@ -231,3 +231,10 @@ pub fn dev_null() -> Option { None } } + +// Less noisy wrapper around [`rustix::pipe::tee`] +#[inline] +#[cfg(any(target_os = "linux", target_os = "android"))] +pub fn tee(source: &impl AsFd, target: &impl AsFd, len: usize) -> rustix::io::Result { + rustix::pipe::tee(source, target, len, SpliceFlags::empty()) +}