Skip to content

Commit d302350

Browse files
authored
Fix metrics for repartition when preserve_order=true (apache#20924)
## Which issue does this PR close? Found this when working on apache#20875 <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change Metric reporting was previously incorrect for `RepartitionExec` if preserve order was set to true. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Create new metrics before creating `PerPartitionStream` ## Are these changes tested? Yes and confirmed that this fails on main: ``` thread 'repartition::test::test_preserve_order_output_rows_not_double_counted' (12487869) panicked at datafusion/physical-plan/src/repartition/mod.rs:3007:9: assertion `left == right` failed: metrics output_rows (8) should match actual rows collected (4), not double-count left: 8 right: 4 note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace ``` <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 7d9f6ea commit d302350

1 file changed

Lines changed: 64 additions & 8 deletions

File tree

  • datafusion/physical-plan/src/repartition

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1299,6 +1299,12 @@ impl ExecutionPlan for RepartitionExec {
12991299
if preserve_order {
13001300
// Store streams from all the input partitions:
13011301
// Each input partition gets its own spill reader to maintain proper FIFO ordering
1302+
//
1303+
// Pass None for metrics here — these intermediate streams feed into
1304+
// StreamingMerge which is the actual output. Only the merge's
1305+
// BaselineMetrics should contribute to the operator's reported
1306+
// output_rows. Without this, every row would be counted twice
1307+
// (once by PerPartitionStream, once by StreamingMerge).
13021308
let input_streams = rx
13031309
.into_iter()
13041310
.zip(spill_readers)
@@ -1311,7 +1317,7 @@ impl ExecutionPlan for RepartitionExec {
13111317
Arc::clone(&reservation),
13121318
spill_stream,
13131319
1, // Each receiver handles one input partition
1314-
BaselineMetrics::new(&metrics, partition),
1320+
None,
13151321
)) as SendableRecordBatchStream
13161322
})
13171323
.collect::<Vec<_>>();
@@ -1349,7 +1355,7 @@ impl ExecutionPlan for RepartitionExec {
13491355
reservation,
13501356
spill_stream,
13511357
num_input_partitions,
1352-
BaselineMetrics::new(&metrics, partition),
1358+
Some(BaselineMetrics::new(&metrics, partition)),
13531359
)) as SendableRecordBatchStream)
13541360
}
13551361
})
@@ -1862,8 +1868,8 @@ struct PerPartitionStream {
18621868
/// each sending None when complete. We must wait for all of them.
18631869
remaining_partitions: usize,
18641870

1865-
/// Execution metrics
1866-
baseline_metrics: BaselineMetrics,
1871+
/// Execution metrics (None in preserve-order mode where StreamingMerge owns the metrics)
1872+
baseline_metrics: Option<BaselineMetrics>,
18671873
}
18681874

18691875
impl PerPartitionStream {
@@ -1874,7 +1880,7 @@ impl PerPartitionStream {
18741880
reservation: SharedMemoryReservation,
18751881
spill_stream: SendableRecordBatchStream,
18761882
num_input_partitions: usize,
1877-
baseline_metrics: BaselineMetrics,
1883+
baseline_metrics: Option<BaselineMetrics>,
18781884
) -> Self {
18791885
Self {
18801886
schema,
@@ -1893,8 +1899,11 @@ impl PerPartitionStream {
18931899
cx: &mut Context<'_>,
18941900
) -> Poll<Option<Result<RecordBatch>>> {
18951901
use futures::StreamExt;
1896-
let cloned_time = self.baseline_metrics.elapsed_compute().clone();
1897-
let _timer = cloned_time.timer();
1902+
let elapsed = self
1903+
.baseline_metrics
1904+
.as_ref()
1905+
.map(|m| m.elapsed_compute().clone());
1906+
let _timer = elapsed.as_ref().map(|t| t.timer());
18981907

18991908
loop {
19001909
match self.state {
@@ -1980,7 +1989,11 @@ impl Stream for PerPartitionStream {
19801989
cx: &mut Context<'_>,
19811990
) -> Poll<Option<Self::Item>> {
19821991
let poll = self.poll_next_inner(cx);
1983-
self.baseline_metrics.record_poll(poll)
1992+
if let Some(metrics) = &self.baseline_metrics {
1993+
metrics.record_poll(poll)
1994+
} else {
1995+
poll
1996+
}
19841997
}
19851998
}
19861999

@@ -3294,4 +3307,47 @@ mod test {
32943307
let exec = Arc::new(exec);
32953308
Arc::new(TestMemoryExec::update_cache(&exec))
32963309
}
3310+
3311+
/// preserve_order repartition should not double-count
3312+
/// output rows.
3313+
#[tokio::test]
3314+
async fn test_preserve_order_output_rows_not_double_counted() -> Result<()> {
3315+
use datafusion_execution::TaskContext;
3316+
3317+
// Two sorted input partitions, 2 rows each (4 total)
3318+
let batch1 = record_batch!(("c0", UInt32, [1, 3])).unwrap();
3319+
let batch2 = record_batch!(("c0", UInt32, [2, 4])).unwrap();
3320+
let schema = batch1.schema();
3321+
let sort_exprs = sort_exprs(&schema);
3322+
3323+
let input_partitions = vec![vec![batch1], vec![batch2]];
3324+
let exec = TestMemoryExec::try_new(&input_partitions, Arc::clone(&schema), None)?
3325+
.try_with_sort_information(vec![sort_exprs.clone(), sort_exprs])?;
3326+
let exec = Arc::new(exec);
3327+
let exec = Arc::new(TestMemoryExec::update_cache(&exec));
3328+
3329+
let exec = RepartitionExec::try_new(exec, Partitioning::RoundRobinBatch(3))?
3330+
.with_preserve_order();
3331+
3332+
let task_ctx = Arc::new(TaskContext::default());
3333+
let mut total_rows = 0;
3334+
for i in 0..exec.partitioning().partition_count() {
3335+
let mut stream = exec.execute(i, Arc::clone(&task_ctx))?;
3336+
while let Some(result) = stream.next().await {
3337+
total_rows += result?.num_rows();
3338+
}
3339+
}
3340+
3341+
assert_eq!(total_rows, 4, "actual rows collected should be 4");
3342+
3343+
let metrics = exec.metrics().unwrap();
3344+
let reported_output_rows = metrics.output_rows().unwrap();
3345+
assert_eq!(
3346+
reported_output_rows, total_rows,
3347+
"metrics output_rows ({reported_output_rows}) should match \
3348+
actual rows collected ({total_rows}), not double-count"
3349+
);
3350+
3351+
Ok(())
3352+
}
32973353
}

0 commit comments

Comments
 (0)