Skip to content

Commit e6c2699

Browse files
adriangbclaude
andcommitted
feat: TableSampleSystemPlanner — TABLESAMPLE SYSTEM works out of the box
Wires SQL `TABLESAMPLE SYSTEM(p%) [REPEATABLE(n)]` into the infrastructure from the previous commit so it works on a default `SessionContext` (and therefore in `datafusion-cli` and the sqllogictest harness) without any extra registration. - New `datafusion_sql::sample::TableSampleSystemPlanner` (`RelationPlanner`). Lifts `TABLESAMPLE SYSTEM(p%) [REPEATABLE(n)]` to the core `Sample` extension node. Other forms (`BERNOULLI`, `ROW` count, `BUCKET ... OUT OF ...`, `OFFSET`) are rejected at planning time with errors that point at registering a custom `RelationPlanner` ahead of this one. - New public `SamplePhysicalPlanner` (`ExtensionPlanner`) in `datafusion::physical_planner`. Lowers `Sample` to `SampleExec`. Pre-registered in `DefaultPhysicalPlanner::default()` so the default query planner handles it. - `SessionStateDefaults::default_relation_planners()` returns the built-in planner; `SessionStateBuilder::with_default_features()` installs it. Both gated behind the `sql` feature so `--no-default-features` builds keep working. - `register_relation_planner` already prepends to the chain, so any user-supplied planner runs first and can return `Original` to fall through to the built-in for SYSTEM. That composition is the intended override mechanism. End-to-end coverage: - New `datafusion/sqllogictest/test_files/tablesample.slt` exercises the path a user gets out of the box: `SYSTEM(100)`, `SYSTEM(50) REPEATABLE(42)` deterministic count, EXPLAIN absorbed into ParquetSource, every rejected form, and the planning-time error for sources that don't implement `try_push_sample` (CSV). Docs: - `docs/source/user-guide/sql/select.md` gains a `TABLESAMPLE clause` section explaining what it is, the SYSTEM vs BERNOULLI tradeoff, the parquet implementation strategy, deterministic seeds, the EXPLAIN format, and the list of rejected forms. - `docs/source/library-user-guide/extending-sql.md` reframes the existing TABLESAMPLE example as the way to add additional flavours on top of the built-in SYSTEM planner. - `datafusion-examples/examples/relation_planner/main.rs` carries a matching note in its module docs. - `datafusion-examples/README.md` regenerated. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 0b26d70 commit e6c2699

10 files changed

Lines changed: 535 additions & 10 deletions

File tree

datafusion-examples/README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,11 @@ cargo run --example dataframe -- dataframe
193193

194194
#### Category: Single Process
195195

196-
| Subcommand | File Path | Description |
197-
| --------------- | ------------------------------------------------------------------------------------- | ------------------------------------------ |
198-
| match_recognize | [`relation_planner/match_recognize.rs`](examples/relation_planner/match_recognize.rs) | Implement MATCH_RECOGNIZE pattern matching |
199-
| pivot_unpivot | [`relation_planner/pivot_unpivot.rs`](examples/relation_planner/pivot_unpivot.rs) | Implement PIVOT / UNPIVOT |
200-
| table_sample | [`relation_planner/table_sample.rs`](examples/relation_planner/table_sample.rs) | Implement TABLESAMPLE |
196+
| Subcommand | File Path | Description |
197+
| --------------- | ------------------------------------------------------------------------------------- | --------------------------------------------------------------------- |
198+
| match_recognize | [`relation_planner/match_recognize.rs`](examples/relation_planner/match_recognize.rs) | Implement MATCH_RECOGNIZE pattern matching |
199+
| pivot_unpivot | [`relation_planner/pivot_unpivot.rs`](examples/relation_planner/pivot_unpivot.rs) | Implement PIVOT / UNPIVOT |
200+
| table_sample | [`relation_planner/table_sample.rs`](examples/relation_planner/table_sample.rs) | Implement TABLESAMPLE BERNOULLI / ROW / BUCKET via per-batch sampling |
201201

202202
## SQL Ops Examples
203203

datafusion-examples/examples/relation_planner/main.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,14 @@
3535
//! (file: pivot_unpivot.rs, desc: Implement PIVOT / UNPIVOT)
3636
//!
3737
//! - `table_sample`
38-
//! (file: table_sample.rs, desc: Implement TABLESAMPLE)
38+
//! (file: table_sample.rs, desc: Implement TABLESAMPLE BERNOULLI / ROW / BUCKET via per-batch sampling)
39+
//!
40+
//! Note: `TABLESAMPLE SYSTEM(p%)` is supported out of the box by the
41+
//! built-in `datafusion_sql::sample::TableSampleSystemPlanner`, which is
42+
//! auto-registered on a default `SessionContext`. The `table_sample`
43+
//! example below shows how to register a *different* planner for the
44+
//! row-level forms (`BERNOULLI`, `ROW`, `BUCKET`) that the built-in
45+
//! intentionally does not handle.
3946
//!
4047
//! ## Snapshot Testing
4148
//!

datafusion/core/src/execution/session_state.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1175,6 +1175,11 @@ impl SessionStateBuilder {
11751175
.get_or_insert_with(Vec::new)
11761176
.extend(SessionStateDefaults::default_expr_planners());
11771177

1178+
#[cfg(feature = "sql")]
1179+
self.relation_planners
1180+
.get_or_insert_with(Vec::new)
1181+
.extend(SessionStateDefaults::default_relation_planners());
1182+
11781183
self.scalar_functions
11791184
.get_or_insert_with(Vec::new)
11801185
.extend(SessionStateDefaults::default_scalar_functions());

datafusion/core/src/execution/session_state_defaults.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ use datafusion_execution::config::SessionConfig;
3636
use datafusion_execution::object_store::ObjectStoreUrl;
3737
use datafusion_execution::runtime_env::RuntimeEnv;
3838
use datafusion_expr::planner::ExprPlanner;
39+
#[cfg(feature = "sql")]
40+
use datafusion_expr::planner::RelationPlanner;
3941
use datafusion_expr::registry::ExtensionTypeRegistrationRef;
4042
use datafusion_expr::{AggregateUDF, HigherOrderUDF, ScalarUDF, WindowUDF};
4143
use std::collections::HashMap;
@@ -82,6 +84,19 @@ impl SessionStateDefaults {
8284
default_catalog
8385
}
8486

87+
/// Returns the list of default [`RelationPlanner`]s installed by
88+
/// [`Self::default_relation_planners`]. Currently this is just the
89+
/// built-in `TableSampleSystemPlanner`, which lifts
90+
/// `TABLESAMPLE SYSTEM(p%) [REPEATABLE(n)]` into the core `Sample`
91+
/// extension node so the `SamplePushdown` rule can absorb it into
92+
/// the scan. Other `TABLESAMPLE` flavors are rejected at planning
93+
/// time — register a `RelationPlanner` ahead of this one to add
94+
/// custom semantics.
95+
#[cfg(feature = "sql")]
96+
pub fn default_relation_planners() -> Vec<Arc<dyn RelationPlanner>> {
97+
vec![Arc::new(datafusion_sql::sample::TableSampleSystemPlanner)]
98+
}
99+
85100
/// returns the list of default [`ExprPlanner`]s
86101
pub fn default_expr_planners() -> Vec<Arc<dyn ExprPlanner>> {
87102
let expr_planners: Vec<Arc<dyn ExprPlanner>> = vec![

datafusion/core/src/physical_planner.rs

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,79 @@ pub trait ExtensionPlanner {
237237
}
238238
}
239239

240+
/// [`ExtensionPlanner`] that lowers the [`Sample`] logical extension
241+
/// node into a [`SampleExec`] physical node.
242+
///
243+
/// The pushdown machinery (cube-root absorption into `ParquetSource`,
244+
/// the `SamplePushdown` optimizer rule, per-node `Passthrough`
245+
/// overrides) is wired into the default optimizer pipeline, so once
246+
/// a `Sample` reaches the physical planner it will be pushed into
247+
/// the source — but it has to *get* there first. Register this
248+
/// planner on a [`SessionStateBuilder`] / [`DefaultPhysicalPlanner`]
249+
/// alongside whichever [`RelationPlanner`] (or other front-end) you
250+
/// use to emit the `Sample` logical node:
251+
///
252+
/// ```rust,ignore
253+
/// use std::sync::Arc;
254+
/// use datafusion::physical_planner::{DefaultPhysicalPlanner, SamplePhysicalPlanner};
255+
///
256+
/// let planner = DefaultPhysicalPlanner::with_extension_planners(vec![
257+
/// Arc::new(SamplePhysicalPlanner),
258+
/// ]);
259+
/// ```
260+
///
261+
/// `SamplePhysicalPlanner` is registered automatically in
262+
/// [`DefaultPhysicalPlanner::default`] so that `TABLESAMPLE SYSTEM`
263+
/// works out of the box on a default `SessionContext`. Callers who
264+
/// supply their own list via [`DefaultPhysicalPlanner::with_extension_planners`]
265+
/// **replace** the defaults — re-add `SamplePhysicalPlanner` to the
266+
/// front of their list if they want sampling support.
267+
///
268+
/// [`Sample`]: datafusion_expr::logical_plan::sample::Sample
269+
/// [`SampleExec`]: datafusion_physical_plan::sample::SampleExec
270+
/// [`RelationPlanner`]: datafusion_expr::planner::RelationPlanner
271+
/// [`SessionStateBuilder`]: crate::execution::session_state::SessionStateBuilder
272+
#[derive(Debug, Default)]
273+
pub struct SamplePhysicalPlanner;
274+
275+
#[async_trait]
276+
impl ExtensionPlanner for SamplePhysicalPlanner {
277+
async fn plan_extension(
278+
&self,
279+
_planner: &dyn PhysicalPlanner,
280+
node: &dyn UserDefinedLogicalNode,
281+
_logical_inputs: &[&LogicalPlan],
282+
physical_inputs: &[Arc<dyn ExecutionPlan>],
283+
_session_state: &SessionState,
284+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
285+
let Some(sample) = node
286+
.as_any()
287+
.downcast_ref::<datafusion_expr::logical_plan::sample::Sample>()
288+
else {
289+
return Ok(None);
290+
};
291+
if physical_inputs.len() != 1 {
292+
return plan_err!(
293+
"Sample expects exactly one input; got {}",
294+
physical_inputs.len()
295+
);
296+
}
297+
let method = match sample.method {
298+
datafusion_expr::logical_plan::sample::SampleMethod::System => {
299+
datafusion_physical_plan::sample_pushdown::SampleMethod::System
300+
}
301+
};
302+
Ok(Some(Arc::new(
303+
datafusion_physical_plan::sample::SampleExec::new(
304+
Arc::clone(&physical_inputs[0]),
305+
method,
306+
sample.fraction,
307+
sample.seed,
308+
),
309+
)))
310+
}
311+
}
312+
240313
/// Default single node physical query planner that converts a
241314
/// `LogicalPlan` to an `ExecutionPlan` suitable for execution.
242315
///
@@ -255,11 +328,21 @@ pub trait ExtensionPlanner {
255328
/// execute concurrently.
256329
///
257330
/// [`planning_concurrency`]: crate::config::ExecutionOptions::planning_concurrency
258-
#[derive(Default)]
259331
pub struct DefaultPhysicalPlanner {
260332
extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
261333
}
262334

335+
impl Default for DefaultPhysicalPlanner {
336+
/// Constructs a planner with [`SamplePhysicalPlanner`] pre-registered
337+
/// so the core `Sample` extension node lowers without any extra
338+
/// wiring on a default `SessionContext`.
339+
fn default() -> Self {
340+
Self {
341+
extension_planners: vec![Arc::new(SamplePhysicalPlanner)],
342+
}
343+
}
344+
}
345+
263346
#[async_trait]
264347
impl PhysicalPlanner for DefaultPhysicalPlanner {
265348
/// Create a physical plan from a logical plan

datafusion/sql/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ pub mod planner;
4949
mod query;
5050
mod relation;
5151
pub mod resolve;
52+
pub mod sample;
5253
mod select;
5354
mod set_expr;
5455
mod stack;

datafusion/sql/src/sample.rs

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Built-in [`RelationPlanner`] for `TABLESAMPLE SYSTEM(p%)`.
19+
//!
20+
//! Auto-registered via [`SessionStateDefaults::default_relation_planners`]
21+
//! so SQL `TABLESAMPLE SYSTEM (10) [REPEATABLE (n)]` works out of the
22+
//! box on any default `SessionContext`. Other `TABLESAMPLE` flavours
23+
//! (`BERNOULLI`, `ROW`, `BUCKET ... OUT OF ...`, `OFFSET`) are rejected
24+
//! at planning time — implementing those is left to a downstream
25+
//! `RelationPlanner` (see `datafusion-examples/examples/relation_planner/`).
26+
//!
27+
//! `SessionStateBuilder::register_relation_planner` inserts new planners
28+
//! at the front of the chain, so a downstream planner that returns
29+
//! `Planned` for the same `TABLESAMPLE` syntax wins. Returning
30+
//! `Original` falls through to this default.
31+
//!
32+
//! [`SessionStateDefaults::default_relation_planners`]: ../../datafusion/execution/session_state/struct.SessionStateDefaults.html
33+
34+
use std::sync::Arc;
35+
36+
use datafusion_common::{Result, not_impl_err, plan_datafusion_err, plan_err};
37+
use datafusion_expr::logical_plan::sample::{SampleMethod, sample_plan};
38+
use datafusion_expr::planner::{
39+
PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning,
40+
};
41+
use sqlparser::ast::{
42+
self, TableFactor, TableSampleKind, TableSampleMethod, TableSampleUnit,
43+
};
44+
45+
/// Built-in `RelationPlanner` that lifts `TABLESAMPLE SYSTEM(p%)`
46+
/// (with optional `REPEATABLE(seed)`) into the core
47+
/// [`Sample`](datafusion_expr::logical_plan::sample::Sample) extension
48+
/// node so the `SamplePushdown` optimizer rule can absorb the sample
49+
/// into the scan.
50+
///
51+
/// Rejects every other form of `TABLESAMPLE` with a `not_impl_err`. To
52+
/// support `BERNOULLI`, row counts, or `BUCKET`, register your own
53+
/// `RelationPlanner` ahead of this one — `register_relation_planner`
54+
/// pushes to the front and the first `Planned` wins.
55+
#[derive(Debug, Default)]
56+
pub struct TableSampleSystemPlanner;
57+
58+
impl RelationPlanner for TableSampleSystemPlanner {
59+
fn plan_relation(
60+
&self,
61+
relation: TableFactor,
62+
context: &mut dyn RelationPlannerContext,
63+
) -> Result<RelationPlanning> {
64+
// Only act on Table relations carrying a `TABLESAMPLE` clause.
65+
// Everything else (derived, function, unnest, join) falls
66+
// through to the next planner / DataFusion's default logic.
67+
let TableFactor::Table {
68+
sample: Some(sample),
69+
alias,
70+
name,
71+
args,
72+
with_hints,
73+
version,
74+
with_ordinality,
75+
partitions,
76+
json_path,
77+
index_hints,
78+
} = relation
79+
else {
80+
return Ok(RelationPlanning::Original(Box::new(relation)));
81+
};
82+
83+
let ts = match sample {
84+
TableSampleKind::BeforeTableAlias(s)
85+
| TableSampleKind::AfterTableAlias(s) => *s,
86+
};
87+
88+
if ts.bucket.is_some() {
89+
return not_impl_err!(
90+
"TABLESAMPLE BUCKET is not supported (only SYSTEM PERCENT). \
91+
Register a custom RelationPlanner before the built-in \
92+
TableSampleSystemPlanner to handle other forms."
93+
);
94+
}
95+
if ts.offset.is_some() {
96+
return not_impl_err!(
97+
"TABLESAMPLE OFFSET is not supported (only SYSTEM PERCENT)"
98+
);
99+
}
100+
match ts.name {
101+
// The built-in planner only handles SYSTEM (and BLOCK as an
102+
// alias for SYSTEM, matching Hive). Anything else is a
103+
// semantics commitment we don't want to make in core.
104+
Some(TableSampleMethod::System) | Some(TableSampleMethod::Block) | None => {}
105+
Some(other) => {
106+
return not_impl_err!(
107+
"TABLESAMPLE method {other} is not supported (only SYSTEM). \
108+
Register a custom RelationPlanner before the built-in \
109+
TableSampleSystemPlanner to handle other methods."
110+
);
111+
}
112+
}
113+
114+
let quantity = ts.quantity.ok_or_else(|| {
115+
plan_datafusion_err!("TABLESAMPLE without a quantity is not supported")
116+
})?;
117+
let raw = match &quantity.value {
118+
ast::Expr::Value(vs) => match &vs.value {
119+
ast::Value::Number(n, _) => n.parse::<f64>().map_err(|_| {
120+
plan_datafusion_err!("invalid TABLESAMPLE quantity: {n}")
121+
})?,
122+
v => return plan_err!("TABLESAMPLE quantity must be numeric; got {v:?}"),
123+
},
124+
other => {
125+
return plan_err!("TABLESAMPLE quantity must be a literal; got {other}");
126+
}
127+
};
128+
let fraction = match quantity.unit {
129+
Some(TableSampleUnit::Percent) | None => raw / 100.0,
130+
Some(TableSampleUnit::Rows) => {
131+
return not_impl_err!(
132+
"TABLESAMPLE with ROWS count is not supported (only SYSTEM PERCENT)"
133+
);
134+
}
135+
};
136+
137+
let seed = ts
138+
.seed
139+
.map(|s| match s.value {
140+
ast::Value::Number(n, _) => n
141+
.parse::<u64>()
142+
.map_err(|_| plan_datafusion_err!("invalid REPEATABLE seed: {n}")),
143+
v => Err(plan_datafusion_err!(
144+
"REPEATABLE seed must be an integer; got {v:?}"
145+
)),
146+
})
147+
.transpose()?;
148+
149+
// Replan the bare table without the sample clause, then wrap
150+
// the resulting plan in a `Sample` extension node.
151+
let bare = TableFactor::Table {
152+
sample: None,
153+
alias: alias.clone(),
154+
name,
155+
args,
156+
with_hints,
157+
version,
158+
with_ordinality,
159+
partitions,
160+
json_path,
161+
index_hints,
162+
};
163+
let input = context.plan(bare)?;
164+
let plan = sample_plan(Arc::new(input), SampleMethod::System, fraction, seed)?;
165+
Ok(RelationPlanning::Planned(Box::new(PlannedRelation::new(
166+
plan, alias,
167+
))))
168+
}
169+
}

0 commit comments

Comments
 (0)