Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions native/core/src/execution/shuffle/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
};

pub(super) struct ShufflePartitionerMetrics {
/// metrics
pub(super) baseline: BaselineMetrics,

/// Time to perform repartitioning
pub(super) repart_time: Time,

/// Time encoding batches to IPC format
pub(super) encode_time: Time,

/// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics.
pub(super) write_time: Time,

/// Number of input batches
pub(super) input_batches: Count,

/// count of spills during the execution of the operator
pub(super) spill_count: Count,

/// total spilled bytes during the execution of the operator
pub(super) spilled_bytes: Count,

/// The original size of spilled data. Different to `spilled_bytes` because of compression.
pub(super) data_size: Count,
}

impl ShufflePartitionerMetrics {
pub(super) fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
Self {
baseline: BaselineMetrics::new(metrics, partition),
repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition),
encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition),
write_time: MetricBuilder::new(metrics).subset_time("write_time", partition),
input_batches: MetricBuilder::new(metrics).counter("input_batches", partition),
spill_count: MetricBuilder::new(metrics).spill_count(partition),
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
data_size: MetricBuilder::new(metrics).counter("data_size", partition),
}
}
}
1 change: 1 addition & 0 deletions native/core/src/execution/shuffle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

pub(crate) mod codec;
mod comet_partitioning;
mod metrics;
mod shuffle_writer;
pub mod spark_unsafe;

Expand Down
179 changes: 65 additions & 114 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Defines the External shuffle repartition plan.

use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
use crate::execution::shuffle::{CometPartitioning, CompressionCodec, ShuffleBlockWriter};
use crate::execution::tracing::{with_trace, with_trace_async};
use arrow::compute::interleave_record_batch;
Expand All @@ -35,9 +36,7 @@ use datafusion::{
runtime_env::RuntimeEnv,
},
physical_plan::{
metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time,
},
metrics::{ExecutionPlanMetricsSet, MetricsSet, Time},
stream::RecordBatchStreamAdapter,
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
Statistics,
Expand Down Expand Up @@ -185,7 +184,7 @@ impl ExecutionPlan for ShuffleWriterExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, Arc::clone(&context))?;
let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0);
let metrics = ShufflePartitionerMetrics::new(&self.metrics, 0);

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
Expand Down Expand Up @@ -216,7 +215,7 @@ async fn external_shuffle(
output_data_file: String,
output_index_file: String,
partitioning: CometPartitioning,
metrics: ShuffleRepartitionerMetrics,
metrics: ShufflePartitionerMetrics,
context: Arc<TaskContext>,
codec: CompressionCodec,
tracing_enabled: bool,
Expand Down Expand Up @@ -268,47 +267,6 @@ async fn external_shuffle(
.await
}

struct ShuffleRepartitionerMetrics {
/// metrics
baseline: BaselineMetrics,

/// Time to perform repartitioning
repart_time: Time,

/// Time encoding batches to IPC format
encode_time: Time,

/// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics.
write_time: Time,

/// Number of input batches
input_batches: Count,

/// count of spills during the execution of the operator
spill_count: Count,

/// total spilled bytes during the execution of the operator
spilled_bytes: Count,

/// The original size of spilled data. Different to `spilled_bytes` because of compression.
data_size: Count,
}

impl ShuffleRepartitionerMetrics {
fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
Self {
baseline: BaselineMetrics::new(metrics, partition),
repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition),
encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition),
write_time: MetricBuilder::new(metrics).subset_time("write_time", partition),
input_batches: MetricBuilder::new(metrics).counter("input_batches", partition),
spill_count: MetricBuilder::new(metrics).spill_count(partition),
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
data_size: MetricBuilder::new(metrics).counter("data_size", partition),
}
}
}

#[async_trait::async_trait]
trait ShufflePartitioner: Send + Sync {
/// Insert a batch into the partitioner
Expand All @@ -328,7 +286,7 @@ struct MultiPartitionShuffleRepartitioner {
/// Partitioning scheme to use
partitioning: CometPartitioning,
runtime: Arc<RuntimeEnv>,
metrics: ShuffleRepartitionerMetrics,
metrics: ShufflePartitionerMetrics,
/// Reused scratch space for computing partition indices
scratch: ScratchSpace,
/// The configured batch size
Expand Down Expand Up @@ -356,6 +314,54 @@ struct ScratchSpace {
partition_starts: Vec<u32>,
}

impl ScratchSpace {
fn map_partition_ids_to_starts_and_indices(
&mut self,
num_output_partitions: usize,
num_rows: usize,
) {
let partition_ids = &mut self.partition_ids[..num_rows];

// count each partition size, while leaving the last extra element as 0
let partition_counters = &mut self.partition_starts;
partition_counters.resize(num_output_partitions + 1, 0);
partition_counters.fill(0);
partition_ids
.iter()
.for_each(|partition_id| partition_counters[*partition_id as usize] += 1);

// accumulate partition counters into partition ends
// e.g. partition counter: [1, 3, 2, 1, 0] => [1, 4, 6, 7, 7]
let partition_ends = partition_counters;
let mut accum = 0;
partition_ends.iter_mut().for_each(|v| {
*v += accum;
accum = *v;
});

// calculate partition row indices and partition starts
// e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] will produce the following partition_row_indices
// and partition_starts arrays:
//
// partition_row_indices: [6, 1, 2, 3, 4, 5, 0]
// partition_starts: [0, 1, 4, 6, 7]
//
// partition_starts conceptually splits partition_row_indices into smaller slices.
// Each slice partition_row_indices[partition_starts[K]..partition_starts[K + 1]] contains the
// row indices of the input batch that are partitioned into partition K. For example,
// first partition 0 has one row index [6], partition 1 has row indices [1, 2, 3], etc.
let partition_row_indices = &mut self.partition_row_indices;
partition_row_indices.resize(num_rows, 0);
for (index, partition_id) in partition_ids.iter().enumerate().rev() {
partition_ends[*partition_id as usize] -= 1;
let end = partition_ends[*partition_id as usize];
partition_row_indices[end as usize] = index as u32;
}

// after calculating, partition ends become partition starts
}
}

impl MultiPartitionShuffleRepartitioner {
#[allow(clippy::too_many_arguments)]
pub fn try_new(
Expand All @@ -364,7 +370,7 @@ impl MultiPartitionShuffleRepartitioner {
output_index_file: String,
schema: SchemaRef,
partitioning: CometPartitioning,
metrics: ShuffleRepartitionerMetrics,
metrics: ShufflePartitionerMetrics,
runtime: Arc<RuntimeEnv>,
batch_size: usize,
codec: CompressionCodec,
Expand Down Expand Up @@ -432,52 +438,6 @@ impl MultiPartitionShuffleRepartitioner {
return Ok(());
}

fn map_partition_ids_to_starts_and_indices(
scratch: &mut ScratchSpace,
num_output_partitions: usize,
num_rows: usize,
) {
let partition_ids = &mut scratch.partition_ids[..num_rows];

// count each partition size, while leaving the last extra element as 0
let partition_counters = &mut scratch.partition_starts;
partition_counters.resize(num_output_partitions + 1, 0);
partition_counters.fill(0);
partition_ids
.iter()
.for_each(|partition_id| partition_counters[*partition_id as usize] += 1);

// accumulate partition counters into partition ends
// e.g. partition counter: [1, 3, 2, 1, 0] => [1, 4, 6, 7, 7]
let partition_ends = partition_counters;
let mut accum = 0;
partition_ends.iter_mut().for_each(|v| {
*v += accum;
accum = *v;
});

// calculate partition row indices and partition starts
// e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] will produce the following partition_row_indices
// and partition_starts arrays:
//
// partition_row_indices: [6, 1, 2, 3, 4, 5, 0]
// partition_starts: [0, 1, 4, 6, 7]
//
// partition_starts conceptually splits partition_row_indices into smaller slices.
// Each slice partition_row_indices[partition_starts[K]..partition_starts[K + 1]] contains the
// row indices of the input batch that are partitioned into partition K. For example,
// first partition 0 has one row index [6], partition 1 has row indices [1, 2, 3], etc.
let partition_row_indices = &mut scratch.partition_row_indices;
partition_row_indices.resize(num_rows, 0);
for (index, partition_id) in partition_ids.iter().enumerate().rev() {
partition_ends[*partition_id as usize] -= 1;
let end = partition_ends[*partition_id as usize];
partition_row_indices[end as usize] = index as u32;
}

// after calculating, partition ends become partition starts
}

if input.num_rows() > self.batch_size {
return Err(DataFusionError::Internal(
"Input batch size exceeds configured batch size. Call `insert_batch` instead."
Expand Down Expand Up @@ -524,11 +484,8 @@ impl MultiPartitionShuffleRepartitioner {

// We now have partition ids for every input row, map that to partition starts
// and partition indices to eventually right these rows to partition buffers.
map_partition_ids_to_starts_and_indices(
&mut scratch,
*num_output_partitions,
num_rows,
);
scratch
.map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows);

timer.stop();
Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
Expand Down Expand Up @@ -580,11 +537,8 @@ impl MultiPartitionShuffleRepartitioner {

// We now have partition ids for every input row, map that to partition starts
// and partition indices to eventually right these rows to partition buffers.
map_partition_ids_to_starts_and_indices(
&mut scratch,
*num_output_partitions,
num_rows,
);
scratch
.map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows);

timer.stop();
Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
Expand Down Expand Up @@ -642,11 +596,8 @@ impl MultiPartitionShuffleRepartitioner {

// We now have partition ids for every input row, map that to partition starts
// and partition indices to eventually write these rows to partition buffers.
map_partition_ids_to_starts_and_indices(
&mut scratch,
*num_output_partitions,
num_rows,
);
scratch
.map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows);

timer.stop();
Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
Expand Down Expand Up @@ -923,7 +874,7 @@ struct SinglePartitionShufflePartitioner {
/// Number of rows in the concatenating batches
num_buffered_rows: usize,
/// Metrics for the repartitioner
metrics: ShuffleRepartitionerMetrics,
metrics: ShufflePartitionerMetrics,
/// The configured batch size
batch_size: usize,
}
Expand All @@ -933,7 +884,7 @@ impl SinglePartitionShufflePartitioner {
output_data_path: String,
output_index_path: String,
schema: SchemaRef,
metrics: ShuffleRepartitionerMetrics,
metrics: ShufflePartitionerMetrics,
batch_size: usize,
codec: CompressionCodec,
write_buffer_size: usize,
Expand Down Expand Up @@ -1200,7 +1151,7 @@ impl PartitionWriter {
&mut self,
iter: &mut PartitionedBatchIterator,
runtime: &RuntimeEnv,
metrics: &ShuffleRepartitionerMetrics,
metrics: &ShufflePartitionerMetrics,
write_buffer_size: usize,
) -> Result<usize> {
if let Some(batch) = iter.next() {
Expand Down Expand Up @@ -1393,7 +1344,7 @@ mod test {
}

#[tokio::test]
async fn shuffle_repartitioner_memory() {
async fn shuffle_partitioner_memory() {
let batch = create_batch(900);
assert_eq!(8316, batch.get_array_memory_size()); // Not stable across Arrow versions

Expand All @@ -1407,7 +1358,7 @@ mod test {
"/tmp/index.out".to_string(),
batch.schema(),
CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions),
ShuffleRepartitionerMetrics::new(&metrics_set, 0),
ShufflePartitionerMetrics::new(&metrics_set, 0),
runtime_env,
1024,
CompressionCodec::Lz4Frame,
Expand Down
Loading