Skip to content

Commit be369dc

Browse files
committed
DEBUG
1 parent f5e6fe3 commit be369dc

12 files changed

Lines changed: 468 additions & 57 deletions

File tree

datafusion/common/src/dfschema.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,18 +144,16 @@ impl DFSchema {
144144
qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
145145
metadata: HashMap<String, String>,
146146
) -> Result<Self> {
147+
147148
let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
148149
qualified_fields.clone().into_iter().unzip();
149150

150151
let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
151-
152152
let dfschema = Self {
153153
inner: schema,
154154
field_qualifiers: qualifiers,
155155
functional_dependencies: FunctionalDependencies::empty(),
156156
};
157-
// println!("{:?}", qualified_fields);
158-
//println!();
159157
dfschema.check_names()?;
160158
Ok(dfschema)
161159
}
@@ -212,7 +210,7 @@ impl DFSchema {
212210
pub fn check_names(&self) -> Result<()> {
213211
let mut qualified_names = BTreeSet::new();
214212
let mut unqualified_names = BTreeSet::new();
215-
// println!("Fields: {:?}", self.inner.fields().iter().map(|f| f.name()));
213+
println!("Fields: {:?}", self.inner.fields().iter().map(|f| f.name()));
216214
println!("Fields qualifiers: {:?}", self.field_qualifiers);
217215

218216
for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {

datafusion/core/src/dataframe/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1347,7 +1347,7 @@ impl DataFrame {
13471347
/// ```
13481348
pub async fn collect(self) -> Result<Vec<RecordBatch>> {
13491349
let task_ctx = Arc::new(self.task_ctx());
1350-
let plan = self.create_physical_plan().await?;
1350+
let plan = self.create_physical_plan().await?; // fails here while doing the logical plan
13511351
collect(plan, task_ctx).await
13521352
}
13531353

datafusion/core/src/physical_planner.rs

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ use datafusion_common::tree_node::{
6666
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
6767
};
6868
use datafusion_common::{
69-
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
70-
ScalarValue,
69+
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, ScalarValue
7170
};
7271
use datafusion_datasource::memory::MemorySourceConfig;
7372
use datafusion_expr::dml::{CopyTo, InsertOp};
@@ -78,9 +77,7 @@ use datafusion_expr::expr::{
7877
use datafusion_expr::expr_rewriter::unnormalize_cols;
7978
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
8079
use datafusion_expr::{
81-
Analyze, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension, FetchType,
82-
Filter, JoinType, RecursiveQuery, SkipType, StringifiedPlan, WindowFrame,
83-
WindowFrameBound, WriteOp,
80+
Analyze, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension, FetchType, Filter, JoinType, RecursiveQuery, SkipType, StringifiedPlan, SubqueryAlias, WindowFrame, WindowFrameBound, WriteOp
8481
};
8582
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
8683
use datafusion_physical_expr::expressions::{Column, Literal};
@@ -185,10 +182,10 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
185182
return Ok(plan);
186183
}
187184
let plan = self
188-
.create_initial_plan(logical_plan, session_state)
185+
.create_initial_plan(logical_plan, session_state) //fails here
189186
.await?;
190-
191-
self.optimize_physical_plan(plan, session_state, |_, _| {})
187+
188+
self.optimize_physical_plan(plan, session_state, |_, _| {})
192189
}
193190

194191
/// Create a physical expression from a logical expression
@@ -762,13 +759,16 @@ impl DefaultPhysicalPlanner {
762759
Arc::clone(&physical_input_schema),
763760
)?)
764761
}
765-
LogicalPlan::Projection(Projection { input, expr, .. }) => self
766-
.create_project_physical_exec(
762+
LogicalPlan::Projection(Projection { input, expr, .. }) => {
763+
let result = self.create_project_physical_exec(
767764
session_state,
768765
children.one()?,
769766
input,
770767
expr,
771-
)?,
768+
);
769+
770+
result?
771+
},
772772
LogicalPlan::Filter(Filter {
773773
predicate, input, ..
774774
}) => {
@@ -901,35 +901,39 @@ impl DefaultPhysicalPlanner {
901901
schema: join_schema,
902902
..
903903
}) => {
904+
// println!("join_schema : {:?}", join_schema); // "value" "tags", "tags_get_values(tags,Utf8(\"host\"))"
905+
// println!("left.schema() : {:?}", left.schema()); "value" "tags"
906+
// println!("right.schema() : {:?}", right.schema()); "tags_get_values(tags,Utf8(\"host\"))"
904907
let null_equals_null = *null_equals_null;
905908

906909
let [physical_left, physical_right] = children.two()?;
907910

908-
// If join has expression equijoin keys, add physical projection.
909-
let has_expr_join_key = keys.iter().any(|(l, r)| {
911+
// If join has expression equijoin keys, add physical projection since there are common columns
912+
let has_expr_join_key = keys.iter().any(|(l, r)| { // for our case it is an expresion because the key is BinaryExpr
910913
!(matches!(l, Expr::Column(_)) && matches!(r, Expr::Column(_)))
911914
});
915+
912916
let (new_logical, physical_left, physical_right) = if has_expr_join_key {
913917
// TODO: Can we extract this transformation to somewhere before physical plan
914918
// creation?
919+
println!("This is a join with an equijoin key");
915920
let (left_keys, right_keys): (Vec<_>, Vec<_>) =
916921
keys.iter().cloned().unzip();
917922

918-
let (left, left_col_keys, left_projected) =
923+
let (left, left_col_keys, left_projected) = // this does wrap it in a projection
919924
wrap_projection_for_join_if_necessary(
920925
&left_keys,
921926
left.as_ref().clone(),
922927
)?;
923-
let (right, right_col_keys, right_projected) =
928+
let (right, right_col_keys, right_projected) = // this one doesn't
924929
wrap_projection_for_join_if_necessary(
925930
&right_keys,
926931
right.as_ref().clone(),
927932
)?;
928933
let column_on = (left_col_keys, right_col_keys);
929-
930-
let left = Arc::new(left);
934+
let left = Arc::new(left); // modified left & right fields
931935
let right = Arc::new(right);
932-
let new_join = LogicalPlan::Join(Join::try_new_with_project_input(
936+
let new_join = LogicalPlan::Join(Join::try_new_with_project_input( // it was failing here
933937
node,
934938
Arc::clone(&left),
935939
Arc::clone(&right),
@@ -964,12 +968,13 @@ impl DefaultPhysicalPlanner {
964968
)?,
965969
_ => physical_right,
966970
};
971+
//println!("NEW JOIN SCHEMA {:?}", new_join.schema());
967972

968973
// Remove temporary projected columns
969974
if left_projected || right_projected {
970975
let final_join_result =
971976
join_schema.iter().map(Expr::from).collect::<Vec<_>>();
972-
let projection = LogicalPlan::Projection(Projection::try_new(
977+
let projection = LogicalPlan::Projection(Projection::try_new( // now it fails here
973978
final_join_result,
974979
Arc::new(new_join),
975980
)?);

0 commit comments

Comments
 (0)