From b3a864b6b0310734f1abaf5084d2095047f451a0 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 17 May 2026 00:42:40 -0700 Subject: [PATCH 1/5] refactor: add TableScanBuilder, deprecate TableScan::try_new `TableScan::try_new` takes five positional arguments and bare `TableScan { .. }` struct literals are scattered across the codebase, making both fragile to field additions. Introduce `TableScanBuilder` (with `From`, so an existing scan can be decomposed, tweaked, and rebuilt) and move the schema-derivation logic into `build()`. `TableScan::try_new` is now deprecated and delegates to the builder; all in-tree callers are migrated to the builder. Pure refactor, no behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/expr/src/logical_plan/builder.rs | 11 ++- datafusion/expr/src/logical_plan/mod.rs | 4 +- datafusion/expr/src/logical_plan/plan.rs | 88 +++++++++++++++++-- .../optimizer/src/optimize_projections/mod.rs | 20 ++--- datafusion/proto/src/logical_plan/mod.rs | 18 ++-- 5 files changed, 103 insertions(+), 38 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 017a123eb035b..13605fad30844 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -33,8 +33,8 @@ use crate::expr_rewriter::{ use crate::logical_plan::{ Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, - Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values, - Window, + Projection, Repartition, Sort, SubqueryAlias, TableScanBuilder, Union, Unnest, + Values, Window, }; use crate::select_expr::SelectExpr; use crate::utils::{ @@ -510,8 +510,11 @@ impl LogicalPlanBuilder { filters: Vec, fetch: Option, ) -> Result { - let table_scan = - TableScan::try_new(table_name, table_source, projection, filters, fetch)?; + let table_scan = TableScanBuilder::new(table_name, table_source) + .with_projection(projection) + .with_filters(filters) + .with_fetch(fetch) + .build()?; // Inline TableScan if table_scan.filters.is_empty() diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index c2b01868c97f3..5087b25178ab6 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -42,8 +42,8 @@ pub use plan::{ EmptyRelation, Explain, ExplainOption, Extension, FetchType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery, - SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, - projection_schema, + SubqueryAlias, TableScan, TableScanBuilder, ToStringifiedPlan, Union, Unnest, Values, + Window, projection_schema, }; pub use statement::{ Deallocate, Execute, Prepare, ResetVariable, SetVariable, Statement, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index c572b202f03ce..02de912ee7810 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2854,6 +2854,7 @@ impl Hash for TableScan { impl TableScan { /// Initialize TableScan with appropriate schema from the given /// arguments. + #[deprecated(since = "54.0.0", note = "use `TableScanBuilder` instead")] pub fn try_new( table_name: impl Into, table_source: Arc, @@ -2861,14 +2862,79 @@ impl TableScan { filters: Vec, fetch: Option, ) -> Result { - let table_name = table_name.into(); + TableScanBuilder::new(table_name, table_source) + .with_projection(projection) + .with_filters(filters) + .with_fetch(fetch) + .build() + } +} + +/// Builder for [`TableScan`]. +/// +/// Prefer this over constructing a [`TableScan`] directly: it derives the +/// `projected_schema` from the source schema and projection, and is resilient +/// to new fields being added to [`TableScan`]. An existing scan can be turned +/// back into a builder with `TableScanBuilder::from(scan)`, tweaked, and +/// rebuilt with [`TableScanBuilder::build`]. +pub struct TableScanBuilder { + table_name: TableReference, + source: Arc, + projection: Option>, + filters: Vec, + fetch: Option, +} + +impl TableScanBuilder { + /// Create a new builder for a scan of `source` named `table_name`. + pub fn new( + table_name: impl Into, + source: Arc, + ) -> Self { + Self { + table_name: table_name.into(), + source, + projection: None, + filters: vec![], + fetch: None, + } + } + + /// Set the column projection (indices into the source schema). + pub fn with_projection(mut self, projection: Option>) -> Self { + self.projection = projection; + self + } + + /// Set the filter expressions offered to the table provider. + pub fn with_filters(mut self, filters: Vec) -> Self { + self.filters = filters; + self + } + + /// Set the maximum number of rows to read. + pub fn with_fetch(mut self, fetch: Option) -> Self { + self.fetch = fetch; + self + } + + /// Build the [`TableScan`], deriving its `projected_schema` from the + /// source schema and projection. + pub fn build(self) -> Result { + let TableScanBuilder { + table_name, + source, + projection, + filters, + fetch, + } = self; if table_name.table().is_empty() { return plan_err!("table_name cannot be empty"); } - let schema = table_source.schema(); + let schema = source.schema(); let func_dependencies = FunctionalDependencies::new_from_constraints( - table_source.constraints(), + source.constraints(), schema.fields.len(), ); let projected_schema = projection @@ -2894,9 +2960,9 @@ impl TableScan { })?; let projected_schema = Arc::new(projected_schema); - Ok(Self { + Ok(TableScan { table_name, - source: table_source, + source, projection, projected_schema, filters, @@ -2905,6 +2971,18 @@ impl TableScan { } } +impl From for TableScanBuilder { + fn from(scan: TableScan) -> Self { + Self { + table_name: scan.table_name, + source: scan.source, + projection: scan.projection, + filters: scan.filters, + fetch: scan.fetch, + } + } +} + // Repartition the plan based on a partitioning scheme. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] pub struct Repartition { diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index af944abc6f0b4..dbc2a4c2c03fe 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -29,8 +29,8 @@ use datafusion_common::{ }; use datafusion_expr::expr::Alias; use datafusion_expr::{ - Aggregate, Distinct, EmptyRelation, Expr, Projection, TableScan, Unnest, Window, - logical_plan::LogicalPlan, + Aggregate, Distinct, EmptyRelation, Expr, Projection, TableScanBuilder, Unnest, + Window, logical_plan::LogicalPlan, }; use crate::optimize_projections::required_indices::RequiredIndices; @@ -269,23 +269,15 @@ fn optimize_projections( .transform_data(|plan| optimize_subqueries(plan, config)); } LogicalPlan::TableScan(table_scan) => { - let TableScan { - table_name, - source, - projection, - filters, - fetch, - projected_schema: _, - } = table_scan; - // Get indices referred to in the original (schema with all fields) // given projected indices. - let projection = match &projection { + let projection = match &table_scan.projection { Some(projection) => indices.into_mapped_indices(|idx| projection[idx]), None => indices.into_inner(), }; - let new_scan = - TableScan::try_new(table_name, source, Some(projection), filters, fetch)?; + let new_scan = TableScanBuilder::from(table_scan) + .with_projection(Some(projection)) + .build()?; return Transformed::yes(LogicalPlan::TableScan(new_scan)) .transform_data(|plan| optimize_subqueries(plan, config)); diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 8228e8e6f2ff0..5127024b655ca 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -39,8 +39,8 @@ use datafusion_catalog::empty::EmptyTable; use datafusion_common::file_options::file_type::FileType; use datafusion_common::format::ExplainFormat; use datafusion_common::{ - Result, TableReference, ToDFSchema, assert_or_internal_err, context, - internal_datafusion_err, internal_err, not_impl_err, plan_err, + Result, TableReference, assert_or_internal_err, context, internal_datafusion_err, + internal_err, not_impl_err, plan_err, }; use datafusion_datasource::file_format::FileFormat; use datafusion_datasource::file_format::{ @@ -65,8 +65,8 @@ use datafusion_expr::{ logical_plan::{ Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation, Extension, Join, JoinConstraint, Prepare, - Projection, Repartition, Sort, SubqueryAlias, TableScan, Values, Window, - builder::project, + Projection, Repartition, Sort, SubqueryAlias, TableScan, TableScanBuilder, + Values, Window, builder::project, }, }; @@ -371,15 +371,7 @@ fn from_table_source( target: Arc, extension_codec: &dyn LogicalExtensionCodec, ) -> Result { - let projected_schema = target.schema().to_dfschema_ref()?; - let r = LogicalPlan::TableScan(TableScan { - table_name, - source: target, - projection: None, - projected_schema, - filters: vec![], - fetch: None, - }); + let r = LogicalPlan::TableScan(TableScanBuilder::new(table_name, target).build()?); LogicalPlanNode::try_from_logical_plan(&r, extension_codec) } From f88ac9e254e56ca090592f288b12f2233c5d26d1 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 17 May 2026 00:44:47 -0700 Subject: [PATCH 2/5] feat: add StatisticsRequest type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `StatisticsRequest` to `datafusion-expr-common::statistics` — a small vocabulary for query-aware statistics: a caller can ask a provider for a specific statistic (Min/Max/NullCount/DistinctCount/Sum/ByteSize per column, plus RowCount and TotalByteSize) instead of for a dense `Statistics` covering every column. It is intentionally just a vocabulary; nothing in DataFusion populates or consumes it yet. Re-exported via `datafusion_expr::statistics`. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/expr-common/src/statistics.rs | 41 ++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/datafusion/expr-common/src/statistics.rs b/datafusion/expr-common/src/statistics.rs index c94c181615aed..47dece88a6073 100644 --- a/datafusion/expr-common/src/statistics.rs +++ b/datafusion/expr-common/src/statistics.rs @@ -1694,3 +1694,44 @@ mod tests { all_ops.into_iter().collect() } } + +// --------------------------------------------------------------------------- +// Query-aware statistics requests. +// +// A small extension to the existing `Statistics` model: instead of "give me +// everything you have for every column", a caller can ask for a specific list +// of stats by name. `StatisticsRequest` is just that vocabulary — DataFusion +// itself does not populate or consume it. It exists so a request can be +// threaded from a `TableScan` (see `TableScan::statistics_requests`) through +// `ScanArgs::statistics_requests` to a `TableProvider`, which is enough for a +// query-aware statistics feature to be implemented outside of DataFusion. +// --------------------------------------------------------------------------- + +use datafusion_common::Column; + +/// A statistic a caller would like a provider to supply, if it can do so +/// cheaply. +/// +/// Each variant maps onto a field of [`datafusion_common::Statistics`] / +/// [`datafusion_common::ColumnStatistics`], so a provider that already +/// populates one can answer the request trivially. +#[derive(Debug, Clone, PartialEq)] +pub enum StatisticsRequest { + /// Smallest non-null value of `column`. + Min(Column), + /// Largest non-null value of `column`. + Max(Column), + /// Number of NULLs in `column`. + NullCount(Column), + /// Number of distinct values in `column` (exact or estimated). + DistinctCount(Column), + /// Sum of values in `column` (numerics, widened per + /// `ColumnStatistics::sum_value`). + Sum(Column), + /// Encoded/output byte size of `column`. + ByteSize(Column), + /// Number of rows in the container (table / file). + RowCount, + /// Total byte size of the container's output. + TotalByteSize, +} From 7e839c09302b2ae7796e2945fa5839a43db3928e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 17 May 2026 00:50:34 -0700 Subject: [PATCH 3/5] feat: add TableScan::statistics_requests field MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an advisory `statistics_requests: Vec` field to `TableScan`. A custom optimizer rule can attach the statistics the surrounding plan shape would benefit from (e.g. Min/Max for sort keys) via `TableScan::with_statistics_requests` or the new `TableScanBuilder::with_statistics_requests`; the physical planner will thread them into the table provider (next commit). The field is empty by default and DataFusion's own rules never populate it. `Debug`/`PartialEq`/`Eq`/`Hash`/`PartialOrd` for `TableScan` are left unchanged — it is advisory metadata, not part of plan identity. `map_expressions` in `tree_node.rs` is rewritten to rebuild `TableScan` via `..scan` instead of an exhaustive destructure, so it carries this (and any future) field through untouched. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/expr/src/logical_plan/plan.rs | 39 +++++++++++++++++++ datafusion/expr/src/logical_plan/tree_node.rs | 24 ++++-------- datafusion/optimizer/src/push_down_filter.rs | 1 + 3 files changed, 48 insertions(+), 16 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 02de912ee7810..0d7a6650e6384 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -50,6 +50,7 @@ use crate::{ WindowFunctionDefinition, build_join_schema, expr_vec_fmt, requalify_sides_if_needed, }; +use crate::statistics::StatisticsRequest; use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef}; use datafusion_common::cse::{NormalizeEq, Normalizeable}; use datafusion_common::format::ExplainFormat; @@ -2780,6 +2781,13 @@ pub struct TableScan { pub filters: Vec, /// Optional number of rows to read pub fetch: Option, + /// Statistics the planner would like the provider to answer for this + /// scan, typically attached by a custom optimizer rule from the + /// surrounding plan shape (e.g. Min/Max for sort keys). Threaded into + /// the table provider via `ScanArgs::statistics_requests` at + /// physical-planning time. Advisory and empty by default; DataFusion's + /// own rules never populate it. + pub statistics_requests: Vec, } impl Debug for TableScan { @@ -2868,6 +2876,20 @@ impl TableScan { .with_fetch(fetch) .build() } + + /// Attach the statistics the planner would like the provider to answer + /// for this scan. See [`Self::statistics_requests`]. + /// + /// Intended for custom optimizer rules annotating an existing + /// `TableScan`; the physical planner threads the result into + /// `ScanArgs::statistics_requests`. + pub fn with_statistics_requests( + mut self, + statistics_requests: Vec, + ) -> Self { + self.statistics_requests = statistics_requests; + self + } } /// Builder for [`TableScan`]. @@ -2883,6 +2905,7 @@ pub struct TableScanBuilder { projection: Option>, filters: Vec, fetch: Option, + statistics_requests: Vec, } impl TableScanBuilder { @@ -2897,6 +2920,7 @@ impl TableScanBuilder { projection: None, filters: vec![], fetch: None, + statistics_requests: vec![], } } @@ -2918,6 +2942,16 @@ impl TableScanBuilder { self } + /// Set the statistics requests for the scan. See + /// [`TableScan::statistics_requests`]. + pub fn with_statistics_requests( + mut self, + statistics_requests: Vec, + ) -> Self { + self.statistics_requests = statistics_requests; + self + } + /// Build the [`TableScan`], deriving its `projected_schema` from the /// source schema and projection. pub fn build(self) -> Result { @@ -2927,6 +2961,7 @@ impl TableScanBuilder { projection, filters, fetch, + statistics_requests, } = self; if table_name.table().is_empty() { @@ -2967,6 +3002,7 @@ impl TableScanBuilder { projected_schema, filters, fetch, + statistics_requests, }) } } @@ -2979,6 +3015,7 @@ impl From for TableScanBuilder { projection: scan.projection, filters: scan.filters, fetch: scan.fetch, + statistics_requests: scan.statistics_requests, } } } @@ -5217,6 +5254,7 @@ mod tests { projected_schema: Arc::clone(&schema), filters: vec![], fetch: None, + statistics_requests: vec![], })); let col = schema.field_names()[0].clone(); @@ -5247,6 +5285,7 @@ mod tests { projected_schema: Arc::clone(&unique_schema), filters: vec![], fetch: None, + statistics_requests: vec![], })); let col = schema.field_names()[0].clone(); diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index ef9382a57209a..eb461b0319b1c 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -608,23 +608,15 @@ impl LogicalPlan { Transformed::new(plan, exprs.transformed, exprs.tnr) } } - LogicalPlan::TableScan(TableScan { - table_name, - source, - projection, - projected_schema, - filters, - fetch, - }) => filters.map_elements(f)?.update_data(|filters| { - LogicalPlan::TableScan(TableScan { - table_name, - source, - projection, - projected_schema, - filters, - fetch, + LogicalPlan::TableScan(mut scan) => { + // Only `filters` carry expressions; take them out, rewrite, + // and rebuild with `..scan` so every other field (including + // any added later) is carried through untouched. + let filters = std::mem::take(&mut scan.filters); + filters.map_elements(f)?.update_data(|filters| { + LogicalPlan::TableScan(TableScan { filters, ..scan }) }) - }), + } LogicalPlan::Distinct(Distinct::On(DistinctOn { on_expr, select_expr, diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 3564db1ebef4a..601ee316555b6 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -3168,6 +3168,7 @@ mod tests { projection, source: Arc::new(test_provider), fetch: None, + statistics_requests: vec![], }); Ok(LogicalPlanBuilder::from(table_scan)) From cc01ceff03b0e113e397ee99da594f10c3a17185 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 17 May 2026 00:53:38 -0700 Subject: [PATCH 4/5] feat: thread statistics requests into ScanArgs Add a `statistics_requests` field to `ScanArgs` (with `with_statistics_requests` / `statistics_requests` accessors) and have the physical planner thread `TableScan::statistics_requests` into it. This completes the request-side path: a custom optimizer rule annotates `TableScan`, and the request reaches a custom `TableProvider` in `scan_with_args`. DataFusion's own providers ignore the field; the default `ScanArgs` value is an empty slice. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/catalog/src/table.rs | 23 +++++++++++++++++++++++ datafusion/core/src/physical_planner.rs | 4 +++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 5d1391bed1172..3ae0b2ed10a95 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -26,6 +26,7 @@ use async_trait::async_trait; use datafusion_common::{Constraints, Statistics, not_impl_err}; use datafusion_common::{Result, internal_err}; use datafusion_expr::Expr; +use datafusion_expr::statistics::StatisticsRequest; use datafusion_expr::dml::InsertOp; use datafusion_expr::{ @@ -406,6 +407,7 @@ pub struct ScanArgs<'a> { filters: Option<&'a [Expr]>, projection: Option<&'a [usize]>, limit: Option, + statistics_requests: &'a [StatisticsRequest], } impl<'a> ScanArgs<'a> { @@ -467,6 +469,27 @@ impl<'a> ScanArgs<'a> { pub fn limit(&self) -> Option { self.limit } + + /// Set the statistics the caller would like the provider to answer for + /// this scan, if it can do so cheaply. + /// + /// Providers read these via [`Self::statistics_requests()`]; anything a + /// provider cannot answer cheaply it simply ignores. DataFusion's own + /// `TableProvider`s ignore this field — it exists so a request can be + /// threaded from a custom optimizer rule (which annotates + /// `TableScan::statistics_requests`) through to a custom provider. + pub fn with_statistics_requests( + mut self, + statistics_requests: &'a [StatisticsRequest], + ) -> Self { + self.statistics_requests = statistics_requests; + self + } + + /// Get the statistics requests for the scan. Empty if none were set. + pub fn statistics_requests(&self) -> &'a [StatisticsRequest] { + self.statistics_requests + } } /// Result of a table scan operation from [`TableProvider::scan_with_args`]. diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 3b2c7a78e898e..6cd1ea319e57a 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -645,6 +645,7 @@ impl DefaultPhysicalPlanner { filters, fetch, projected_schema, + statistics_requests, .. } = scan; @@ -657,7 +658,8 @@ impl DefaultPhysicalPlanner { let opts = ScanArgs::default() .with_projection(projection.as_deref()) .with_filters(Some(&filters_vec)) - .with_limit(*fetch); + .with_limit(*fetch) + .with_statistics_requests(statistics_requests); let res = source.scan_with_args(session_state, opts).await?; Arc::clone(res.plan()) } else { From 070700dbc50f557e45ee3c0d83551c39eff1428b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 17 May 2026 01:00:18 -0700 Subject: [PATCH 5/5] test: e2e statistics-request flow via a custom optimizer rule Add a self-contained integration test that plays both external roles: a custom `OptimizerRule` annotates each `TableScan` with `StatisticsRequest`s, and a custom `TableProvider` records the `ScanArgs::statistics_requests` it receives in `scan_with_args`. This demonstrates the request-side hooks are sufficient to build the feature entirely outside of DataFusion. A second test confirms that without such a rule the provider sees an empty request list. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/core/tests/user_defined/mod.rs | 4 + .../tests/user_defined/statistics_requests.rs | 214 ++++++++++++++++++ 2 files changed, 218 insertions(+) create mode 100644 datafusion/core/tests/user_defined/statistics_requests.rs diff --git a/datafusion/core/tests/user_defined/mod.rs b/datafusion/core/tests/user_defined/mod.rs index bc9949f5d681c..4dad3ec4577d9 100644 --- a/datafusion/core/tests/user_defined/mod.rs +++ b/datafusion/core/tests/user_defined/mod.rs @@ -41,3 +41,7 @@ mod relation_planner; /// Tests for insert operations mod insert_operation; + +/// Tests for `StatisticsRequest`s flowing from a custom optimizer rule +/// through the physical planner into a custom `TableProvider`. +mod statistics_requests; diff --git a/datafusion/core/tests/user_defined/statistics_requests.rs b/datafusion/core/tests/user_defined/statistics_requests.rs new file mode 100644 index 0000000000000..83a1e0cb503bc --- /dev/null +++ b/datafusion/core/tests/user_defined/statistics_requests.rs @@ -0,0 +1,214 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! End-to-end test that a *custom* optimizer rule can annotate a +//! `TableScan` with `StatisticsRequest`s and have them reach a *custom* +//! `TableProvider`'s `scan_with_args`. +//! +//! DataFusion ships no rule that populates `TableScan::statistics_requests` +//! and no provider that consumes `ScanArgs::statistics_requests`. This test +//! plays both roles, demonstrating that the request-side hooks are +//! sufficient to build the whole feature outside of DataFusion. + +use std::sync::{Arc, Mutex}; + +use arrow::array::{Int64Array, RecordBatch}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion::catalog::{ScanArgs, ScanResult, Session, TableProvider}; +use datafusion::common::tree_node::Transformed; +use datafusion::common::{Column, Result}; +use datafusion::datasource::TableType; +use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::execution::context::SessionContext; +use datafusion::execution::session_state::SessionStateBuilder; +use datafusion::logical_expr::statistics::StatisticsRequest; +use datafusion::logical_expr::{Expr, LogicalPlan}; +use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule}; +use datafusion::physical_plan::ExecutionPlan; + +/// A custom optimizer rule that annotates every `TableScan` with a +/// `RowCount` request plus a `Min` request for each of its columns. +/// +/// This stands in for whatever request-derivation logic an external +/// implementer would write (e.g. Min/Max for sort keys, DistinctCount for +/// join keys). Here it is intentionally trivial and deterministic. +#[derive(Debug)] +struct RequestColumnStatistics; + +impl OptimizerRule for RequestColumnStatistics { + fn name(&self) -> &str { + "test_request_column_statistics" + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + let LogicalPlan::TableScan(scan) = plan else { + return Ok(Transformed::no(plan)); + }; + // Idempotent: the optimizer runs rules to a fixpoint, so only + // annotate a scan we have not already touched. + if !scan.statistics_requests.is_empty() { + return Ok(Transformed::no(LogicalPlan::TableScan(scan))); + } + let mut requests = vec![StatisticsRequest::RowCount]; + for field in scan.projected_schema.fields() { + requests.push(StatisticsRequest::Min(Column::new_unqualified( + field.name(), + ))); + } + Ok(Transformed::yes(LogicalPlan::TableScan( + scan.with_statistics_requests(requests), + ))) + } +} + +/// A `TableProvider` that records the `statistics_requests` it was asked +/// for, so the test can assert what reached it. +#[derive(Debug)] +struct RecordingTable { + schema: SchemaRef, + batch: RecordBatch, + last_requests: Arc>>, +} + +#[async_trait] +impl TableProvider for RecordingTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Ok(MemorySourceConfig::try_new_exec( + &[vec![self.batch.clone()]], + Arc::clone(&self.schema), + projection.cloned(), + )?) + } + + async fn scan_with_args<'a>( + &self, + state: &dyn Session, + args: ScanArgs<'a>, + ) -> Result { + // Record what reached us, then delegate to `scan`. + *self.last_requests.lock().unwrap() = args.statistics_requests().to_vec(); + let plan = self + .scan( + state, + args.projection().map(|p| p.to_vec()).as_ref(), + args.filters().unwrap_or(&[]), + args.limit(), + ) + .await?; + Ok(ScanResult::new(plan)) + } +} + +fn make_table() -> (Arc, Arc>>) { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Int64, false), + ])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3])), + Arc::new(Int64Array::from(vec![10, 20, 30])), + ], + ) + .unwrap(); + let last_requests = Arc::new(Mutex::new(Vec::new())); + let provider = Arc::new(RecordingTable { + schema, + batch, + last_requests: Arc::clone(&last_requests), + }); + (provider, last_requests) +} + +#[tokio::test] +async fn custom_rule_requests_reach_custom_provider() -> Result<()> { + let (provider, last_requests) = make_table(); + + let state = SessionStateBuilder::new() + .with_default_features() + .with_optimizer_rule(Arc::new(RequestColumnStatistics)) + .build(); + let ctx = SessionContext::new_with_state(state); + ctx.register_table("t", provider)?; + + ctx.sql("SELECT a, b FROM t").await?.collect().await?; + + let got = last_requests.lock().unwrap().clone(); + assert_eq!( + got.len(), + 3, + "expected RowCount + Min(a) + Min(b), got {got:?}" + ); + assert!( + got.contains(&StatisticsRequest::RowCount), + "expected RowCount, got {got:?}" + ); + assert!( + got.contains(&StatisticsRequest::Min(Column::new_unqualified("a"))), + "expected Min(a), got {got:?}" + ); + assert!( + got.contains(&StatisticsRequest::Min(Column::new_unqualified("b"))), + "expected Min(b), got {got:?}" + ); + Ok(()) +} + +#[tokio::test] +async fn no_requests_without_a_rule() -> Result<()> { + // Without a rule populating `TableScan::statistics_requests`, the + // provider sees an empty request list — stock DataFusion behavior. + let (provider, last_requests) = make_table(); + let ctx = SessionContext::new(); + ctx.register_table("t", provider)?; + + ctx.sql("SELECT a, b FROM t").await?.collect().await?; + + assert!( + last_requests.lock().unwrap().is_empty(), + "expected no requests without a custom rule" + ); + Ok(()) +}