Skip to content

Commit df7ae35

Browse files
oech3cakebakerpre-commit-ci[bot]
authored
uucore: splice fast-path for head -c & preliminary for dd, tail, split (#11844)
* uucore: splice fast-path for head -c & preliminary for dd, tail, split * pipes.rs: reduce nest by early return Co-authored-by: Daniel Hofstetter <daniel.hofstetter@42dh.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: Daniel Hofstetter <daniel.hofstetter@42dh.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 50957e1 commit df7ae35

File tree

3 files changed

+111
-1
lines changed

3 files changed

+111
-1
lines changed

src/uu/head/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ memchr = { workspace = true }
2424
thiserror = { workspace = true }
2525
uucore = { workspace = true, features = [
2626
"parser-size",
27+
"pipes",
2728
"ringbuffer",
2829
"lines",
2930
"fs",

src/uu/head/src/head.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,14 @@ fn wrap_in_stdout_error(err: io::Error) -> io::Error {
166166
)
167167
}
168168

169+
// zero-copy fast-path
170+
#[cfg(any(target_os = "linux", target_os = "android"))]
171+
fn read_n_bytes(input: impl Read + AsFd, n: u64) -> io::Result<u64> {
172+
let out = io::stdout();
173+
uucore::pipes::send_n_bytes(input, out, n).map_err(wrap_in_stdout_error)
174+
}
175+
176+
#[cfg(not(any(target_os = "linux", target_os = "android")))]
169177
fn read_n_bytes(input: impl Read, n: u64) -> io::Result<u64> {
170178
// Read the first `n` bytes from the `input` reader.
171179
let mut reader = input.take(n);
@@ -606,6 +614,7 @@ mod tests {
606614
}
607615

608616
#[test]
617+
#[cfg(not(any(target_os = "linux", target_os = "android")))] // missing trait for AsFd
609618
fn read_early_exit() {
610619
let mut empty = io::BufReader::new(Cursor::new(Vec::new()));
611620
assert!(read_n_bytes(&mut empty, 0).is_ok());

src/uucore/src/lib/features/pipes.rs

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ use rustix::pipe::{SpliceFlags, fcntl_setpipe_size};
1010
#[cfg(any(target_os = "linux", target_os = "android", test))]
1111
use std::fs::File;
1212
#[cfg(any(target_os = "linux", target_os = "android"))]
13-
use std::os::fd::AsFd;
13+
use std::{io::Read, os::fd::AsFd, sync::OnceLock};
1414
#[cfg(any(target_os = "linux", target_os = "android"))]
1515
pub const MAX_ROOTLESS_PIPE_SIZE: usize = 1024 * 1024;
16+
#[cfg(any(target_os = "linux", target_os = "android"))]
17+
const KERNEL_DEFAULT_PIPE_SIZE: usize = 64 * 1024;
1618

1719
/// A wrapper around [`rustix::pipe::pipe`] that ensures the pipe is cleaned up.
1820
///
@@ -30,6 +32,20 @@ pub fn pipe() -> std::io::Result<(File, File)> {
3032
Ok((File::from(read), File::from(write)))
3133
}
3234

35+
/// return pipe larger than given size and kernel's default size
36+
///
37+
/// useful to save RAM usage
38+
#[inline]
39+
#[cfg(any(target_os = "linux", target_os = "android"))]
40+
fn pipe_with_size(s: usize) -> std::io::Result<(File, File)> {
41+
let (read, write) = rustix::pipe::pipe()?;
42+
if s > KERNEL_DEFAULT_PIPE_SIZE {
43+
let _ = fcntl_setpipe_size(&read, s);
44+
}
45+
46+
Ok((File::from(read), File::from(write)))
47+
}
48+
3349
/// Less noisy wrapper around [`rustix::pipe::splice`].
3450
///
3551
/// Up to `len` bytes are moved from `source` to `target`. Returns the number
@@ -70,6 +86,90 @@ pub fn might_fuse(source: &impl AsFd) -> bool {
7086
rustix::fs::fstatfs(source).map_or(true, |stats| stats.f_type == 0x6573_5546) // FUSE magic number, too many platform specific clippy warning with const
7187
}
7288

89+
/// splice `n` bytes with safe read/write fallback
90+
/// return actually sent bytes
91+
#[inline]
92+
#[cfg(any(target_os = "linux", target_os = "android"))]
93+
pub fn send_n_bytes(
94+
input: impl Read + AsFd,
95+
mut target: impl std::io::Write + AsFd,
96+
n: u64,
97+
) -> std::io::Result<u64> {
98+
static PIPE_CACHE: OnceLock<Option<(File, File)>> = OnceLock::new();
99+
let pipe_size = MAX_ROOTLESS_PIPE_SIZE.min(n as usize);
100+
let mut n = n;
101+
let mut bytes_written: u64 = 0;
102+
// do not always fallback to write as it needs 2 Ctrl+D to exit process on tty
103+
let fallback = if let Ok(b) = splice(&input, &target, n as usize) {
104+
bytes_written = b as u64;
105+
n -= bytes_written;
106+
if n == 0 {
107+
// avoid unnecessary syscalls
108+
return Ok(bytes_written);
109+
}
110+
111+
// improve throughput or save RAM usage
112+
// expected that input is already extended if it is coming from splice
113+
// we can use pipe_size * N with some case e.g. head -c N inputs, but we need N splice call anyway
114+
if pipe_size > KERNEL_DEFAULT_PIPE_SIZE {
115+
let _ = fcntl_setpipe_size(&target, pipe_size);
116+
}
117+
118+
loop {
119+
match splice(&input, &target, n as usize) {
120+
Ok(0) => break might_fuse(&input),
121+
Ok(s @ 1..) => {
122+
n -= s as u64;
123+
bytes_written += s as u64;
124+
}
125+
_ => break true,
126+
}
127+
}
128+
} else if let Some((broker_r, broker_w)) = PIPE_CACHE
129+
.get_or_init(|| pipe_with_size(pipe_size).ok())
130+
.as_ref()
131+
{
132+
loop {
133+
match splice(&input, &broker_w, n as usize) {
134+
Ok(0) => break might_fuse(&input),
135+
Ok(s @ 1..) => {
136+
if splice_exact(&broker_r, &target, s).is_ok() {
137+
n -= s as u64;
138+
bytes_written += s as u64;
139+
if n == 0 {
140+
// avoid unnecessary splice for small input
141+
break false;
142+
}
143+
} else {
144+
let mut drain = Vec::with_capacity(s); // bounded by pipe size
145+
broker_r.take(s as u64).read_to_end(&mut drain)?;
146+
target.write_all(&drain)?;
147+
break true;
148+
}
149+
}
150+
_ => break true,
151+
}
152+
}
153+
} else {
154+
true
155+
};
156+
157+
if !fallback {
158+
return Ok(bytes_written);
159+
}
160+
let mut reader = input.take(n);
161+
let mut buf = vec![0u8; (32 * 1024).min(n as usize)]; //use heap to avoid early allocation
162+
loop {
163+
match reader.read(&mut buf)? {
164+
0 => return Ok(bytes_written),
165+
n => {
166+
target.write_all(&buf[..n])?;
167+
bytes_written += n as u64;
168+
}
169+
}
170+
}
171+
}
172+
73173
/// Return verified /dev/null
74174
///
75175
/// `splice` to /dev/null is faster than `read` when we skip or count the non-seekable input

0 commit comments

Comments
 (0)