From 805aacd3116c8c2398246835aa9dc48005875b28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Tue, 2 Jun 2026 17:16:09 +0100 Subject: [PATCH 1/4] backport few datafusion physical_optimizer rules --- Cargo.lock | 1 + Cargo.toml | 1 + ballista/scheduler/Cargo.toml | 1 + .../src/physical_optimizer/filter_pushdown.rs | 865 ++++++++++++++++++ .../scheduler/src/physical_optimizer/mod.rs | 2 + .../physical_optimizer/output_requirements.rs | 438 +++++++++ ballista/scheduler/src/state/aqe/planner.rs | 53 +- 7 files changed, 1358 insertions(+), 3 deletions(-) create mode 100644 ballista/scheduler/src/physical_optimizer/filter_pushdown.rs create mode 100644 ballista/scheduler/src/physical_optimizer/output_requirements.rs diff --git a/Cargo.lock b/Cargo.lock index 50bfcad291..075445410b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1177,6 +1177,7 @@ dependencies = [ "graphviz-rust", "http 1.4.1", "insta", + "itertools 0.14.0", "log", "object_store", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 99d2b85932..b03ea05fef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ datafusion-spark = "53" datafusion-substrait = "53" insta = "1.47" +itertools = "0.14" object_store = "0.13" prost = "0.14" prost-types = "0.14" diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml index cb60881fb0..f26d579d10 100644 --- a/ballista/scheduler/Cargo.toml +++ b/ballista/scheduler/Cargo.toml @@ -60,6 +60,7 @@ futures = { workspace = true } graphviz-rust = { version = "0.9", optional = true } http = "1.4" insta = { workspace = true } +itertools = { workspace = true } log = { workspace = true } object_store = { workspace = true } once_cell = { version = "1.21.4", optional = true } diff --git a/ballista/scheduler/src/physical_optimizer/filter_pushdown.rs b/ballista/scheduler/src/physical_optimizer/filter_pushdown.rs new file mode 100644 index 0000000000..d0cf7ca583 --- /dev/null +++ b/ballista/scheduler/src/physical_optimizer/filter_pushdown.rs @@ -0,0 +1,865 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Filter Pushdown Optimization Process +//! +//! The filter pushdown mechanism involves four key steps: +//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`] +//! on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`] +//! by inspecting its logic and children's schemas, determining which filters can be pushed to each child. +//! 2. **Optimizer Executes Pushdown**: The optimizer recursively calls `push_down_filters` in this module on each child, +//! passing the appropriate filters (`Vec>`) for that child. +//! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children, +//! containing information about which filters were successfully pushed down vs. unsupported. +//! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent, +//! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides +//! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes). +//! +//! [`FilterDescription`]: datafusion_physical_plan::filter_pushdown::FilterDescription + +// +// This is temporary workaround until datafusion 55 is released +// see: https://github.com/apache/datafusion/pull/22523/ +// for more details +// +use std::sync::Arc; + +use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; +use datafusion::common::{Result, assert_eq_or_internal_err, config::ConfigOptions}; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_expr_common::physical_expr::is_volatile; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::filter_pushdown::{ + ChildFilterPushdownResult, ChildPushdownResult, FilterPushdownPhase, + FilterPushdownPropagation, PushedDown, +}; +use datafusion::physical_plan::{ExecutionPlan, with_new_children_if_necessary}; + +use itertools::{Itertools, izip}; + +/// Attempts to recursively push given filters from the top of the tree into leaves. +/// +/// # Default Implementation +/// +/// The default implementation in [`ExecutionPlan::gather_filters_for_pushdown`] +/// and [`ExecutionPlan::handle_child_pushdown_result`] assumes that: +/// +/// * Parent filters can't be passed onto children (determined by [`ExecutionPlan::gather_filters_for_pushdown`]) +/// * This node has no filters to contribute (determined by [`ExecutionPlan::gather_filters_for_pushdown`]). +/// * Any filters that could not be pushed down to the children are marked as unsupported (determined by [`ExecutionPlan::handle_child_pushdown_result`]). +/// +/// # Example: Push filter into a `DataSourceExec` +/// +/// For example, consider the following plan: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ CoalesceBatchesExec │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ FilterExec │ +/// │ filters = [ id=1] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// ``` +/// +/// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to the `DataSourceExec` node. +/// +/// If this filter is selective pushing it into the scan can avoid massive +/// amounts of data being read from the source (the projection is `*` so all +/// matching columns are read). +/// +/// The new plan looks like: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ CoalesceBatchesExec │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// │ filters = [ id=1] │ +/// └──────────────────────┘ +/// ``` +/// +/// # Example: Push filters with `ProjectionExec` +/// +/// Let's consider a more complex example involving a [`ProjectionExec`] +/// node in between the [`FilterExec`] and `DataSourceExec` nodes that +/// creates a new column that the filter depends on. +/// +/// ```text +/// ┌──────────────────────┐ +/// │ CoalesceBatchesExec │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ FilterExec │ +/// │ filters = │ +/// │ [cost>50,id=1] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ ProjectionExec │ +/// │ cost = price * 1.2 │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// ``` +/// +/// We want to push down the filters `[id=1]` to the `DataSourceExec` node, +/// but can't push down `cost>50` because it requires the [`ProjectionExec`] +/// node to be executed first. A simple thing to do would be to split up the +/// filter into two separate filters and push down the first one: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ CoalesceBatchesExec │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ FilterExec │ +/// │ filters = │ +/// │ [cost>50] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ ProjectionExec │ +/// │ cost = price * 1.2 │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// │ filters = [ id=1] │ +/// └──────────────────────┘ +/// ``` +/// +/// We can actually however do better by pushing down `price * 1.2 > 50` +/// instead of `cost > 50`: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ CoalesceBatchesExec │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ ProjectionExec │ +/// │ cost = price * 1.2 │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// │ filters = [id=1, │ +/// │ price * 1.2 > 50] │ +/// └──────────────────────┘ +/// ``` +/// +/// # Example: Push filters within a subtree +/// +/// There are also cases where we may be able to push down filters within a +/// subtree but not the entire tree. A good example of this is aggregation +/// nodes: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ ProjectionExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ FilterExec │ +/// │ filters = [sum > 10] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌───────────────────────┐ +/// │ AggregateExec │ +/// │ group by = [id] │ +/// │ aggregate = │ +/// │ [sum(price)] │ +/// └───────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ FilterExec │ +/// │ filters = [id=1] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// ``` +/// +/// The transformation here is to push down the `id=1` filter to the +/// `DataSourceExec` node: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ ProjectionExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ FilterExec │ +/// │ filters = [sum > 10] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌───────────────────────┐ +/// │ AggregateExec │ +/// │ group by = [id] │ +/// │ aggregate = │ +/// │ [sum(price)] │ +/// └───────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// │ filters = [id=1] │ +/// └──────────────────────┘ +/// ``` +/// +/// The point here is that: +/// 1. We cannot push down `sum > 10` through the [`AggregateExec`] node into the `DataSourceExec` node. +/// Any filters above the [`AggregateExec`] node are not pushed down. +/// This is determined by calling [`ExecutionPlan::gather_filters_for_pushdown`] on the [`AggregateExec`] node. +/// 2. We need to keep recursing into the tree so that we can discover the other [`FilterExec`] node and push +/// down the `id=1` filter. +/// +/// # Example: Push filters through Joins +/// +/// It is also possible to push down filters through joins and filters that +/// originate from joins. For example, a hash join where we build a hash +/// table of the left side and probe the right side (ignoring why we would +/// choose this order, typically it depends on the size of each table, +/// etc.). +/// +/// ```text +/// ┌─────────────────────┐ +/// │ FilterExec │ +/// │ filters = │ +/// │ [d.size > 100] │ +/// └─────────────────────┘ +/// │ +/// │ +/// ┌──────────▼──────────┐ +/// │ │ +/// │ HashJoinExec │ +/// │ [u.dept@hash(d.id)] │ +/// │ │ +/// └─────────────────────┘ +/// │ +/// ┌────────────┴────────────┐ +/// ┌──────────▼──────────┐ ┌──────────▼──────────┐ +/// │ DataSourceExec │ │ DataSourceExec │ +/// │ alias [users as u] │ │ alias [dept as d] │ +/// │ │ │ │ +/// └─────────────────────┘ └─────────────────────┘ +/// ``` +/// +/// There are two pushdowns we can do here: +/// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node to the `DataSourceExec` +/// node for the `departments` table. +/// 2. Push down the hash table state from the `HashJoinExec` node to the `DataSourceExec` node to avoid reading +/// rows from the `users` table that will be eliminated by the join. +/// This can be done via a bloom filter or similar and is not (yet) supported +/// in DataFusion. See . +/// +/// ```text +/// ┌─────────────────────┐ +/// │ │ +/// │ HashJoinExec │ +/// │ [u.dept@hash(d.id)] │ +/// │ │ +/// └─────────────────────┘ +/// │ +/// ┌────────────┴────────────┐ +/// ┌──────────▼──────────┐ ┌──────────▼──────────┐ +/// │ DataSourceExec │ │ DataSourceExec │ +/// │ alias [users as u] │ │ alias [dept as d] │ +/// │ filters = │ │ filters = │ +/// │ [depg@hash(d.id)] │ │ [ d.size > 100] │ +/// └─────────────────────┘ └─────────────────────┘ +/// ``` +/// +/// You may notice in this case that the filter is *dynamic*: the hash table +/// is built _after_ the `departments` table is read and at runtime. We +/// don't have a concrete `InList` filter or similar to push down at +/// optimization time. These sorts of dynamic filters are handled by +/// building a specialized [`PhysicalExpr`] that can be evaluated at runtime +/// and internally maintains a reference to the hash table or other state. +/// +/// To make working with these sorts of dynamic filters more tractable we have the method [`PhysicalExpr::snapshot`] +/// which attempts to simplify a dynamic filter into a "basic" non-dynamic filter. +/// For a join this could mean converting it to an `InList` filter or a min/max filter for example. +/// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details. +/// +/// # Example: Push TopK filters into Scans +/// +/// Another form of dynamic filter is pushing down the state of a `TopK` +/// operator for queries like `SELECT * FROM t ORDER BY id LIMIT 10`: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ TopK │ +/// │ limit = 10 │ +/// │ order by = [id] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// ``` +/// +/// We can avoid large amounts of data processing by transforming this into: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ TopK │ +/// │ limit = 10 │ +/// │ order by = [id] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// │ filters = │ +/// │ [id < @ TopKHeap] │ +/// └──────────────────────┘ +/// ``` +/// +/// Now as we fill our `TopK` heap we can push down the state of the heap to +/// the `DataSourceExec` node to avoid reading files / row groups / pages / +/// rows that could not possibly be in the top 10. +/// +/// This is not yet implemented in DataFusion. See +/// +/// +/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr +/// [`PhysicalExpr::snapshot`]: datafusion_physical_plan::PhysicalExpr::snapshot +/// [`FilterExec`]: datafusion_physical_plan::filter::FilterExec +/// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec +/// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec +#[derive(Debug)] +pub struct FilterPushdown { + phase: FilterPushdownPhase, + name: String, +} + +impl FilterPushdown { + fn new_with_phase(phase: FilterPushdownPhase) -> Self { + let name = match phase { + FilterPushdownPhase::Pre => "FilterPushdown", + FilterPushdownPhase::Post => "FilterPushdown(Post)", + } + .to_string(); + Self { phase, name } + } + + /// Create a new [`FilterPushdown`] optimizer rule that runs in the pre-optimization phase. + /// See [`FilterPushdownPhase`] for more details. + pub fn new() -> Self { + Self::new_with_phase(FilterPushdownPhase::Pre) + } + + /// Create a new [`FilterPushdown`] optimizer rule that runs in the post-optimization phase. + /// See [`FilterPushdownPhase`] for more details. + pub fn new_post_optimization() -> Self { + Self::new_with_phase(FilterPushdownPhase::Post) + } +} + +impl Default for FilterPushdown { + fn default() -> Self { + Self::new() + } +} + +impl PhysicalOptimizerRule for FilterPushdown { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + Ok( + push_down_filters(&Arc::clone(&plan), vec![], config, self.phase)? + .updated_node + .unwrap_or(plan), + ) + } + + fn name(&self) -> &str { + &self.name + } + + fn schema_check(&self) -> bool { + true // Filter pushdown does not change the schema of the plan + } +} + +fn push_down_filters( + node: &Arc, + parent_predicates: Vec>, + config: &ConfigOptions, + phase: FilterPushdownPhase, +) -> Result>> { + let mut parent_filter_pushdown_supports: Vec> = + vec![vec![]; parent_predicates.len()]; + let mut self_filters_pushdown_supports = vec![]; + let mut new_children = Vec::with_capacity(node.children().len()); + + let children = node.children(); + + // Filter out expressions that are not allowed for pushdown + let parent_filtered = FilteredVec::new(&parent_predicates, allow_pushdown_for_expr); + + let filter_description = node.gather_filters_for_pushdown( + phase, + parent_filtered.items().to_vec(), + config, + )?; + + let filter_description_parent_filters = filter_description.parent_filters(); + let filter_description_self_filters = filter_description.self_filters(); + assert_eq_or_internal_err!( + filter_description_parent_filters.len(), + children.len(), + "Filter pushdown expected parent filters count to match number of children for node {}", + node.name() + ); + assert_eq_or_internal_err!( + filter_description_self_filters.len(), + children.len(), + "Filter pushdown expected self filters count to match number of children for node {}", + node.name() + ); + + for (child_idx, (child, parent_filters, self_filters)) in izip!( + children, + filter_description.parent_filters(), + filter_description.self_filters() + ) + .enumerate() + { + // Here, `parent_filters` are the predicates which are provided by the parent node of + // the current node, and tried to be pushed down over the child which the loop points + // currently. `self_filters` are the predicates which are provided by the current node, + // and tried to be pushed down over the child similarly. + + // Filter out self_filters that contain volatile expressions and track indices + let self_filtered = FilteredVec::new(&self_filters, allow_pushdown_for_expr); + + let num_self_filters = self_filtered.len(); + let mut all_predicates = self_filtered.items().to_vec(); + + // Apply second filter pass: collect indices of parent filters that can be pushed down + let parent_filters_for_child = parent_filtered + .chain_filter_slice(&parent_filters, |filter| { + matches!(filter.discriminant, PushedDown::Yes) + }); + + // Add the filtered parent predicates to all_predicates + for filter in parent_filters_for_child.items() { + all_predicates.push(Arc::clone(&filter.predicate)); + } + + let num_parent_filters = all_predicates.len() - num_self_filters; + + // Any filters that could not be pushed down to a child are marked as not-supported to our parents + let result = + push_down_filters(&Arc::clone(child), all_predicates, config, phase)?; + + if let Some(new_child) = result.updated_node { + // If we have a filter pushdown result, we need to update our children + new_children.push(new_child); + } else { + // If we don't have a filter pushdown result, we need to update our children + new_children.push(Arc::clone(child)); + } + + // Our child doesn't know the difference between filters that were passed down + // from our parents and filters that the current node injected. We need to de-entangle + // this since we do need to distinguish between them. + let mut all_filters = result.filters.into_iter().collect_vec(); + assert_eq_or_internal_err!( + all_filters.len(), + num_self_filters + num_parent_filters, + "Filter pushdown did not return the expected number of filters from {}", + child.name() + ); + let parent_filters = all_filters + .split_off(num_self_filters) + .into_iter() + .collect_vec(); + // Map the results from filtered self filters back to their original positions using FilteredVec + let mapped_self_results = + self_filtered.map_results_to_original(all_filters, PushedDown::No); + + // Wrap each result with its corresponding expression + let self_filter_results: Vec<_> = mapped_self_results + .into_iter() + .zip(self_filters) + .map(|(support, filter)| support.wrap_expression(filter)) + .collect(); + + self_filters_pushdown_supports.push(self_filter_results); + + // Start by marking all parent filters as unsupported for this child + for parent_filter_pushdown_support in parent_filter_pushdown_supports.iter_mut() { + parent_filter_pushdown_support.push(PushedDown::No); + assert_eq!( + parent_filter_pushdown_support.len(), + child_idx + 1, + "Parent filter pushdown supports should have the same length as the number of children" + ); + } + // Map results from pushed-down filters back to original parent filter indices + let mapped_parent_results = parent_filters_for_child + .map_results_to_original(parent_filters, PushedDown::No); + + // Update parent_filter_pushdown_supports with the mapped results + // mapped_parent_results already has the results at their original indices + for (idx, support) in parent_filter_pushdown_supports.iter_mut().enumerate() { + support[child_idx] = mapped_parent_results[idx]; + } + } + + // Re-create this node with new children + let updated_node = with_new_children_if_necessary(Arc::clone(node), new_children)?; + + // TODO: by calling `handle_child_pushdown_result` we are assuming that the + // `ExecutionPlan` implementation will not change the plan itself. + // Should we have a separate method for dynamic pushdown that does not allow modifying the plan? + let mut res = updated_node.handle_child_pushdown_result( + phase, + ChildPushdownResult { + parent_filters: parent_predicates + .into_iter() + .enumerate() + .map( + |(parent_filter_idx, parent_filter)| ChildFilterPushdownResult { + filter: parent_filter, + child_results: parent_filter_pushdown_supports[parent_filter_idx] + .clone(), + }, + ) + .collect(), + self_filters: self_filters_pushdown_supports, + }, + config, + )?; + // Compare pointers for new_node and node, if they are different we must replace + // ourselves because of changes in our children. + if res.updated_node.is_none() && !Arc::ptr_eq(&updated_node, node) { + res.updated_node = Some(updated_node) + } + Ok(res) +} + +/// A helper structure for filtering elements from a vector through multiple passes while +/// tracking their original indices, allowing results to be mapped back to the original positions. +struct FilteredVec { + items: Vec, + // Chain of index mappings: each Vec maps from current level to previous level + // index_mappings[0] maps from first filter to original indices + // index_mappings[1] maps from second filter to first filter indices, etc. + index_mappings: Vec>, + original_len: usize, +} + +impl FilteredVec { + /// Creates a new FilteredVec by filtering items based on the given predicate + fn new(items: &[T], predicate: F) -> Self + where + F: Fn(&T) -> bool, + { + let mut filtered_items = Vec::new(); + let mut original_indices = Vec::new(); + + for (idx, item) in items.iter().enumerate() { + if predicate(item) { + filtered_items.push(item.clone()); + original_indices.push(idx); + } + } + + Self { + items: filtered_items, + index_mappings: vec![original_indices], + original_len: items.len(), + } + } + + /// Returns a reference to the filtered items + fn items(&self) -> &[T] { + &self.items + } + + /// Returns the number of filtered items + fn len(&self) -> usize { + self.items.len() + } + + /// Maps results from the filtered items back to their original positions + /// Returns a vector with the same length as the original input, filled with default_value + /// and updated with results at their original positions + fn map_results_to_original( + &self, + results: Vec, + default_value: R, + ) -> Vec { + let mut mapped_results = vec![default_value; self.original_len]; + + for (result_idx, result) in results.into_iter().enumerate() { + let original_idx = self.trace_to_original_index(result_idx); + mapped_results[original_idx] = result; + } + + mapped_results + } + + /// Traces a filtered index back to its original index through all filter passes + fn trace_to_original_index(&self, mut current_idx: usize) -> usize { + // Work backwards through the chain of index mappings + for mapping in self.index_mappings.iter().rev() { + current_idx = mapping[current_idx]; + } + current_idx + } + + /// Apply a filter to a new set of items while chaining the index mapping from self (parent) + /// This is useful when you have filtered items and then get a transformed slice + /// (e.g., from gather_filters_for_pushdown) that you need to filter again + fn chain_filter_slice(&self, items: &[U], predicate: F) -> FilteredVec + where + F: Fn(&U) -> bool, + { + let mut filtered_items = Vec::new(); + let mut filtered_indices = Vec::new(); + + for (idx, item) in items.iter().enumerate() { + if predicate(item) { + filtered_items.push(item.clone()); + filtered_indices.push(idx); + } + } + + // Chain the index mappings from parent (self) + let mut index_mappings = self.index_mappings.clone(); + index_mappings.push(filtered_indices); + + FilteredVec { + items: filtered_items, + index_mappings, + original_len: self.original_len, + } + } +} + +fn allow_pushdown_for_expr(expr: &Arc) -> bool { + let mut allow_pushdown = true; + expr.apply(|e| { + allow_pushdown = allow_pushdown && !is_volatile(e); + if allow_pushdown { + Ok(TreeNodeRecursion::Continue) + } else { + Ok(TreeNodeRecursion::Stop) + } + }) + .expect("Infallible traversal of PhysicalExpr tree failed"); + allow_pushdown +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_filtered_vec_single_pass() { + let items = vec![1, 2, 3, 4, 5, 6]; + let filtered = FilteredVec::new(&items, |&x| x % 2 == 0); + + // Check filtered items + assert_eq!(filtered.items(), &[2, 4, 6]); + assert_eq!(filtered.len(), 3); + + // Check index mapping + let results = vec!["a", "b", "c"]; + let mapped = filtered.map_results_to_original(results, "default"); + assert_eq!(mapped, vec!["default", "a", "default", "b", "default", "c"]); + } + + #[test] + fn test_filtered_vec_empty_filter() { + let items = vec![1, 3, 5]; + let filtered = FilteredVec::new(&items, |&x| x % 2 == 0); + + assert_eq!(filtered.items(), &[] as &[i32]); + assert_eq!(filtered.len(), 0); + + let results: Vec<&str> = vec![]; + let mapped = filtered.map_results_to_original(results, "default"); + assert_eq!(mapped, vec!["default", "default", "default"]); + } + + #[test] + fn test_filtered_vec_all_pass() { + let items = vec![2, 4, 6]; + let filtered = FilteredVec::new(&items, |&x| x % 2 == 0); + + assert_eq!(filtered.items(), &[2, 4, 6]); + assert_eq!(filtered.len(), 3); + + let results = vec!["a", "b", "c"]; + let mapped = filtered.map_results_to_original(results, "default"); + assert_eq!(mapped, vec!["a", "b", "c"]); + } + + #[test] + fn test_chain_filter_slice_different_types() { + // First pass: filter numbers + let numbers = vec![1, 2, 3, 4, 5, 6]; + let first_pass = FilteredVec::new(&numbers, |&x| x > 3); + assert_eq!(first_pass.items(), &[4, 5, 6]); + + // Transform to strings (simulating gather_filters_for_pushdown transformation) + let strings = vec!["four", "five", "six"]; + + // Second pass: filter strings that contain 'i' + let second_pass = first_pass.chain_filter_slice(&strings, |s| s.contains('i')); + assert_eq!(second_pass.items(), &["five", "six"]); + + // Map results back to original indices + let results = vec![100, 200]; + let mapped = second_pass.map_results_to_original(results, 0); + // "five" was at index 4 (1-based: 5), "six" was at index 5 (1-based: 6) + assert_eq!(mapped, vec![0, 0, 0, 0, 100, 200]); + } + + #[test] + fn test_chain_filter_slice_complex_scenario() { + // Simulating the filter pushdown scenario + // Parent predicates: [A, B, C, D, E] + let parent_predicates = vec!["A", "B", "C", "D", "E"]; + + // First pass: filter out some predicates (simulating allow_pushdown_for_expr) + let first_pass = FilteredVec::new(&parent_predicates, |s| *s != "B" && *s != "D"); + assert_eq!(first_pass.items(), &["A", "C", "E"]); + + // After gather_filters_for_pushdown, we get transformed results for a specific child + // Let's say child gets [A_transformed, C_transformed, E_transformed] + // but only C and E can be pushed down + #[derive(Clone, Debug, PartialEq)] + struct TransformedPredicate { + name: String, + can_push: bool, + } + + let child_predicates = vec![ + TransformedPredicate { + name: "A_transformed".to_string(), + can_push: false, + }, + TransformedPredicate { + name: "C_transformed".to_string(), + can_push: true, + }, + TransformedPredicate { + name: "E_transformed".to_string(), + can_push: true, + }, + ]; + + // Second pass: filter based on can_push + let second_pass = + first_pass.chain_filter_slice(&child_predicates, |p| p.can_push); + assert_eq!(second_pass.len(), 2); + assert_eq!(second_pass.items()[0].name, "C_transformed"); + assert_eq!(second_pass.items()[1].name, "E_transformed"); + + // Simulate getting results back from child + let child_results = vec!["C_result", "E_result"]; + let mapped = second_pass.map_results_to_original(child_results, "no_result"); + + // Results should be at original positions: C was at index 2, E was at index 4 + assert_eq!( + mapped, + vec![ + "no_result", + "no_result", + "C_result", + "no_result", + "E_result" + ] + ); + } + + #[test] + fn test_trace_to_original_index() { + let items = vec![10, 20, 30, 40, 50]; + let filtered = FilteredVec::new(&items, |&x| x != 20 && x != 40); + + // filtered items are [10, 30, 50] at original indices [0, 2, 4] + assert_eq!(filtered.trace_to_original_index(0), 0); // 10 was at index 0 + assert_eq!(filtered.trace_to_original_index(1), 2); // 30 was at index 2 + assert_eq!(filtered.trace_to_original_index(2), 4); // 50 was at index 4 + } + + #[test] + fn test_chain_filter_preserves_original_len() { + let items = vec![1, 2, 3, 4, 5]; + let first = FilteredVec::new(&items, |&x| x > 2); + + let strings = vec!["three", "four", "five"]; + let second = first.chain_filter_slice(&strings, |s| s.len() == 4); + + // Original length should still be 5 + let results = vec!["x", "y"]; + let mapped = second.map_results_to_original(results, "-"); + assert_eq!(mapped.len(), 5); + } +} diff --git a/ballista/scheduler/src/physical_optimizer/mod.rs b/ballista/scheduler/src/physical_optimizer/mod.rs index 9bd816678e..29d6a94f64 100644 --- a/ballista/scheduler/src/physical_optimizer/mod.rs +++ b/ballista/scheduler/src/physical_optimizer/mod.rs @@ -15,4 +15,6 @@ // specific language governing permissions and limitations // under the License. +pub mod filter_pushdown; pub mod join_selection; +pub mod output_requirements; diff --git a/ballista/scheduler/src/physical_optimizer/output_requirements.rs b/ballista/scheduler/src/physical_optimizer/output_requirements.rs new file mode 100644 index 0000000000..7e8b586868 --- /dev/null +++ b/ballista/scheduler/src/physical_optimizer/output_requirements.rs @@ -0,0 +1,438 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! The GlobalOrderRequire optimizer rule either: +//! - Adds an auxiliary `OutputRequirementExec` operator to keep track of global +//! ordering and distribution requirement across rules, or +//! - Removes the auxiliary `OutputRequirementExec` operator from the physical plan. +//! Since the `OutputRequirementExec` operator is only a helper operator, it +//! shouldn't occur in the final plan (i.e. the executed plan). + +// +// This is temporary workaround until datafusion 55 is released +// see: https://github.com/apache/datafusion/pull/22523/ +// for more details +// + +use std::sync::Arc; + +use datafusion::common::config::ConfigOptions; +use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion::common::{Result, Statistics}; +use datafusion::execution::TaskContext; +use datafusion::physical_expr::Distribution; +use datafusion::physical_expr_common::sort_expr::OrderingRequirements; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::execution_plan::Boundedness; +use datafusion::physical_plan::projection::{ + ProjectionExec, make_with_child, update_expr, update_ordering_requirement, +}; +use datafusion::physical_plan::sorts::sort::SortExec; +use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + SendableRecordBatchStream, +}; + +/// This rule either adds or removes [`OutputRequirements`]s to/from the physical +/// plan according to its `mode` attribute, which is set by the constructors +/// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of +/// the global requirements (ordering and distribution) across rules. +/// +/// The primary use case of this node and rule is to specify and preserve the desired output +/// ordering and distribution the entire plan. When sending to a single client, a single partition may +/// be desirable, but when sending to a multi-partitioned writer, keeping multiple partitions may be +/// better. +#[derive(Debug)] +pub struct OutputRequirements { + mode: RuleMode, +} + +impl OutputRequirements { + /// Create a new rule which works in `Add` mode; i.e. it simply adds a + /// top-level [`OutputRequirementExec`] into the physical plan to keep track + /// of global ordering and distribution requirements if there are any. + /// Note that this rule should run at the beginning. It is idempotent: when + /// invoked on a plan that is already topped by an `OutputRequirementExec`, + /// it returns the plan unchanged. + pub fn new_add_mode() -> Self { + Self { + mode: RuleMode::Add, + } + } + + /// Create a new rule which works in `Remove` mode; i.e. it simply removes + /// the top-level [`OutputRequirementExec`] from the physical plan if there is + /// any. We do this because a `OutputRequirementExec` is an ancillary, + /// non-executable operator whose sole purpose is to track global + /// requirements during optimization. Therefore, a + /// `OutputRequirementExec` should not appear in the final plan. + pub fn new_remove_mode() -> Self { + Self { + mode: RuleMode::Remove, + } + } +} + +#[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Hash)] +enum RuleMode { + Add, + Remove, +} + +/// An ancillary, non-executable operator whose sole purpose is to track global +/// requirements during optimization. It imposes +/// - the ordering requirement in its `order_requirement` attribute. +/// - the distribution requirement in its `dist_requirement` attribute. +/// +/// See [`OutputRequirements`] for more details +#[derive(Debug)] +pub struct OutputRequirementExec { + input: Arc, + order_requirement: Option, + dist_requirement: Distribution, + cache: Arc, + fetch: Option, +} + +impl OutputRequirementExec { + /// creates new [OutputRequirementExec] instance + pub fn new( + input: Arc, + requirements: Option, + dist_requirement: Distribution, + fetch: Option, + ) -> Self { + let cache = Self::compute_properties(&input, &fetch); + Self { + input, + order_requirement: requirements, + dist_requirement, + cache: Arc::new(cache), + fetch, + } + } + /// returns plan input + pub fn input(&self) -> Arc { + Arc::clone(&self.input) + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties( + input: &Arc, + fetch: &Option, + ) -> PlanProperties { + let boundedness = if fetch.is_some() { + Boundedness::Bounded + } else { + input.boundedness() + }; + + PlanProperties::new( + input.equivalence_properties().clone(), // Equivalence Properties + input.output_partitioning().clone(), // Output Partitioning + input.pipeline_behavior(), // Pipeline Behavior + boundedness, // Boundedness + ) + } + + /// Get fetch + pub fn fetch(&self) -> Option { + self.fetch + } +} + +impl DisplayAs for OutputRequirementExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let order_cols = self + .order_requirement + .as_ref() + .map(|reqs| reqs.first()) + .map(|lex| { + let pairs: Vec = lex + .iter() + .map(|req| { + let direction = req + .options + .as_ref() + .map( + |opt| if opt.descending { "desc" } else { "asc" }, + ) + .unwrap_or("unspecified"); + format!("({}, {direction})", req.expr) + }) + .collect(); + format!("[{}]", pairs.join(", ")) + }) + .unwrap_or_else(|| "[]".to_string()); + + write!( + f, + "OutputRequirementExec: order_by={}, dist_by={}", + order_cols, self.dist_requirement + ) + } + DisplayFormatType::TreeRender => { + write!(f, "") + } + } + } +} + +impl ExecutionPlan for OutputRequirementExec { + fn name(&self) -> &'static str { + "OutputRequirementExec" + } + + fn properties(&self) -> &Arc { + &self.cache + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } + + fn required_input_distribution(&self) -> Vec { + vec![self.dist_requirement.clone()] + } + + fn maintains_input_order(&self) -> Vec { + vec![true] + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn required_input_ordering(&self) -> Vec> { + vec![self.order_requirement.clone()] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + Ok(Arc::new(Self::new( + children.remove(0), // has a single child + self.order_requirement.clone(), + self.dist_requirement.clone(), + self.fetch, + ))) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unreachable!(); + } + + fn partition_statistics( + &self, + partition: Option, + ) -> datafusion::error::Result { + self.input.partition_statistics(partition) + } + + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // If the projection does not narrow the schema, we should not try to push it down: + let proj_exprs = projection.expr(); + if proj_exprs.len() >= projection.input().schema().fields().len() { + return Ok(None); + } + + let mut requirements = self.required_input_ordering().swap_remove(0); + if let Some(reqs) = requirements { + let mut updated_reqs = vec![]; + let (lexes, soft) = reqs.into_alternatives(); + for lex in lexes.into_iter() { + let Some(updated_lex) = update_ordering_requirement(lex, proj_exprs)? + else { + return Ok(None); + }; + updated_reqs.push(updated_lex); + } + requirements = OrderingRequirements::new_alternatives(updated_reqs, soft); + } + + let dist_req = match &self.required_input_distribution()[0] { + Distribution::HashPartitioned(exprs) => { + let mut updated_exprs = vec![]; + for expr in exprs { + let Some(new_expr) = update_expr(expr, projection.expr(), false)? + else { + return Ok(None); + }; + updated_exprs.push(new_expr); + } + Distribution::HashPartitioned(updated_exprs) + } + dist => dist.clone(), + }; + + make_with_child(projection, &self.input()).map(|input| { + let e = OutputRequirementExec::new(input, requirements, dist_req, self.fetch); + Some(Arc::new(e) as _) + }) + } + + fn fetch(&self) -> Option { + self.fetch + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } +} + +impl PhysicalOptimizerRule for OutputRequirements { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + match self.mode { + RuleMode::Add => require_top_ordering(plan), + RuleMode::Remove => plan + .transform_up(|plan| { + if let Some(sort_req) = + plan.as_any().downcast_ref::() + { + Ok(Transformed::yes(sort_req.input())) + } else { + Ok(Transformed::no(plan)) + } + }) + .data(), + } + } + + fn name(&self) -> &str { + "OutputRequirements" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// This functions adds ancillary `OutputRequirementExec` to the physical plan, so that +/// global requirements are not lost during optimization. +/// +/// Idempotent: if the plan is already topped by an `OutputRequirementExec`, it +/// is returned unchanged so that re-running this rule (as adaptive execution +/// in datafusion-ballista AQE does after every completed stage, see +/// datafusion-ballista#1359) does not stack wrappers. +fn require_top_ordering(plan: Arc) -> Result> { + if plan + .as_any() + .downcast_ref::() + .is_some() + { + return Ok(plan); + } + let (new_plan, is_changed) = require_top_ordering_helper(plan)?; + if is_changed { + Ok(new_plan) + } else { + // Add `OutputRequirementExec` to the top, with no specified ordering and distribution requirement. + Ok(Arc::new(OutputRequirementExec::new( + new_plan, + // there is no ordering requirement + None, + Distribution::UnspecifiedDistribution, + None, + )) as _) + } +} + +/// Helper function that adds an ancillary `OutputRequirementExec` to the given plan. +/// First entry in the tuple is resulting plan, second entry indicates whether any +/// `OutputRequirementExec` is added to the plan. +fn require_top_ordering_helper( + plan: Arc, +) -> Result<(Arc, bool)> { + let mut children = plan.children(); + // Global ordering defines desired ordering in the final result. + if children.len() != 1 { + Ok((plan, false)) + } else if let Some(sort_exec) = plan.as_any().downcast_ref::() { + // In case of constant columns, output ordering of the `SortExec` would + // be an empty set. Therefore; we check the sort expression field to + // assign the requirements. + let req_dist = sort_exec.required_input_distribution().swap_remove(0); + let req_ordering = sort_exec.expr(); + let reqs = OrderingRequirements::from(req_ordering.clone()); + let fetch = sort_exec.fetch(); + + Ok(( + Arc::new(OutputRequirementExec::new( + plan, + Some(reqs), + req_dist, + fetch, + )) as _, + true, + )) + } else if let Some(spm) = plan.as_any().downcast_ref::() { + let reqs = OrderingRequirements::from(spm.expr().clone()); + let fetch = spm.fetch(); + Ok(( + Arc::new(OutputRequirementExec::new( + plan, + Some(reqs), + Distribution::SinglePartition, + fetch, + )) as _, + true, + )) + } else if plan.maintains_input_order()[0] + && (plan.required_input_ordering()[0] + .as_ref() + .is_none_or(|o| matches!(o, OrderingRequirements::Soft(_)))) + { + // Keep searching for a `SortExec` as long as ordering is maintained, + // and on-the-way operators do not themselves require an ordering. + // When an operator requires an ordering, any `SortExec` below can not + // be responsible for (i.e. the originator of) the global ordering. + let (new_child, is_changed) = + require_top_ordering_helper(Arc::clone(children.swap_remove(0)))?; + + let plan = if is_changed { + plan.with_new_children(vec![new_child])? + } else { + plan + }; + + Ok((plan, is_changed)) + } else { + // Stop searching, there is no global ordering desired for the query. + Ok((plan, false)) + } +} + +// See tests in datafusion/core/tests/physical_optimizer diff --git a/ballista/scheduler/src/state/aqe/planner.rs b/ballista/scheduler/src/state/aqe/planner.rs index ada158752d..7584fd6ea2 100644 --- a/ballista/scheduler/src/state/aqe/planner.rs +++ b/ballista/scheduler/src/state/aqe/planner.rs @@ -1,3 +1,5 @@ +use crate::physical_optimizer::filter_pushdown::FilterPushdown; +use crate::physical_optimizer::output_requirements::OutputRequirements; // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -33,7 +35,20 @@ use datafusion::execution::context::SessionContext; use datafusion::execution::{SessionState, SessionStateBuilder}; use datafusion::logical_expr::LogicalPlan; use datafusion::physical_optimizer::PhysicalOptimizerRule; -use datafusion::physical_optimizer::optimizer::PhysicalOptimizer; +use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics; +use datafusion::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate; +use datafusion::physical_optimizer::enforce_distribution::EnforceDistribution; +use datafusion::physical_optimizer::enforce_sorting::EnforceSorting; +use datafusion::physical_optimizer::ensure_coop::EnsureCooperative; +use datafusion::physical_optimizer::join_selection::JoinSelection; +use datafusion::physical_optimizer::limit_pushdown::LimitPushdown; +use datafusion::physical_optimizer::limit_pushdown_past_window::LimitPushPastWindows; +use datafusion::physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggregation; +use datafusion::physical_optimizer::projection_pushdown::ProjectionPushdown; +use datafusion::physical_optimizer::pushdown_sort::PushdownSort; +use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan; +use datafusion::physical_optimizer::topk_aggregation::TopKAggregation; +use datafusion::physical_optimizer::update_aggr_exprs::OptimizeAggregateOrder; use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, displayable}; use datafusion::physical_planner::DefaultPhysicalPlanner; use datafusion::prelude::SessionConfig; @@ -494,8 +509,11 @@ impl AdaptivePlanner { // select actual join implementation based on current runtime information physical_optimizers .push(Arc::new(SelectJoinRule::new(plan_id_generator.clone()))); - // add default set - physical_optimizers.extend(PhysicalOptimizer::new().rules); + + // add default datafusion set of optimizers + // physical_optimizers.extend(PhysicalOptimizer::new().rules); + // + physical_optimizers.extend(Self::datafusion_optimizers()); // `DistributedExchangeRule` should be the last plan mutator rule in the chain physical_optimizers @@ -504,6 +522,35 @@ impl AdaptivePlanner { physical_optimizers } + // at the moment we create default set of optimizers + // ``` + // physical_optimizers.extend(PhysicalOptimizer::new().rules); + // ``` + // as there are issues with some of them + fn datafusion_optimizers() -> Vec> { + vec![ + Arc::new(OutputRequirements::new_add_mode()), // temporary fix + Arc::new(AggregateStatistics::new()), + Arc::new(JoinSelection::new()), + Arc::new(LimitedDistinctAggregation::new()), + Arc::new(FilterPushdown::new()), // temporary fix + Arc::new(EnforceDistribution::new()), + Arc::new(CombinePartialFinalAggregate::new()), + Arc::new(EnforceSorting::new()), + Arc::new(OptimizeAggregateOrder::new()), + Arc::new(ProjectionPushdown::new()), + Arc::new(OutputRequirements::new_remove_mode()), + Arc::new(TopKAggregation::new()), + Arc::new(LimitPushPastWindows::new()), + Arc::new(LimitPushdown::new()), + Arc::new(ProjectionPushdown::new()), + Arc::new(PushdownSort::new()), + Arc::new(EnsureCooperative::new()), + Arc::new(FilterPushdown::new_post_optimization()), + Arc::new(SanityCheckPlan::new()), + ] + } + /// set of rules which will be executed ONCE before /// running standard set of physical optimizers fn plan_preparation_optimizers() -> Vec { From e7d54b10957f49f09c656a4de201bc391828503c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Tue, 2 Jun 2026 18:14:50 +0100 Subject: [PATCH 2/4] fix docs --- .../src/physical_optimizer/filter_pushdown.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ballista/scheduler/src/physical_optimizer/filter_pushdown.rs b/ballista/scheduler/src/physical_optimizer/filter_pushdown.rs index d0cf7ca583..da8cee7b5f 100644 --- a/ballista/scheduler/src/physical_optimizer/filter_pushdown.rs +++ b/ballista/scheduler/src/physical_optimizer/filter_pushdown.rs @@ -29,7 +29,7 @@ //! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides //! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes). //! -//! [`FilterDescription`]: datafusion_physical_plan::filter_pushdown::FilterDescription +//! [`FilterDescription`]: datafusion::physical_plan::filter_pushdown::FilterDescription // // This is temporary workaround until datafusion 55 is released @@ -379,11 +379,11 @@ use itertools::{Itertools, izip}; /// This is not yet implemented in DataFusion. See /// /// -/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr -/// [`PhysicalExpr::snapshot`]: datafusion_physical_plan::PhysicalExpr::snapshot -/// [`FilterExec`]: datafusion_physical_plan::filter::FilterExec -/// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec -/// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec +/// [`PhysicalExpr`]: datafusion::physical_plan::PhysicalExpr +/// [`PhysicalExpr::snapshot`]: datafusion::physical_plan::PhysicalExpr::snapshot +/// [`FilterExec`]: datafusion::physical_plan::filter::FilterExec +/// [`ProjectionExec`]: datafusion::physical_plan::projection::ProjectionExec +/// [`AggregateExec`]: datafusion::physical_plan::aggregates::AggregateExec #[derive(Debug)] pub struct FilterPushdown { phase: FilterPushdownPhase, From 3f008568e2ce00a43274d4319e21f52d2a13db66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Tue, 2 Jun 2026 18:20:49 +0100 Subject: [PATCH 3/4] added few comments to explain why are optimizers carried over --- ballista/scheduler/src/physical_optimizer/mod.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/ballista/scheduler/src/physical_optimizer/mod.rs b/ballista/scheduler/src/physical_optimizer/mod.rs index 29d6a94f64..a4057c0a6b 100644 --- a/ballista/scheduler/src/physical_optimizer/mod.rs +++ b/ballista/scheduler/src/physical_optimizer/mod.rs @@ -15,6 +15,18 @@ // specific language governing permissions and limitations // under the License. +// filter pushdown has been copied over +// from datafusion to patch non idempotent +// behavior. +// TODO: remove when updated to datafusion 55 pub mod filter_pushdown; +// join selection has been copied over from +// datafusion and patched to support ballista +// specific cases. it has been used in static +// execution graph only. pub mod join_selection; +// output requirements has been copied over +// from datafusion to patch non idempotent +// behavior. +// TODO: remove when updated to datafusion 55 pub mod output_requirements; From 04abf36c324539041f677fe12f6af62032463f3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Tue, 2 Jun 2026 20:51:57 +0100 Subject: [PATCH 4/4] remove output requirements as they were giving wrong ... result --- .../scheduler/src/physical_optimizer/mod.rs | 5 - .../physical_optimizer/output_requirements.rs | 438 ------------------ ballista/scheduler/src/state/aqe/planner.rs | 52 +-- 3 files changed, 13 insertions(+), 482 deletions(-) delete mode 100644 ballista/scheduler/src/physical_optimizer/output_requirements.rs diff --git a/ballista/scheduler/src/physical_optimizer/mod.rs b/ballista/scheduler/src/physical_optimizer/mod.rs index a4057c0a6b..4252d95492 100644 --- a/ballista/scheduler/src/physical_optimizer/mod.rs +++ b/ballista/scheduler/src/physical_optimizer/mod.rs @@ -25,8 +25,3 @@ pub mod filter_pushdown; // specific cases. it has been used in static // execution graph only. pub mod join_selection; -// output requirements has been copied over -// from datafusion to patch non idempotent -// behavior. -// TODO: remove when updated to datafusion 55 -pub mod output_requirements; diff --git a/ballista/scheduler/src/physical_optimizer/output_requirements.rs b/ballista/scheduler/src/physical_optimizer/output_requirements.rs deleted file mode 100644 index 7e8b586868..0000000000 --- a/ballista/scheduler/src/physical_optimizer/output_requirements.rs +++ /dev/null @@ -1,438 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! The GlobalOrderRequire optimizer rule either: -//! - Adds an auxiliary `OutputRequirementExec` operator to keep track of global -//! ordering and distribution requirement across rules, or -//! - Removes the auxiliary `OutputRequirementExec` operator from the physical plan. -//! Since the `OutputRequirementExec` operator is only a helper operator, it -//! shouldn't occur in the final plan (i.e. the executed plan). - -// -// This is temporary workaround until datafusion 55 is released -// see: https://github.com/apache/datafusion/pull/22523/ -// for more details -// - -use std::sync::Arc; - -use datafusion::common::config::ConfigOptions; -use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion::common::{Result, Statistics}; -use datafusion::execution::TaskContext; -use datafusion::physical_expr::Distribution; -use datafusion::physical_expr_common::sort_expr::OrderingRequirements; -use datafusion::physical_optimizer::PhysicalOptimizerRule; -use datafusion::physical_plan::execution_plan::Boundedness; -use datafusion::physical_plan::projection::{ - ProjectionExec, make_with_child, update_expr, update_ordering_requirement, -}; -use datafusion::physical_plan::sorts::sort::SortExec; -use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; -use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, - SendableRecordBatchStream, -}; - -/// This rule either adds or removes [`OutputRequirements`]s to/from the physical -/// plan according to its `mode` attribute, which is set by the constructors -/// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of -/// the global requirements (ordering and distribution) across rules. -/// -/// The primary use case of this node and rule is to specify and preserve the desired output -/// ordering and distribution the entire plan. When sending to a single client, a single partition may -/// be desirable, but when sending to a multi-partitioned writer, keeping multiple partitions may be -/// better. -#[derive(Debug)] -pub struct OutputRequirements { - mode: RuleMode, -} - -impl OutputRequirements { - /// Create a new rule which works in `Add` mode; i.e. it simply adds a - /// top-level [`OutputRequirementExec`] into the physical plan to keep track - /// of global ordering and distribution requirements if there are any. - /// Note that this rule should run at the beginning. It is idempotent: when - /// invoked on a plan that is already topped by an `OutputRequirementExec`, - /// it returns the plan unchanged. - pub fn new_add_mode() -> Self { - Self { - mode: RuleMode::Add, - } - } - - /// Create a new rule which works in `Remove` mode; i.e. it simply removes - /// the top-level [`OutputRequirementExec`] from the physical plan if there is - /// any. We do this because a `OutputRequirementExec` is an ancillary, - /// non-executable operator whose sole purpose is to track global - /// requirements during optimization. Therefore, a - /// `OutputRequirementExec` should not appear in the final plan. - pub fn new_remove_mode() -> Self { - Self { - mode: RuleMode::Remove, - } - } -} - -#[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Hash)] -enum RuleMode { - Add, - Remove, -} - -/// An ancillary, non-executable operator whose sole purpose is to track global -/// requirements during optimization. It imposes -/// - the ordering requirement in its `order_requirement` attribute. -/// - the distribution requirement in its `dist_requirement` attribute. -/// -/// See [`OutputRequirements`] for more details -#[derive(Debug)] -pub struct OutputRequirementExec { - input: Arc, - order_requirement: Option, - dist_requirement: Distribution, - cache: Arc, - fetch: Option, -} - -impl OutputRequirementExec { - /// creates new [OutputRequirementExec] instance - pub fn new( - input: Arc, - requirements: Option, - dist_requirement: Distribution, - fetch: Option, - ) -> Self { - let cache = Self::compute_properties(&input, &fetch); - Self { - input, - order_requirement: requirements, - dist_requirement, - cache: Arc::new(cache), - fetch, - } - } - /// returns plan input - pub fn input(&self) -> Arc { - Arc::clone(&self.input) - } - - /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. - fn compute_properties( - input: &Arc, - fetch: &Option, - ) -> PlanProperties { - let boundedness = if fetch.is_some() { - Boundedness::Bounded - } else { - input.boundedness() - }; - - PlanProperties::new( - input.equivalence_properties().clone(), // Equivalence Properties - input.output_partitioning().clone(), // Output Partitioning - input.pipeline_behavior(), // Pipeline Behavior - boundedness, // Boundedness - ) - } - - /// Get fetch - pub fn fetch(&self) -> Option { - self.fetch - } -} - -impl DisplayAs for OutputRequirementExec { - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - let order_cols = self - .order_requirement - .as_ref() - .map(|reqs| reqs.first()) - .map(|lex| { - let pairs: Vec = lex - .iter() - .map(|req| { - let direction = req - .options - .as_ref() - .map( - |opt| if opt.descending { "desc" } else { "asc" }, - ) - .unwrap_or("unspecified"); - format!("({}, {direction})", req.expr) - }) - .collect(); - format!("[{}]", pairs.join(", ")) - }) - .unwrap_or_else(|| "[]".to_string()); - - write!( - f, - "OutputRequirementExec: order_by={}, dist_by={}", - order_cols, self.dist_requirement - ) - } - DisplayFormatType::TreeRender => { - write!(f, "") - } - } - } -} - -impl ExecutionPlan for OutputRequirementExec { - fn name(&self) -> &'static str { - "OutputRequirementExec" - } - - fn properties(&self) -> &Arc { - &self.cache - } - - fn benefits_from_input_partitioning(&self) -> Vec { - vec![false] - } - - fn required_input_distribution(&self) -> Vec { - vec![self.dist_requirement.clone()] - } - - fn maintains_input_order(&self) -> Vec { - vec![true] - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.input] - } - - fn required_input_ordering(&self) -> Vec> { - vec![self.order_requirement.clone()] - } - - fn with_new_children( - self: Arc, - mut children: Vec>, - ) -> Result> { - Ok(Arc::new(Self::new( - children.remove(0), // has a single child - self.order_requirement.clone(), - self.dist_requirement.clone(), - self.fetch, - ))) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> Result { - unreachable!(); - } - - fn partition_statistics( - &self, - partition: Option, - ) -> datafusion::error::Result { - self.input.partition_statistics(partition) - } - - fn try_swapping_with_projection( - &self, - projection: &ProjectionExec, - ) -> Result>> { - // If the projection does not narrow the schema, we should not try to push it down: - let proj_exprs = projection.expr(); - if proj_exprs.len() >= projection.input().schema().fields().len() { - return Ok(None); - } - - let mut requirements = self.required_input_ordering().swap_remove(0); - if let Some(reqs) = requirements { - let mut updated_reqs = vec![]; - let (lexes, soft) = reqs.into_alternatives(); - for lex in lexes.into_iter() { - let Some(updated_lex) = update_ordering_requirement(lex, proj_exprs)? - else { - return Ok(None); - }; - updated_reqs.push(updated_lex); - } - requirements = OrderingRequirements::new_alternatives(updated_reqs, soft); - } - - let dist_req = match &self.required_input_distribution()[0] { - Distribution::HashPartitioned(exprs) => { - let mut updated_exprs = vec![]; - for expr in exprs { - let Some(new_expr) = update_expr(expr, projection.expr(), false)? - else { - return Ok(None); - }; - updated_exprs.push(new_expr); - } - Distribution::HashPartitioned(updated_exprs) - } - dist => dist.clone(), - }; - - make_with_child(projection, &self.input()).map(|input| { - let e = OutputRequirementExec::new(input, requirements, dist_req, self.fetch); - Some(Arc::new(e) as _) - }) - } - - fn fetch(&self) -> Option { - self.fetch - } - - fn as_any(&self) -> &dyn std::any::Any { - self - } -} - -impl PhysicalOptimizerRule for OutputRequirements { - fn optimize( - &self, - plan: Arc, - _config: &ConfigOptions, - ) -> Result> { - match self.mode { - RuleMode::Add => require_top_ordering(plan), - RuleMode::Remove => plan - .transform_up(|plan| { - if let Some(sort_req) = - plan.as_any().downcast_ref::() - { - Ok(Transformed::yes(sort_req.input())) - } else { - Ok(Transformed::no(plan)) - } - }) - .data(), - } - } - - fn name(&self) -> &str { - "OutputRequirements" - } - - fn schema_check(&self) -> bool { - true - } -} - -/// This functions adds ancillary `OutputRequirementExec` to the physical plan, so that -/// global requirements are not lost during optimization. -/// -/// Idempotent: if the plan is already topped by an `OutputRequirementExec`, it -/// is returned unchanged so that re-running this rule (as adaptive execution -/// in datafusion-ballista AQE does after every completed stage, see -/// datafusion-ballista#1359) does not stack wrappers. -fn require_top_ordering(plan: Arc) -> Result> { - if plan - .as_any() - .downcast_ref::() - .is_some() - { - return Ok(plan); - } - let (new_plan, is_changed) = require_top_ordering_helper(plan)?; - if is_changed { - Ok(new_plan) - } else { - // Add `OutputRequirementExec` to the top, with no specified ordering and distribution requirement. - Ok(Arc::new(OutputRequirementExec::new( - new_plan, - // there is no ordering requirement - None, - Distribution::UnspecifiedDistribution, - None, - )) as _) - } -} - -/// Helper function that adds an ancillary `OutputRequirementExec` to the given plan. -/// First entry in the tuple is resulting plan, second entry indicates whether any -/// `OutputRequirementExec` is added to the plan. -fn require_top_ordering_helper( - plan: Arc, -) -> Result<(Arc, bool)> { - let mut children = plan.children(); - // Global ordering defines desired ordering in the final result. - if children.len() != 1 { - Ok((plan, false)) - } else if let Some(sort_exec) = plan.as_any().downcast_ref::() { - // In case of constant columns, output ordering of the `SortExec` would - // be an empty set. Therefore; we check the sort expression field to - // assign the requirements. - let req_dist = sort_exec.required_input_distribution().swap_remove(0); - let req_ordering = sort_exec.expr(); - let reqs = OrderingRequirements::from(req_ordering.clone()); - let fetch = sort_exec.fetch(); - - Ok(( - Arc::new(OutputRequirementExec::new( - plan, - Some(reqs), - req_dist, - fetch, - )) as _, - true, - )) - } else if let Some(spm) = plan.as_any().downcast_ref::() { - let reqs = OrderingRequirements::from(spm.expr().clone()); - let fetch = spm.fetch(); - Ok(( - Arc::new(OutputRequirementExec::new( - plan, - Some(reqs), - Distribution::SinglePartition, - fetch, - )) as _, - true, - )) - } else if plan.maintains_input_order()[0] - && (plan.required_input_ordering()[0] - .as_ref() - .is_none_or(|o| matches!(o, OrderingRequirements::Soft(_)))) - { - // Keep searching for a `SortExec` as long as ordering is maintained, - // and on-the-way operators do not themselves require an ordering. - // When an operator requires an ordering, any `SortExec` below can not - // be responsible for (i.e. the originator of) the global ordering. - let (new_child, is_changed) = - require_top_ordering_helper(Arc::clone(children.swap_remove(0)))?; - - let plan = if is_changed { - plan.with_new_children(vec![new_child])? - } else { - plan - }; - - Ok((plan, is_changed)) - } else { - // Stop searching, there is no global ordering desired for the query. - Ok((plan, false)) - } -} - -// See tests in datafusion/core/tests/physical_optimizer diff --git a/ballista/scheduler/src/state/aqe/planner.rs b/ballista/scheduler/src/state/aqe/planner.rs index 7584fd6ea2..762e683dbd 100644 --- a/ballista/scheduler/src/state/aqe/planner.rs +++ b/ballista/scheduler/src/state/aqe/planner.rs @@ -1,5 +1,3 @@ -use crate::physical_optimizer::filter_pushdown::FilterPushdown; -use crate::physical_optimizer::output_requirements::OutputRequirements; // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -16,6 +14,7 @@ use crate::physical_optimizer::output_requirements::OutputRequirements; // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +use crate::physical_optimizer::filter_pushdown::FilterPushdown; use crate::state::aqe::adapter::BallistaAdapter; use crate::state::aqe::execution_plan::{AdaptiveDatafusionExec, ExchangeExec}; use crate::state::aqe::optimizer_rule::chaos_exec::ChaosCreatingRule; @@ -23,7 +22,6 @@ use crate::state::aqe::optimizer_rule::{ CoalescePartitionsRule, DelayJoinSelectionRule, DistributedExchangeRule, PropagateEmptyExecRule, SelectJoinRule, }; - use crate::state::distributed_explain::handle_explain_plan; use crate::state::execution_stage::StageOutput; use ballista_core::execution_plans::ShuffleWriter; @@ -35,20 +33,6 @@ use datafusion::execution::context::SessionContext; use datafusion::execution::{SessionState, SessionStateBuilder}; use datafusion::logical_expr::LogicalPlan; use datafusion::physical_optimizer::PhysicalOptimizerRule; -use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics; -use datafusion::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate; -use datafusion::physical_optimizer::enforce_distribution::EnforceDistribution; -use datafusion::physical_optimizer::enforce_sorting::EnforceSorting; -use datafusion::physical_optimizer::ensure_coop::EnsureCooperative; -use datafusion::physical_optimizer::join_selection::JoinSelection; -use datafusion::physical_optimizer::limit_pushdown::LimitPushdown; -use datafusion::physical_optimizer::limit_pushdown_past_window::LimitPushPastWindows; -use datafusion::physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggregation; -use datafusion::physical_optimizer::projection_pushdown::ProjectionPushdown; -use datafusion::physical_optimizer::pushdown_sort::PushdownSort; -use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan; -use datafusion::physical_optimizer::topk_aggregation::TopKAggregation; -use datafusion::physical_optimizer::update_aggr_exprs::OptimizeAggregateOrder; use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, displayable}; use datafusion::physical_planner::DefaultPhysicalPlanner; use datafusion::prelude::SessionConfig; @@ -511,7 +495,9 @@ impl AdaptivePlanner { .push(Arc::new(SelectJoinRule::new(plan_id_generator.clone()))); // add default datafusion set of optimizers - // physical_optimizers.extend(PhysicalOptimizer::new().rules); + // physical_optimizers.extend( + // datafusion::physical_optimizer::optimizer::PhysicalOptimizer::new().rules, + // ); // physical_optimizers.extend(Self::datafusion_optimizers()); @@ -527,28 +513,16 @@ impl AdaptivePlanner { // physical_optimizers.extend(PhysicalOptimizer::new().rules); // ``` // as there are issues with some of them + // TODO: replace this once we update to datafusion 55 fn datafusion_optimizers() -> Vec> { - vec![ - Arc::new(OutputRequirements::new_add_mode()), // temporary fix - Arc::new(AggregateStatistics::new()), - Arc::new(JoinSelection::new()), - Arc::new(LimitedDistinctAggregation::new()), - Arc::new(FilterPushdown::new()), // temporary fix - Arc::new(EnforceDistribution::new()), - Arc::new(CombinePartialFinalAggregate::new()), - Arc::new(EnforceSorting::new()), - Arc::new(OptimizeAggregateOrder::new()), - Arc::new(ProjectionPushdown::new()), - Arc::new(OutputRequirements::new_remove_mode()), - Arc::new(TopKAggregation::new()), - Arc::new(LimitPushPastWindows::new()), - Arc::new(LimitPushdown::new()), - Arc::new(ProjectionPushdown::new()), - Arc::new(PushdownSort::new()), - Arc::new(EnsureCooperative::new()), - Arc::new(FilterPushdown::new_post_optimization()), - Arc::new(SanityCheckPlan::new()), - ] + datafusion::physical_optimizer::optimizer::PhysicalOptimizer::new() + .rules + .into_iter() + .map(|r| match r.name() { + "FilterPushdown" => Arc::new(FilterPushdown::new()), + _ => r, + }) + .collect() } /// set of rules which will be executed ONCE before