Skip to content

Commit d20c5aa

Browse files
adriangbclaude
andcommitted
test: e2e statistics-request flow via a custom optimizer rule
Add a self-contained integration test that plays both external roles: a custom `OptimizerRule` annotates each `TableScan` with `StatisticsRequest`s, and a custom `TableProvider` records the `ScanArgs::statistics_requests` it receives in `scan_with_args`. This demonstrates the request-side hooks are sufficient to build the feature entirely outside of DataFusion. A second test confirms that without such a rule the provider sees an empty request list. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c8a615a commit d20c5aa

2 files changed

Lines changed: 220 additions & 0 deletions

File tree

datafusion/core/tests/user_defined/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,7 @@ mod relation_planner;
4141

4242
/// Tests for insert operations
4343
mod insert_operation;
44+
45+
/// Tests for `StatisticsRequest`s flowing from a custom optimizer rule
46+
/// through the physical planner into a custom `TableProvider`.
47+
mod statistics_requests;
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! End-to-end test that a *custom* optimizer rule can annotate a
19+
//! `TableScan` with `StatisticsRequest`s and have them reach a *custom*
20+
//! `TableProvider`'s `scan_with_args`.
21+
//!
22+
//! DataFusion ships no rule that populates `TableScan::statistics_requests`
23+
//! and no provider that consumes `ScanArgs::statistics_requests`. This test
24+
//! plays both roles, demonstrating that the request-side hooks are
25+
//! sufficient to build the whole feature outside of DataFusion.
26+
27+
use std::sync::{Arc, Mutex};
28+
29+
use arrow::array::{Int64Array, RecordBatch};
30+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
31+
use async_trait::async_trait;
32+
use datafusion::catalog::{ScanArgs, ScanResult, Session, TableProvider};
33+
use datafusion::common::tree_node::Transformed;
34+
use datafusion::common::{Column, Result};
35+
use datafusion::datasource::TableType;
36+
use datafusion::datasource::memory::MemorySourceConfig;
37+
use datafusion::execution::context::SessionContext;
38+
use datafusion::execution::session_state::SessionStateBuilder;
39+
use datafusion::logical_expr::statistics::StatisticsRequest;
40+
use datafusion::logical_expr::{Expr, LogicalPlan};
41+
use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule};
42+
use datafusion::physical_plan::ExecutionPlan;
43+
44+
/// A custom optimizer rule that annotates every `TableScan` with a
45+
/// `RowCount` request plus a `Min` request for each of its columns.
46+
///
47+
/// This stands in for whatever request-derivation logic an external
48+
/// implementer would write (e.g. Min/Max for sort keys, DistinctCount for
49+
/// join keys). Here it is intentionally trivial and deterministic.
50+
#[derive(Debug)]
51+
struct RequestColumnStatistics;
52+
53+
impl OptimizerRule for RequestColumnStatistics {
54+
fn name(&self) -> &str {
55+
"test_request_column_statistics"
56+
}
57+
58+
fn apply_order(&self) -> Option<ApplyOrder> {
59+
Some(ApplyOrder::TopDown)
60+
}
61+
62+
fn supports_rewrite(&self) -> bool {
63+
true
64+
}
65+
66+
fn rewrite(
67+
&self,
68+
plan: LogicalPlan,
69+
_config: &dyn OptimizerConfig,
70+
) -> Result<Transformed<LogicalPlan>> {
71+
let LogicalPlan::TableScan(mut scan) = plan else {
72+
return Ok(Transformed::no(plan));
73+
};
74+
// Insert into the scan's existing request set. `BTreeSet::insert`
75+
// reports whether the value was new, so the rule is idempotent — and
76+
// composes with other rules' requests for free: re-inserting an
77+
// existing request is a no-op, and we report `Transformed::yes` only
78+
// when something was actually added, so the optimizer reaches a
79+
// fixpoint without a manual "already visited" guard.
80+
let mut changed = scan.statistics_requests.insert(StatisticsRequest::RowCount);
81+
for field in scan.projected_schema.fields() {
82+
let req = StatisticsRequest::Min(Column::new_unqualified(field.name()));
83+
changed |= scan.statistics_requests.insert(req);
84+
}
85+
Ok(if changed {
86+
Transformed::yes(LogicalPlan::TableScan(scan))
87+
} else {
88+
Transformed::no(LogicalPlan::TableScan(scan))
89+
})
90+
}
91+
}
92+
93+
/// A `TableProvider` that records the `statistics_requests` it was asked
94+
/// for, so the test can assert what reached it.
95+
#[derive(Debug)]
96+
struct RecordingTable {
97+
schema: SchemaRef,
98+
batch: RecordBatch,
99+
last_requests: Arc<Mutex<Vec<StatisticsRequest>>>,
100+
}
101+
102+
#[async_trait]
103+
impl TableProvider for RecordingTable {
104+
fn schema(&self) -> SchemaRef {
105+
Arc::clone(&self.schema)
106+
}
107+
108+
fn table_type(&self) -> TableType {
109+
TableType::Base
110+
}
111+
112+
async fn scan(
113+
&self,
114+
_state: &dyn Session,
115+
projection: Option<&Vec<usize>>,
116+
_filters: &[Expr],
117+
_limit: Option<usize>,
118+
) -> Result<Arc<dyn ExecutionPlan>> {
119+
Ok(MemorySourceConfig::try_new_exec(
120+
&[vec![self.batch.clone()]],
121+
Arc::clone(&self.schema),
122+
projection.cloned(),
123+
)?)
124+
}
125+
126+
async fn scan_with_args<'a>(
127+
&self,
128+
state: &dyn Session,
129+
args: ScanArgs<'a>,
130+
) -> Result<ScanResult> {
131+
// Record what reached us, then delegate to `scan`.
132+
*self.last_requests.lock().unwrap() = args.statistics_requests().to_vec();
133+
let plan = self
134+
.scan(
135+
state,
136+
args.projection().map(|p| p.to_vec()).as_ref(),
137+
args.filters().unwrap_or(&[]),
138+
args.limit(),
139+
)
140+
.await?;
141+
Ok(ScanResult::new(plan))
142+
}
143+
}
144+
145+
fn make_table() -> (Arc<RecordingTable>, Arc<Mutex<Vec<StatisticsRequest>>>) {
146+
let schema = Arc::new(Schema::new(vec![
147+
Field::new("a", DataType::Int64, false),
148+
Field::new("b", DataType::Int64, false),
149+
]));
150+
let batch = RecordBatch::try_new(
151+
Arc::clone(&schema),
152+
vec![
153+
Arc::new(Int64Array::from(vec![1, 2, 3])),
154+
Arc::new(Int64Array::from(vec![10, 20, 30])),
155+
],
156+
)
157+
.unwrap();
158+
let last_requests = Arc::new(Mutex::new(Vec::new()));
159+
let provider = Arc::new(RecordingTable {
160+
schema,
161+
batch,
162+
last_requests: Arc::clone(&last_requests),
163+
});
164+
(provider, last_requests)
165+
}
166+
167+
#[tokio::test]
168+
async fn custom_rule_requests_reach_custom_provider() -> Result<()> {
169+
let (provider, last_requests) = make_table();
170+
171+
let state = SessionStateBuilder::new()
172+
.with_default_features()
173+
.with_optimizer_rule(Arc::new(RequestColumnStatistics))
174+
.build();
175+
let ctx = SessionContext::new_with_state(state);
176+
ctx.register_table("t", provider)?;
177+
178+
ctx.sql("SELECT a, b FROM t").await?.collect().await?;
179+
180+
let got = last_requests.lock().unwrap().clone();
181+
assert_eq!(
182+
got.len(),
183+
3,
184+
"expected RowCount + Min(a) + Min(b), got {got:?}"
185+
);
186+
assert!(
187+
got.contains(&StatisticsRequest::RowCount),
188+
"expected RowCount, got {got:?}"
189+
);
190+
assert!(
191+
got.contains(&StatisticsRequest::Min(Column::new_unqualified("a"))),
192+
"expected Min(a), got {got:?}"
193+
);
194+
assert!(
195+
got.contains(&StatisticsRequest::Min(Column::new_unqualified("b"))),
196+
"expected Min(b), got {got:?}"
197+
);
198+
Ok(())
199+
}
200+
201+
#[tokio::test]
202+
async fn no_requests_without_a_rule() -> Result<()> {
203+
// Without a rule populating `TableScan::statistics_requests`, the
204+
// provider sees an empty request list — stock DataFusion behavior.
205+
let (provider, last_requests) = make_table();
206+
let ctx = SessionContext::new();
207+
ctx.register_table("t", provider)?;
208+
209+
ctx.sql("SELECT a, b FROM t").await?.collect().await?;
210+
211+
assert!(
212+
last_requests.lock().unwrap().is_empty(),
213+
"expected no requests without a custom rule"
214+
);
215+
Ok(())
216+
}

0 commit comments

Comments
 (0)