Skip to content

Commit fe7e262

Browse files
oech3cakebaker
authored andcommitted
uucore: dedup code of unbounded splice with broker
1 parent b3469ae commit fe7e262

3 files changed

Lines changed: 52 additions & 71 deletions

File tree

src/uu/cat/src/splice.rs

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
// file that was distributed with this source code.
55
use super::{CatResult, FdReadable, InputHandle};
66

7-
use std::io::{Read, Write};
7+
use std::io::Write;
88
use std::os::{fd::AsFd, unix::io::AsRawFd};
99

10-
use uucore::pipes::{MAX_ROOTLESS_PIPE_SIZE, might_fuse, pipe, splice, splice_exact};
10+
use uucore::pipes::{MAX_ROOTLESS_PIPE_SIZE, might_fuse, splice};
1111

1212
/// This function is called from `write_fast()` on Linux and Android. The
1313
/// function `splice()` is used to move data between two file descriptors
@@ -21,8 +21,6 @@ pub(super) fn write_fast_using_splice<R: FdReadable, S: AsRawFd + AsFd + Write>(
2121
handle: &InputHandle<R>,
2222
write_fd: &mut S,
2323
) -> CatResult<bool> {
24-
use std::{fs::File, sync::OnceLock};
25-
static PIPE_CACHE: OnceLock<Option<(File, File)>> = OnceLock::new();
2624
if splice(&handle.reader, &write_fd, MAX_ROOTLESS_PIPE_SIZE).is_ok() {
2725
// fcntl improves throughput
2826
// todo: avoid fcntl overhead for small input, but don't fcntl inside of the loop
@@ -34,28 +32,10 @@ pub(super) fn write_fast_using_splice<R: FdReadable, S: AsRawFd + AsFd + Write>(
3432
Err(_) => return Ok(true),
3533
}
3634
}
37-
} else if let Some((pipe_rd, pipe_wr)) = PIPE_CACHE.get_or_init(|| pipe().ok()).as_ref() {
38-
// both of in/output are not pipe. needs broker to use splice() with additional costs
39-
loop {
40-
match splice(&handle.reader, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
41-
Ok(0) => return Ok(might_fuse(&handle.reader)),
42-
Ok(n) => {
43-
if splice_exact(&pipe_rd, write_fd, n).is_err() {
44-
// If the first splice manages to copy to the intermediate
45-
// pipe, but the second splice to stdout fails for some reason
46-
// we can recover by copying the data that we have from the
47-
// intermediate pipe to stdout using normal read/write. Then
48-
// we tell the caller to fall back.
49-
let mut drain = Vec::with_capacity(n); // bounded by pipe size
50-
pipe_rd.take(n as u64).read_to_end(&mut drain)?;
51-
write_fd.write_all(&drain)?;
52-
return Ok(true);
53-
}
54-
}
55-
Err(_) => return Ok(true),
56-
}
57-
}
5835
} else {
59-
Ok(true)
36+
Ok(
37+
uucore::pipes::splice_unbounded_broker(&handle.reader, write_fd)?
38+
|| might_fuse(&handle.reader),
39+
)
6040
}
6141
}

src/uucore/src/lib/features/buf_copy/linux.rs

Lines changed: 3 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@
55

66
//! Buffer-based copying implementation for Linux and Android.
77
8-
use crate::{
9-
error::UResult,
10-
pipes::{MAX_ROOTLESS_PIPE_SIZE, pipe, splice, splice_exact},
11-
};
8+
use crate::error::UResult;
129

1310
/// Buffer-based copying utilities for unix (excluding Linux).
1411
use std::{
@@ -53,7 +50,8 @@ where
5350
{
5451
// If we're on Linux or Android, try to use the splice() system call
5552
// for faster writing. If it works, we're done.
56-
if !splice_write(src, dest)? {
53+
// todo: bypass broker pipe this if input or output is pipe. We use this mostly for stream.
54+
if !crate::pipes::splice_unbounded_broker(src, dest)? {
5755
return Ok(());
5856
}
5957

@@ -68,43 +66,3 @@ where
6866
dest.flush()?;
6967
Ok(())
7068
}
71-
72-
/// Write from source `handle` into destination `write_fd` using Linux-specific
73-
/// `splice` system call.
74-
///
75-
/// # Arguments
76-
/// - `source` - source handle
77-
/// - `dest` - destination handle
78-
#[inline]
79-
pub(crate) fn splice_write<R, S>(source: &R, dest: &mut S) -> UResult<bool>
80-
where
81-
R: Read + AsFd + AsRawFd,
82-
S: AsRawFd + AsFd + Write,
83-
{
84-
let (pipe_rd, pipe_wr) = pipe()?; // todo: bypass this if input or output is pipe. We use this mostly for stream.
85-
// improve throughput
86-
// no need to increase pipe size of input fd since
87-
// - sender with splice probably increased size already
88-
// - sender without splice is bottleneck
89-
let _ = rustix::pipe::fcntl_setpipe_size(&mut *dest, MAX_ROOTLESS_PIPE_SIZE);
90-
91-
loop {
92-
match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
93-
Ok(0) => return Ok(false),
94-
Ok(n) => {
95-
if splice_exact(&pipe_rd, dest, n).is_err() {
96-
// If the first splice manages to copy to the intermediate
97-
// pipe, but the second splice to stdout fails for some reason
98-
// we can recover by copying the data that we have from the
99-
// intermediate pipe to stdout using normal read/write. Then
100-
// we tell the caller to fall back.
101-
let mut drain = Vec::with_capacity(n); // bounded by pipe size
102-
pipe_rd.take(n as u64).read_to_end(&mut drain)?;
103-
dest.write_all(&drain)?;
104-
return Ok(true);
105-
}
106-
}
107-
Err(_) => return Ok(true),
108-
}
109-
}
110-
}

src/uucore/src/lib/features/pipes.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,48 @@ pub fn might_fuse(source: &impl AsFd) -> bool {
8686
rustix::fs::fstatfs(source).map_or(true, |stats| stats.f_type == 0x6573_5546) // FUSE magic number, too many platform specific clippy warning with const
8787
}
8888

89+
/// force-splice source to dest even both of them are not pipe
90+
/// return true if we need read/write fallback
91+
///
92+
/// This should not be used if one of them are pipe to save resources
93+
#[inline]
94+
#[cfg(any(target_os = "linux", target_os = "android"))]
95+
pub fn splice_unbounded_broker<R, S>(source: &R, dest: &mut S) -> std::io::Result<bool>
96+
where
97+
R: Read + AsFd,
98+
S: AsFd + std::io::Write,
99+
{
100+
static PIPE_CACHE: OnceLock<Option<(File, File)>> = OnceLock::new();
101+
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE.get_or_init(|| pipe().ok()).as_ref() else {
102+
return Ok(true);
103+
};
104+
// improve throughput
105+
// no need to increase pipe size of input fd since
106+
// - sender with splice probably increased size already
107+
// - sender without splice is bottleneck
108+
let _ = fcntl_setpipe_size(&mut *dest, MAX_ROOTLESS_PIPE_SIZE);
109+
110+
loop {
111+
match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
112+
Ok(0) => return Ok(false),
113+
Ok(n) => {
114+
if splice_exact(&pipe_rd, dest, n).is_err() {
115+
// If the first splice manages to copy to the intermediate
116+
// pipe, but the second splice to stdout fails for some reason
117+
// we can recover by copying the data that we have from the
118+
// intermediate pipe to stdout using normal read/write. Then
119+
// we tell the caller to fall back.
120+
let mut drain = Vec::with_capacity(n); // bounded by pipe size
121+
pipe_rd.take(n as u64).read_to_end(&mut drain)?;
122+
dest.write_all(&drain)?;
123+
return Ok(true);
124+
}
125+
}
126+
Err(_) => return Ok(true),
127+
}
128+
}
129+
}
130+
89131
/// splice `n` bytes with safe read/write fallback
90132
/// return actually sent bytes
91133
#[inline]
@@ -129,6 +171,7 @@ pub fn send_n_bytes(
129171
.get_or_init(|| pipe_with_size(pipe_size).ok())
130172
.as_ref()
131173
{
174+
// todo: create fn splice_bounded_broker
132175
loop {
133176
match splice(&input, &broker_w, n as usize) {
134177
Ok(0) => break might_fuse(&input),

0 commit comments

Comments
 (0)