1515// specific language governing permissions and limitations
1616// under the License.
1717
18- //! # TABLESAMPLE Example
18+ //! # TABLESAMPLE Example — adding row-level sampling on top of the built-in
1919//!
20- //! This example demonstrates implementing SQL `TABLESAMPLE` support using
21- //! DataFusion's extensibility APIs.
20+ //! `TABLESAMPLE SYSTEM(p%)` is supported out of the box: it's lifted to the
21+ //! core `Sample` extension node by the auto-registered
22+ //! `TableSampleSystemPlanner` and pushed into `ParquetSource` by the
23+ //! `SamplePushdown` rule. Adding *other* methods — `BERNOULLI`, `ROW`
24+ //! counts, Hive `BUCKET` — is the job of a `RelationPlanner` extension,
25+ //! which is what this example demonstrates.
2226//!
23- //! This is a working `TABLESAMPLE` implementation that can serve as a starting
24- //! point for your own projects. It also works as a template for adding other
25- //! custom SQL operators, covering the full pipeline from parsing to execution.
27+ //! The key composition pattern: when our planner sees `TABLESAMPLE`, it
28+ //! handles only the methods it implements (`BERNOULLI`, `ROW`, `BUCKET`)
29+ //! and returns [`RelationPlanning::Original`] for `SYSTEM`. Because
30+ //! `register_relation_planner` prepends to the chain, our planner runs
31+ //! first and the built-in handles whatever we don't. **No SYSTEM
32+ //! reimplementation required.**
2633//!
2734//! It shows how to:
2835//!
2936//! 1. **Parse** TABLESAMPLE syntax via a custom [`RelationPlanner`]
3037//! 2. **Plan** sampling as a custom logical node ([`TableSamplePlanNode`])
3138//! 3. **Execute** sampling via a custom physical operator ([`SampleExec`])
39+ //! 4. **Compose** with the built-in SYSTEM planner by returning
40+ //! `RelationPlanning::Original` for methods we don't implement
3241//!
3342//! ## Supported Syntax
3443//!
3544//! ```sql
36- //! -- Bernoulli sampling (each row has N% chance of selection)
45+ //! -- Bernoulli sampling (each row has N% chance of selection) — this example
3746//! SELECT * FROM table TABLESAMPLE BERNOULLI(10 PERCENT)
3847//!
39- //! -- Fractional sampling (0.0 to 1.0)
48+ //! -- Fractional sampling (0.0 to 1.0) — this example
4049//! SELECT * FROM table TABLESAMPLE (0.1)
4150//!
42- //! -- Row count limit
51+ //! -- Row count limit — this example
4352//! SELECT * FROM table TABLESAMPLE (100 ROWS)
4453//!
4554//! -- Reproducible sampling with a seed
4655//! SELECT * FROM table TABLESAMPLE (10 PERCENT) REPEATABLE(42)
56+ //!
57+ //! -- SYSTEM (block-level) sampling — handled by the built-in planner
58+ //! -- WITHOUT any code in this example
59+ //! SELECT * FROM table TABLESAMPLE SYSTEM (10) REPEATABLE(42)
4760//! ```
4861//!
4962//! ## Architecture
8194
8295use std:: {
8396 fmt:: { self , Debug , Formatter } ,
97+ fs:: File ,
8498 hash:: { Hash , Hasher } ,
8599 pin:: Pin ,
86100 sync:: Arc ,
@@ -98,9 +112,12 @@ use futures::{
98112 stream:: { Stream , StreamExt } ,
99113} ;
100114use rand:: { Rng , SeedableRng , rngs:: StdRng } ;
115+ use tempfile:: TempDir ;
101116use tonic:: async_trait;
102117
103118use datafusion:: optimizer:: simplify_expressions:: simplify_literal:: parse_literal;
119+ use datafusion:: parquet:: arrow:: ArrowWriter ;
120+ use datafusion:: parquet:: file:: properties:: WriterProperties ;
104121use datafusion:: {
105122 execution:: {
106123 RecordBatchStream , SendableRecordBatchStream , SessionState , SessionStateBuilder ,
@@ -111,7 +128,9 @@ use datafusion::{
111128 DisplayAs , DisplayFormatType , ExecutionPlan , PlanProperties ,
112129 metrics:: { BaselineMetrics , ExecutionPlanMetricsSet , MetricsSet , RecordOutput } ,
113130 } ,
114- physical_planner:: { DefaultPhysicalPlanner , ExtensionPlanner , PhysicalPlanner } ,
131+ physical_planner:: {
132+ DefaultPhysicalPlanner , ExtensionPlanner , PhysicalPlanner , SamplePhysicalPlanner ,
133+ } ,
115134 prelude:: * ,
116135} ;
117136use datafusion_common:: {
@@ -144,10 +163,18 @@ pub async fn table_sample() -> Result<()> {
144163
145164 let ctx = SessionContext :: new_with_state ( state) ;
146165
147- // Register custom relation planner for logical planning
166+ // Register custom relation planner for logical planning. It's prepended
167+ // to the chain (so it runs *before* the auto-registered built-in
168+ // `TableSampleSystemPlanner`); when our planner returns
169+ // `RelationPlanning::Original`, the built-in handles it.
148170 ctx. register_relation_planner ( Arc :: new ( TableSamplePlanner ) ) ?;
149171 register_sample_data ( & ctx) ?;
150172
173+ // Register a parquet-backed copy of the same data so we can demonstrate
174+ // SYSTEM end-to-end. SYSTEM only ships pushdown for parquet sources;
175+ // the in-memory `sample_data` table can't absorb a `Sample` node.
176+ let _parquet_dir = register_sample_data_parquet ( & ctx) . await ?;
177+
151178 println ! ( "TABLESAMPLE Example" ) ;
152179 println ! ( "===================\n " ) ;
153180
@@ -273,6 +300,34 @@ async fn run_examples(ctx: &SessionContext) -> Result<()> {
273300 +---------+---------+---------+---------+
274301 " ) ;
275302
303+ // Example 7: SYSTEM sampling — handled by the built-in
304+ // `TableSampleSystemPlanner`, **not** by this example's planner.
305+ // Our planner returns `Original` for SYSTEM, so the chain falls
306+ // through. Routed against the parquet-backed copy of the table so
307+ // the `SamplePushdown` rule can absorb the sample into the scan.
308+ // `REPEATABLE(42)` makes the rows deterministic across runs and
309+ // across machines: the parquet sampler keys on the seed plus the
310+ // execution `partition_index` (a stable per-file id), never on the
311+ // on-disk path, so the same query against the same data picks the
312+ // same rows everywhere.
313+ let results = run_example (
314+ ctx,
315+ "Example 7: SYSTEM (handled by the built-in, not this example)" ,
316+ "SELECT * FROM sample_data_parquet TABLESAMPLE SYSTEM (50) REPEATABLE (42)" ,
317+ )
318+ . await ?;
319+ assert_snapshot ! ( results, @r"
320+ +---------+---------+
321+ | column1 | column2 |
322+ +---------+---------+
323+ | 6 | row_6 |
324+ | 7 | row_7 |
325+ | 8 | row_8 |
326+ | 9 | row_9 |
327+ | 10 | row_10 |
328+ +---------+---------+
329+ " ) ;
330+
276331 Ok ( ( ) )
277332}
278333
@@ -301,6 +356,43 @@ fn register_sample_data(ctx: &SessionContext) -> Result<()> {
301356 Ok ( ( ) )
302357}
303358
359+ /// Register the same data as `sample_data_parquet`, backed by a tempfile
360+ /// parquet so `TABLESAMPLE SYSTEM` (handled by the built-in planner) has
361+ /// a `ParquetSource` to push into. Returns the `TempDir` so the caller
362+ /// can keep it alive for the duration of the queries.
363+ async fn register_sample_data_parquet ( ctx : & SessionContext ) -> Result < TempDir > {
364+ let dir = TempDir :: new ( ) . map_err ( |e| plan_datafusion_err ! ( "tempdir: {e}" ) ) ?;
365+ let path = dir. path ( ) . join ( "sample_data.parquet" ) ;
366+
367+ let column1: ArrayRef = Arc :: new ( Int32Array :: from ( ( 1 ..=10 ) . collect :: < Vec < i32 > > ( ) ) ) ;
368+ let column2: ArrayRef = Arc :: new ( StringArray :: from (
369+ ( 1 ..=10 ) . map ( |i| format ! ( "row_{i}" ) ) . collect :: < Vec < _ > > ( ) ,
370+ ) ) ;
371+ let batch =
372+ RecordBatch :: try_from_iter ( vec ! [ ( "column1" , column1) , ( "column2" , column2) ] ) ?;
373+ let file = File :: create ( & path) . map_err ( |e| plan_datafusion_err ! ( "create: {e}" ) ) ?;
374+ let mut writer = ArrowWriter :: try_new (
375+ file,
376+ batch. schema ( ) ,
377+ Some ( WriterProperties :: builder ( ) . build ( ) ) ,
378+ )
379+ . map_err ( |e| plan_datafusion_err ! ( "ArrowWriter: {e}" ) ) ?;
380+ writer
381+ . write ( & batch)
382+ . map_err ( |e| plan_datafusion_err ! ( "write: {e}" ) ) ?;
383+ writer
384+ . close ( )
385+ . map_err ( |e| plan_datafusion_err ! ( "close: {e}" ) ) ?;
386+
387+ ctx. register_parquet (
388+ "sample_data_parquet" ,
389+ path. to_str ( ) . unwrap ( ) ,
390+ Default :: default ( ) ,
391+ )
392+ . await ?;
393+ Ok ( dir)
394+ }
395+
304396// ============================================================================
305397// Logical Planning: TableSamplePlanner + TableSamplePlanNode
306398// ============================================================================
@@ -318,7 +410,7 @@ impl RelationPlanner for TableSamplePlanner {
318410 ) -> Result < RelationPlanning > {
319411 // Only handle Table relations with TABLESAMPLE clause
320412 let TableFactor :: Table {
321- sample : Some ( sample ) ,
413+ sample : Some ( kind ) ,
322414 alias,
323415 name,
324416 args,
@@ -333,23 +425,48 @@ impl RelationPlanner for TableSamplePlanner {
333425 return Ok ( RelationPlanning :: Original ( Box :: new ( relation) ) ) ;
334426 } ;
335427
336- // Extract sample spec (handles both before/after alias positions)
337- let sample = match sample {
428+ // Inspect the sample without consuming `kind` yet — we may need to
429+ // hand the relation back unchanged for methods this example doesn't
430+ // implement.
431+ let inspect = match & kind {
338432 ast:: TableSampleKind :: BeforeTableAlias ( s)
339- | ast:: TableSampleKind :: AfterTableAlias ( s) => s,
433+ | ast:: TableSampleKind :: AfterTableAlias ( s) => s. as_ref ( ) ,
340434 } ;
341435
342- // Validate sampling method
343- if let Some ( method) = & sample. name
344- && * method != TableSampleMethod :: Bernoulli
345- && * method != TableSampleMethod :: Row
346- {
347- return not_impl_err ! (
348- "Sampling method {} is not supported (only BERNOULLI and ROW)" ,
349- method
436+ // This example handles BERNOULLI / ROW (row-level coin flip) and
437+ // BUCKET (Hive-style modulo). Anything else — most importantly
438+ // SYSTEM / BLOCK — falls through to the next planner so the
439+ // built-in `TableSampleSystemPlanner` (auto-registered via
440+ // `SessionStateBuilder::with_default_features`) can handle it
441+ // with parquet pushdown. **No SYSTEM reimplementation here.**
442+ let we_handle = inspect. bucket . is_some ( )
443+ || matches ! (
444+ inspect. name,
445+ Some ( TableSampleMethod :: Bernoulli ) | Some ( TableSampleMethod :: Row ) | None
350446 ) ;
447+ if !we_handle {
448+ // Reconstruct the original relation and pass it on.
449+ let original = TableFactor :: Table {
450+ sample : Some ( kind) ,
451+ alias,
452+ name,
453+ args,
454+ with_hints,
455+ version,
456+ with_ordinality,
457+ partitions,
458+ json_path,
459+ index_hints,
460+ } ;
461+ return Ok ( RelationPlanning :: Original ( Box :: new ( original) ) ) ;
351462 }
352463
464+ // Extract sample spec (handles both before/after alias positions)
465+ let sample = match kind {
466+ ast:: TableSampleKind :: BeforeTableAlias ( s)
467+ | ast:: TableSampleKind :: AfterTableAlias ( s) => s,
468+ } ;
469+
353470 // Offset sampling (ClickHouse-style) not supported
354471 if sample. offset . is_some ( ) {
355472 return not_impl_err ! (
@@ -553,8 +670,15 @@ impl Hash for HashableF64 {
553670// Physical Planning: TableSampleQueryPlanner + TableSampleExtensionPlanner
554671// ============================================================================
555672
556- /// Custom query planner that registers [`TableSampleExtensionPlanner`] to
557- /// convert [`TableSamplePlanNode`] into [`SampleExec`].
673+ /// Custom query planner that registers [`TableSampleExtensionPlanner`]
674+ /// (lowering this example's [`TableSamplePlanNode`] to its own [`SampleExec`])
675+ /// alongside the built-in [`SamplePhysicalPlanner`] (lowering the core
676+ /// `Sample` extension node to its `SampleExec`). Both extension planners
677+ /// coexist: each only handles its own logical node type and returns
678+ /// `Ok(None)` otherwise. Without `SamplePhysicalPlanner` here,
679+ /// `TABLESAMPLE SYSTEM` queries that fall through to the built-in
680+ /// `TableSampleSystemPlanner` would fail to plan because
681+ /// `with_extension_planners(...)` *replaces* the defaults.
558682#[ derive( Debug ) ]
559683struct TableSampleQueryPlanner ;
560684
@@ -565,9 +689,10 @@ impl QueryPlanner for TableSampleQueryPlanner {
565689 logical_plan : & LogicalPlan ,
566690 session_state : & SessionState ,
567691 ) -> Result < Arc < dyn ExecutionPlan > > {
568- let planner = DefaultPhysicalPlanner :: with_extension_planners ( vec ! [ Arc :: new(
569- TableSampleExtensionPlanner ,
570- ) ] ) ;
692+ let planner = DefaultPhysicalPlanner :: with_extension_planners ( vec ! [
693+ Arc :: new( TableSampleExtensionPlanner ) ,
694+ Arc :: new( SamplePhysicalPlanner ) ,
695+ ] ) ;
571696 planner
572697 . create_physical_plan ( logical_plan, session_state)
573698 . await
0 commit comments