Skip to content

Commit 2274f5e

Browse files
respond to review
1 parent af6737e commit 2274f5e

3 files changed

Lines changed: 20 additions & 5 deletions

File tree

quickwit/quickwit-datafusion/src/sources/metrics/optimizer.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ use datafusion::error::Result as DFResult;
2222
use datafusion::physical_expr::expressions::Column;
2323
use datafusion::physical_expr::{LexOrdering, Partitioning};
2424
use datafusion::physical_optimizer::PhysicalOptimizerRule;
25-
use datafusion::physical_plan::ExecutionPlan;
2625
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
2726
use datafusion::physical_plan::repartition::RepartitionExec;
2827
use datafusion::physical_plan::sorts::sort::SortExec;
2928
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
29+
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
3030
use quickwit_parquet_engine::sorted_series::SORTED_SERIES_COLUMN;
3131

3232
/// Replaces the inner sorted-series hash repartition in rollup plans with a
@@ -93,10 +93,17 @@ fn rewrite_sorted_series_final_aggregate(
9393
}
9494

9595
let ordering = sort.expr().clone();
96-
let partition_sort: Arc<dyn ExecutionPlan> = Arc::new(
97-
SortExec::new(ordering.clone(), Arc::clone(repartition.input()))
98-
.with_preserve_partitioning(true),
99-
);
96+
let repartition_input = Arc::clone(repartition.input());
97+
let partition_sort: Arc<dyn ExecutionPlan> = if repartition_input
98+
.equivalence_properties()
99+
.ordering_satisfy(ordering.clone())?
100+
{
101+
repartition_input
102+
} else {
103+
Arc::new(
104+
SortExec::new(ordering.clone(), repartition_input).with_preserve_partitioning(true),
105+
)
106+
};
100107
let merged: Arc<dyn ExecutionPlan> =
101108
Arc::new(SortPreservingMergeExec::new(ordering, partition_sort));
102109

quickwit/quickwit-datafusion/tests/distributed.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,10 @@ async fn test_distributed_tasks_not_shuffles() {
203203
plan_str.contains("SortPreservingMergeExec: [sorted_series"),
204204
"expected sorted-series worker streams to be merge-sorted:\n{plan_str}"
205205
);
206+
assert!(
207+
!plan_str.contains("SortExec: expr=[sorted_series"),
208+
"expected sorted-series worker streams to use preserved ordering without an explicit sort:\n{plan_str}"
209+
);
206210
assert!(
207211
!plan_str.contains("NetworkShuffleExec"),
208212
"expected no network shuffle for sorted-series finalization:\n{plan_str}"

quickwit/quickwit-datafusion/tests/metrics.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,10 @@ async fn test_rollup_nested_aggregation() {
663663
plan_str.contains("SortPreservingMergeExec: [sorted_series"),
664664
"expected sorted_series partials to be merge-sorted before finalization:\n{plan_str}"
665665
);
666+
assert!(
667+
!plan_str.contains("SortExec: expr=[sorted_series"),
668+
"expected sorted_series partials to use preserved ordering without an explicit sort:\n{plan_str}"
669+
);
666670
assert!(
667671
!plan_str.contains("Hash([sorted_series"),
668672
"expected no hash repartition by sorted_series:\n{plan_str}"

0 commit comments

Comments
 (0)