Skip to content

Commit 070700d

Browse files
adriangbclaude
andcommitted
test: e2e statistics-request flow via a custom optimizer rule
Add a self-contained integration test that plays both external roles: a custom `OptimizerRule` annotates each `TableScan` with `StatisticsRequest`s, and a custom `TableProvider` records the `ScanArgs::statistics_requests` it receives in `scan_with_args`. This demonstrates the request-side hooks are sufficient to build the feature entirely outside of DataFusion. A second test confirms that without such a rule the provider sees an empty request list. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent cc01cef commit 070700d

2 files changed

Lines changed: 218 additions & 0 deletions

File tree

datafusion/core/tests/user_defined/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,7 @@ mod relation_planner;
4141

4242
/// Tests for insert operations
4343
mod insert_operation;
44+
45+
/// Tests for `StatisticsRequest`s flowing from a custom optimizer rule
46+
/// through the physical planner into a custom `TableProvider`.
47+
mod statistics_requests;
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! End-to-end test that a *custom* optimizer rule can annotate a
19+
//! `TableScan` with `StatisticsRequest`s and have them reach a *custom*
20+
//! `TableProvider`'s `scan_with_args`.
21+
//!
22+
//! DataFusion ships no rule that populates `TableScan::statistics_requests`
23+
//! and no provider that consumes `ScanArgs::statistics_requests`. This test
24+
//! plays both roles, demonstrating that the request-side hooks are
25+
//! sufficient to build the whole feature outside of DataFusion.
26+
27+
use std::sync::{Arc, Mutex};
28+
29+
use arrow::array::{Int64Array, RecordBatch};
30+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
31+
use async_trait::async_trait;
32+
use datafusion::catalog::{ScanArgs, ScanResult, Session, TableProvider};
33+
use datafusion::common::tree_node::Transformed;
34+
use datafusion::common::{Column, Result};
35+
use datafusion::datasource::TableType;
36+
use datafusion::datasource::memory::MemorySourceConfig;
37+
use datafusion::execution::context::SessionContext;
38+
use datafusion::execution::session_state::SessionStateBuilder;
39+
use datafusion::logical_expr::statistics::StatisticsRequest;
40+
use datafusion::logical_expr::{Expr, LogicalPlan};
41+
use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule};
42+
use datafusion::physical_plan::ExecutionPlan;
43+
44+
/// A custom optimizer rule that annotates every `TableScan` with a
45+
/// `RowCount` request plus a `Min` request for each of its columns.
46+
///
47+
/// This stands in for whatever request-derivation logic an external
48+
/// implementer would write (e.g. Min/Max for sort keys, DistinctCount for
49+
/// join keys). Here it is intentionally trivial and deterministic.
50+
#[derive(Debug)]
51+
struct RequestColumnStatistics;
52+
53+
impl OptimizerRule for RequestColumnStatistics {
54+
fn name(&self) -> &str {
55+
"test_request_column_statistics"
56+
}
57+
58+
fn apply_order(&self) -> Option<ApplyOrder> {
59+
Some(ApplyOrder::TopDown)
60+
}
61+
62+
fn supports_rewrite(&self) -> bool {
63+
true
64+
}
65+
66+
fn rewrite(
67+
&self,
68+
plan: LogicalPlan,
69+
_config: &dyn OptimizerConfig,
70+
) -> Result<Transformed<LogicalPlan>> {
71+
let LogicalPlan::TableScan(scan) = plan else {
72+
return Ok(Transformed::no(plan));
73+
};
74+
// Idempotent: the optimizer runs rules to a fixpoint, so only
75+
// annotate a scan we have not already touched.
76+
if !scan.statistics_requests.is_empty() {
77+
return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
78+
}
79+
let mut requests = vec![StatisticsRequest::RowCount];
80+
for field in scan.projected_schema.fields() {
81+
requests.push(StatisticsRequest::Min(Column::new_unqualified(
82+
field.name(),
83+
)));
84+
}
85+
Ok(Transformed::yes(LogicalPlan::TableScan(
86+
scan.with_statistics_requests(requests),
87+
)))
88+
}
89+
}
90+
91+
/// A `TableProvider` that records the `statistics_requests` it was asked
92+
/// for, so the test can assert what reached it.
93+
#[derive(Debug)]
94+
struct RecordingTable {
95+
schema: SchemaRef,
96+
batch: RecordBatch,
97+
last_requests: Arc<Mutex<Vec<StatisticsRequest>>>,
98+
}
99+
100+
#[async_trait]
101+
impl TableProvider for RecordingTable {
102+
fn schema(&self) -> SchemaRef {
103+
Arc::clone(&self.schema)
104+
}
105+
106+
fn table_type(&self) -> TableType {
107+
TableType::Base
108+
}
109+
110+
async fn scan(
111+
&self,
112+
_state: &dyn Session,
113+
projection: Option<&Vec<usize>>,
114+
_filters: &[Expr],
115+
_limit: Option<usize>,
116+
) -> Result<Arc<dyn ExecutionPlan>> {
117+
Ok(MemorySourceConfig::try_new_exec(
118+
&[vec![self.batch.clone()]],
119+
Arc::clone(&self.schema),
120+
projection.cloned(),
121+
)?)
122+
}
123+
124+
async fn scan_with_args<'a>(
125+
&self,
126+
state: &dyn Session,
127+
args: ScanArgs<'a>,
128+
) -> Result<ScanResult> {
129+
// Record what reached us, then delegate to `scan`.
130+
*self.last_requests.lock().unwrap() = args.statistics_requests().to_vec();
131+
let plan = self
132+
.scan(
133+
state,
134+
args.projection().map(|p| p.to_vec()).as_ref(),
135+
args.filters().unwrap_or(&[]),
136+
args.limit(),
137+
)
138+
.await?;
139+
Ok(ScanResult::new(plan))
140+
}
141+
}
142+
143+
fn make_table() -> (Arc<RecordingTable>, Arc<Mutex<Vec<StatisticsRequest>>>) {
144+
let schema = Arc::new(Schema::new(vec![
145+
Field::new("a", DataType::Int64, false),
146+
Field::new("b", DataType::Int64, false),
147+
]));
148+
let batch = RecordBatch::try_new(
149+
Arc::clone(&schema),
150+
vec![
151+
Arc::new(Int64Array::from(vec![1, 2, 3])),
152+
Arc::new(Int64Array::from(vec![10, 20, 30])),
153+
],
154+
)
155+
.unwrap();
156+
let last_requests = Arc::new(Mutex::new(Vec::new()));
157+
let provider = Arc::new(RecordingTable {
158+
schema,
159+
batch,
160+
last_requests: Arc::clone(&last_requests),
161+
});
162+
(provider, last_requests)
163+
}
164+
165+
#[tokio::test]
166+
async fn custom_rule_requests_reach_custom_provider() -> Result<()> {
167+
let (provider, last_requests) = make_table();
168+
169+
let state = SessionStateBuilder::new()
170+
.with_default_features()
171+
.with_optimizer_rule(Arc::new(RequestColumnStatistics))
172+
.build();
173+
let ctx = SessionContext::new_with_state(state);
174+
ctx.register_table("t", provider)?;
175+
176+
ctx.sql("SELECT a, b FROM t").await?.collect().await?;
177+
178+
let got = last_requests.lock().unwrap().clone();
179+
assert_eq!(
180+
got.len(),
181+
3,
182+
"expected RowCount + Min(a) + Min(b), got {got:?}"
183+
);
184+
assert!(
185+
got.contains(&StatisticsRequest::RowCount),
186+
"expected RowCount, got {got:?}"
187+
);
188+
assert!(
189+
got.contains(&StatisticsRequest::Min(Column::new_unqualified("a"))),
190+
"expected Min(a), got {got:?}"
191+
);
192+
assert!(
193+
got.contains(&StatisticsRequest::Min(Column::new_unqualified("b"))),
194+
"expected Min(b), got {got:?}"
195+
);
196+
Ok(())
197+
}
198+
199+
#[tokio::test]
200+
async fn no_requests_without_a_rule() -> Result<()> {
201+
// Without a rule populating `TableScan::statistics_requests`, the
202+
// provider sees an empty request list — stock DataFusion behavior.
203+
let (provider, last_requests) = make_table();
204+
let ctx = SessionContext::new();
205+
ctx.register_table("t", provider)?;
206+
207+
ctx.sql("SELECT a, b FROM t").await?.collect().await?;
208+
209+
assert!(
210+
last_requests.lock().unwrap().is_empty(),
211+
"expected no requests without a custom rule"
212+
);
213+
Ok(())
214+
}

0 commit comments

Comments
 (0)