Skip to content

Commit 1afa8ea

Browse files
committed
feat: add scatter_time metric to shuffle write profiling
1 parent 233d436 commit 1afa8ea

4 files changed

Lines changed: 12 additions & 0 deletions

File tree

native/core/src/execution/shuffle/metrics.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ pub(super) struct ShufflePartitionerMetrics {
2626
/// Time to perform repartitioning
2727
pub(super) repart_time: Time,
2828

29+
/// Time scattering values to per-partition buffers
30+
pub(super) scatter_time: Time,
31+
2932
/// Time encoding batches to IPC format
3033
pub(super) encode_time: Time,
3134

@@ -50,6 +53,7 @@ impl ShufflePartitionerMetrics {
5053
Self {
5154
baseline: BaselineMetrics::new(metrics, partition),
5255
repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition),
56+
scatter_time: MetricBuilder::new(metrics).subset_time("scatter_time", partition),
5357
encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition),
5458
write_time: MetricBuilder::new(metrics).subset_time("write_time", partition),
5559
input_batches: MetricBuilder::new(metrics).counter("input_batches", partition),

native/core/src/execution/shuffle/partitioners/multi_partition.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,8 @@ impl MultiPartitionShuffleRepartitioner {
415415
// Track memory before scatter
416416
let mem_before: usize = self.partition_buffers.iter().map(|b| b.memory_size()).sum();
417417

418+
let scatter_start = Instant::now();
419+
418420
// Column-oriented scatter: for each column, iterate by partition then by
419421
// rows within that partition. This keeps writes to the same partition buffer
420422
// sequential for better cache locality.
@@ -535,6 +537,8 @@ impl MultiPartitionShuffleRepartitioner {
535537
}
536538
}
537539

540+
self.metrics.scatter_time.add_duration(scatter_start.elapsed());
541+
538542
// Update row counts from partition_starts (O(num_partitions), not O(num_rows))
539543
for p in 0..num_partitions {
540544
let count = (partition_starts[p + 1] - partition_starts[p]) as usize;

spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,9 @@ object CometMetricNode {
248248
Map(
249249
"elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle writer time"),
250250
"repart_time" -> SQLMetrics.createNanoTimingMetric(sc, "repartition time"),
251+
"scatter_time" -> SQLMetrics.createNanoTimingMetric(
252+
sc,
253+
"scatter to partition buffers time"),
251254
"encode_time" -> SQLMetrics.createNanoTimingMetric(sc, "encoding and compression time"),
252255
"decode_time" -> SQLMetrics.createNanoTimingMetric(sc, "decoding and decompression time"),
253256
"spill_count" -> SQLMetrics.createMetric(sc, "number of spills"),

spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class CometNativeShuffleWriter[K, V](
7878
val detailedMetrics = Seq(
7979
"elapsed_compute",
8080
"repart_time",
81+
"scatter_time",
8182
"encode_time",
8283
"input_batches",
8384
"spill_count",

0 commit comments

Comments
 (0)