Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/uu/cat/src/cat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,9 @@ fn print_fast<R: FdReadable>(handle: &mut InputHandle<R>) -> CatResult<()> {
{
// If we're on Linux or Android, try to use the splice() system call
// for faster writing. If it works, we're done.
if !splice::write_fast_using_splice(handle, &mut stdout)? {
return Ok(());
match splice::write_fast_using_splice(handle, &mut stdout)? {
uucore::pipes::SpliceState::Ended => return Ok(()),
uucore::pipes::SpliceState::Fallback => {}
}
}
// If we're not on Linux or Android, or the splice() call failed,
Expand Down
16 changes: 11 additions & 5 deletions src/uu/cat/src/splice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,23 @@ use uucore::pipes::{MAX_ROOTLESS_PIPE_SIZE, might_fuse, splice};
/// without copying between kernel and user spaces. This results in a large
/// speedup.
///
/// The `bool` in the result value indicates if we need to fall back to normal
/// copying or not. False means we don't have to.
/// The `SpliceState` in the result value indicates if we need to fall back to
/// normal copying or not. `SpliceState::Ended` means we don't have to.
#[inline]
pub(super) fn write_fast_using_splice<R: FdReadable, S: AsFd>(
handle: &InputHandle<R>,
write_fd: &mut S,
) -> CatResult<bool> {
let res = match splice(&handle.reader, &write_fd, MAX_ROOTLESS_PIPE_SIZE) {
) -> CatResult<uucore::pipes::SpliceState> {
let splice_state = match splice(&handle.reader, &write_fd, MAX_ROOTLESS_PIPE_SIZE) {
Ok(_) => uucore::pipes::splice_unbounded(&handle.reader, write_fd)?,
// both of in/output are not pipe
_ => uucore::pipes::splice_unbounded_broker(&handle.reader, write_fd)?,
};
Ok(res || might_fuse(&handle.reader))
let final_state = match splice_state {
uucore::pipes::SpliceState::Ended if might_fuse(&handle.reader) => {
uucore::pipes::SpliceState::Fallback
}
state => state,
};
Ok(final_state)
}
9 changes: 7 additions & 2 deletions src/uu/tail/src/tail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,13 @@ fn print_target_section<
} else {
// zero-copy fast-path
#[cfg(any(target_os = "linux", target_os = "android"))]
if uucore::pipes::splice_unbounded_broker(file, &mut stdout)? {
io::copy(file, &mut stdout)?;
{
match uucore::pipes::splice_unbounded_broker(file, &mut stdout)? {
uucore::pipes::SpliceState::Ended => return Ok(()),
uucore::pipes::SpliceState::Fallback => {
io::copy(file, &mut stdout)?;
}
}
}
#[cfg(not(any(target_os = "linux", target_os = "android")))]
io::copy(file, &mut stdout)?;
Expand Down
26 changes: 13 additions & 13 deletions src/uucore/src/lib/features/buf_copy/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,18 @@ where
// If we're on Linux or Android, try to use the splice() system call
// for faster writing. If it works, we're done.
// todo: bypass broker pipe this if input or output is pipe. We use this mostly for stream.
if !crate::pipes::splice_unbounded_broker(src, dest)? {
return Ok(());
match crate::pipes::splice_unbounded_broker(src, dest)? {
crate::pipes::SpliceState::Ended => Ok(()),
crate::pipes::SpliceState::Fallback => {
std::io::copy(src, dest)?;

// If the splice() call failed and there has been some data written to
// stdout via while loop above AND there will be second splice() call
// that will succeed, data pushed through splice will be output before
// the data buffered in stdout.lock. Therefore additional explicit flush
// is required here.
dest.flush()?;
Ok(())
}
}

// If the splice() call failed, fall back on slower writing.
std::io::copy(src, dest)?;

// If the splice() call failed and there has been some data written to
// stdout via while loop above AND there will be second splice() call
// that will succeed, data pushed through splice will be output before
// the data buffered in stdout.lock. Therefore additional explicit flush
// is required here.
dest.flush()?;
Ok(())
}
28 changes: 17 additions & 11 deletions src/uucore/src/lib/features/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ pub fn pipe<const SIZE_REQUIRED: bool>(s: usize) -> std::io::Result<(File, File)
Ok((File::from(read), File::from(write)))
}

#[cfg(any(target_os = "linux", target_os = "android"))]
pub enum SpliceState {
Ended,
Fallback,
}

/// Less noisy wrapper around [`rustix::pipe::splice`].
///
/// Up to `len` bytes are moved from `source` to `target`. Returns the number
Expand Down Expand Up @@ -81,11 +87,11 @@ pub fn might_fuse(source: &impl AsFd) -> bool {
}

/// splice all of source to dest
/// return true if we need read/write fallback
/// fails if one of in/output should be pipe
/// return `SpliceState` indicating if we need read/write fallback
/// fallback if one of in/output should be pipe
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn splice_unbounded<R, S>(source: &R, dest: &mut S) -> std::io::Result<bool>
pub fn splice_unbounded<R, S>(source: &R, dest: &mut S) -> std::io::Result<SpliceState>
where
R: Read + AsFd,
S: AsFd,
Expand All @@ -99,19 +105,19 @@ where
loop {
match splice(&source, &dest, MAX_ROOTLESS_PIPE_SIZE) {
Ok(1..) => {}
Ok(0) => return Ok(false),
Err(_) => return Ok(true),
Ok(0) => return Ok(SpliceState::Ended),
Err(_) => return Ok(SpliceState::Fallback),
}
}
}

/// force-splice source to dest even both of them are not pipe
/// return true if we need read/write fallback
/// return `SpliceState` indicating if we need read/write fallback
///
/// This should not be used if one of them are pipe to save resources
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn splice_unbounded_broker<R, S>(source: &R, dest: &mut S) -> std::io::Result<bool>
pub fn splice_unbounded_broker<R, S>(source: &R, dest: &mut S) -> std::io::Result<SpliceState>
where
R: Read + AsFd,
S: AsFd,
Expand All @@ -121,7 +127,7 @@ where
.get_or_init(|| pipe::<false>(MAX_ROOTLESS_PIPE_SIZE).ok())
.as_ref()
else {
return Ok(true);
return Ok(SpliceState::Fallback);
};
// improve throughput
// no need to increase pipe size of input fd since
Expand All @@ -131,7 +137,7 @@ where

loop {
match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
Ok(0) => return Ok(false),
Ok(0) => return Ok(SpliceState::Ended),
Ok(n) => {
if splice_exact(&pipe_rd, dest, n).is_err() {
// If the first splice manages to copy to the intermediate
Expand All @@ -143,10 +149,10 @@ where
let mut drain = Vec::with_capacity(n);
pipe_rd.take(n as u64).read_to_end(&mut drain)?;
crate::io::RawWriter(&dest).write_all(&drain)?;
return Ok(true);
return Ok(SpliceState::Fallback);
}
}
Err(_) => return Ok(true),
Err(_) => return Ok(SpliceState::Fallback),
}
}
}
Expand Down
Loading