Skip to content

Commit ae549d8

Browse files
adriangbclaude
andcommitted
test: e2e statistics-request flow via a custom optimizer rule
Add a self-contained integration test that plays both external roles: a custom `OptimizerRule` annotates each `TableScan` with `StatisticsRequest`s, and a custom `TableProvider` records the `ScanArgs::statistics_requests` it receives in `scan_with_args`. This demonstrates the request-side hooks are sufficient to build the feature entirely outside of DataFusion. A second test confirms that without such a rule the provider sees an empty request list. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 53a20d9 commit ae549d8

2 files changed

Lines changed: 222 additions & 0 deletions

File tree

datafusion/core/tests/user_defined/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,7 @@ mod relation_planner;
4141

4242
/// Tests for insert operations
4343
mod insert_operation;
44+
45+
/// Tests for `StatisticsRequest`s flowing from a custom optimizer rule
46+
/// through the physical planner into a custom `TableProvider`.
47+
mod statistics_requests;
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! End-to-end test that a *custom* optimizer rule can annotate a
19+
//! `TableScan` with `StatisticsRequest`s and have them reach a *custom*
20+
//! `TableProvider`'s `scan_with_args`.
21+
//!
22+
//! DataFusion ships no rule that populates `TableScan::statistics_requests`
23+
//! and no provider that consumes `ScanArgs::statistics_requests`. This test
24+
//! plays both roles, demonstrating that the request-side hooks are
25+
//! sufficient to build the whole feature outside of DataFusion.
26+
27+
use std::sync::{Arc, Mutex};
28+
29+
use arrow::array::{Int64Array, RecordBatch};
30+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
31+
use async_trait::async_trait;
32+
use datafusion::catalog::{ScanArgs, ScanResult, Session, TableProvider};
33+
use datafusion::common::tree_node::Transformed;
34+
use datafusion::common::{Column, Result};
35+
use datafusion::datasource::TableType;
36+
use datafusion::datasource::memory::MemorySourceConfig;
37+
use datafusion::execution::context::SessionContext;
38+
use datafusion::execution::session_state::SessionStateBuilder;
39+
use datafusion::logical_expr::statistics::StatisticsRequest;
40+
use datafusion::logical_expr::{Expr, LogicalPlan};
41+
use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule};
42+
use datafusion::physical_plan::ExecutionPlan;
43+
44+
/// A custom optimizer rule that annotates every `TableScan` with a
45+
/// `RowCount` request plus a `Min` request for each of its columns.
46+
///
47+
/// This stands in for whatever request-derivation logic an external
48+
/// implementer would write (e.g. Min/Max for sort keys, DistinctCount for
49+
/// join keys). Here it is intentionally trivial and deterministic.
50+
#[derive(Debug)]
51+
struct RequestColumnStatistics;
52+
53+
impl OptimizerRule for RequestColumnStatistics {
54+
fn name(&self) -> &str {
55+
"test_request_column_statistics"
56+
}
57+
58+
fn apply_order(&self) -> Option<ApplyOrder> {
59+
Some(ApplyOrder::TopDown)
60+
}
61+
62+
fn supports_rewrite(&self) -> bool {
63+
true
64+
}
65+
66+
fn rewrite(
67+
&self,
68+
plan: LogicalPlan,
69+
_config: &dyn OptimizerConfig,
70+
) -> Result<Transformed<LogicalPlan>> {
71+
let LogicalPlan::TableScan(mut scan) = plan else {
72+
return Ok(Transformed::no(plan));
73+
};
74+
// Insert into the scan's existing request set. `BTreeSet::insert`
75+
// reports whether the value was new, so the rule is idempotent — and
76+
// composes with other rules' requests for free: re-inserting an
77+
// existing request is a no-op, and we report `Transformed::yes` only
78+
// when something was actually added, so the optimizer reaches a
79+
// fixpoint without a manual "already visited" guard.
80+
let mut changed =
81+
scan.statistics_requests.insert(StatisticsRequest::RowCount);
82+
for field in scan.projected_schema.fields() {
83+
let req =
84+
StatisticsRequest::Min(Column::new_unqualified(field.name()));
85+
changed |= scan.statistics_requests.insert(req);
86+
}
87+
Ok(if changed {
88+
Transformed::yes(LogicalPlan::TableScan(scan))
89+
} else {
90+
Transformed::no(LogicalPlan::TableScan(scan))
91+
})
92+
}
93+
}
94+
95+
/// A `TableProvider` that records the `statistics_requests` it was asked
96+
/// for, so the test can assert what reached it.
97+
#[derive(Debug)]
98+
struct RecordingTable {
99+
schema: SchemaRef,
100+
batch: RecordBatch,
101+
last_requests: Arc<Mutex<Vec<StatisticsRequest>>>,
102+
}
103+
104+
#[async_trait]
105+
impl TableProvider for RecordingTable {
106+
fn schema(&self) -> SchemaRef {
107+
Arc::clone(&self.schema)
108+
}
109+
110+
fn table_type(&self) -> TableType {
111+
TableType::Base
112+
}
113+
114+
async fn scan(
115+
&self,
116+
_state: &dyn Session,
117+
projection: Option<&Vec<usize>>,
118+
_filters: &[Expr],
119+
_limit: Option<usize>,
120+
) -> Result<Arc<dyn ExecutionPlan>> {
121+
Ok(MemorySourceConfig::try_new_exec(
122+
&[vec![self.batch.clone()]],
123+
Arc::clone(&self.schema),
124+
projection.cloned(),
125+
)?)
126+
}
127+
128+
async fn scan_with_args<'a>(
129+
&self,
130+
state: &dyn Session,
131+
args: ScanArgs<'a>,
132+
) -> Result<ScanResult> {
133+
// Record what reached us, then delegate to `scan`.
134+
*self.last_requests.lock().unwrap() = args.statistics_requests().to_vec();
135+
let plan = self
136+
.scan(
137+
state,
138+
args.projection().map(|p| p.to_vec()).as_ref(),
139+
args.filters().unwrap_or(&[]),
140+
args.limit(),
141+
)
142+
.await?;
143+
Ok(ScanResult::new(plan))
144+
}
145+
}
146+
147+
fn make_table() -> (Arc<RecordingTable>, Arc<Mutex<Vec<StatisticsRequest>>>) {
148+
let schema = Arc::new(Schema::new(vec![
149+
Field::new("a", DataType::Int64, false),
150+
Field::new("b", DataType::Int64, false),
151+
]));
152+
let batch = RecordBatch::try_new(
153+
Arc::clone(&schema),
154+
vec![
155+
Arc::new(Int64Array::from(vec![1, 2, 3])),
156+
Arc::new(Int64Array::from(vec![10, 20, 30])),
157+
],
158+
)
159+
.unwrap();
160+
let last_requests = Arc::new(Mutex::new(Vec::new()));
161+
let provider = Arc::new(RecordingTable {
162+
schema,
163+
batch,
164+
last_requests: Arc::clone(&last_requests),
165+
});
166+
(provider, last_requests)
167+
}
168+
169+
#[tokio::test]
170+
async fn custom_rule_requests_reach_custom_provider() -> Result<()> {
171+
let (provider, last_requests) = make_table();
172+
173+
let state = SessionStateBuilder::new()
174+
.with_default_features()
175+
.with_optimizer_rule(Arc::new(RequestColumnStatistics))
176+
.build();
177+
let ctx = SessionContext::new_with_state(state);
178+
ctx.register_table("t", provider)?;
179+
180+
ctx.sql("SELECT a, b FROM t").await?.collect().await?;
181+
182+
let got = last_requests.lock().unwrap().clone();
183+
assert_eq!(
184+
got.len(),
185+
3,
186+
"expected RowCount + Min(a) + Min(b), got {got:?}"
187+
);
188+
assert!(
189+
got.contains(&StatisticsRequest::RowCount),
190+
"expected RowCount, got {got:?}"
191+
);
192+
assert!(
193+
got.contains(&StatisticsRequest::Min(Column::new_unqualified("a"))),
194+
"expected Min(a), got {got:?}"
195+
);
196+
assert!(
197+
got.contains(&StatisticsRequest::Min(Column::new_unqualified("b"))),
198+
"expected Min(b), got {got:?}"
199+
);
200+
Ok(())
201+
}
202+
203+
#[tokio::test]
204+
async fn no_requests_without_a_rule() -> Result<()> {
205+
// Without a rule populating `TableScan::statistics_requests`, the
206+
// provider sees an empty request list — stock DataFusion behavior.
207+
let (provider, last_requests) = make_table();
208+
let ctx = SessionContext::new();
209+
ctx.register_table("t", provider)?;
210+
211+
ctx.sql("SELECT a, b FROM t").await?.collect().await?;
212+
213+
assert!(
214+
last_requests.lock().unwrap().is_empty(),
215+
"expected no requests without a custom rule"
216+
);
217+
Ok(())
218+
}

0 commit comments

Comments
 (0)