Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/uu/tee/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ clap = { workspace = true }
uucore = { workspace = true, features = ["libc", "parser", "signals"] }
fluent = { workspace = true }

[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
rustix = { workspace = true, features = ["pipe"] }
uucore = { workspace = true, features = ["pipes"] }

[[bin]]
name = "tee"
path = "src/main.rs"
97 changes: 94 additions & 3 deletions src/uu/tee/src/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,98 @@ struct MultiWriter {
impl MultiWriter {
/// Copies all bytes from the input buffer to the output buffer
/// without buffering which is POSIX requirement.
pub fn copy_unbuffered<R: Read>(&mut self, mut input: R) -> Result<()> {
// todo: support splice() and tee() fast-path at here
pub fn copy_unbuffered(&mut self, mut input: NamedReader) -> Result<()> {
#[cfg(any(target_os = "linux", target_os = "android"))]
{
use ::rustix::fd::AsFd;
use uucore::pipes::{MAX_ROOTLESS_PIPE_SIZE, pipe, splice, splice_exact};
let (p_r, p_w) = pipe()?; // force-splice() even stdin is not pipe
let (p2_r, p2_w) = pipe()?; // force-tee() even output is not pipe
let input = input.inner.as_fd();
let mode = self.output_error_mode;
// improve throughput
let _ = rustix::pipe::fcntl_setpipe_size(
self.writers[0].inner.as_fd(),
MAX_ROOTLESS_PIPE_SIZE,
);
'splice: loop {
let mut aborted = None;
match splice(&input, &p_w, MAX_ROOTLESS_PIPE_SIZE) {
Ok(0) => return Ok(()),
Err(_) => break 'splice,
Ok(s) => {
let w_len = self.writers.len();
// len - 1 outputs do not consume input
for other in &mut self.writers[..w_len - 1] {
assert_eq!(
uucore::pipes::tee(&p_r, &p2_w, s),
Ok(s),
"tee() between internal pipes should not be blocked"
);
let fd = other.inner.as_fd();
if splice_exact(&p2_r, &fd, s).is_err() {
// fallback with proper error message
let mut drain = Vec::with_capacity(s);
let mut reader = (&p2_r).take(s as u64);
let res = (|| {
reader.read_to_end(&mut drain)?;
other.inner.write_all(&drain)?;
other.inner.flush()
})();
if let Err(e) = res {
if let Err(e) =
process_error(mode, e, other, &mut self.ignored_errors)
{
aborted.get_or_insert(e);
}
other.name.clear(); //mark as exited
}
}
}
// last one consumes input
if let Some(last) = self.writers.last_mut() {
if splice_exact(&p_r, &last.inner.as_fd(), s).is_err() {
// fallback with proper error message
let mut drain = Vec::with_capacity(s);
let mut reader = (&p_r).take(s as u64);
let res = (|| {
reader.read_to_end(&mut drain)?;
last.inner.write_all(&drain)?;
last.inner.flush()
})();
if let Err(e) = res {
if let Err(e) =
process_error(mode, e, last, &mut self.ignored_errors)
{
aborted.get_or_insert(e);
}
last.name.clear(); //mark as exited
}
}
}
}
}
self.writers.retain(|w| !w.name.is_empty());
if let Some(e) = aborted {
return Err(e);
}
if self.writers.is_empty() {
return Err(Error::from(ErrorKind::Other));
}
}
}
// The implementation for this function is adopted from the generic buffer copy implementation from
// the standard library:
// https://github.com/rust-lang/rust/blob/2feb91181882e525e698c4543063f4d0296fcf91/library/std/src/io/copy.rs#L271-L297

// Use buffer size from std implementation
// https://github.com/rust-lang/rust/blob/2feb91181882e525e698c4543063f4d0296fcf91/library/std/src/sys/io/mod.rs#L44
const BUF_SIZE: usize = 8 * 1024;
#[cfg(not(any(target_os = "linux", target_os = "android")))]
let mut buffer = [0u8; BUF_SIZE];
// fast-path for small input. needs 2+ read to catch end of file
// fast-path for small input on the platform missing splice
// needs 2+ read to catch end of file
#[cfg(not(any(target_os = "linux", target_os = "android")))]
for _ in 0..2 {
match input.read(&mut buffer) {
Ok(0) => return Ok(()), // end of file
Expand Down Expand Up @@ -289,3 +370,13 @@ impl Read for NamedReader {
})
}
}

#[cfg(any(target_os = "linux", target_os = "android"))]
impl rustix::fd::AsFd for Writer {
fn as_fd(&self) -> rustix::fd::BorrowedFd<'_> {
match self {
Self::File(f) => f.as_fd(),
Self::Stdout(s) => s.as_fd(),
}
}
}
7 changes: 7 additions & 0 deletions src/uucore/src/lib/features/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,10 @@ pub fn dev_null() -> Option<File> {
None
}
}

// Less noisy wrapper around [`rustix::pipe::tee`]
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn tee(source: &impl AsFd, target: &impl AsFd, len: usize) -> rustix::io::Result<usize> {
rustix::pipe::tee(source, target, len, SpliceFlags::empty())
}
Loading