Skip to content

Commit 5a800d2

Browse files
committed
pipes.rs: don't mix raw write and buffered write
1 parent 9bf907d commit 5a800d2

3 files changed

Lines changed: 16 additions & 11 deletions

File tree

src/uucore/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ parser-num = ["extendedbigdecimal", "num-traits"]
170170
parser-size = ["parser-num", "procfs"]
171171
parser-glob = ["glob"]
172172
parser = ["parser-num", "parser-size", "parser-glob"]
173-
pipes = []
173+
pipes = ["fs"]
174174
process = ["libc"]
175175
proc-info = ["tty", "walkdir"]
176176
quoting-style = ["i18n-common"]

src/uucore/src/lib/features/pipes.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ use rustix::pipe::{SpliceFlags, fcntl_setpipe_size};
1010
#[cfg(any(target_os = "linux", target_os = "android"))]
1111
use std::fs::File;
1212
#[cfg(any(target_os = "linux", target_os = "android"))]
13-
use std::{io::Read, os::fd::AsFd, sync::OnceLock};
13+
use std::{
14+
io::{Read, Write},
15+
os::fd::AsFd,
16+
sync::OnceLock,
17+
};
1418
#[cfg(any(target_os = "linux", target_os = "android"))]
1519
pub const MAX_ROOTLESS_PIPE_SIZE: usize = 1024 * 1024;
1620
#[cfg(any(target_os = "linux", target_os = "android"))]
@@ -93,7 +97,7 @@ pub fn might_fuse(source: &impl AsFd) -> bool {
9397
pub fn splice_unbounded<R, S>(source: &R, dest: &mut S) -> std::io::Result<bool>
9498
where
9599
R: Read + AsFd,
96-
S: AsFd + std::io::Write,
100+
S: AsFd + Write,
97101
{
98102
// improve throughput
99103
// todo: avoid fcntl overhead for small input, but don't fcntl inside of the loop
@@ -119,7 +123,7 @@ where
119123
pub fn splice_unbounded_broker<R, S>(source: &R, dest: &mut S) -> std::io::Result<bool>
120124
where
121125
R: Read + AsFd,
122-
S: AsFd + std::io::Write,
126+
S: AsFd + Write,
123127
{
124128
static PIPE_CACHE: OnceLock<Option<(File, File)>> = OnceLock::new();
125129
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE.get_or_init(|| pipe().ok()).as_ref() else {
@@ -139,12 +143,12 @@ where
139143
// If the first splice manages to copy to the intermediate
140144
// pipe, but the second splice to stdout fails for some reason
141145
// we can recover by copying the data that we have from the
142-
// intermediate pipe to stdout using normal read/write. Then
146+
// intermediate pipe to stdout using unbuffered read/write. Then
143147
// we tell the caller to fall back.
144148
debug_assert!(n <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
145149
let mut drain = Vec::with_capacity(n);
146150
pipe_rd.take(n as u64).read_to_end(&mut drain)?;
147-
dest.write_all(&drain)?;
151+
crate::io::RawWriter(&dest).write_all(&drain)?;
148152
return Ok(true);
149153
}
150154
}
@@ -159,7 +163,7 @@ where
159163
#[cfg(any(target_os = "linux", target_os = "android"))]
160164
pub fn send_n_bytes(
161165
input: impl Read + AsFd,
162-
mut target: impl std::io::Write + AsFd,
166+
mut target: impl Write + AsFd,
163167
n: u64,
164168
) -> std::io::Result<u64> {
165169
static PIPE_CACHE: OnceLock<Option<(File, File)>> = OnceLock::new();
@@ -210,9 +214,10 @@ pub fn send_n_bytes(
210214
}
211215
} else {
212216
debug_assert!(s <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
217+
// drain pipe before fallback to raw write
213218
let mut drain = Vec::with_capacity(s);
214219
broker_r.take(s as u64).read_to_end(&mut drain)?;
215-
target.write_all(&drain)?;
220+
crate::io::RawWriter(&target).write_all(&drain)?;
216221
break true;
217222
}
218223
}

src/uucore/src/lib/mods/io.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ type NativeType = OwnedHandle;
3030
#[cfg(not(windows))]
3131
type NativeType = OwnedFd;
3232

33-
// create stdout without buffering
33+
// create writer without buffering
3434
#[cfg(any(unix, target_os = "wasi"))]
35-
pub struct RawWriter(pub io::Stdout);
35+
pub struct RawWriter<T: AsFd>(pub T);
3636
#[cfg(any(unix, target_os = "wasi"))]
37-
impl io::Write for RawWriter {
37+
impl<T: AsFd> io::Write for RawWriter<T> {
3838
fn write(&mut self, b: &[u8]) -> io::Result<usize> {
3939
rustix::io::write(&self.0, b).map_err(Into::into)
4040
}

0 commit comments

Comments
 (0)