Skip to content

Commit c3d2005

Browse files
Merge pull request #82 from aaronriekenberg/pipe_mode
Add pipe mode, block size.
2 parents f4814b0 + add88b8 commit c3d2005

17 files changed

Lines changed: 559 additions & 38 deletions

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ repository = "https://github.com/aaronriekenberg/rust-parallel"
1515

1616
[dependencies]
1717
anyhow = "1"
18+
bytesize = "2"
1819
clap = { version = "4", features = ["derive"] }
1920
indicatif = "0.18"
2021
itertools = "0.14"

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Listed in [Awesome Rust - utilities](https://github.com/rust-unofficial/awesome-
1919
Similar interface to [GNU Parallel](https://www.gnu.org/software/parallel/parallel_examples.html) or [xargs](https://man7.org/linux/man-pages/man1/xargs.1.html) plus useful features:
2020
* More than 10x faster than GNU Parallel [in benchmarks](https://github.com/aaronriekenberg/rust-parallel/wiki/Benchmarks)
2121
* Run commands from [stdin](https://github.com/aaronriekenberg/rust-parallel/wiki/Manual#commands-from-stdin), [input files](https://github.com/aaronriekenberg/rust-parallel/wiki/Manual#reading-multiple-inputs), or [`:::` arguments](https://github.com/aaronriekenberg/rust-parallel/wiki/Manual#commands-from-arguments)
22+
* [Pipe mode](https://github.com/aaronriekenberg/rust-parallel/wiki/Manual#pipe-mode) to read blocks from stdin and pass to parallel commands automatically.
2223
* Automatic parallelism to all cpus, or [configure manually](https://github.com/aaronriekenberg/rust-parallel/wiki/Manual#parallelism)
2324
* Transform inputs with [variables](https://github.com/aaronriekenberg/rust-parallel/wiki/Manual#automatic-variables) or [regular expressions](https://github.com/aaronriekenberg/rust-parallel/wiki/Manual#regular-expression)
2425
* Prevent [output interleaving](https://github.com/aaronriekenberg/rust-parallel/wiki/Output-Interleaving) and maintain input order with `-k`/`--keep-order`

scripts/generate_manual.sh

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ echo '
1515
1. [Commands from stdin](#commands-from-stdin)
1616
1. [Command and initial arguments on command line](#command-and-initial-arguments-on-command-line)
1717
1. [Reading multiple inputs](#reading-multiple-inputs)
18+
1. [Pipe Mode](#pipe-mode)
1819
1. [Parallelism](#parallelism)
1920
1. [Keep Output Order](#keep-output-order)
2021
1. [Dry run](#dry-run)
@@ -142,6 +143,30 @@ rm -f test
142143

143144
echo '```'
144145

146+
echo '## Pipe Mode
147+
148+
The `--pipe` option can be used to enable pipe mode.
149+
150+
In pipe mode input from stdin is split into blocks and each block is passed to a separate instance of the command via stdin. Command instances are run in parallel.
151+
152+
The default block size is 1 MiB, which can be changed with the `--block-size` option.
153+
154+
Here we use `--pipe` to run `wc -l`
155+
'
156+
157+
echo '```'
158+
echo '$ cat /usr/share/dict/words | rust-parallel --pipe wc -l'
159+
cat /usr/share/dict/words | $RUST_PARALLEL --pipe wc -l
160+
echo '```'
161+
162+
echo 'Here we use pipe mode with with a smaller block size of 500 KiB:'
163+
164+
echo '```'
165+
echo '$ cat /usr/share/dict/words | rust-parallel --pipe --block-size=500KiB wc -l'
166+
cat /usr/share/dict/words | $RUST_PARALLEL --pipe --block-size=500KiB wc -l
167+
echo '```'
168+
169+
145170
echo '
146171
## Parallelism
147172

src/command.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ impl Command {
3434
cmd = ?self.command_and_args.command_path,
3535
args = ?self.command_and_args.args,
3636
line = %self.input_line_number,
37+
stdin = %self.command_and_args.stdin,
3738
child_pid,
3839
),
3940
level = "debug")]
@@ -42,13 +43,17 @@ impl Command {
4243

4344
let command_metrics = &context.command_metrics;
4445

45-
let OwnedCommandAndArgs { command_path, args } = &self.command_and_args;
46+
let OwnedCommandAndArgs {
47+
command_path,
48+
args,
49+
stdin,
50+
} = &self.command_and_args;
4651

4752
command_metrics.increment_commands_run();
4853

4954
let child_process = match context
5055
.child_process_factory
51-
.spawn(command_path, args)
56+
.spawn(command_path, args, stdin.clone())
5257
.await
5358
{
5459
Err(e) => {
@@ -91,8 +96,8 @@ impl std::fmt::Display for Command {
9196
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
9297
write!(
9398
f,
94-
"cmd={:?},args={:?},line={}",
95-
self.command_and_args.command_path, self.command_and_args.args, self.input_line_number,
99+
"{},line={}",
100+
self.command_and_args, self.input_line_number
96101
)
97102
}
98103
}
@@ -177,10 +182,7 @@ impl CommandService {
177182
return Ok(());
178183
};
179184

180-
let command_and_args = OwnedCommandAndArgs {
181-
command_path,
182-
args: command_and_args.args,
183-
};
185+
let command_and_args = command_and_args.with_command_path(command_path);
184186

185187
self.spawn_command(command_and_args, input_line_number)
186188
.await?;

src/command_line_args.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
use bytesize::ByteSize;
2+
13
use clap::{Parser, ValueEnum};
24

35
use tokio::sync::OnceCell;
46

57
use tracing::debug;
68

9+
use std::str::FromStr;
10+
711
pub const COMMANDS_FROM_ARGS_SEPARATOR: &str = ":::";
812

913
/// Execute commands in parallel
@@ -81,6 +85,14 @@ pub struct CommandLineArgs {
8185
#[arg(short, long)]
8286
pub keep_order: bool,
8387

88+
/// Use pipe input mode
89+
#[arg(long)]
90+
pub pipe: bool,
91+
92+
/// Block size for pipe input mode, defaults to 1MiB
93+
#[arg(long, default_value = "1MiB", value_parser = Self::parse_block_size_bytes)]
94+
pub block_size: usize,
95+
8496
/// Path to shell to use for shell mode
8597
#[arg(long, default_value = Self::default_shell())]
8698
pub shell_path: String,
@@ -138,6 +150,19 @@ impl CommandLineArgs {
138150
}
139151
}
140152

153+
fn parse_block_size_bytes(s: &str) -> Result<usize, String> {
154+
let bytes =
155+
ByteSize::from_str(s).map_err(|e| format!("could not parse size `{s}`: {e}"))?;
156+
157+
if bytes.0 == 0 {
158+
return Err("size must be greater than 0".to_string());
159+
}
160+
161+
let bytes =
162+
usize::try_from(bytes.0).map_err(|e| format!("size `{s}` is too large: {e}"))?;
163+
Ok(bytes)
164+
}
165+
141166
fn default_shell() -> &'static str {
142167
if cfg!(unix) {
143168
"/bin/bash"

src/common.rs

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,36 @@
1-
use std::{collections::VecDeque, path::PathBuf};
1+
use bytesize::ByteSize;
22

3-
#[derive(Debug, Eq, PartialEq)]
3+
use std::{collections::VecDeque, path::PathBuf, sync::Arc};
4+
5+
#[derive(Clone, Debug, Eq, PartialEq, Default)]
6+
pub struct StdinData(pub Option<Arc<String>>);
7+
8+
impl std::fmt::Display for StdinData {
9+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
10+
match &self.0 {
11+
Some(s) => {
12+
let size_string = ByteSize::b(s.len().try_into().unwrap_or_default()).to_string();
13+
write!(f, "{size_string}")
14+
}
15+
None => write!(f, "None"),
16+
}
17+
}
18+
}
19+
20+
#[derive(Debug, Eq, PartialEq, Default)]
421
pub struct OwnedCommandAndArgs {
522
pub command_path: PathBuf,
623
pub args: Vec<String>,
24+
pub stdin: StdinData,
725
}
826

927
impl std::fmt::Display for OwnedCommandAndArgs {
1028
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
11-
write!(f, "cmd={:?},args={:?}", self.command_path, self.args)
29+
write!(
30+
f,
31+
"cmd={:?},args={:?},stdin={}",
32+
self.command_path, self.args, self.stdin
33+
)
1234
}
1335
}
1436

@@ -29,6 +51,7 @@ impl TryFrom<VecDeque<String>> for OwnedCommandAndArgs {
2951
Ok(Self {
3052
command_path: PathBuf::from(command),
3153
args: deque.into(),
54+
stdin: StdinData(None),
3255
})
3356
}
3457
}
@@ -40,3 +63,15 @@ impl TryFrom<Vec<String>> for OwnedCommandAndArgs {
4063
Self::try_from(VecDeque::from(vec))
4164
}
4265
}
66+
67+
impl OwnedCommandAndArgs {
68+
pub fn with_command_path(mut self, command_path: PathBuf) -> Self {
69+
self.command_path = command_path;
70+
self
71+
}
72+
73+
pub fn with_stdin(mut self, stdin: StdinData) -> Self {
74+
self.stdin = stdin;
75+
self
76+
}
77+
}

src/input.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,25 @@ impl std::fmt::Display for Input {
4646
}
4747
}
4848

49+
#[derive(Debug)]
50+
pub enum LineNumberOrRange {
51+
Single(usize),
52+
Range(usize, usize),
53+
}
54+
55+
impl std::fmt::Display for LineNumberOrRange {
56+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57+
match self {
58+
Self::Single(line) => write!(f, "{line}"),
59+
Self::Range(start, end) => write!(f, "{start}-{end}"),
60+
}
61+
}
62+
}
63+
4964
#[derive(Debug)]
5065
pub struct InputLineNumber {
5166
pub input: Input,
52-
pub line_number: usize,
67+
pub line_number: LineNumberOrRange,
5368
}
5469

5570
impl std::fmt::Display for InputLineNumber {
@@ -59,18 +74,22 @@ impl std::fmt::Display for InputLineNumber {
5974
}
6075

6176
enum InputList {
62-
BufferedInputList(Vec<BufferedInput>),
77+
Buffered(Vec<BufferedInput>),
6378

6479
CommandLineArgs,
80+
81+
Pipe,
6582
}
6683

6784
fn build_input_list(command_line_args: &'static CommandLineArgs) -> InputList {
68-
if command_line_args.commands_from_args_mode() {
85+
if command_line_args.pipe {
86+
InputList::Pipe
87+
} else if command_line_args.commands_from_args_mode() {
6988
InputList::CommandLineArgs
7089
} else if command_line_args.input_file.is_empty() {
71-
InputList::BufferedInputList(vec![BufferedInput::Stdin])
90+
InputList::Buffered(vec![BufferedInput::Stdin])
7291
} else {
73-
InputList::BufferedInputList(
92+
InputList::Buffered(
7493
command_line_args
7594
.input_file
7695
.iter()

src/input/buffered_reader.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader, Split};
44

55
use crate::command_line_args::CommandLineArgs;
66

7-
use super::{BufferedInput, Input, InputLineNumber};
7+
use super::{BufferedInput, Input};
88

99
type AsyncBufReadBox = Box<dyn AsyncBufRead + Unpin + Send>;
1010

@@ -54,20 +54,19 @@ impl BufferedInputReader {
5454
}
5555
}
5656

57-
pub async fn next_segment(&mut self) -> anyhow::Result<Option<(InputLineNumber, Vec<u8>)>> {
57+
pub async fn next_segment(&mut self) -> anyhow::Result<Option<(Input, usize, Vec<u8>)>> {
5858
let segment = self.split.next_segment().await?;
5959

6060
match segment {
6161
None => Ok(None),
6262
Some(segment) => {
6363
self.next_line_number += 1;
6464

65-
let input_line_number = InputLineNumber {
66-
input: Input::Buffered(self.buffered_input),
67-
line_number: self.next_line_number,
68-
};
69-
70-
Ok(Some((input_line_number, segment)))
65+
Ok(Some((
66+
Input::Buffered(self.buffered_input),
67+
self.next_line_number,
68+
segment,
69+
)))
7170
}
7271
}
7372
}

0 commit comments

Comments
 (0)