Skip to content

Commit 5cf8eef

Browse files
authored
fix(aggregate): show aliased expr in explain (#21739)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #19685. ## Rationale for this change Physical explain output only showed the alias for aliased aggregates. That made it hard to understand the plan, especially when the aggregate had a filter, explicit RESPECT NULLS, or a custom UDAF display. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? - Show the full aggregate expression in physical explain for user-written aggregate aliases. - Keep internal aliases like count(*) compact in physical explain. - Replace the old hidden metadata approach with an explicit is_internal flag on Alias. - Preserve that flag through planner rewrites, tree rewrites, and proto round-trip. - Add tests for aliased aggregate explain output, including: - normal aliased aggregates - quoted aliases - explicit RESPECT NULLS - custom human display - count(*) - nested internal alias display - Add an upgrade note for the public Alias API change. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? Yes. - Physical explain output is clearer for aliased aggregate expressions. - Alias now has a new is_internal field. This is a public API change for users who build or pattern match Alias directly. The upgrade guide has been updated with the needed changes. <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent b17847d commit 5cf8eef

29 files changed

Lines changed: 1339 additions & 185 deletions

File tree

datafusion/core/src/physical_planner.rs

Lines changed: 288 additions & 77 deletions
Large diffs are not rendered by default.

datafusion/core/tests/dataframe/mod.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3022,20 +3022,20 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
30223022
assert_snapshot!(
30233023
pretty_format_batches(&df_results).unwrap(),
30243024
@r"
3025-
+---------------+----------------------------------------------------------------------------+
3026-
| plan_type | plan |
3027-
+---------------+----------------------------------------------------------------------------+
3028-
| logical_plan | Sort: count(*) AS count(*) ASC NULLS LAST |
3029-
| | Aggregate: groupBy=[[t1.b]], aggr=[[count(Int64(1)) AS count(*)]] |
3030-
| | TableScan: t1 projection=[b] |
3031-
| physical_plan | SortPreservingMergeExec: [count(*)@1 ASC NULLS LAST] |
3032-
| | SortExec: expr=[count(*)@1 ASC NULLS LAST], preserve_partitioning=[true] |
3033-
| | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[count(*)] |
3034-
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=1 |
3035-
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(*)] |
3036-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
3037-
| | |
3038-
+---------------+----------------------------------------------------------------------------+
3025+
+---------------+---------------------------------------------------------------------------------------+
3026+
| plan_type | plan |
3027+
+---------------+---------------------------------------------------------------------------------------+
3028+
| logical_plan | Sort: count(*) AS count(*) ASC NULLS LAST |
3029+
| | Aggregate: groupBy=[[t1.b]], aggr=[[count(Int64(1)) AS count(*)]] |
3030+
| | TableScan: t1 projection=[b] |
3031+
| physical_plan | SortPreservingMergeExec: [count(*)@1 ASC NULLS LAST] |
3032+
| | SortExec: expr=[count(*)@1 ASC NULLS LAST], preserve_partitioning=[true] |
3033+
| | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[count(1) as count(*)] |
3034+
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=1 |
3035+
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(1) as count(*)] |
3036+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
3037+
| | |
3038+
+---------------+---------------------------------------------------------------------------------------+
30393039
"
30403040
);
30413041
Ok(())
@@ -3500,9 +3500,9 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
35003500
| | HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@1, a@0)], projection=[a@3, b@4, count(*)@0, __always_true@2] |
35013501
| | CoalescePartitionsExec |
35023502
| | ProjectionExec: expr=[count(*)@1 as count(*), a@0 as a, true as __always_true] |
3503-
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)] |
3503+
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(1) as count(*)] |
35043504
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 |
3505-
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] |
3505+
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(1) as count(*)] |
35063506
| | DataSourceExec: partitions=1, partition_sizes=[1] |
35073507
| | DataSourceExec: partitions=1, partition_sizes=[1] |
35083508
| | |

datafusion/expr/src/expr.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -743,6 +743,28 @@ impl Alias {
743743
self.metadata = metadata;
744744
self
745745
}
746+
747+
#[doc(hidden)]
748+
pub fn with_expr(mut self, expr: Expr) -> Self {
749+
self.expr = Box::new(expr);
750+
self
751+
}
752+
753+
#[doc(hidden)]
754+
pub fn try_map_expr(self, f: impl FnOnce(Expr) -> Result<Expr>) -> Result<Expr> {
755+
let Alias {
756+
expr,
757+
relation,
758+
name,
759+
metadata,
760+
} = self;
761+
Ok(Expr::Alias(Alias {
762+
expr: Box::new(f(*expr)?),
763+
relation,
764+
name,
765+
metadata,
766+
}))
767+
}
746768
}
747769

748770
/// Binary expression for [`Expr::BinaryExpr`]
@@ -4155,6 +4177,36 @@ mod test {
41554177
);
41564178
}
41574179

4180+
#[test]
4181+
fn test_unalias_nested_respects_user_metadata() {
4182+
use std::collections::HashMap;
4183+
4184+
let base_expr = col("id");
4185+
4186+
let no_metadata = base_expr.clone().alias("alias");
4187+
assert_eq!(no_metadata.unalias_nested().data, base_expr);
4188+
4189+
let Expr::Alias(empty_metadata_alias) = base_expr.clone().alias("alias") else {
4190+
unreachable!();
4191+
};
4192+
let empty_metadata_alias = Expr::Alias(
4193+
empty_metadata_alias.with_metadata(Some(FieldMetadata::default())),
4194+
);
4195+
assert_eq!(empty_metadata_alias.unalias_nested().data, base_expr);
4196+
4197+
let user_metadata = FieldMetadata::from(HashMap::from([(
4198+
"some_key".to_string(),
4199+
"some_value".to_string(),
4200+
)]));
4201+
4202+
let Expr::Alias(user_alias) = base_expr.clone().alias("alias") else {
4203+
unreachable!();
4204+
};
4205+
let user_alias =
4206+
Expr::Alias(user_alias.with_metadata(Some(user_metadata.clone())));
4207+
assert_eq!(user_alias.clone().unalias_nested().data, user_alias);
4208+
}
4209+
41584210
fn wildcard_options(
41594211
opt_ilike: Option<IlikeSelectItem>,
41604212
opt_exclude: Option<ExcludeSelectItem>,

datafusion/expr/src/expr_rewriter/mod.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,16 @@ impl NamePreserver {
340340

341341
pub fn save(&self, expr: &Expr) -> SavedName {
342342
if self.use_alias {
343-
let (relation, name) = expr.qualified_name();
344-
SavedName::Saved { relation, name }
343+
match expr {
344+
Expr::Alias(alias) => SavedName::Saved {
345+
relation: alias.relation.clone(),
346+
name: alias.name.clone(),
347+
},
348+
_ => {
349+
let (relation, name) = expr.qualified_name();
350+
SavedName::Saved { relation, name }
351+
}
352+
}
345353
} else {
346354
SavedName::None
347355
}

datafusion/expr/src/expr_schema.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,6 +1073,27 @@ mod tests {
10731073
assert_eq!(meta, outer_ref.metadata(&schema).unwrap());
10741074
}
10751075

1076+
#[test]
1077+
fn test_alias_metadata_is_preserved_in_field_metadata() {
1078+
let schema = MockExprSchema::new().with_data_type(DataType::Int32);
1079+
let alias_metadata = FieldMetadata::from(HashMap::from([(
1080+
"some_key".to_string(),
1081+
"some_value".to_string(),
1082+
)]));
1083+
1084+
let Expr::Alias(alias) = col("foo").alias("alias") else {
1085+
unreachable!();
1086+
};
1087+
let expr = Expr::Alias(alias.with_metadata(Some(alias_metadata.clone())));
1088+
1089+
let field = expr.to_field(&schema).unwrap().1;
1090+
assert_eq!(
1091+
field.metadata().get("some_key"),
1092+
Some(&"some_value".to_string())
1093+
);
1094+
assert_eq!(expr.metadata(&schema).unwrap(), alias_metadata);
1095+
}
1096+
10761097
#[test]
10771098
fn test_expr_placeholder() {
10781099
let schema = MockExprSchema::new();

datafusion/optimizer/src/decorrelate_lateral_join.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,12 @@ fn rewrite_internal(join: Join) -> Result<Transformed<LogicalPlan>> {
260260
)],
261261
else_expr: Some(Box::new(col)),
262262
});
263-
proj_exprs.push(case_expr.alias_qualified(qualifier.cloned(), name));
263+
proj_exprs.push(Expr::Alias(expr::Alias {
264+
expr: Box::new(case_expr),
265+
relation: qualifier.cloned(),
266+
name: name.to_string(),
267+
metadata: None,
268+
}));
264269
continue;
265270
}
266271
proj_exprs.push(col);

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -605,9 +605,12 @@ fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Project
605605
if metadata.is_none() && expr.schema_name().to_string() == name {
606606
expr
607607
} else {
608-
Expr::Alias(
609-
Alias::new(expr, relation, name).with_metadata(metadata),
610-
)
608+
Expr::Alias(Alias {
609+
expr: Box::new(expr),
610+
relation,
611+
name,
612+
metadata,
613+
})
611614
}
612615
})
613616
}),

0 commit comments

Comments
 (0)