Skip to content

Commit 4397831

Browse files
committed
refactor: replace rustix::pipe with std::io::pipe
1 parent c170a18 commit 4397831

9 files changed

Lines changed: 59 additions & 63 deletions

File tree

fuzz/uufuzz/src/lib.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ use rand::RngExt;
1111
use rand::prelude::IndexedRandom;
1212
use rustix::io::dup;
1313
use rustix::io::read;
14-
use rustix::pipe::pipe;
1514
use rustix::stdio::{dup2_stderr, dup2_stdin, dup2_stdout};
1615
use std::env::temp_dir;
1716
use std::ffi::OsString;
1817
use std::fs::File;
18+
use std::io::pipe;
1919
use std::io::{Seek, SeekFrom, Write};
2020
use std::process::{Command, Stdio};
2121
use std::sync::atomic::Ordering;
@@ -92,7 +92,7 @@ where
9292
};
9393

9494
println!("Running test {:?}", &args[0..]);
95-
let (read_pipe_stdout, write_pipe_stdout) = match pipe() {
95+
let (stdout_pipe_reader, stdout_pipe_writer) = match pipe() {
9696
Ok(fds) => fds,
9797
Err(_) => {
9898
return CommandResult {
@@ -102,7 +102,7 @@ where
102102
};
103103
}
104104
};
105-
let (read_pipe_stderr, write_pipe_stderr) = match pipe() {
105+
let (stderr_pipe_reader, stderr_pipe_writer) = match pipe() {
106106
Ok(fds) => fds,
107107
Err(_) => {
108108
return CommandResult {
@@ -114,7 +114,7 @@ where
114114
};
115115

116116
// Redirect stdout and stderr to their respective pipes
117-
if dup2_stdout(&write_pipe_stdout).is_err() || dup2_stderr(&write_pipe_stderr).is_err() {
117+
if dup2_stdout(&stdout_pipe_writer).is_err() || dup2_stderr(&stderr_pipe_writer).is_err() {
118118
return CommandResult {
119119
stdout: "".to_string(),
120120
stderr: "Failed to redirect STDOUT_FILENO or STDERR_FILENO".to_string(),
@@ -156,8 +156,8 @@ where
156156
};
157157

158158
let (uumain_exit_status, captured_stdout, captured_stderr) = thread::scope(|s| {
159-
let out = s.spawn(|| read_from_fd(read_pipe_stdout));
160-
let err = s.spawn(|| read_from_fd(read_pipe_stderr));
159+
let out = s.spawn(|| read_from_fd(stdout_pipe_reader));
160+
let err = s.spawn(|| read_from_fd(stderr_pipe_reader));
161161
#[allow(clippy::unnecessary_to_owned)]
162162
// TODO: clippy wants us to use args.iter().cloned() ?
163163
let status = uumain_function(args.to_owned().into_iter());
@@ -167,8 +167,8 @@ where
167167
io::stdout().flush().unwrap();
168168
io::stderr().flush().unwrap();
169169
// Drop write ends to close them, allowing readers to get EOF
170-
drop(write_pipe_stdout);
171-
drop(write_pipe_stderr);
170+
drop(stdout_pipe_writer);
171+
drop(stderr_pipe_writer);
172172
// Restore stdout/stderr
173173
let _ = dup2_stdout(&original_stdout_fd_owned);
174174
let _ = dup2_stderr(&original_stderr_fd_owned);

src/uu/wc/src/count_fast.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,14 @@ fn count_bytes_using_splice(fd: &impl AsFd) -> Result<usize, usize> {
5555
}
5656
} else {
5757
// input is not pipe. needs broker to use splice() with additional cost
58-
let (pipe_rd, pipe_wr) = pipe::<false>(MAX_ROOTLESS_PIPE_SIZE).map_err(|_| 0_usize)?;
58+
let (pipe_reader, pipe_writer) =
59+
pipe::<false>(MAX_ROOTLESS_PIPE_SIZE).map_err(|_| 0_usize)?;
5960
loop {
60-
match splice(fd, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
61+
match splice(fd, &pipe_writer, MAX_ROOTLESS_PIPE_SIZE) {
6162
Ok(0) => return Ok(byte_count),
6263
Ok(res) => {
6364
byte_count += res;
64-
splice_exact(&pipe_rd, &null_file, res).map_err(|_| byte_count)?;
65+
splice_exact(&pipe_reader, &null_file, res).map_err(|_| byte_count)?;
6566
}
6667
Err(_) => return Err(byte_count),
6768
}

src/uu/yes/src/yes.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,18 +117,19 @@ pub fn exec(mut bytes: Vec<u8>) -> io::Result<()> {
117117
// improve throughput
118118
let _ = rustix::pipe::fcntl_setpipe_size(&stdout, MAX_ROOTLESS_PIPE_SIZE);
119119
// don't show any error from fast-path and fallback to write for proper message
120-
if let Ok((p_read, mut p_write)) = pipe::<true>(MAX_ROOTLESS_PIPE_SIZE)
121-
&& p_write.write_all(bytes).is_ok()
120+
if let Ok((source_reader, mut source_writer)) = pipe::<true>(MAX_ROOTLESS_PIPE_SIZE)
121+
&& source_writer.write_all(bytes).is_ok()
122122
{
123-
if aligned && tee(&p_read, &stdout, MAX_ROOTLESS_PIPE_SIZE).is_ok() {
124-
while let Ok(1..) = tee(&p_read, &stdout, MAX_ROOTLESS_PIPE_SIZE) {}
125-
} else if let Ok((broker_read, broker_write)) = pipe::<true>(MAX_ROOTLESS_PIPE_SIZE) {
123+
if aligned && tee(&source_reader, &stdout, MAX_ROOTLESS_PIPE_SIZE).is_ok() {
124+
while let Ok(1..) = tee(&source_reader, &stdout, MAX_ROOTLESS_PIPE_SIZE) {}
125+
} else if let Ok((broker_reader, broker_writer)) = pipe::<true>(MAX_ROOTLESS_PIPE_SIZE) {
126126
// tee() cannot control offset and write to non-pipe
127-
'hybrid: while let Ok(mut remain) = tee(&p_read, &broker_write, MAX_ROOTLESS_PIPE_SIZE)
127+
'hybrid: while let Ok(mut remain) =
128+
tee(&source_reader, &broker_writer, MAX_ROOTLESS_PIPE_SIZE)
128129
{
129130
debug_assert!(remain == bytes.len(), "splice() should cleanup pipe");
130131
while remain > 0 {
131-
if let Ok(s) = splice(&broker_read, &stdout, remain) {
132+
if let Ok(s) = splice(&broker_reader, &stdout, remain) {
132133
remain -= s;
133134
} else {
134135
// avoid output breakage with reduced remain even if it would not happen

src/uucore/src/lib/features/buf_copy.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,12 @@ mod tests {
5151
fn test_copy_stream() {
5252
let mut dest_file = new_temp_file();
5353

54-
let (pipe_read, pipe_write) = rustix::pipe::pipe().unwrap();
55-
let mut pipe_read: File = pipe_read.into();
56-
let mut pipe_write: File = pipe_write.into();
54+
let (mut pipe_reader, mut pipe_writer) = std::io::pipe().unwrap();
5755
let data = b"Hello, world!";
5856
let thread = thread::spawn(move || {
59-
pipe_write.write_all(data).unwrap();
57+
pipe_writer.write_all(data).unwrap();
6058
});
61-
copy_stream(&mut pipe_read, &mut dest_file).unwrap();
59+
copy_stream(&mut pipe_reader, &mut dest_file).unwrap();
6260
thread.join().unwrap();
6361

6462
// We would have been at the end already, so seek again to the start.

src/uucore/src/lib/features/pipes.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use rustix::pipe::{SpliceFlags, fcntl_setpipe_size};
1010
#[cfg(any(target_os = "linux", target_os = "android"))]
1111
use std::fs::File;
1212
#[cfg(any(target_os = "linux", target_os = "android"))]
13+
use std::io::{PipeReader, PipeWriter};
14+
#[cfg(any(target_os = "linux", target_os = "android"))]
1315
use std::{
1416
io::{Read, Write},
1517
os::fd::AsFd,
@@ -27,17 +29,17 @@ const KERNEL_DEFAULT_PIPE_SIZE: usize = 64 * 1024;
2729
/// used for resolving the limitation for splice: one of a input or output should be pipe
2830
#[inline]
2931
#[cfg(any(target_os = "linux", target_os = "android"))]
30-
pub fn pipe<const SIZE_REQUIRED: bool>(s: usize) -> std::io::Result<(File, File)> {
31-
let (read, write) = rustix::pipe::pipe()?;
32+
pub fn pipe<const SIZE_REQUIRED: bool>(s: usize) -> std::io::Result<(PipeReader, PipeWriter)> {
33+
let (pipe_reader, pipe_writer) = std::io::pipe()?;
3234
// guard unnecessary syscall
3335
if s > KERNEL_DEFAULT_PIPE_SIZE {
34-
let r = fcntl_setpipe_size(&read, s);
36+
let r = fcntl_setpipe_size(&pipe_reader, s);
3537
if SIZE_REQUIRED {
3638
r?;
3739
}
3840
}
3941

40-
Ok((File::from(read), File::from(write)))
42+
Ok((pipe_reader, pipe_writer))
4143
}
4244

4345
/// Less noisy wrapper around [`rustix::pipe::splice`].
@@ -116,8 +118,8 @@ where
116118
R: Read + AsFd,
117119
S: AsFd,
118120
{
119-
static PIPE_CACHE: OnceLock<Option<(File, File)>> = OnceLock::new();
120-
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE
121+
static PIPE_CACHE: OnceLock<Option<(PipeReader, PipeWriter)>> = OnceLock::new();
122+
let Some((pipe_reader, pipe_writer)) = PIPE_CACHE
121123
.get_or_init(|| pipe::<false>(MAX_ROOTLESS_PIPE_SIZE).ok())
122124
.as_ref()
123125
else {
@@ -130,18 +132,18 @@ where
130132
let _ = fcntl_setpipe_size(&mut *dest, MAX_ROOTLESS_PIPE_SIZE);
131133

132134
loop {
133-
match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
135+
match splice(&source, &pipe_writer, MAX_ROOTLESS_PIPE_SIZE) {
134136
Ok(0) => return Ok(false),
135137
Ok(n) => {
136-
if splice_exact(&pipe_rd, dest, n).is_err() {
138+
if splice_exact(&pipe_reader, dest, n).is_err() {
137139
// If the first splice manages to copy to the intermediate
138140
// pipe, but the second splice to stdout fails for some reason
139141
// we can recover by copying the data that we have from the
140142
// intermediate pipe to stdout using unbuffered read/write. Then
141143
// we tell the caller to fall back.
142144
debug_assert!(n <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
143145
let mut drain = Vec::with_capacity(n);
144-
pipe_rd.take(n as u64).read_to_end(&mut drain)?;
146+
pipe_reader.take(n as u64).read_to_end(&mut drain)?;
145147
crate::io::RawWriter(&dest).write_all(&drain)?;
146148
return Ok(true);
147149
}
@@ -160,7 +162,7 @@ pub fn send_n_bytes(
160162
mut target: impl Write + AsFd,
161163
n: u64,
162164
) -> std::io::Result<u64> {
163-
static PIPE_CACHE: OnceLock<Option<(File, File)>> = OnceLock::new();
165+
static PIPE_CACHE: OnceLock<Option<(PipeReader, PipeWriter)>> = OnceLock::new();
164166
let pipe_size = MAX_ROOTLESS_PIPE_SIZE.min(n as usize);
165167
let mut n = n;
166168
let mut bytes_written: u64 = 0;

tests/by-util/test_cat.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,14 @@ use uutests::util_name;
2323
#[test]
2424
fn test_cat_broken_pipe_nonzero_and_message() {
2525
use uutests::new_ucmd;
26-
let (read, write) = rustix::pipe::pipe().expect("Failed to create pipe");
26+
let (pipe_reader, pipe_writer) = std::io::pipe().expect("Failed to create pipe");
2727
// Close the read end to simulate a broken pipe on stdout
28-
drop(read);
29-
let write: File = write.into();
28+
drop(pipe_reader);
3029
let content = (0..10000).map(|_| "x").collect::<String>();
3130

3231
// On Unix, SIGPIPE should lead to a non-zero exit; ensure process exits and fails
3332
new_ucmd!()
34-
.set_stdout(write)
33+
.set_stdout(pipe_writer)
3534
.pipe_in(content.as_bytes())
3635
.fails();
3736
}

tests/by-util/test_comm.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -691,10 +691,8 @@ fn test_comm_anonymous_pipes() {
691691
let scene = TestScenario::new(util_name!());
692692

693693
// Open two anonymous pipes
694-
let (comm1_reader, comm1_writer) = rustix::pipe::pipe().unwrap();
695-
let mut comm1_writer: std::fs::File = comm1_writer.into();
696-
let (comm2_reader, comm2_writer) = rustix::pipe::pipe().unwrap();
697-
let mut comm2_writer: std::fs::File = comm2_writer.into();
694+
let (comm1_reader, mut comm1_writer) = std::io::pipe().unwrap();
695+
let (comm2_reader, mut comm2_writer) = std::io::pipe().unwrap();
698696

699697
// comm reads the data in chunks
700698
// make content large enough, so that at least two chunks are read

tests/by-util/test_paste.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -450,12 +450,12 @@ fn test_paste_non_utf8_paths() {
450450
}
451451

452452
#[cfg(target_os = "linux")]
453-
fn make_broken_pipe() -> std::fs::File {
454-
let (read, write) = rustix::pipe::pipe().expect("Failed to create pipe");
453+
fn make_broken_pipe() -> std::io::PipeWriter {
454+
let (reader, writer) = std::io::pipe().expect("Failed to create pipe");
455455
// Drop the read end so writes fail with EPIPE.
456-
drop(read);
456+
drop(reader);
457457

458-
write.into()
458+
writer
459459
}
460460

461461
#[test]

tests/by-util/test_tee.rs

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -256,26 +256,25 @@ mod linux_only {
256256
use uutests::util::{AtPath, CmdResult, UCommand};
257257

258258
use std::fmt::Write;
259-
use std::fs::File;
260259
use std::process::Stdio;
261260
use std::time::Duration;
262261
use uutests::at_and_ucmd;
263262
use uutests::new_ucmd;
264263

265-
fn make_broken_pipe() -> File {
266-
let (read, write) = rustix::pipe::pipe().expect("Failed to create pipe");
264+
fn make_broken_pipe() -> std::io::PipeWriter {
265+
let (reader, writer) = std::io::pipe().expect("Failed to create pipe");
267266
// Drop the read end of the pipe
268-
drop(read);
269-
// Make the write end of the pipe into a Rust File
270-
write.into()
267+
drop(reader);
268+
// Return the write end of the pipe
269+
writer
271270
}
272271

273-
fn make_hanging_read() -> File {
274-
let (read, write) = rustix::pipe::pipe().expect("Failed to create pipe");
272+
fn make_hanging_read() -> std::io::PipeReader {
273+
let (reader, writer) = std::io::pipe().expect("Failed to create pipe");
275274
// PURPOSELY leak the write end of the pipe, so the read end hangs.
276-
std::mem::forget(write);
275+
std::mem::forget(writer);
277276
// Return the read end of the pipe
278-
read.into()
277+
reader
279278
}
280279

281280
fn run_tee(proc: &mut UCommand) -> (String, CmdResult) {
@@ -726,14 +725,13 @@ fn test_output_error_flag_without_value_defaults_warn_nopipe() {
726725
#[cfg(all(unix, not(target_os = "freebsd")))]
727726
#[test]
728727
fn test_output_error_presence_only_broken_pipe_unix() {
729-
let (read, write) = rustix::pipe::pipe().expect("Failed to create pipe");
728+
let (reader, writer) = std::io::pipe().expect("Failed to create pipe");
730729
// Close the read end to simulate a broken pipe on stdout
731-
drop(read);
732-
let write: std::fs::File = write.into();
730+
drop(reader);
733731
let content = (0..10_000).map(|_| "x").collect::<String>();
734732
let result = new_ucmd!()
735733
.arg("--output-error") // presence-only flag
736-
.set_stdout(write)
734+
.set_stdout(writer)
737735
.pipe_in(content.as_bytes())
738736
.run();
739737

@@ -745,14 +743,13 @@ fn test_output_error_presence_only_broken_pipe_unix() {
745743
#[cfg(all(unix, not(target_os = "freebsd")))]
746744
#[test]
747745
fn test_broken_pipe_early_termination_stdout_only() {
748-
let (read, write) = rustix::pipe::pipe().expect("Failed to create pipe");
746+
let (reader, writer) = std::io::pipe().expect("Failed to create pipe");
749747
// Create a broken stdout
750-
drop(read);
751-
let write: std::fs::File = write.into();
748+
drop(reader);
752749
let content = (0..10_000).map(|_| "x").collect::<String>();
753750
let mut proc = new_ucmd!();
754751
let result = proc
755-
.set_stdout(write)
752+
.set_stdout(writer)
756753
.ignore_stdin_write_error()
757754
.pipe_in(content.as_bytes())
758755
.run();

0 commit comments

Comments
 (0)