Skip to content

Commit e408b5a

Browse files
authored
uucore::pipes: merge pipe_with_size to pipe and catch fcntl err (#12285)
1 parent ab328d1 commit e408b5a

5 files changed

Lines changed: 27 additions & 31 deletions

File tree

src/uu/wc/src/count_fast.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ fn count_bytes_using_splice(fd: &impl AsFd) -> Result<usize, usize> {
5555
}
5656
} else {
5757
// input is not pipe. needs broker to use splice() with additional cost
58-
let (pipe_rd, pipe_wr) = pipe().map_err(|_| 0_usize)?;
58+
let (pipe_rd, pipe_wr) = pipe::<false>(MAX_ROOTLESS_PIPE_SIZE).map_err(|_| 0_usize)?;
5959
loop {
6060
match splice(fd, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
6161
Ok(0) => return Ok(byte_count),

src/uu/yes/src/yes.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,12 @@ pub fn exec(mut bytes: Vec<u8>) -> io::Result<()> {
117117
// improve throughput
118118
let _ = rustix::pipe::fcntl_setpipe_size(&stdout, MAX_ROOTLESS_PIPE_SIZE);
119119
// don't show any error from fast-path and fallback to write for proper message
120-
if let Ok((p_read, mut p_write)) = pipe()
120+
if let Ok((p_read, mut p_write)) = pipe::<true>(MAX_ROOTLESS_PIPE_SIZE)
121121
&& p_write.write_all(bytes).is_ok()
122122
{
123123
if aligned && tee(&p_read, &stdout, MAX_ROOTLESS_PIPE_SIZE).is_ok() {
124124
while let Ok(1..) = tee(&p_read, &stdout, MAX_ROOTLESS_PIPE_SIZE) {}
125-
} else if let Ok((broker_read, broker_write)) = pipe() {
125+
} else if let Ok((broker_read, broker_write)) = pipe::<true>(MAX_ROOTLESS_PIPE_SIZE) {
126126
// tee() cannot control offset and write to non-pipe
127127
'hybrid: while let Ok(mut remain) = tee(&p_read, &broker_write, MAX_ROOTLESS_PIPE_SIZE)
128128
{

src/uucore/src/lib/features/buf_copy.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ mod tests {
2727

2828
#[cfg(target_os = "linux")]
2929
use {
30-
crate::pipes,
3130
std::fs::OpenOptions,
3231
std::{
3332
io::{Seek, SeekFrom},
@@ -50,11 +49,13 @@ mod tests {
5049
}
5150

5251
#[test]
53-
#[cfg(target_os = "linux")]
52+
#[cfg(unix)]
5453
fn test_copy_stream() {
5554
let mut dest_file = new_temp_file();
5655

57-
let (mut pipe_read, mut pipe_write) = pipes::pipe().unwrap();
56+
let (pipe_read, pipe_write) = rustix::pipe::pipe().unwrap();
57+
let mut pipe_read: File = pipe_read.into();
58+
let mut pipe_write: File = pipe_write.into();
5859
let data = b"Hello, world!";
5960
let thread = thread::spawn(move || {
6061
pipe_write.write_all(data).unwrap();
@@ -72,8 +73,8 @@ mod tests {
7273
}
7374

7475
#[test]
75-
#[cfg(not(target_os = "linux"))]
76-
// Test for non-linux platforms. We use regular files instead.
76+
#[cfg(not(unix))]
77+
// Test for non-unix platforms. We use regular files instead.
7778
fn test_copy_stream() {
7879
let temp_dir = tempdir().unwrap();
7980
let src_path = temp_dir.path().join("src.txt");

src/uucore/src/lib/features/pipes.rs

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,21 @@ pub const MAX_ROOTLESS_PIPE_SIZE: usize = 1024 * 1024;
2020
#[cfg(any(target_os = "linux", target_os = "android"))]
2121
const KERNEL_DEFAULT_PIPE_SIZE: usize = 64 * 1024;
2222

23-
/// A wrapper around [`rustix::pipe::pipe`] that ensures the pipe is cleaned up.
23+
/// return pipe larger than given size
24+
/// SIZE_REQUIRED should be true if you want to fail when changing pipe size failed
25+
/// e.g. writing size to pipe should not hang
2426
///
25-
/// Returns two `File` objects: everything written to the second can be read
26-
/// from the first.
2727
/// used for resolving the limitation for splice: one of a input or output should be pipe
2828
#[inline]
2929
#[cfg(any(target_os = "linux", target_os = "android"))]
30-
pub fn pipe() -> std::io::Result<(File, File)> {
31-
let (read, write) = rustix::pipe::pipe()?;
32-
// improve performance for splice
33-
let _ = fcntl_setpipe_size(&read, MAX_ROOTLESS_PIPE_SIZE);
34-
35-
Ok((File::from(read), File::from(write)))
36-
}
37-
38-
/// return pipe larger than given size and kernel's default size
39-
///
40-
/// useful to save RAM usage
41-
#[inline]
42-
#[cfg(any(target_os = "linux", target_os = "android"))]
43-
fn pipe_with_size(s: usize) -> std::io::Result<(File, File)> {
30+
pub fn pipe<const SIZE_REQUIRED: bool>(s: usize) -> std::io::Result<(File, File)> {
4431
let (read, write) = rustix::pipe::pipe()?;
32+
// guard unnecessary syscall
4533
if s > KERNEL_DEFAULT_PIPE_SIZE {
46-
let _ = fcntl_setpipe_size(&read, s);
34+
let r = fcntl_setpipe_size(&read, s);
35+
if SIZE_REQUIRED {
36+
r?;
37+
}
4738
}
4839

4940
Ok((File::from(read), File::from(write)))
@@ -126,7 +117,10 @@ where
126117
S: AsFd,
127118
{
128119
static PIPE_CACHE: OnceLock<Option<(File, File)>> = OnceLock::new();
129-
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE.get_or_init(|| pipe().ok()).as_ref() else {
120+
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE
121+
.get_or_init(|| pipe::<false>(MAX_ROOTLESS_PIPE_SIZE).ok())
122+
.as_ref()
123+
else {
130124
return Ok(true);
131125
};
132126
// improve throughput
@@ -197,7 +191,7 @@ pub fn send_n_bytes(
197191
}
198192
}
199193
} else if let Some((broker_r, broker_w)) = PIPE_CACHE
200-
.get_or_init(|| pipe_with_size(pipe_size).ok())
194+
.get_or_init(|| pipe::<false>(pipe_size).ok())
201195
.as_ref()
202196
{
203197
// todo: create fn splice_bounded_broker

tests/by-util/test_comm.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -687,13 +687,14 @@ fn test_output_lossy_utf8() {
687687
#[cfg_attr(wasi_runner, ignore = "WASI sandbox: host paths not visible")]
688688
fn test_comm_anonymous_pipes() {
689689
use std::{io::Write, os::fd::AsRawFd, process};
690-
use uucore::pipes::pipe;
691690

692691
let scene = TestScenario::new(util_name!());
693692

694693
// Open two anonymous pipes
695-
let (comm1_reader, mut comm1_writer) = pipe().unwrap();
696-
let (comm2_reader, mut comm2_writer) = pipe().unwrap();
694+
let (comm1_reader, comm1_writer) = rustix::pipe::pipe().unwrap();
695+
let mut comm1_writer: std::fs::File = comm1_writer.into();
696+
let (comm2_reader, comm2_writer) = rustix::pipe::pipe().unwrap();
697+
let mut comm2_writer: std::fs::File = comm2_writer.into();
697698

698699
// comm reads the data in chunks
699700
// make content large enough, so that at least two chunks are read

0 commit comments

Comments
 (0)