Skip to content

Commit 7a69ada

Browse files
committed
DEBUG more
1 parent be369dc commit 7a69ada

5 files changed

Lines changed: 272 additions & 133 deletions

File tree

datafusion/common/src/dfschema.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ use std::fmt::{Display, Formatter};
2323
use std::hash::Hash;
2424
use std::sync::Arc;
2525

26+
27+
2628
use crate::error::{DataFusionError, Result, _plan_err, _schema_err};
2729
use crate::{
28-
field_not_found, unqualified_field_not_found, Column, FunctionalDependencies,
29-
SchemaError, TableReference,
30+
field_not_found, unqualified_field_not_found, Column, FunctionalDependencies, SchemaError, TableReference
3031
};
3132

3233
use arrow::compute::can_cast_types;
@@ -144,7 +145,6 @@ impl DFSchema {
144145
qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
145146
metadata: HashMap<String, String>,
146147
) -> Result<Self> {
147-
148148
let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
149149
qualified_fields.clone().into_iter().unzip();
150150

@@ -206,12 +206,25 @@ impl DFSchema {
206206
Ok(dfschema)
207207
}
208208

209+
/// Qualify each field of the schema on the fly:
210+
pub fn with_field_specific_qualified_schema(
211+
&self,
212+
qualifiers: Vec<Option<TableReference>>,
213+
) -> Result<Self> {
214+
if qualifiers.len() != self.fields().len() {
215+
return Err(DataFusionError::Plan("Number of qualifiers must match number of fields".to_string()));
216+
}
217+
Ok(DFSchema {
218+
inner: Arc::clone(&self.inner),
219+
field_qualifiers: qualifiers,
220+
functional_dependencies: self.functional_dependencies.clone(),
221+
})
222+
}
223+
209224
/// Check if the schema have some fields with the same name
210225
pub fn check_names(&self) -> Result<()> {
211226
let mut qualified_names = BTreeSet::new();
212227
let mut unqualified_names = BTreeSet::new();
213-
println!("Fields: {:?}", self.inner.fields().iter().map(|f| f.name()));
214-
println!("Fields qualifiers: {:?}", self.field_qualifiers);
215228

216229
for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
217230
if let Some(qualifier) = qualifier {
@@ -500,7 +513,8 @@ impl DFSchema {
500513

501514
/// Find the field with the given name
502515
pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
503-
self.qualified_field_with_unqualified_name(name)
516+
self.qualified_field_with_unqualified_name(name)// y aqui no va a saber
517+
//si es del left or right por eso sale ambigous porque no sabe de qe l;ado elegir porque Expr no tiene el qualifier left o right :/
504518
.map(|(_, field)| field)
505519
}
506520

@@ -806,10 +820,6 @@ impl DFSchema {
806820
}
807821
}
808822

809-
pub fn show_field_qualifiers(&self) -> Vec<Option<&TableReference>> {
810-
self.field_qualifiers.iter().map(|q| q.as_ref()).collect()
811-
}
812-
813823
/// Replace all field qualifier with new value in schema
814824
pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
815825
let qualifier = qualifier.into();
@@ -1020,6 +1030,7 @@ impl ExprSchema for DFSchema {
10201030
}
10211031

10221032
fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1033+
//println!("COL!");
10231034
let field = self.field_from_column(col)?;
10241035
Ok((field.data_type(), field.is_nullable()))
10251036
}

datafusion/core/src/physical_planner.rs

Lines changed: 82 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ use datafusion_physical_optimizer::PhysicalOptimizerRule;
8686
use datafusion_physical_plan::execution_plan::InvariantLevel;
8787
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
8888
use datafusion_physical_plan::unnest::ListUnnest;
89+
use datafusion_sql::TableReference;
8990

9091
use crate::schema_equivalence::schema_satisfied_by;
9192
use async_trait::async_trait;
@@ -892,8 +893,8 @@ impl DefaultPhysicalPlanner {
892893

893894
// 2 Children
894895
LogicalPlan::Join(Join {
895-
left,
896-
right,
896+
left: original_left,
897+
right: original_right,
897898
on: keys,
898899
filter,
899900
join_type,
@@ -904,6 +905,7 @@ impl DefaultPhysicalPlanner {
904905
// println!("join_schema : {:?}", join_schema); // "value" "tags", "tags_get_values(tags,Utf8(\"host\"))"
905906
// println!("left.schema() : {:?}", left.schema()); "value" "tags"
906907
// println!("right.schema() : {:?}", right.schema()); "tags_get_values(tags,Utf8(\"host\"))"
908+
907909
let null_equals_null = *null_equals_null;
908910

909911
let [physical_left, physical_right] = children.two()?;
@@ -916,30 +918,33 @@ impl DefaultPhysicalPlanner {
916918
let (new_logical, physical_left, physical_right) = if has_expr_join_key {
917919
// TODO: Can we extract this transformation to somewhere before physical plan
918920
// creation?
919-
println!("This is a join with an equijoin key");
921+
// println!("This is a join with an equijoin key");
920922
let (left_keys, right_keys): (Vec<_>, Vec<_>) =
921923
keys.iter().cloned().unzip();
922924

923925
let (left, left_col_keys, left_projected) = // this does wrap it in a projection
924926
wrap_projection_for_join_if_necessary(
925927
&left_keys,
926-
left.as_ref().clone(),
928+
original_left.as_ref().clone(),
927929
)?;
928930
let (right, right_col_keys, right_projected) = // this one doesn't
929931
wrap_projection_for_join_if_necessary(
930932
&right_keys,
931-
right.as_ref().clone(),
933+
original_right.as_ref().clone(),
932934
)?;
933935
let column_on = (left_col_keys, right_col_keys);
934936
let left = Arc::new(left); // modified left & right fields
935937
let right = Arc::new(right);
936-
let new_join = LogicalPlan::Join(Join::try_new_with_project_input( // it was failing here
938+
939+
let (new_join, requalified) = Join::try_new_with_project_input( // it was failing here
937940
node,
938941
Arc::clone(&left),
939942
Arc::clone(&right),
940943
column_on,
941-
)?);
944+
)?;
942945

946+
let new_join = LogicalPlan::Join(new_join);
947+
943948
// If inputs were projected then create ExecutionPlan for these new
944949
// LogicalPlan nodes.
945950
let physical_left = match (left_projected, left.as_ref()) {
@@ -968,12 +973,21 @@ impl DefaultPhysicalPlanner {
968973
)?,
969974
_ => physical_right,
970975
};
971-
//println!("NEW JOIN SCHEMA {:?}", new_join.schema());
972-
976+
973977
// Remove temporary projected columns
974978
if left_projected || right_projected {
979+
// This qualification is only valid for inner joins since by definition the join_schema will always be the result
980+
// of the left and right side combined. Also we should qualify schemas in case sides have been previosuly
981+
// requalified on try_new_with_project_input, this with the aim to determine later the
982+
// nullability and datatypes by looking into both sides of the schema, avoiding ambiguity errors.
983+
let qualified_join_schema = if *join_type == JoinType::Inner && requalified {
984+
Arc::new(qualify_join_schema(join_schema, original_left, original_right)?)
985+
} else {
986+
Arc::clone(join_schema)
987+
};
988+
975989
let final_join_result =
976-
join_schema.iter().map(Expr::from).collect::<Vec<_>>();
990+
qualified_join_schema.iter().map(Expr::from).collect::<Vec<_>>();
977991
let projection = LogicalPlan::Projection(Projection::try_new( // now it fails here
978992
final_join_result,
979993
Arc::new(new_join),
@@ -1315,6 +1329,64 @@ impl DefaultPhysicalPlanner {
13151329
}
13161330
}
13171331

1332+
1333+
/// Qualify each field of the schema on the fly:
1334+
pub fn qualify_join_schema(
1335+
join_schema: &DFSchema,
1336+
left: &LogicalPlan,
1337+
right: &LogicalPlan,
1338+
) -> Result<DFSchema> {
1339+
let left_fields = left.schema().fields();
1340+
let right_fields = right.schema().fields();
1341+
let join_fields = join_schema.fields();
1342+
1343+
// 1. Validate lengths
1344+
if join_fields.len() != left_fields.len() + right_fields.len() {
1345+
return Err(DataFusionError::Plan(format!(
1346+
"Join schema field count mismatch: {} (join) != {} (left) + {} (right)",
1347+
join_fields.len(),
1348+
left_fields.len(),
1349+
right_fields.len()
1350+
)));
1351+
}
1352+
1353+
// 2. Validate field names match
1354+
for (i, field) in join_fields.iter().enumerate() {
1355+
let expected_field = if i < left_fields.len() {
1356+
&left_fields[i]
1357+
} else {
1358+
&right_fields[i - left_fields.len()]
1359+
};
1360+
1361+
if field.name() != expected_field.name() {
1362+
return Err(DataFusionError::Plan(format!(
1363+
"Field name mismatch at index {}: '{}' (join) != '{}' (input)",
1364+
i,
1365+
field.name(),
1366+
expected_field.name()
1367+
)));
1368+
}
1369+
}
1370+
1371+
// 3. Build qualifiers: left → "left", right → "right"
1372+
// (Replace these literals with real table names if you have them.)
1373+
let qualifiers= join_fields
1374+
.iter()
1375+
.enumerate()
1376+
.map(|(i, _)| {
1377+
if i < left_fields.len() {
1378+
Some(TableReference::Bare { table: Arc::from("left") })
1379+
} else {
1380+
Some(TableReference::Bare { table: Arc::from("right") })
1381+
}
1382+
})
1383+
.collect();
1384+
1385+
// 4️⃣ Return a *new* DFSchema (no mutation of the original)
1386+
join_schema.with_field_specific_qualified_schema(qualifiers)
1387+
}
1388+
1389+
13181390
/// Expand and align a GROUPING SET expression.
13191391
/// (see <https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS>)
13201392
///

0 commit comments

Comments
 (0)