Skip to content

Commit 3d91d5c

Browse files
committed
pipes.rs: deduplicate some code
1 parent 83b09a4 commit 3d91d5c

1 file changed

Lines changed: 24 additions & 25 deletions

File tree

src/uucore/src/lib/features/pipes.rs

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,7 @@ pub fn splice_unbounded(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Re
106106
/// This should not be used if one of them are pipe to save resources
107107
#[inline]
108108
#[cfg(any(target_os = "linux", target_os = "android"))]
109-
pub fn splice_unbounded_broker<R, S>(source: &R, dest: &mut S) -> std::io::Result<bool>
110-
where
111-
R: Read + AsFd,
112-
S: AsFd,
113-
{
109+
pub fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Result<bool> {
114110
static PIPE_CACHE: OnceLock<Option<(PipeReader, PipeWriter)>> = OnceLock::new();
115111
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE
116112
.get_or_init(|| pipe::<false>(MAX_ROOTLESS_PIPE_SIZE).ok())
@@ -134,10 +130,7 @@ where
134130
// we can recover by copying the data that we have from the
135131
// intermediate pipe to stdout using unbuffered read/write. Then
136132
// we tell the caller to fall back.
137-
debug_assert!(n <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
138-
let mut drain = Vec::with_capacity(n);
139-
pipe_rd.take(n as u64).read_to_end(&mut drain)?;
140-
crate::io::RawWriter(&dest).write_all(&drain)?;
133+
send_n_rw(pipe_rd, dest, n as u64)?;
141134
return Ok(true);
142135
}
143136
}
@@ -150,11 +143,7 @@ where
150143
/// return actually sent bytes
151144
#[inline]
152145
#[cfg(any(target_os = "linux", target_os = "android"))]
153-
pub fn send_n_bytes(
154-
input: impl Read + AsFd,
155-
mut target: impl Write + AsFd,
156-
n: u64,
157-
) -> std::io::Result<u64> {
146+
pub fn send_n_bytes(input: impl AsFd, target: impl AsFd, n: u64) -> std::io::Result<u64> {
158147
static PIPE_CACHE: OnceLock<Option<(PipeReader, PipeWriter)>> = OnceLock::new();
159148
let pipe_size = MAX_ROOTLESS_PIPE_SIZE.min(n as usize);
160149
let mut n = n;
@@ -203,10 +192,8 @@ pub fn send_n_bytes(
203192
}
204193
} else {
205194
debug_assert!(s <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
206-
// drain pipe before fallback to raw write
207-
let mut drain = Vec::with_capacity(s);
208-
broker_r.take(s as u64).read_to_end(&mut drain)?;
209-
crate::io::RawWriter(&target).write_all(&drain)?;
195+
// cleanup before fallback to raw write
196+
send_n_rw(broker_r, &target, s as u64)?;
210197
break true;
211198
}
212199
}
@@ -220,17 +207,29 @@ pub fn send_n_bytes(
220207
if !fallback {
221208
return Ok(bytes_written);
222209
}
223-
let mut reader = input.take(n);
224-
let mut buf = vec![0u8; (32 * 1024).min(n as usize)]; //use heap to avoid early allocation
225-
loop {
210+
Ok(bytes_written + send_n_rw(&input, &target, n)?)
211+
}
212+
213+
/// read n bytes from src and write to dest
214+
/// return sent bytes
215+
///
216+
// we cannot include this to splice_exact at a fallback since we need to catch I/O error
217+
#[cfg(any(target_os = "linux", target_os = "android"))]
218+
pub fn send_n_rw(src: &impl AsFd, dest: &impl AsFd, n: u64) -> std::io::Result<u64> {
219+
let mut sent = 0;
220+
let mut reader = crate::io::RawReader(src).take(n);
221+
let mut dest = crate::io::RawWriter(dest);
222+
let mut buf = [0u8; 1024 * 32]; //rustix::io::read can avoid 0-fill, but it generates wrong error...
223+
while sent < n {
226224
match reader.read(&mut buf)? {
227-
0 => return Ok(bytes_written),
228-
n => {
229-
target.write_all(&buf[..n])?;
230-
bytes_written += n as u64;
225+
0 => return Ok(sent),
226+
written => {
227+
dest.write_all(&buf[..written])?;
228+
sent += written as u64;
231229
}
232230
}
233231
}
232+
Ok(sent)
234233
}
235234

236235
/// Return verified /dev/null

0 commit comments

Comments
 (0)