Skip to content

Commit 8db29e1

Browse files
committed
tee: add splice() and tee() zero-copy fast-path
1 parent 39c3646 commit 8db29e1

4 files changed

Lines changed: 106 additions & 3 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/uu/tee/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ clap = { workspace = true }
2424
uucore = { workspace = true, features = ["libc", "parser", "signals"] }
2525
fluent = { workspace = true }
2626

27+
[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
28+
rustix = { workspace = true, features = ["pipe"] }
29+
uucore = { workspace = true, features = ["pipes"] }
30+
2731
[[bin]]
2832
name = "tee"
2933
path = "src/main.rs"

src/uu/tee/src/tee.rs

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,17 +149,98 @@ struct MultiWriter {
149149
impl MultiWriter {
150150
/// Copies all bytes from the input buffer to the output buffer
151151
/// without buffering which is POSIX requirement.
152-
pub fn copy_unbuffered<R: Read>(&mut self, mut input: R) -> Result<()> {
153-
// todo: support splice() and tee() fast-path at here
152+
pub fn copy_unbuffered(&mut self, mut input: NamedReader) -> Result<()> {
153+
#[cfg(any(target_os = "linux", target_os = "android"))]
154+
{
155+
use ::rustix::fd::AsFd;
156+
use uucore::pipes::{MAX_ROOTLESS_PIPE_SIZE, pipe, splice, splice_exact};
157+
let (p_r, p_w) = pipe()?; // force-splice() even stdin is not pipe
158+
let (p2_r, p2_w) = pipe()?; // force-tee() even output is not pipe
159+
let input = input.inner.as_fd();
160+
let mode = self.output_error_mode;
161+
// improve throughput
162+
let _ = rustix::pipe::fcntl_setpipe_size(
163+
self.writers[0].inner.as_fd(),
164+
MAX_ROOTLESS_PIPE_SIZE,
165+
);
166+
'splice: loop {
167+
let mut aborted = None;
168+
match splice(&input, &p_w, MAX_ROOTLESS_PIPE_SIZE) {
169+
Ok(0) => return Ok(()),
170+
Err(_) => break 'splice,
171+
Ok(s) => {
172+
let w_len = self.writers.len();
173+
// len - 1 outputs do not consume input
174+
for other in &mut self.writers[..w_len - 1] {
175+
assert_eq!(
176+
uucore::pipes::tee(&p_r, &p2_w, s),
177+
Ok(s),
178+
"tee() between internal pipes should not be blocked"
179+
);
180+
let fd = other.inner.as_fd();
181+
if splice_exact(&p2_r, &fd, s).is_err() {
182+
// fallback with proper error message
183+
let mut drain = Vec::with_capacity(s);
184+
let mut reader = (&p2_r).take(s as u64);
185+
let res = (|| {
186+
reader.read_to_end(&mut drain)?;
187+
other.inner.write_all(&drain)?;
188+
other.inner.flush()
189+
})();
190+
if let Err(e) = res {
191+
if let Err(e) =
192+
process_error(mode, e, other, &mut self.ignored_errors)
193+
{
194+
aborted.get_or_insert(e);
195+
}
196+
other.name.clear(); //mark as exited
197+
}
198+
}
199+
}
200+
// last one consumes input
201+
if let Some(last) = self.writers.last_mut() {
202+
if splice_exact(&p_r, &last.inner.as_fd(), s).is_err() {
203+
// fallback with proper error message
204+
let mut drain = Vec::with_capacity(s);
205+
let mut reader = (&p_r).take(s as u64);
206+
let res = (|| {
207+
reader.read_to_end(&mut drain)?;
208+
last.inner.write_all(&drain)?;
209+
last.inner.flush()
210+
})();
211+
if let Err(e) = res {
212+
if let Err(e) =
213+
process_error(mode, e, last, &mut self.ignored_errors)
214+
{
215+
aborted.get_or_insert(e);
216+
}
217+
last.name.clear(); //mark as exited
218+
}
219+
}
220+
}
221+
}
222+
}
223+
self.writers.retain(|w| !w.name.is_empty());
224+
if let Some(e) = aborted {
225+
return Err(e);
226+
}
227+
if self.writers.is_empty() {
228+
return Err(Error::from(ErrorKind::Other));
229+
}
230+
}
231+
}
154232
// The implementation for this function is adopted from the generic buffer copy implementation from
155233
// the standard library:
156234
// https://github.com/rust-lang/rust/blob/2feb91181882e525e698c4543063f4d0296fcf91/library/std/src/io/copy.rs#L271-L297
157235

158236
// Use buffer size from std implementation
159237
// https://github.com/rust-lang/rust/blob/2feb91181882e525e698c4543063f4d0296fcf91/library/std/src/sys/io/mod.rs#L44
160238
const BUF_SIZE: usize = 8 * 1024;
239+
#[cfg(not(any(target_os = "linux", target_os = "android")))]
161240
let mut buffer = [0u8; BUF_SIZE];
162-
// fast-path for small input. needs 2+ read to catch end of file
241+
// fast-path for small input on the platform missing splice
242+
// needs 2+ read to catch end of file
243+
#[cfg(not(any(target_os = "linux", target_os = "android")))]
163244
for _ in 0..2 {
164245
match input.read(&mut buffer) {
165246
Ok(0) => return Ok(()), // end of file
@@ -291,3 +372,13 @@ impl Read for NamedReader {
291372
})
292373
}
293374
}
375+
376+
#[cfg(any(target_os = "linux", target_os = "android"))]
377+
impl rustix::fd::AsFd for Writer {
378+
fn as_fd(&self) -> rustix::fd::BorrowedFd<'_> {
379+
match self {
380+
Self::File(f) => f.as_fd(),
381+
Self::Stdout(s) => s.as_fd(),
382+
}
383+
}
384+
}

src/uucore/src/lib/features/pipes.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,3 +188,10 @@ pub fn dev_null() -> Option<File> {
188188
None
189189
}
190190
}
191+
192+
// Less noisy wrapper around [`rustix::pipe::tee`]
193+
#[inline]
194+
#[cfg(any(target_os = "linux", target_os = "android"))]
195+
pub fn tee(source: &impl AsFd, target: &impl AsFd, len: usize) -> rustix::io::Result<usize> {
196+
rustix::pipe::tee(source, target, len, SpliceFlags::empty())
197+
}

0 commit comments

Comments
 (0)