Skip to content

Commit 503cf6f

Browse files
committed
feat: add standalone shuffle benchmark tool (#3752)
1 parent b9d0b9c commit 503cf6f

2 files changed

Lines changed: 6 additions & 105 deletions

File tree

native/shuffle/README.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ of the [Apache DataFusion Comet] subproject.
2626

2727
## Shuffle Benchmark Tool
2828

29-
A standalone benchmark binary (`shuffle_bench`) is included for profiling shuffle write and read
29+
A standalone benchmark binary (`shuffle_bench`) is included for profiling shuffle write
3030
performance outside of Spark. It streams input data directly from Parquet files.
3131

3232
### Basic usage
@@ -51,12 +51,10 @@ cargo run --release --features shuffle-bench --bin shuffle_bench -- \
5151
| `--zstd-level` | `1` | Zstd compression level (1–22) |
5252
| `--batch-size` | `8192` | Batch size for reading Parquet data |
5353
| `--memory-limit` | _(none)_ | Memory limit in bytes; triggers spilling when exceeded |
54-
| `--max-buffered-batches` | `0` | Max batches to buffer before spilling (0 = memory-pool-only) |
5554
| `--write-buffer-size` | `1048576` | Write buffer size in bytes |
5655
| `--limit` | `0` | Limit rows processed per iteration (0 = no limit) |
5756
| `--iterations` | `1` | Number of timed iterations |
5857
| `--warmup` | `0` | Number of warmup iterations before timing |
59-
| `--read-back` | `false` | Also benchmark reading back the shuffle output |
6058
| `--output-dir` | `/tmp/comet_shuffle_bench` | Directory for temporary shuffle output files |
6159

6260
### Profiling with flamegraph

native/shuffle/src/bin/shuffle_bench.rs

Lines changed: 5 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Standalone shuffle benchmark tool for profiling Comet shuffle write and read
19-
//! outside of Spark. Streams input directly from Parquet files.
18+
//! Standalone shuffle benchmark tool for profiling Comet shuffle write
19+
//! performance outside of Spark. Streams input directly from Parquet files.
2020
//!
2121
//! # Usage
2222
//!
@@ -25,8 +25,7 @@
2525
//! --input /data/tpch-sf100/lineitem/ \
2626
//! --partitions 200 \
2727
//! --codec zstd --zstd-level 1 \
28-
//! --hash-columns 0,3 \
29-
//! --read-back
28+
//! --hash-columns 0,3
3029
//! ```
3130
//!
3231
//! Profile with flamegraph:
@@ -46,9 +45,7 @@ use datafusion::physical_plan::common::collect;
4645
use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
4746
use datafusion::physical_plan::ExecutionPlan;
4847
use datafusion::prelude::{ParquetReadOptions, SessionContext};
49-
use datafusion_comet_shuffle::{
50-
read_ipc_compressed, CometPartitioning, CompressionCodec, ShuffleWriterExec,
51-
};
48+
use datafusion_comet_shuffle::{CometPartitioning, CompressionCodec, ShuffleWriterExec};
5249
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
5350
use std::fs;
5451
use std::path::{Path, PathBuf};
@@ -58,7 +55,7 @@ use std::time::Instant;
5855
#[derive(Parser, Debug)]
5956
#[command(
6057
name = "shuffle_bench",
61-
about = "Standalone benchmark for Comet shuffle write and read performance"
58+
about = "Standalone benchmark for Comet shuffle write performance"
6259
)]
6360
struct Args {
6461
/// Path to input Parquet file or directory of Parquet files
@@ -93,10 +90,6 @@ struct Args {
9390
#[arg(long)]
9491
memory_limit: Option<usize>,
9592

96-
/// Also benchmark reading back the shuffle output
97-
#[arg(long, default_value_t = false)]
98-
read_back: bool,
99-
10093
/// Number of iterations to run
10194
#[arg(long, default_value_t = 1)]
10295
iterations: usize,
@@ -169,7 +162,6 @@ fn main() {
169162

170163
let total_iters = args.warmup + args.iterations;
171164
let mut write_times = Vec::with_capacity(args.iterations);
172-
let mut read_times = Vec::with_capacity(args.iterations);
173165
let mut data_file_sizes = Vec::with_capacity(args.iterations);
174166
let mut last_metrics: Option<MetricsSet> = None;
175167
let mut last_input_metrics: Option<MetricsSet> = None;
@@ -216,22 +208,7 @@ fn main() {
216208
print!(" output: {}", format_bytes(data_size as usize));
217209
}
218210

219-
if args.read_back && args.concurrent_tasks <= 1 {
220-
let read_elapsed = run_shuffle_read(
221-
data_file.to_str().unwrap(),
222-
index_file.to_str().unwrap(),
223-
args.partitions,
224-
);
225-
if !is_warmup {
226-
read_times.push(read_elapsed);
227-
}
228-
print!(" read: {:.3}s", read_elapsed);
229-
}
230211
println!();
231-
232-
// Remove output files after each iteration to avoid filling disk
233-
let _ = fs::remove_file(&data_file);
234-
let _ = fs::remove_file(&index_file);
235212
}
236213

237214
if args.iterations > 0 {
@@ -264,24 +241,6 @@ fn main() {
264241
);
265242
}
266243

267-
if !read_times.is_empty() {
268-
let avg_data_size = data_file_sizes.iter().sum::<u64>() / data_file_sizes.len() as u64;
269-
let avg_read = read_times.iter().sum::<f64>() / read_times.len() as f64;
270-
let read_throughput_bytes = avg_data_size as f64 / avg_read;
271-
272-
println!("Read:");
273-
println!(" avg time: {:.3}s", avg_read);
274-
if read_times.len() > 1 {
275-
let min = read_times.iter().cloned().fold(f64::INFINITY, f64::min);
276-
let max = read_times.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
277-
println!(" min/max: {:.3}s / {:.3}s", min, max);
278-
}
279-
println!(
280-
" throughput: {}/s (from compressed)",
281-
format_bytes(read_throughput_bytes as usize)
282-
);
283-
}
284-
285244
if let Some(ref metrics) = last_input_metrics {
286245
println!();
287246
println!("Input Metrics (last iteration):");
@@ -330,15 +289,6 @@ fn print_shuffle_metrics(metrics: &MetricsSet, total_wall_time_secs: f64) {
330289
if let Some(nanos) = get_metric("write_time") {
331290
println!(" write time: {}", fmt_time(nanos));
332291
}
333-
if let Some(nanos) = get_metric("interleave_time") {
334-
println!(" interleave time: {}", fmt_time(nanos));
335-
}
336-
if let Some(nanos) = get_metric("coalesce_time") {
337-
println!(" coalesce time: {}", fmt_time(nanos));
338-
}
339-
if let Some(nanos) = get_metric("memcopy_time") {
340-
println!(" memcopy time: {}", fmt_time(nanos));
341-
}
342292

343293
if let Some(spill_count) = get_metric("spill_count") {
344294
if spill_count > 0 {
@@ -633,53 +583,6 @@ fn run_concurrent_shuffle_writes(
633583
})
634584
}
635585

636-
fn run_shuffle_read(data_file: &str, index_file: &str, num_partitions: usize) -> f64 {
637-
let start = Instant::now();
638-
639-
let index_bytes = fs::read(index_file).expect("Failed to read index file");
640-
let num_offsets = index_bytes.len() / 8;
641-
let offsets: Vec<i64> = (0..num_offsets)
642-
.map(|i| {
643-
let bytes: [u8; 8] = index_bytes[i * 8..(i + 1) * 8].try_into().unwrap();
644-
i64::from_le_bytes(bytes)
645-
})
646-
.collect();
647-
648-
let data_bytes = fs::read(data_file).expect("Failed to read data file");
649-
650-
let mut total_rows = 0usize;
651-
let mut total_batches = 0usize;
652-
653-
for p in 0..num_partitions.min(offsets.len().saturating_sub(1)) {
654-
let start_offset = offsets[p] as usize;
655-
let end_offset = offsets[p + 1] as usize;
656-
657-
if start_offset >= end_offset {
658-
continue;
659-
}
660-
661-
let mut offset = start_offset;
662-
while offset < end_offset {
663-
let ipc_length =
664-
u64::from_le_bytes(data_bytes[offset..offset + 8].try_into().unwrap()) as usize;
665-
let block_data = &data_bytes[offset + 16..offset + 8 + ipc_length];
666-
let batch = read_ipc_compressed(block_data).expect("Failed to decode shuffle block");
667-
total_rows += batch.num_rows();
668-
total_batches += 1;
669-
offset += 8 + ipc_length;
670-
}
671-
}
672-
673-
let elapsed = start.elapsed().as_secs_f64();
674-
eprintln!(
675-
" read back {} rows in {} batches from {} partitions",
676-
format_number(total_rows),
677-
total_batches,
678-
num_partitions
679-
);
680-
elapsed
681-
}
682-
683586
fn build_partitioning(
684587
scheme: &str,
685588
num_partitions: usize,

0 commit comments

Comments
 (0)