Skip to content

Commit 2a2a060

Browse files
Dandandanclaude
andauthored
Resolve MIN/MAX from Parquet metadata for Single-mode aggregates and CAST projections (#21651)
## Which issue does this PR close? Related to improving ClickBench performance (metadata-only query resolution) ## Rationale for this change ClickBench Q6 (`SELECT MIN("EventDate"), MAX("EventDate") FROM hits`) was doing a full column scan despite the answer being available in Parquet row group statistics. Two issues prevented the `AggregateStatistics` optimizer from firing: 1. **`take_optimizable` missed `Single` mode** — it only matched the `Final → Partial` pair, not single-partition scans. 2. **Statistics lost through CAST projections** — `project_statistics` returned `unknown` for any non-Column/Literal expression, discarding Parquet min/max through casts like `CAST(CAST(EventDate AS Int32) AS Date32)`. This now avoids scanning any columns, going from ~6ms to ~1.5ms ``` │ QQuery 6 │ 5.12 / 6.29 ±0.83 / 7.65 ms │ 1.26 / 1.43 ±0.26 / 1.93 ms │ +4.39x faster │ ``` ## What changes are included in this PR? - **`aggregate_statistics.rs`**: `take_optimizable` now also matches `Single`/`SinglePartitioned` aggregates. - **`projection.rs`**: Added `project_column_statistics_through_expr()` which propagates min/max statistics through `CastExpr`. Result: Q6 now resolves entirely from Parquet metadata (zero I/O). ## Are these changes tested? Yes — existing tests pass, ClickBench sqllogictest updated with new expected plan for Q6. ## Are there any user-facing changes? Scalar `MIN`/`MAX` aggregates over CAST projections now resolve from file metadata when statistics are available, avoiding unnecessary I/O. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 9a3d96a commit 2a2a060

4 files changed

Lines changed: 137 additions & 21 deletions

File tree

datafusion/physical-expr/src/projection.rs

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::ops::Deref;
2121
use std::sync::Arc;
2222

2323
use crate::PhysicalExpr;
24-
use crate::expressions::{Column, Literal};
24+
use crate::expressions::{CastExpr, Column, Literal};
2525
use crate::scalar_function::ScalarFunctionExpr;
2626
use crate::utils::collect_columns;
2727

@@ -714,9 +714,10 @@ impl ProjectionExprs {
714714
}
715715
}
716716
} else {
717-
// TODO stats: estimate more statistics from expressions
718-
// (expressions should compute their statistics themselves)
719-
ColumnStatistics::new_unknown()
717+
project_column_statistics_through_expr(
718+
expr.as_ref(),
719+
&stats.column_statistics,
720+
)
720721
};
721722
column_statistics.push(col_stats);
722723
}
@@ -726,6 +727,39 @@ impl ProjectionExprs {
726727
}
727728
}
728729

730+
/// Propagate column statistics through CAST projections. Other expressions
731+
/// return unknown — generalizing via [`PhysicalExpr::evaluate_bounds`] is
732+
/// unsafe for aggregate folding since many impls (e.g. `sin`) return a fixed
733+
/// envelope rather than tight bounds on the actual inputs.
734+
fn project_column_statistics_through_expr(
735+
expr: &dyn PhysicalExpr,
736+
column_stats: &[ColumnStatistics],
737+
) -> ColumnStatistics {
738+
if let Some(col) = expr.downcast_ref::<Column>() {
739+
return column_stats[col.index()].clone();
740+
}
741+
let Some(cast_expr) = expr.downcast_ref::<CastExpr>() else {
742+
return ColumnStatistics::new_unknown();
743+
};
744+
let inner_stats =
745+
project_column_statistics_through_expr(cast_expr.expr.as_ref(), column_stats);
746+
let target_type = cast_expr.cast_type();
747+
ColumnStatistics {
748+
min_value: inner_stats
749+
.min_value
750+
.cast_to(target_type)
751+
.unwrap_or(Precision::Absent),
752+
max_value: inner_stats
753+
.max_value
754+
.cast_to(target_type)
755+
.unwrap_or(Precision::Absent),
756+
null_count: inner_stats.null_count,
757+
distinct_count: inner_stats.distinct_count,
758+
sum_value: Precision::Absent,
759+
byte_size: Precision::Absent,
760+
}
761+
}
762+
729763
impl<'a> IntoIterator for &'a ProjectionExprs {
730764
type Item = &'a ProjectionExpr;
731765
type IntoIter = std::slice::Iter<'a, ProjectionExpr>;
@@ -1256,7 +1290,7 @@ pub(crate) mod tests {
12561290

12571291
use super::*;
12581292
use crate::equivalence::{EquivalenceProperties, convert_to_orderings};
1259-
use crate::expressions::{BinaryExpr, col};
1293+
use crate::expressions::{BinaryExpr, CastExpr, col};
12601294
use crate::utils::tests::TestScalarUDF;
12611295
use crate::{PhysicalExprRef, ScalarFunctionExpr};
12621296

@@ -2791,6 +2825,38 @@ pub(crate) mod tests {
27912825
Ok(())
27922826
}
27932827

2828+
#[test]
2829+
fn test_project_statistics_with_cast() -> Result<()> {
2830+
let input_stats = get_stats();
2831+
let input_schema = get_schema();
2832+
2833+
// SELECT CAST(col0 AS Int32) AS casted
2834+
let projection = ProjectionExprs::new(vec![ProjectionExpr {
2835+
expr: Arc::new(CastExpr::new(
2836+
Arc::new(Column::new("col0", 0)),
2837+
DataType::Int32,
2838+
None,
2839+
)),
2840+
alias: "casted".to_string(),
2841+
}]);
2842+
2843+
let output_stats = projection.project_statistics(
2844+
input_stats,
2845+
&projection.project_schema(&input_schema)?,
2846+
)?;
2847+
2848+
assert_eq!(
2849+
output_stats.column_statistics[0].min_value,
2850+
Precision::Exact(ScalarValue::Int32(Some(-4)))
2851+
);
2852+
assert_eq!(
2853+
output_stats.column_statistics[0].max_value,
2854+
Precision::Exact(ScalarValue::Int32(Some(21)))
2855+
);
2856+
2857+
Ok(())
2858+
}
2859+
27942860
#[test]
27952861
fn test_project_statistics_primitive_width_only() -> Result<()> {
27962862
let input_stats = get_stats();

datafusion/physical-optimizer/src/aggregate_statistics.rs

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ use datafusion_common::Result;
2020
use datafusion_common::config::ConfigOptions;
2121
use datafusion_common::scalar::ScalarValue;
2222
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
23-
use datafusion_physical_plan::aggregates::{AggregateExec, AggregateInputMode};
23+
use datafusion_physical_plan::aggregates::{
24+
AggregateExec, AggregateInputMode, AggregateMode,
25+
};
2426
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
2527
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
2628
use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs};
@@ -49,7 +51,7 @@ impl PhysicalOptimizerRule for AggregateStatistics {
4951
plan: Arc<dyn ExecutionPlan>,
5052
config: &ConfigOptions,
5153
) -> Result<Arc<dyn ExecutionPlan>> {
52-
if let Some(partial_agg_exec) = take_optimizable(&*plan) {
54+
if let Some(partial_agg_exec) = take_optimizable(&plan) {
5355
let partial_agg_exec = partial_agg_exec
5456
.downcast_ref::<AggregateExec>()
5557
.expect("take_optimizable() ensures that this is a AggregateExec");
@@ -106,19 +108,26 @@ impl PhysicalOptimizerRule for AggregateStatistics {
106108
}
107109
}
108110

109-
/// assert if the node passed as argument is a final `AggregateExec` node that can be optimized:
110-
/// - its child (with possible intermediate layers) is a partial `AggregateExec` node
111-
/// - they both have no grouping expression
112-
///
113-
/// If this is the case, return a ref to the partial `AggregateExec`, else `None`.
114-
/// We would have preferred to return a casted ref to AggregateExec but the recursion requires
115-
/// the `ExecutionPlan.children()` method that returns an owned reference.
116-
fn take_optimizable(node: &dyn ExecutionPlan) -> Option<Arc<dyn ExecutionPlan>> {
117-
if let Some(final_agg_exec) = node.downcast_ref::<AggregateExec>()
118-
&& final_agg_exec.mode().input_mode() == AggregateInputMode::Partial
119-
&& final_agg_exec.group_expr().is_empty()
111+
/// Returns an `AggregateExec` whose statistics can replace the aggregate with
112+
/// literal values: either a `Single`/`SinglePartitioned` aggregate, or a
113+
/// `Final` aggregate wrapping a `Partial`. Must have no GROUP BY and no
114+
/// filters.
115+
fn take_optimizable(plan: &Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> {
116+
let agg_exec = plan.downcast_ref::<AggregateExec>()?;
117+
118+
if matches!(
119+
agg_exec.mode(),
120+
AggregateMode::Single | AggregateMode::SinglePartitioned
121+
) && agg_exec.group_expr().is_empty()
122+
&& agg_exec.filter_expr().iter().all(|e| e.is_none())
123+
{
124+
return Some(Arc::clone(plan));
125+
}
126+
127+
if agg_exec.mode().input_mode() == AggregateInputMode::Partial
128+
&& agg_exec.group_expr().is_empty()
120129
{
121-
let mut child = Arc::clone(final_agg_exec.input());
130+
let mut child = Arc::clone(agg_exec.input());
122131
loop {
123132
if let Some(partial_agg_exec) = child.downcast_ref::<AggregateExec>()
124133
&& partial_agg_exec.mode().input_mode() == AggregateInputMode::Raw

datafusion/sqllogictest/test_files/aggregate.slt

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8977,3 +8977,44 @@ GROUP BY id ORDER BY id;
89778977

89788978
statement ok
89798979
DROP TABLE first_last_value_str_tests;
8980+
8981+
# MIN/MAX over a CAST projection folds from parquet statistics via the
8982+
# aggregate-statistics optimizer. Non-CAST projections (multi-column or
8983+
# self-referenced expressions) must keep returning correct results — guard
8984+
# against future regressions if the optimizer is broadened.
8985+
statement ok
8986+
SET datafusion.execution.target_partitions = 1;
8987+
8988+
statement ok
8989+
CREATE EXTERNAL TABLE hits_raw
8990+
STORED AS PARQUET
8991+
LOCATION '../core/tests/data/clickbench_hits_10.parquet';
8992+
8993+
query II
8994+
SELECT MIN(CAST("EventDate" AS BIGINT)), MAX(CAST("EventDate" AS BIGINT)) FROM hits_raw;
8995+
----
8996+
15901 15901
8997+
8998+
query II
8999+
SELECT MIN(delta), MAX(delta)
9000+
FROM (
9001+
SELECT "UserID" - CAST("ClientIP" AS BIGINT) AS delta
9002+
FROM hits_raw
9003+
);
9004+
----
9005+
-2461439044872611287 7418527518698834918
9006+
9007+
query II
9008+
SELECT MIN(zero), MAX(zero)
9009+
FROM (
9010+
SELECT "UserID" - "UserID" AS zero
9011+
FROM hits_raw
9012+
);
9013+
----
9014+
0 0
9015+
9016+
statement ok
9017+
SET datafusion.execution.target_partitions = 4;
9018+
9019+
statement ok
9020+
DROP TABLE hits_raw;

datafusion/sqllogictest/test_files/clickbench.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,8 @@ logical_plan
192192
03)----Projection: CAST(CAST(hits_raw.EventDate AS Int32) AS Date32) AS EventDate
193193
04)------TableScan: hits_raw projection=[EventDate]
194194
physical_plan
195-
01)AggregateExec: mode=Single, gby=[], aggr=[min(hits.EventDate), max(hits.EventDate)]
196-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[CAST(CAST(EventDate@5 AS Int32) AS Date32) as EventDate], file_type=parquet
195+
01)ProjectionExec: expr=[2013-07-15 as min(hits.EventDate), 2013-07-15 as max(hits.EventDate)]
196+
02)--PlaceholderRowExec
197197

198198
query DD
199199
SELECT MIN("EventDate"), MAX("EventDate") FROM hits;

0 commit comments

Comments
 (0)