Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::{
SendableRecordBatchStream, Statistics, check_if_same_properties,
};
use datafusion_common::config::ConfigOptions;
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr::utils::collect_columns;
use parking_lot::Mutex;
use std::collections::HashSet;
Expand Down Expand Up @@ -713,6 +714,37 @@ impl AggregateExec {
&self.cache
}

/// Clone this exec with a new output schema that only renames fields.
pub fn with_output_schema(&self, schema: SchemaRef) -> Result<Self> {
let eq_properties = self
.cache
.eq_properties
.clone()
.with_new_schema(Arc::clone(&schema))?;
let output_partitioning = remap_partitioning(&self.cache.partitioning, &schema);
let cache = PlanProperties::new(
eq_properties,
output_partitioning,
self.cache.emission_type,
self.cache.boundedness,
);
Ok(Self {
schema,
cache: Arc::new(cache),
required_input_ordering: self.required_input_ordering.clone(),
metrics: ExecutionPlanMetricsSet::new(),
input_order_mode: self.input_order_mode.clone(),
mode: self.mode,
group_by: Arc::clone(&self.group_by),
aggr_expr: Arc::clone(&self.aggr_expr),
filter_expr: Arc::clone(&self.filter_expr),
limit_options: self.limit_options,
input: Arc::clone(&self.input),
input_schema: Arc::clone(&self.input_schema),
dynamic_filter: self.dynamic_filter.clone(),
})
}

/// Create a new hash aggregate execution plan
pub fn try_new(
mode: AggregateMode,
Expand Down Expand Up @@ -1598,6 +1630,35 @@ impl ExecutionPlan for AggregateExec {
}
}

/// Remap Column references in a Partitioning to use new schema field names.
fn remap_partitioning(
partitioning: &Partitioning,
new_schema: &SchemaRef,
) -> Partitioning {
match partitioning {
Partitioning::Hash(exprs, n) => {
let new_exprs: Vec<Arc<dyn PhysicalExpr>> = exprs
.iter()
.map(|e| {
if let Some(col) = e.as_any().downcast_ref::<Column>() {
let idx = col.index();
if idx < new_schema.fields().len() {
Arc::new(Column::new(new_schema.field(idx).name(), idx))
as Arc<dyn PhysicalExpr>
} else {
Arc::clone(e)
}
} else {
Arc::clone(e)
}
})
.collect();
Partitioning::Hash(new_exprs, *n)
}
other => other.clone(),
}
}

/// Creates the output schema for an [`AggregateExec`] containing the group by columns followed
/// by the aggregate columns.
fn create_schema(
Expand Down
60 changes: 59 additions & 1 deletion datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use super::{
DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
SendableRecordBatchStream, SortOrderPushdownResult, Statistics,
};
use crate::aggregates::{AggregateExec, AggregateMode};
use crate::column_rewriter::PhysicalColumnRewriter;
use crate::execution_plan::CardinalityEffect;
use crate::filter_pushdown::{
Expand All @@ -39,7 +40,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use arrow::datatypes::SchemaRef;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{
Expand Down Expand Up @@ -733,6 +734,10 @@ pub fn remove_unnecessary_projections(
if is_projection_removable(projection) {
return Ok(Transformed::yes(Arc::clone(projection.input())));
}
// Try to absorb rename-only projections into the child:
if let Some(new_child) = try_absorb_rename_projection(projection)? {
return Ok(Transformed::yes(new_child));
}
// If it does, check if we can push it under its child(ren):
projection
.input()
Expand All @@ -757,6 +762,59 @@ fn is_projection_removable(projection: &ProjectionExec) -> bool {
}) && exprs.len() == projection.input().schema().fields().len()
}

/// If a projection only renames columns, try to absorb the rename into
/// the child operator, eliminating the projection entirely.
fn try_absorb_rename_projection(
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let exprs = projection.expr();
let input = projection.input();
let input_field_count = input.schema().fields().len();

let is_rename_only = exprs.len() == input_field_count
&& exprs.iter().enumerate().all(|(idx, proj_expr)| {
proj_expr
.expr
.as_any()
.downcast_ref::<Column>()
.map(|col| col.index() == idx)
.unwrap_or(false)
});

if !is_rename_only {
return Ok(None);
}

let old_schema = input.schema();
let new_fields: Vec<Arc<arrow::datatypes::Field>> = exprs
.iter()
.enumerate()
.map(|(idx, proj_expr)| {
let field = old_schema.field(idx);
Arc::new(field.as_ref().clone().with_name(&proj_expr.alias))
})
.collect();
let new_schema = Arc::new(Schema::new_with_metadata(
new_fields,
old_schema.metadata().clone(),
));

if let Some(agg) = input.downcast_ref::<AggregateExec>()
&& matches!(
agg.mode(),
AggregateMode::Final
| AggregateMode::FinalPartitioned
| AggregateMode::Single
| AggregateMode::SinglePartitioned
)
{
let new_agg: AggregateExec = agg.with_output_schema(new_schema)?;
return Ok(Some(Arc::new(new_agg)));
}

Ok(None)
}

/// Given the expression set of a projection, checks if the projection causes
/// any renaming or constructs a non-`Column` physical expression.
pub fn all_alias_free_columns(exprs: &[ProjectionExpr]) -> bool {
Expand Down
33 changes: 15 additions & 18 deletions datafusion/sqllogictest/test_files/agg_func_substitute.slt
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@ logical_plan
02)--Aggregate: groupBy=[[multiple_ordered_table.a]], aggr=[[nth_value(multiple_ordered_table.c, Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]]]
03)----TableScan: multiple_ordered_table projection=[a, c]
physical_plan
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
01)AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
02)--RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
03)----AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true


query TT
Expand All @@ -61,12 +60,11 @@ logical_plan
02)--Aggregate: groupBy=[[multiple_ordered_table.a]], aggr=[[nth_value(multiple_ordered_table.c, Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]]]
03)----TableScan: multiple_ordered_table projection=[a, c]
physical_plan
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
01)AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
02)--RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
03)----AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true

query TT
EXPLAIN SELECT a, ARRAY_AGG(c ORDER BY c)[1 + 100] as result
Expand All @@ -78,12 +76,11 @@ logical_plan
02)--Aggregate: groupBy=[[multiple_ordered_table.a]], aggr=[[nth_value(multiple_ordered_table.c, Int64(101)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] AS nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]]]
03)----TableScan: multiple_ordered_table projection=[a, c]
physical_plan
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
01)AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
02)--RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
03)----AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true

query II
SELECT a, ARRAY_AGG(c ORDER BY c)[1] as result
Expand Down
26 changes: 12 additions & 14 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1646,14 +1646,13 @@ logical_plan
03)----Aggregate: groupBy=[[t.c AS alias1]], aggr=[[]]
04)------TableScan: t projection=[c]
physical_plan
01)ProjectionExec: expr=[median(alias1)@0 as median(DISTINCT t.c)]
02)--AggregateExec: mode=Final, gby=[], aggr=[median(alias1)]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[median(alias1)]
05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
06)----------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=1
07)------------AggregateExec: mode=Partial, gby=[c@0 as alias1], aggr=[]
08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
01)AggregateExec: mode=Final, gby=[], aggr=[median(alias1)]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[median(alias1)]
04)------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
05)--------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=1
06)----------AggregateExec: mode=Partial, gby=[c@0 as alias1], aggr=[]
07)------------DataSourceExec: partitions=1, partition_sizes=[1]

statement ok
drop table t;
Expand Down Expand Up @@ -7898,12 +7897,11 @@ logical_plan
03)----Aggregate: groupBy=[[]], aggr=[[count(aggregate_test_100.c5)]]
04)------TableScan: aggregate_test_100 projection=[c5]
physical_plan
01)ProjectionExec: expr=[count(aggregate_test_100.c5)@0 as count_c5]
02)--AggregateExec: mode=Final, gby=[], aggr=[count(aggregate_test_100.c5)]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(aggregate_test_100.c5)]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c5], file_type=csv, has_header=true
01)AggregateExec: mode=Final, gby=[], aggr=[count(aggregate_test_100.c5)]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[count(aggregate_test_100.c5)]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c5], file_type=csv, has_header=true

statement count 0
drop table aggregate_test_100;
Expand Down
25 changes: 11 additions & 14 deletions datafusion/sqllogictest/test_files/aggregate_repartition.slt
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,11 @@ logical_plan
02)--Aggregate: groupBy=[[dim_csv.env]], aggr=[[count(Int64(1))]]
03)----TableScan: dim_csv projection=[env]
physical_plan
01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))]
03)----RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=4
04)------AggregateExec: mode=Partial, gby=[env@0 as env], aggr=[count(Int64(1))]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.csv]]}, projection=[env], file_type=csv, has_header=true
01)AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))]
02)--RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=4
03)----AggregateExec: mode=Partial, gby=[env@0 as env], aggr=[count(Int64(1))]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.csv]]}, projection=[env], file_type=csv, has_header=true

# Test 2: EXPLAIN query for Parquet table with GROUP BY

Expand All @@ -92,11 +91,10 @@ logical_plan
02)--Aggregate: groupBy=[[dim_parquet.env]], aggr=[[count(Int64(1))]]
03)----TableScan: dim_parquet projection=[env]
physical_plan
01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))]
03)----RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=1
04)------AggregateExec: mode=Partial, gby=[env@0 as env], aggr=[count(Int64(1))]
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]}, projection=[env], file_type=parquet
01)AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))]
02)--RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=1
03)----AggregateExec: mode=Partial, gby=[env@0 as env], aggr=[count(Int64(1))]
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]}, projection=[env], file_type=parquet

# Verify the queries actually work and return the same results
query TI rowsort
Expand Down Expand Up @@ -125,9 +123,8 @@ logical_plan
02)--Aggregate: groupBy=[[dim_parquet.env]], aggr=[[count(Int64(1))]]
03)----TableScan: dim_parquet projection=[env]
physical_plan
01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
02)--AggregateExec: mode=Single, gby=[env@0 as env], aggr=[count(Int64(1))]
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]}, projection=[env], file_type=parquet
01)AggregateExec: mode=Single, gby=[env@0 as env], aggr=[count(Int64(1))]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]}, projection=[env], file_type=parquet

# Config reset

Expand Down
17 changes: 8 additions & 9 deletions datafusion/sqllogictest/test_files/aggregates_simplify.slt
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,14 @@ logical_plan
04)------Projection: sum_simplify_t.column1 + Int64(1) AS __common_expr_1
05)--------TableScan: sum_simplify_t projection=[column1]
physical_plan
01)ProjectionExec: expr=[sum(alias1)@0 as sum(DISTINCT sum_simplify_t.column1 + Int64(1)), sum(alias2)@1 as sum(sum_simplify_t.column1 + Int64(1))]
02)--AggregateExec: mode=Final, gby=[], aggr=[sum(alias1), sum(alias2)]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[sum(alias1), sum(alias2)]
05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[alias2]
06)----------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=1
07)------------AggregateExec: mode=Partial, gby=[__common_expr_1@0 as alias1], aggr=[alias2]
08)--------------ProjectionExec: expr=[column1@0 + 1 as __common_expr_1]
09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
01)AggregateExec: mode=Final, gby=[], aggr=[sum(alias1), sum(alias2)]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[sum(alias1), sum(alias2)]
04)------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[alias2]
05)--------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=1
06)----------AggregateExec: mode=Partial, gby=[__common_expr_1@0 as alias1], aggr=[alias2]
07)------------ProjectionExec: expr=[column1@0 + 1 as __common_expr_1]
08)--------------DataSourceExec: partitions=1, partition_sizes=[1]

# FILTER clauses with different aggregate arguments
query II
Expand Down
Loading
Loading