Skip to content

Commit d36fcc3

Browse files
committed
tee: add splice() and tee() zero-copy fast-path
1 parent 2e04477 commit d36fcc3

2 files changed

Lines changed: 91 additions & 3 deletions

File tree

src/uu/tee/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ rustix = { workspace = true, features = ["stdio", "fs"] }
3030
tempfile = { workspace = true }
3131
uucore = { workspace = true, features = ["benchmark"] }
3232

33+
[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
34+
rustix = { workspace = true, features = ["pipe"] }
35+
uucore = { workspace = true, features = ["pipes"] }
36+
3337
[[bin]]
3438
name = "tee"
3539
path = "src/main.rs"

src/uu/tee/src/tee.rs

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ use uucore::signals::ensure_stdout_not_broken;
2222
#[cfg(unix)]
2323
use uucore::signals::{disable_pipe_errors, ignore_interrupts};
2424

25+
#[cfg(any(target_os = "linux", target_os = "android"))]
26+
use rustix::fd::AsFd;
27+
#[cfg(any(target_os = "linux", target_os = "android"))]
28+
use uucore::pipes::{MAX_ROOTLESS_PIPE_SIZE, pipe, splice, splice_exact};
29+
2530
#[uucore::main]
2631
pub fn uumain(args: impl uucore::Args) -> UResult<()> {
2732
let matches = uucore::clap_localization::handle_clap_result(uu_app(), args)?;
@@ -150,17 +155,86 @@ struct MultiWriter {
150155
impl MultiWriter {
151156
/// Copies all bytes from the input buffer to the output buffer
152157
/// without buffering which is POSIX requirement.
153-
pub fn copy_unbuffered<R: Read>(&mut self, mut input: R) -> Result<()> {
154-
// todo: support splice() and tee() fast-path at here
158+
pub fn copy_unbuffered(&mut self, mut input: NamedReader) -> Result<()> {
159+
#[cfg(any(target_os = "linux", target_os = "android"))]
160+
macro_rules! splice_or_fallback {
161+
($pipe:expr, $writer:expr, $len:expr) => {
162+
let fd = $writer.inner.as_fd();
163+
if splice_exact($pipe, &fd, $len).is_err() {
164+
debug_assert!($len <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
165+
let mut drain = Vec::with_capacity($len);
166+
let mut reader = ($pipe).take($len as u64);
167+
let res = (|| {
168+
reader.read_to_end(&mut drain)?;
169+
$writer.inner.write_all(&drain)?;
170+
$writer.inner.flush()
171+
})();
172+
if let Err(e) = res {
173+
if let Err(e) = process_error(
174+
self.output_error_mode,
175+
e,
176+
$writer,
177+
&mut self.ignored_errors,
178+
) {
179+
self.aborted.get_or_insert(e);
180+
}
181+
$writer.name.clear(); //mark as exited
182+
}
183+
}
184+
};
185+
}
186+
#[cfg(any(target_os = "linux", target_os = "android"))]
187+
{
188+
let (pipe_read, pipe_write) = pipe()?; // needed to duplicate input
189+
let (pipe2_read, pipe2_write) = pipe()?; // force-tee() even output is not pipe
190+
let input = input.inner.as_fd();
191+
// improve throughput
192+
let _ = rustix::pipe::fcntl_setpipe_size(
193+
self.writers[0].inner.as_fd(),
194+
MAX_ROOTLESS_PIPE_SIZE,
195+
);
196+
'splice: loop {
197+
match splice(&input, &pipe_write, MAX_ROOTLESS_PIPE_SIZE) {
198+
Ok(0) => return Ok(()),
199+
Err(_) => break 'splice,
200+
Ok(s) => {
201+
if let Some((last, others)) = self.writers.split_last_mut() {
202+
for other in others {
203+
// do not consume input
204+
assert_eq!(
205+
uucore::pipes::tee(&pipe_read, &pipe2_write, s),
206+
Ok(s),
207+
"tee() between internal pipes should not be blocked"
208+
);
209+
splice_or_fallback!(&pipe2_read, other, s);
210+
}
211+
// last one consumes input
212+
splice_or_fallback!(&pipe_read, last, s);
213+
} else {
214+
// all writers exited
215+
return Err(Error::from(ErrorKind::Other));
216+
}
217+
}
218+
}
219+
self.writers.retain(|w| !w.name.is_empty());
220+
if let Some(e) = self.aborted.take() {
221+
return Err(e);
222+
}
223+
}
224+
}
155225
// The implementation for this function is adopted from the generic buffer copy implementation from
156226
// the standard library:
157227
// https://github.com/rust-lang/rust/blob/2feb91181882e525e698c4543063f4d0296fcf91/library/std/src/io/copy.rs#L271-L297
158228

159229
// Use buffer size from std implementation
160230
// https://github.com/rust-lang/rust/blob/2feb91181882e525e698c4543063f4d0296fcf91/library/std/src/sys/io/mod.rs#L44
231+
#[allow(clippy::items_after_statements)]
161232
const BUF_SIZE: usize = 8 * 1024;
233+
#[cfg(not(any(target_os = "linux", target_os = "android")))]
162234
let mut buffer = [0u8; BUF_SIZE];
163-
// fast-path for small input. needs 2+ read to catch end of file
235+
// fast-path for small input on the platform missing splice
236+
// needs 2+ read to catch end of file
237+
#[cfg(not(any(target_os = "linux", target_os = "android")))]
164238
for _ in 0..2 {
165239
match input.read(&mut buffer) {
166240
Ok(0) => return Ok(()), // end of file
@@ -290,3 +364,13 @@ impl Read for NamedReader {
290364
})
291365
}
292366
}
367+
368+
#[cfg(any(target_os = "linux", target_os = "android"))]
369+
impl AsFd for Writer {
370+
fn as_fd(&self) -> rustix::fd::BorrowedFd<'_> {
371+
match self {
372+
Self::File(f) => f.as_fd(),
373+
Self::Stdout(s) => s.as_fd(),
374+
}
375+
}
376+
}

0 commit comments

Comments
 (0)