Skip to content

Commit cd5514f

Browse files
add smoke tests and chagne explain docs
1 parent df3e5d2 commit cd5514f

2 files changed

Lines changed: 29 additions & 12 deletions

File tree

  • datafusion/physical-plan/src/repartition
  • docs/source/user-guide/sql

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3007,13 +3007,31 @@ mod test {
30073007

30083008
// Verify metrics are available
30093009
let metrics = exec.metrics().unwrap();
3010-
// Just verify the metrics can be retrieved (spilling may or may not occur)
3011-
let spill_count = metrics.spill_count().unwrap_or(0);
3010+
// Verify spilling occurred and the repartition metrics were recorded.
3011+
let spill_count = metrics.spill_count().unwrap();
30123012
assert!(spill_count > 0);
3013-
let spilled_bytes = metrics.spilled_bytes().unwrap_or(0);
3013+
let spilled_bytes = metrics.spilled_bytes().unwrap();
30143014
assert!(spilled_bytes > 0);
3015-
let spilled_rows = metrics.spilled_rows().unwrap_or(0);
3015+
let spilled_rows = metrics.spilled_rows().unwrap();
30163016
assert!(spilled_rows > 0);
3017+
let fetch_time = metrics.sum_by_name("fetch_time").unwrap().as_usize();
3018+
assert!(fetch_time > 0);
3019+
let repartition_time =
3020+
metrics.sum_by_name("repartition_time").unwrap().as_usize();
3021+
assert!(repartition_time > 0);
3022+
let route_time = metrics.sum_by_name("route_time").unwrap().as_usize();
3023+
assert!(route_time > 0);
3024+
let batch_build_time =
3025+
metrics.sum_by_name("batch_build_time").unwrap().as_usize();
3026+
assert!(batch_build_time > 0);
3027+
assert!(repartition_time >= route_time + batch_build_time);
3028+
let channel_wait_time =
3029+
metrics.sum_by_name("channel_wait_time").unwrap().as_usize();
3030+
assert!(channel_wait_time > 0);
3031+
let spill_write_time =
3032+
metrics.sum_by_name("spill_write_time").unwrap().as_usize();
3033+
assert!(spill_write_time > 0);
3034+
assert!(metrics.sum_by_name("spill_read_wait_time").is_some());
30173035

30183036
Ok(())
30193037
}

docs/source/user-guide/sql/explain.md

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -236,14 +236,13 @@ EXPLAIN ANALYZE SELECT SUM(x) FROM table GROUP BY b;
236236
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
237237
| plan_type | plan |
238238
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
239-
| Plan with Metrics | CoalescePartitionsExec, metrics=[] |
240-
| | ProjectionExec: expr=[SUM(table.x)@1 as SUM(x)], metrics=[] |
241-
| | HashAggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[SUM(x)], metrics=[outputRows=2] |
242-
| | CoalesceBatchesExec: target_batch_size=4096, metrics=[] |
243-
| | RepartitionExec: partitioning=Hash([Column { name: "b", index: 0 }], 16), metrics=[sendTime=839560, fetchTime=122528525, repartitionTime=5327877] |
244-
| | HashAggregateExec: mode=Partial, gby=[b@1 as b], aggr=[SUM(x)], metrics=[outputRows=2] |
245-
| | RepartitionExec: partitioning=RoundRobinBatch(16), metrics=[fetchTime=5660489, repartitionTime=0, sendTime=8012] |
246-
| | DataSourceExec: file_groups={1 group: [[/tmp/table.csv]]}, has_header=false, metrics=[] |
239+
| Plan with Metrics | ProjectionExec: expr=[sum(table.x)@1 as sum(table.x)], metrics=[output_rows=2, elapsed_compute=7.30µs, output_bytes=64.0 B, output_batches=2, expr_0_eval_time=764ns] |
240+
| | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[sum(table.x)], metrics=[output_rows=2, elapsed_compute=699.21µs, output_bytes=1088.0 B, output_batches=2, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=17.60 K, aggregate_arguments_time=6.55µs, aggregation_time=6.97µs, emitting_time=8.47µs, time_calculating_group_ids=5.89µs] |
241+
| | RepartitionExec: partitioning=Hash([b@0], 16), input_partitions=16, metrics=[output_rows=2, elapsed_compute=135.12µs, output_bytes=192.0 KB, output_batches=2, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, batch_build_time=29.35µs, channel_wait_time=4.34µs, fetch_time=14.53ms, repartition_time=49.60µs, route_time=1.97µs, spill_read_wait_time=16ns, spill_write_time=256ns] |
242+
| | AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[sum(table.x)], metrics=[output_rows=2, elapsed_compute=242.71µs, output_bytes=544.0 B, output_batches=1, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, skipped_aggregation_rows=0, peak_mem_used=5.28 K, aggregate_arguments_time=31.31µs, aggregation_time=19.39µs, emitting_time=9.64µs, time_calculating_group_ids=10.77µs, reduction_factor=20% (2/10)] |
243+
| | RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1, metrics=[output_rows=10, elapsed_compute=50.25µs, output_bytes=64.0 KB, output_batches=1, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, batch_build_time=1ns, channel_wait_time=3.43µs, fetch_time=792.21µs, repartition_time=292ns, route_time=1ns, spill_read_wait_time=16ns, spill_write_time=16ns] |
244+
| | DataSourceExec: file_groups={1 group: [[tmp/datafusion-explain/table.csv]]}, projection=[x, b], file_type=csv, has_header=true, metrics=[output_rows=10, elapsed_compute=582.33µs, output_bytes=128.0 B, output_batches=1, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, time_elapsed_opening=140.33µs, time_elapsed_processing=721.33µs, time_elapsed_scanning_total=613.13µs, time_elapsed_scanning_until_data=588.17µs] |
245+
| | |
247246
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
248247
```
249248

0 commit comments

Comments
 (0)