Skip to content

Commit ad46fde

Browse files
committed
fixes
1 parent 57efa5c commit ad46fde

9 files changed

Lines changed: 1390 additions & 1261 deletions

File tree

Cargo.lock

Lines changed: 1353 additions & 1230 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-cli/src/functions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,7 @@ impl StatisticsCacheFunc {
639639
}
640640

641641
impl TableFunctionImpl for StatisticsCacheFunc {
642-
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
642+
fn call(&self, exprs: &[(Expr, Option<String>)]) -> Result<Arc<dyn TableProvider>> {
643643
if !exprs.is_empty() {
644644
return plan_err!("statistics_cache should have no arguments");
645645
}
@@ -774,7 +774,7 @@ impl ListFilesCacheFunc {
774774
}
775775

776776
impl TableFunctionImpl for ListFilesCacheFunc {
777-
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
777+
fn call(&self, exprs: &[(Expr, Option<String>)]) -> Result<Arc<dyn TableProvider>> {
778778
if !exprs.is_empty() {
779779
return plan_err!("list_files_cache should have no arguments");
780780
}

datafusion/core/src/physical_planner.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessar
8787
use datafusion_expr::utils::{expr_to_columns, split_conjunction};
8888
use datafusion_expr::{
8989
Analyze, BinaryExpr, Cast, DescribeTable, DmlStatement, Explain, ExplainFormat,
90-
Extension, FetchType, Filter, JoinType, LogicalPlanBuilder, Operator, RecursiveQuery,
90+
Extension, FetchType, Filter, JoinType, LogicalPlanBuilder, RecursiveQuery,
9191
SkipType, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
9292
};
9393
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
@@ -2212,7 +2212,8 @@ fn extract_dml_filters(
22122212
| LogicalPlan::Repartition(_)
22132213
| LogicalPlan::Aggregate(_)
22142214
| LogicalPlan::Window(_)
2215-
| LogicalPlan::Subquery(_) => {
2215+
| LogicalPlan::Subquery(_)
2216+
| LogicalPlan::Pivot(_) => {
22162217
// Filter information may appear in child nodes; continue traversal
22172218
// to extract filters from Filter/TableScan nodes deeper in the plan
22182219
}
@@ -2567,26 +2568,26 @@ pub fn create_aggregate_expr_and_maybe_filter(
25672568
pub fn transform_pivot_to_aggregate(
25682569
input: Arc<LogicalPlan>,
25692570
aggregate_expr: &Expr,
2570-
pivot_column: &datafusion_common::Column,
2571+
pivot_column: &Column,
25712572
pivot_values: Vec<ScalarValue>,
25722573
default_on_null_expr: Option<&Expr>,
25732574
) -> Result<LogicalPlan> {
25742575
let df_schema = input.schema();
25752576

2576-
let all_columns: Vec<datafusion_common::Column> = df_schema.columns();
2577+
let all_columns: Vec<Column> = df_schema.columns();
25772578

25782579
// Filter to include only columns we want for GROUP BY
25792580
// (exclude pivot column and aggregate expression columns)
25802581
let group_by_columns: Vec<Expr> = all_columns
25812582
.into_iter()
2582-
.filter(|col: &datafusion_common::Column| {
2583+
.filter(|col: &Column| {
25832584
col.name != pivot_column.name
25842585
&& !aggregate_expr
25852586
.column_refs()
25862587
.iter()
25872588
.any(|agg_col| agg_col.name == col.name)
25882589
})
2589-
.map(|col: datafusion_common::Column| Expr::Column(col))
2590+
.map(|col: Column| Expr::Column(col))
25902591
.collect();
25912592

25922593
let builder = LogicalPlanBuilder::from(Arc::unwrap_or_clone(Arc::clone(&input)));
@@ -2660,7 +2661,7 @@ pub fn transform_pivot_to_aggregate(
26602661
.any(|v| field.name() == v.to_string().trim_matches('\''))
26612662
{
26622663
projection_exprs.push(Expr::Column(
2663-
datafusion_common::Column::from_name(field.name()),
2664+
Column::from_name(field.name()),
26642665
));
26652666
}
26662667
}
@@ -2669,7 +2670,7 @@ pub fn transform_pivot_to_aggregate(
26692670
for value in &pivot_values {
26702671
let field_name = value.to_string().trim_matches('\'').to_string();
26712672
let aggregate_col =
2672-
Expr::Column(datafusion_common::Column::from_name(&field_name));
2673+
Expr::Column(Column::from_name(&field_name));
26732674

26742675
// Create COALESCE expression using CASE: CASE WHEN col IS NULL THEN default_value ELSE col END
26752676
let coalesce_expr = Expr::Case(datafusion_expr::expr::Case {
@@ -3103,7 +3104,7 @@ impl DefaultPhysicalPlanner {
31033104
{
31043105
if !physical_exprs.iter().any(|(_, name)| name == field.name()) {
31053106
all_exprs.push((
3106-
Arc::new(Column::new(field.name(), i))
3107+
Arc::new(datafusion_physical_expr::expressions::Column::new(field.name(), i))
31073108
as Arc<dyn PhysicalExpr>,
31083109
field.name().clone(),
31093110
));

datafusion/functions/src/datetime/to_date.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,6 @@ impl ScalarUDFImpl for ToDateFunc {
174174
}
175175
Float16
176176
| Float32
177-
| Float64
178177
| Decimal32(_, _)
179178
| Decimal64(_, _)
180179
| Decimal128(_, _)

datafusion/optimizer/src/analyzer/type_coercion.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2016,7 +2016,7 @@ mod test {
20162016
let plan = LogicalPlan::Projection(Projection::try_new(vec![expr], empty)?);
20172017
assert_type_coercion_error(
20182018
plan,
2019-
"Cannot infer common argument type for comparison operation Int64 IS DISTINCT FROM Boolean",
2019+
"Cannot infer common argument type for comparison operation Float64 IS DISTINCT FROM Boolean",
20202020
)?;
20212021

20222022
// is not true

datafusion/optimizer/src/decorrelate_predicate_subquery.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,6 @@ impl OptimizerRule for DecorrelatePredicateSubquery {
8282
.into_iter()
8383
.partition(has_subquery);
8484

85-
if with_subqueries.is_empty() {
86-
return internal_err!(
87-
"can not find expected subqueries in DecorrelatePredicateSubquery"
88-
);
89-
}
90-
9185
assert_or_internal_err!(
9286
!with_subqueries.is_empty(),
9387
"can not find expected subqueries in DecorrelatePredicateSubquery"
@@ -145,6 +139,7 @@ impl OptimizerRule for DecorrelatePredicateSubquery {
145139
join.join_type,
146140
join.join_constraint,
147141
join.null_equality,
142+
join.null_aware,
148143
)?;
149144
return Ok(Transformed::yes(LogicalPlan::Join(new_join)));
150145
}
@@ -546,8 +541,12 @@ fn build_join(
546541

547542
right_cols_idx_and_col.sort_by_key(|(idx, _)| *idx);
548543

544+
// Deduplicate by schema index to avoid duplicate projection expressions
545+
// when the same column is referenced both qualified and unqualified
546+
let mut seen_indices = std::collections::HashSet::new();
549547
let right_proj_exprs: Vec<Expr> = right_cols_idx_and_col
550548
.into_iter()
549+
.filter(|(idx, _)| seen_indices.insert(*idx))
551550
.map(|(_, c)| Expr::Column(c))
552551
.collect();
553552

@@ -839,9 +838,10 @@ mod tests {
839838
LeftMark Join: Filter: a = __correlated_sq_1.ua [a:Int32;N, mark:Boolean]
840839
Projection: column1 AS a [a:Int32;N]
841840
Values: (Int32(1)), (Int32(2)) [column1:Int32;N]
842-
SubqueryAlias: __correlated_sq_1 [ua:Int32;N]
843-
Projection: column1 AS ua [ua:Int32;N]
844-
Values: (Int32(2)) [column1:Int32;N]
841+
Projection: __correlated_sq_1.ua [ua:Int32;N]
842+
SubqueryAlias: __correlated_sq_1 [ua:Int32;N]
843+
Projection: column1 AS ua [ua:Int32;N]
844+
Values: (Int32(2)) [column1:Int32;N]
845845
"
846846
)
847847
}

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -981,21 +981,22 @@ impl AsLogicalPlan for LogicalPlanNode {
981981
dml_node.dml_type().into(),
982982
Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
983983
)))
984+
}
984985
LogicalPlanType::Pivot(pivot) => {
985986
let aggregate_expr = pivot
986987
.aggregate_expr
987988
.as_ref()
988989
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
989990
.transpose()?
990991
.ok_or_else(|| {
991-
DataFusionError::Internal("aggregate_expr required".to_string())
992+
internal_datafusion_err!("aggregate_expr required")
992993
})?;
993994
let pivot_column = pivot
994995
.pivot_column
995996
.as_ref()
996997
.map(|col| col.clone().into())
997998
.ok_or_else(|| {
998-
DataFusionError::Internal("pivot_column required".to_string())
999+
internal_datafusion_err!("pivot_column required")
9991000
})?;
10001001
let pivot_values = pivot
10011002
.pivot_values

datafusion/sql/src/parser.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -549,10 +549,15 @@ impl<'a> DFParser<'a> {
549549
Token::Word(w) => {
550550
match w.keyword {
551551
Keyword::CREATE => {
552-
if let Token::Word(w) = self.parser.peek_nth_token(2).token {
553-
// use native parser for CREATE EXTERNAL VOLUME
554-
if w.keyword == Keyword::VOLUME {
555-
return self.parse_and_handle_statement();
552+
// use native parser for CREATE EXTERNAL VOLUME
553+
// and CREATE OR REPLACE EXTERNAL VOLUME
554+
for offset in [2, 4] {
555+
if let Token::Word(w) =
556+
self.parser.peek_nth_token(offset).token
557+
{
558+
if w.keyword == Keyword::VOLUME {
559+
return self.parse_and_handle_statement();
560+
}
556561
}
557562
}
558563
self.parser.next_token(); // CREATE

datafusion/sql/src/relation/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
338338
return plan_err!("PIVOT value column is required");
339339
}
340340

341-
let column_name = value_column.last().unwrap().value.clone();
341+
let column_name = value_column.last().unwrap().to_string();
342342
let pivot_column = Column::new(None::<&str>, column_name);
343343

344344
let default_on_null_expr = default_on_null
@@ -454,7 +454,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
454454
let base_plan = self.create_relation(*table, planner_context)?;
455455
let base_schema = base_plan.schema();
456456

457-
let value_column = value.value.clone();
457+
let value_column = value.to_string();
458458
let name_column = name.value.clone();
459459

460460
let mut unpivot_column_indices = Vec::new();
@@ -463,7 +463,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
463463
let mut common_type = None;
464464

465465
for column_ident in &columns {
466-
let column_name = column_ident.value.clone();
466+
let column_name = column_ident.expr.to_string();
467467

468468
let idx = if let Some(i) =
469469
base_schema.index_of_column_by_name(None, &column_name)

0 commit comments

Comments
 (0)