diff --git a/Cargo.toml b/Cargo.toml index a83ad7d..b47a9b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,3 +60,8 @@ url = "2.5.4" name = "materialized_views_benchmark" harness = false path = "benches/materialized_views_benchmark.rs" + +[[bench]] +name = "oneof_logical_walk_benchmark" +harness = false +path = "benches/oneof_logical_walk_benchmark.rs" diff --git a/benches/oneof_logical_walk_benchmark.rs b/benches/oneof_logical_walk_benchmark.rs new file mode 100644 index 0000000..ec1f4ff --- /dev/null +++ b/benches/oneof_logical_walk_benchmark.rs @@ -0,0 +1,93 @@ +//! Benchmarks the cost of running a no-op DataFusion logical-plan tree +//! rewriter over a [`OneOf`] node with varying numbers of candidate branches. +//! +//! Background: each post-ViewMatcher logical optimizer pass calls into +//! `rewrite_extension_inputs`, which iterates `node.inputs()`, clones each +//! child, and recursively rewrites it. With `inputs()` returning every +//! branch, this is O(num_branches × subtree_size) work per optimizer pass. +//! +//! After the change in this commit, `OneOf::inputs()` returns only the +//! primary branch, so the same rewrite is O(subtree_size) regardless of +//! the candidate count. This benchmark verifies that. +//! +//! Run with: +//! ```ignore +//! cargo bench --bench oneof_logical_walk_benchmark +//! ``` +use std::sync::Arc; + +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; +use datafusion_common::Result; +use datafusion_expr::{col, lit, Extension, LogicalPlan, LogicalPlanBuilder}; +use datafusion_materialized_views::rewrite::exploitation::OneOf; + +/// Build a non-trivial logical plan to use as a `OneOf` branch: +/// `Projection(Filter(Filter(Values))).` +fn build_branch(seed: i32) -> LogicalPlan { + LogicalPlanBuilder::values(vec![vec![ + lit(seed), + lit(seed.wrapping_mul(2)), + lit("foo".to_string()), + ]]) + .unwrap() + .filter(col("column2").gt(lit(0))) + .unwrap() + .filter(col("column1").gt(lit(seed))) + .unwrap() + .project(vec![col("column1"), col("column3")]) + .unwrap() + .build() + .unwrap() +} + +/// Construct `Filter(OneOf(branches))`. The outer `Filter` ensures the +/// rewriter walks into the extension node via `rewrite_extension_inputs`. +fn build_one_of_plan(num_branches: usize) -> LogicalPlan { + let branches = (0..num_branches) + .map(|i| build_branch(i as i32)) + .collect::>(); + let one_of = OneOf::new(branches); + let extension_plan = LogicalPlan::Extension(Extension { + node: Arc::new(one_of), + }); + LogicalPlanBuilder::from(extension_plan) + .filter(col("column1").gt(lit(0))) + .unwrap() + .build() + .unwrap() +} + +/// A rewriter that performs no transformation; it merely walks the tree so +/// `rewrite_extension_inputs` is invoked for every Extension node. +struct NoOpRewriter; + +impl TreeNodeRewriter for NoOpRewriter { + type Node = LogicalPlan; + + fn f_down(&mut self, node: Self::Node) -> Result> { + Ok(Transformed::no(node)) + } +} + +fn bench_walk(c: &mut Criterion) { + let mut group = c.benchmark_group("oneof_logical_walk_noop_rewrite"); + for num_branches in [1usize, 3, 5, 10] { + let plan = build_one_of_plan(num_branches); + group.bench_with_input( + BenchmarkId::from_parameter(num_branches), + &plan, + |b, plan| { + b.iter(|| { + let mut rewriter = NoOpRewriter; + let result = plan.clone().rewrite(&mut rewriter).unwrap(); + black_box(result); + }); + }, + ); + } + group.finish(); +} + +criterion_group!(benches, bench_walk); +criterion_main!(benches); diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index 7310d93..6c2b06b 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -279,45 +279,81 @@ impl ViewExploitationPlanner { #[async_trait] impl ExtensionPlanner for ViewExploitationPlanner { /// Choose the best candidate and use it for the physical plan. + /// + /// `OneOf::inputs` returns only the primary branch (see the comment there + /// for why), so DataFusion converts only that branch via the standard + /// recursion and supplies it as `physical_inputs[0]`. The non-primary + /// branches are converted here using `planner.create_physical_plan`. async fn plan_extension( &self, - _planner: &dyn PhysicalPlanner, + planner: &dyn PhysicalPlanner, node: &dyn UserDefinedLogicalNode, - logical_inputs: &[&LogicalPlan], + _logical_inputs: &[&LogicalPlan], physical_inputs: &[Arc], - _session_state: &SessionState, + session_state: &SessionState, ) -> Result>> { let Some(one_of) = node.as_any().downcast_ref::() else { return Ok(None); }; - // Compare schemas ignoring nullability differences. - // Different table types (FileScanTable, LiveTable, MV) may expose - // different nullability for the same column. For example, a partition - // column in one table is non-nullable, but the same column is a file - // column in a rollup MV (forced nullable for DF 52 RecordBatch - // validation compatibility). Field names and data types must match. - if logical_inputs - .iter() - .map(|plan| plan.schema()) - .any(|schema| { - !schemas_equal_ignoring_nullability( - schema.as_arrow(), - logical_inputs[0].schema().as_arrow(), - ) - }) - { + let branches = one_of.branches(); + if branches.is_empty() { return Err(DataFusionError::Plan( - "candidate logical plans should have the same schema".to_string(), + "OneOf must have at least one candidate branch".to_string(), )); } + if physical_inputs.len() != 1 { + return Err(DataFusionError::Internal(format!( + "OneOf expected exactly one physical input from DataFusion (the primary \ + branch), got {}", + physical_inputs.len() + ))); + } + + // Compare logical schemas first, before doing the expensive physical + // conversion of non-primary branches. Schema divergence here can + // happen when post-ViewMatcher logical optimizers (e.g., projection + // pushdown) transform the primary branch in a way that the hidden + // non-primary branches did not receive — see the comment on + // `OneOf::inputs` for why non-primary branches are hidden from DF + // tree rewrites. + // + // When this happens, fall back to the primary physical plan: it's the + // optimized non-MV form of the original query and produces correct + // results. We give up MV acceleration for this one query, but + // availability and correctness are preserved. + // + // Schema comparison ignores nullability: different table types + // (FileScanTable, LiveTable, MV) may expose different nullability for + // the same column (e.g., a partition column in one table is + // non-nullable, but the same column is a file column in a rollup MV + // forced nullable for DF 52 RecordBatch validation compatibility). + // Field names and data types must match. + let primary_logical_schema = branches[0].schema().as_arrow().clone(); + if branches.iter().any(|plan| { + !schemas_equal_ignoring_nullability(plan.schema().as_arrow(), &primary_logical_schema) + }) { + log::warn!( + "OneOf branches diverged in logical schema after optimization; \ + falling back to primary plan (no view-match acceleration for this query)" + ); + return Ok(Some(Arc::clone(&physical_inputs[0]))); + } - if physical_inputs + // Convert non-primary branches to physical plans ourselves. DataFusion + // only walked into the primary branch because `inputs()` hides the + // others. + let mut all_physical = Vec::with_capacity(branches.len()); + all_physical.push(Arc::clone(&physical_inputs[0])); + for branch in &branches[1..] { + let physical = planner.create_physical_plan(branch, session_state).await?; + all_physical.push(physical); + } + + let primary_physical_schema = all_physical[0].schema(); + if all_physical .iter() - .map(|plan| plan.schema()) - .any(|schema| { - !schemas_equal_ignoring_nullability(&schema, &physical_inputs[0].schema()) - }) + .any(|plan| !schemas_equal_ignoring_nullability(&plan.schema(), &primary_physical_schema)) { return Err(DataFusionError::Plan( "candidate physical plans should have the same schema".to_string(), @@ -325,7 +361,7 @@ impl ExtensionPlanner for ViewExploitationPlanner { } Ok(Some(Arc::new(OneOfExec::try_new( - physical_inputs.to_vec(), + all_physical, None, Arc::clone(&self.cost), one_of.rewrite_context().clone(), @@ -362,6 +398,17 @@ impl OneOf { pub fn rewrite_context(&self) -> &RewriteContext { &self.rewrite_context } + + /// Returns all candidate branches stored in this `OneOf` node. + /// + /// Use this for inspecting all candidates from atlas-side optimizers or + /// the physical extension planner. The standard + /// [`UserDefinedLogicalNodeCore::inputs`] only exposes the primary branch + /// to keep DataFusion's tree rewrites from walking each candidate + /// independently — see the comment on [`Self::inputs`] for details. + pub fn branches(&self) -> &[LogicalPlan] { + &self.branches + } } impl UserDefinedLogicalNodeCore for OneOf { @@ -369,11 +416,20 @@ impl UserDefinedLogicalNodeCore for OneOf { "OneOf" } + /// Only expose the primary branch (`branches[0]`) to DataFusion's tree + /// rewriters. Non-primary branches are stored internally and intentionally + /// hidden so that subsequent logical optimizer passes do not walk each + /// candidate independently — view matching produces N semantically + /// equivalent branches, and re-running every logical rule on each branch + /// is N× redundant work that dominates planning time when N is large. + /// + /// Atlas-side optimizers and the physical extension planner that need to + /// see every candidate access them via [`OneOf::branches`]. fn inputs(&self) -> Vec<&LogicalPlan> { - self.branches - .iter() - .sorted_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)) - .collect_vec() + // SAFETY: `OneOf` is constructed with a non-empty `branches` vector; + // `OneOfExec::try_new` enforces this and our constructors take owned + // input that callers already validated. + vec![&self.branches[0]] } fn schema(&self) -> &datafusion_common::DFSchemaRef { @@ -392,13 +448,23 @@ impl UserDefinedLogicalNodeCore for OneOf { write!(f, "OneOf") } + /// Receives only the rewritten primary branch back from DataFusion's tree + /// rewriter (since [`Self::inputs`] returns a single element). Non-primary + /// branches are preserved as ViewMatcher created them — they will not have + /// received the post-ViewMatcher logical optimizer passes, but since they + /// were already logically equivalent to the primary at construction, their + /// physical plans remain valid candidates for cost-based selection. fn with_exprs_and_inputs( &self, _exprs: Vec, inputs: Vec, ) -> Result { + let mut new_branches = self.branches.clone(); + if let Some(updated_primary) = inputs.into_iter().next() { + new_branches[0] = updated_primary; + } Ok(Self { - branches: inputs, + branches: new_branches, rewrite_context: self.rewrite_context.clone(), }) } @@ -762,3 +828,151 @@ mod tests_rewrite_context { assert_eq!(rebuilt.rewrite_context(), &context); } } + +#[cfg(test)] +mod tests_logical_inputs { + use super::*; + use datafusion_expr::LogicalPlanBuilder; + + fn three_branch_one_of() -> OneOf { + let primary = LogicalPlanBuilder::empty(true).build().expect("primary"); + let alt1 = LogicalPlanBuilder::empty(true).build().expect("alt1"); + let alt2 = LogicalPlanBuilder::empty(true).build().expect("alt2"); + OneOf::new(vec![primary, alt1, alt2]) + } + + #[test] + fn inputs_only_exposes_primary_branch() { + let one_of = three_branch_one_of(); + let inputs = UserDefinedLogicalNodeCore::inputs(&one_of); + assert_eq!( + inputs.len(), + 1, + "OneOf::inputs must hide non-primary branches so DF tree rewrites \ + do not walk each candidate independently" + ); + } + + #[test] + fn branches_exposes_all_candidates() { + let one_of = three_branch_one_of(); + assert_eq!( + one_of.branches().len(), + 3, + "branches() must expose every candidate for atlas optimizers and \ + the physical planner" + ); + } + + #[test] + fn with_exprs_and_inputs_preserves_non_primary_branches() { + let one_of = three_branch_one_of(); + let original_alt1 = one_of.branches()[1].clone(); + let original_alt2 = one_of.branches()[2].clone(); + + // Simulate a DataFusion rewriter that hands back a transformed primary. + let new_primary = LogicalPlanBuilder::empty(false) + .build() + .expect("new primary"); + let rebuilt = UserDefinedLogicalNodeCore::with_exprs_and_inputs( + &one_of, + vec![], + vec![new_primary.clone()], + ) + .expect("rebuild one_of"); + + assert_eq!(rebuilt.branches().len(), 3); + assert_eq!(rebuilt.branches()[0], new_primary); + assert_eq!(rebuilt.branches()[1], original_alt1); + assert_eq!(rebuilt.branches()[2], original_alt2); + } + + #[test] + fn with_exprs_and_inputs_keeps_primary_when_no_inputs_supplied() { + // Tree rewriters can return Transformed::no(...) which means + // with_exprs_and_inputs may be called with an empty inputs vec; the + // OneOf should be reconstructed unchanged in that case. + let one_of = three_branch_one_of(); + let original_primary = one_of.branches()[0].clone(); + + let rebuilt = + UserDefinedLogicalNodeCore::with_exprs_and_inputs(&one_of, vec![], vec![]) + .expect("rebuild one_of"); + + assert_eq!(rebuilt.branches().len(), 3); + assert_eq!(rebuilt.branches()[0], original_primary); + } + + /// Reproduces the schema-divergence scenario: a post-ViewMatcher logical + /// optimizer transforms the primary branch (visible via `inputs()`) in a + /// way that changes its schema, while the hidden non-primary branches + /// stay in their original form. The resulting `OneOf` has branches with + /// inconsistent schemas — `ViewExploitationPlanner::plan_extension` + /// detects this and falls back to the primary plan rather than building + /// an invalid `OneOfExec`. + #[test] + fn divergent_primary_schema_is_observable_in_branches() { + use datafusion_expr::{col, lit}; + + // Build branches with the same starting schema (column1, column2). + let make_branch = |seed: i32| { + LogicalPlanBuilder::values(vec![vec![lit(seed), lit(seed.wrapping_mul(2))]]) + .unwrap() + .build() + .unwrap() + }; + let primary = make_branch(0); + let alt1 = make_branch(1); + let alt2 = make_branch(2); + + let one_of = OneOf::new(vec![primary.clone(), alt1.clone(), alt2.clone()]); + let primary_schema_before = primary.schema().as_arrow().clone(); + for branch in one_of.branches() { + assert!( + schemas_equal_ignoring_nullability(branch.schema().as_arrow(), &primary_schema_before), + "all branches must start with the same schema" + ); + } + + // Simulate a post-ViewMatcher rewriter handing back a primary with a + // narrower schema (e.g., projection pushdown reducing output columns). + let narrowed_primary = LogicalPlanBuilder::from(primary) + .project(vec![col("column1")]) + .unwrap() + .build() + .unwrap(); + let narrowed_schema = narrowed_primary.schema().as_arrow().clone(); + assert!( + !schemas_equal_ignoring_nullability(&narrowed_schema, &primary_schema_before), + "narrowed primary should have a different schema" + ); + + let rebuilt = UserDefinedLogicalNodeCore::with_exprs_and_inputs( + &one_of, + vec![], + vec![narrowed_primary], + ) + .expect("rebuild one_of"); + + // The rebuilt OneOf has divergent branch schemas — `plan_extension` + // is responsible for detecting this and falling back. We verify the + // divergence is observable so `plan_extension` has the signal it needs. + assert_eq!(rebuilt.branches().len(), 3); + assert!( + schemas_equal_ignoring_nullability( + rebuilt.branches()[0].schema().as_arrow(), + &narrowed_schema + ), + "primary should reflect the narrowed schema" + ); + for non_primary in &rebuilt.branches()[1..] { + assert!( + !schemas_equal_ignoring_nullability( + non_primary.schema().as_arrow(), + &narrowed_schema + ), + "non-primary branches should retain the original schema" + ); + } + } +}