Skip to content

Commit 778ff1a

Browse files
committed
uucore: splice fast-path for head -c & preliminary for dd, tail, split
1 parent 9adb18c commit 778ff1a

3 files changed

Lines changed: 106 additions & 2 deletions

File tree

src/uu/head/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ memchr = { workspace = true }
2424
thiserror = { workspace = true }
2525
uucore = { workspace = true, features = [
2626
"parser-size",
27+
"pipes",
2728
"ringbuffer",
2829
"lines",
2930
"fs",

src/uu/head/src/head.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,14 @@ fn wrap_in_stdout_error(err: io::Error) -> io::Error {
166166
)
167167
}
168168

169+
// zero-copy fast-path
170+
#[cfg(any(target_os = "linux", target_os = "android"))]
171+
fn read_n_bytes(input: impl Read + AsFd, n: u64) -> io::Result<u64> {
172+
let out = io::stdout();
173+
uucore::pipes::send_n_bytes(input, out, n).map_err(wrap_in_stdout_error)
174+
}
175+
176+
#[cfg(not(any(target_os = "linux", target_os = "android")))]
169177
fn read_n_bytes(input: impl Read, n: u64) -> io::Result<u64> {
170178
// Read the first `n` bytes from the `input` reader.
171179
let mut reader = input.take(n);
@@ -608,6 +616,7 @@ mod tests {
608616
}
609617

610618
#[test]
619+
#[cfg(not(any(target_os = "linux", target_os = "android")))] // missing trait for AsFd
611620
fn read_early_exit() {
612621
let mut empty = io::BufReader::new(Cursor::new(Vec::new()));
613622
assert!(read_n_bytes(&mut empty, 0).is_ok());

src/uucore/src/lib/features/pipes.rs

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
88
#[cfg(any(target_os = "linux", target_os = "android"))]
99
use rustix::pipe::{SpliceFlags, fcntl_setpipe_size};
10-
#[cfg(any(target_os = "linux", target_os = "android", test))]
11-
use std::fs::File;
1210
#[cfg(any(target_os = "linux", target_os = "android"))]
1311
use std::os::fd::AsFd;
12+
#[cfg(any(target_os = "linux", target_os = "android", test))]
13+
use std::{fs::File, io::Read};
1414
#[cfg(any(target_os = "linux", target_os = "android"))]
1515
pub const MAX_ROOTLESS_PIPE_SIZE: usize = 1024 * 1024;
1616

@@ -30,6 +30,21 @@ pub fn pipe() -> std::io::Result<(File, File)> {
3030
Ok((File::from(read), File::from(write)))
3131
}
3232

33+
/// return pipe with given size or kernel's default size
34+
///
35+
/// useful to save RAM usage
36+
#[inline]
37+
#[cfg(any(target_os = "linux", target_os = "android"))]
38+
pub fn pipe_with_size(s: usize) -> std::io::Result<(File, File)> {
39+
let (read, write) = rustix::pipe::pipe()?;
40+
const DEFAULT_SIZE: usize = 64 * 1024;
41+
if s > DEFAULT_SIZE {
42+
let _ = fcntl_setpipe_size(&read, s);
43+
}
44+
45+
Ok((File::from(read), File::from(write)))
46+
}
47+
3348
/// Less noisy wrapper around [`rustix::pipe::splice`].
3449
///
3550
/// Up to `len` bytes are moved from `source` to `target`. Returns the number
@@ -72,6 +87,85 @@ pub fn might_fuse(source: &impl AsFd) -> bool {
7287
.unwrap_or(true)
7388
}
7489

90+
/// splice `n` bytes with safe read/write fallback
91+
/// return actually sent bytes
92+
#[inline]
93+
#[cfg(any(target_os = "linux", target_os = "android"))]
94+
pub fn send_n_bytes(
95+
input: impl Read + AsFd,
96+
mut target: impl std::io::Write + AsFd,
97+
n: u64,
98+
) -> std::io::Result<u64> {
99+
let pipe_size = MAX_ROOTLESS_PIPE_SIZE.min(n as usize);
100+
let mut n = n;
101+
let mut bytes_written: u64 = 0;
102+
// do not always fallback to write as it needs 2 Ctrl+D on tty
103+
let mut needs_fallback = might_fuse(&input);
104+
if let Ok(b) = splice(&input, &target, n as usize) {
105+
bytes_written = b as u64;
106+
n -= bytes_written;
107+
if n > 0 {
108+
// for throughput. expected that input is already extended if it is coming from splice
109+
let _ = fcntl_setpipe_size(&target, pipe_size);
110+
} else {
111+
// avoid unnecessary syscalls
112+
return Ok(bytes_written);
113+
}
114+
115+
loop {
116+
match splice(&input, &target, n as usize) {
117+
Ok(0) => break,
118+
Ok(s @ 1..) => {
119+
n -= s as u64;
120+
bytes_written += s as u64;
121+
}
122+
_ => {
123+
needs_fallback = true;
124+
break;
125+
}
126+
}
127+
}
128+
} else if let Ok((broker_r, broker_w)) = pipe_with_size(pipe_size) {
129+
// todo: allow reusing broker pipe. needed for using splice to uutils/split in the future
130+
loop {
131+
match splice(&input, &broker_w, n as usize) {
132+
Ok(0) => break,
133+
Ok(s @ 1..) => {
134+
if splice_exact(&broker_r, &target, s).is_ok() {
135+
n -= s as u64;
136+
bytes_written += s as u64;
137+
} else {
138+
let mut drain = Vec::with_capacity(s); // bounded by pipe size
139+
broker_r.take(s as u64).read_to_end(&mut drain)?;
140+
target.write_all(&drain)?;
141+
needs_fallback = true;
142+
break;
143+
}
144+
}
145+
_ => {
146+
needs_fallback = true;
147+
break;
148+
}
149+
}
150+
}
151+
}
152+
153+
if !needs_fallback {
154+
return Ok(bytes_written);
155+
}
156+
let mut reader = input.take(n);
157+
let mut buf = vec![0u8; (32 * 1024).min(n as usize)]; //use heap to avoid early allocation
158+
loop {
159+
match reader.read(&mut buf)? {
160+
0 => return Ok(bytes_written),
161+
n => {
162+
target.write_all(&buf[..n])?;
163+
bytes_written += n as u64;
164+
}
165+
}
166+
}
167+
}
168+
75169
/// Return verified /dev/null
76170
///
77171
/// `splice` to /dev/null is faster than `read` when we skip or count the non-seekable input

0 commit comments

Comments
 (0)