Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 6 additions & 26 deletions src/uu/cat/src/splice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
// file that was distributed with this source code.
use super::{CatResult, FdReadable, InputHandle};

use std::io::{Read, Write};
use std::io::Write;
use std::os::{fd::AsFd, unix::io::AsRawFd};

use uucore::pipes::{MAX_ROOTLESS_PIPE_SIZE, might_fuse, pipe, splice, splice_exact};
use uucore::pipes::{MAX_ROOTLESS_PIPE_SIZE, might_fuse, splice};

/// This function is called from `write_fast()` on Linux and Android. The
/// function `splice()` is used to move data between two file descriptors
Expand All @@ -21,8 +21,6 @@ pub(super) fn write_fast_using_splice<R: FdReadable, S: AsRawFd + AsFd + Write>(
handle: &InputHandle<R>,
write_fd: &mut S,
) -> CatResult<bool> {
use std::{fs::File, sync::OnceLock};
static PIPE_CACHE: OnceLock<Option<(File, File)>> = OnceLock::new();
if splice(&handle.reader, &write_fd, MAX_ROOTLESS_PIPE_SIZE).is_ok() {
// fcntl improves throughput
// todo: avoid fcntl overhead for small input, but don't fcntl inside of the loop
Expand All @@ -34,28 +32,10 @@ pub(super) fn write_fast_using_splice<R: FdReadable, S: AsRawFd + AsFd + Write>(
Err(_) => return Ok(true),
}
}
} else if let Some((pipe_rd, pipe_wr)) = PIPE_CACHE.get_or_init(|| pipe().ok()).as_ref() {
// both of in/output are not pipe. needs broker to use splice() with additional costs
loop {
match splice(&handle.reader, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
Ok(0) => return Ok(might_fuse(&handle.reader)),
Ok(n) => {
if splice_exact(&pipe_rd, write_fd, n).is_err() {
// If the first splice manages to copy to the intermediate
// pipe, but the second splice to stdout fails for some reason
// we can recover by copying the data that we have from the
// intermediate pipe to stdout using normal read/write. Then
// we tell the caller to fall back.
let mut drain = Vec::with_capacity(n); // bounded by pipe size
pipe_rd.take(n as u64).read_to_end(&mut drain)?;
write_fd.write_all(&drain)?;
return Ok(true);
}
}
Err(_) => return Ok(true),
}
}
} else {
Ok(true)
Ok(
uucore::pipes::splice_unbounded_broker(&handle.reader, write_fd)?
|| might_fuse(&handle.reader),
)
}
}
48 changes: 3 additions & 45 deletions src/uucore/src/lib/features/buf_copy/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@

//! Buffer-based copying implementation for Linux and Android.

use crate::{
error::UResult,
pipes::{MAX_ROOTLESS_PIPE_SIZE, pipe, splice, splice_exact},
};
use crate::error::UResult;

/// Buffer-based copying utilities for unix (excluding Linux).
use std::{
Expand Down Expand Up @@ -53,7 +50,8 @@ where
{
// If we're on Linux or Android, try to use the splice() system call
// for faster writing. If it works, we're done.
if !splice_write(src, dest)? {
// todo: bypass broker pipe this if input or output is pipe. We use this mostly for stream.
if !crate::pipes::splice_unbounded_broker(src, dest)? {
return Ok(());
}

Expand All @@ -68,43 +66,3 @@ where
dest.flush()?;
Ok(())
}

/// Write from source `handle` into destination `write_fd` using Linux-specific
/// `splice` system call.
///
/// # Arguments
/// - `source` - source handle
/// - `dest` - destination handle
#[inline]
pub(crate) fn splice_write<R, S>(source: &R, dest: &mut S) -> UResult<bool>
where
R: Read + AsFd + AsRawFd,
S: AsRawFd + AsFd + Write,
{
let (pipe_rd, pipe_wr) = pipe()?; // todo: bypass this if input or output is pipe. We use this mostly for stream.
// improve throughput
// no need to increase pipe size of input fd since
// - sender with splice probably increased size already
// - sender without splice is bottleneck
let _ = rustix::pipe::fcntl_setpipe_size(&mut *dest, MAX_ROOTLESS_PIPE_SIZE);

loop {
match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
Ok(0) => return Ok(false),
Ok(n) => {
if splice_exact(&pipe_rd, dest, n).is_err() {
// If the first splice manages to copy to the intermediate
// pipe, but the second splice to stdout fails for some reason
// we can recover by copying the data that we have from the
// intermediate pipe to stdout using normal read/write. Then
// we tell the caller to fall back.
let mut drain = Vec::with_capacity(n); // bounded by pipe size
pipe_rd.take(n as u64).read_to_end(&mut drain)?;
dest.write_all(&drain)?;
return Ok(true);
}
}
Err(_) => return Ok(true),
}
}
}
43 changes: 43 additions & 0 deletions src/uucore/src/lib/features/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,48 @@ pub fn might_fuse(source: &impl AsFd) -> bool {
rustix::fs::fstatfs(source).map_or(true, |stats| stats.f_type == 0x6573_5546) // FUSE magic number, too many platform specific clippy warning with const
}

/// force-splice source to dest even both of them are not pipe
/// return true if we need read/write fallback
///
/// This should not be used if one of them are pipe to save resources
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn splice_unbounded_broker<R, S>(source: &R, dest: &mut S) -> std::io::Result<bool>
where
R: Read + AsFd,
S: AsFd + std::io::Write,
{
static PIPE_CACHE: OnceLock<Option<(File, File)>> = OnceLock::new();
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE.get_or_init(|| pipe().ok()).as_ref() else {
return Ok(true);
};
// improve throughput
// no need to increase pipe size of input fd since
// - sender with splice probably increased size already
// - sender without splice is bottleneck
let _ = fcntl_setpipe_size(&mut *dest, MAX_ROOTLESS_PIPE_SIZE);

loop {
match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
Ok(0) => return Ok(false),
Ok(n) => {
if splice_exact(&pipe_rd, dest, n).is_err() {
// If the first splice manages to copy to the intermediate
// pipe, but the second splice to stdout fails for some reason
// we can recover by copying the data that we have from the
// intermediate pipe to stdout using normal read/write. Then
// we tell the caller to fall back.
let mut drain = Vec::with_capacity(n); // bounded by pipe size
pipe_rd.take(n as u64).read_to_end(&mut drain)?;
dest.write_all(&drain)?;
return Ok(true);
}
}
Err(_) => return Ok(true),
}
}
}

/// splice `n` bytes with safe read/write fallback
/// return actually sent bytes
#[inline]
Expand Down Expand Up @@ -129,6 +171,7 @@ pub fn send_n_bytes(
.get_or_init(|| pipe_with_size(pipe_size).ok())
.as_ref()
{
// todo: create fn splice_bounded_broker
loop {
match splice(&input, &broker_w, n as usize) {
Ok(0) => break might_fuse(&input),
Expand Down
Loading