Skip to content

Commit 1c7630c

Browse files
adriangbclaude
andcommitted
feat: SamplePushdown rule + Sample logical/physical nodes
Adds the cross-cutting infrastructure for pushing TABLESAMPLE-shaped sampling into file sources, with parquet as the first absorbing source. There is no SQL surface yet; this commit only ships the primitives. Wiring a RelationPlanner / ExtensionPlanner so it works out of the box from SQL is the next commit in this stack. - `Sample` `UserDefinedLogicalNodeCore` extension node in `datafusion-expr` (`logical_plan/sample.rs`). Schema-preserving; validates `fraction ∈ (0, 1]`. Currently encodes `SampleMethod::System` only. - `SampleExec` placeholder in `datafusion-physical-plan`. Errors at `execute` (it's a marker — the `SamplePushdown` rule is expected to remove it). Implements filter / sort pushdown passthrough so unrelated optimizer rules see straight through it. - New `try_push_sample` method on `ExecutionPlan` and `FileSource`, returning `Absorbed { inner }` / `Passthrough` / `Unsupported { reason }`. Default is `Unsupported`; per-node `Passthrough` overrides on filter, projection, coalesce_batches, coalesce_partitions, repartition, and non-fetch sort. - `ParquetSource::try_push_sample` runs the (intentionally private) hierarchical block-level reduction across files / row groups / rows, with adaptive collapse when an axis can't reduce. Coordinates with the opener via `pub(crate)` `system_target_remaining` and `seed` fields on `ParquetSampling`. Single-file, single-row-group inputs hit ~p × N rows instead of undershooting at p^(1/3) × N. - `REPEATABLE(seed)` is plumbed all the way through: when set, `ParquetSampling::apply_row_group_sampling` and `apply_row_fraction_sampling` key only on `(seed, ...)` and ignore the file path, so the same query is reproducible across environments. - `SamplePushdown` optimizer rule (between `PushdownSort` and `EnsureCooperative`) walks top-down. On `Absorbed` it replaces `SampleExec` with the rebuilt source; on `Passthrough` it pushes through the single-child node and recurses; on `Unsupported` it errors at planning time with `"TABLESAMPLE is not supported for this source"`. There is intentionally no generic post-scan `SampleExec` yet. - EXPLAIN visibility: `ParquetSource::fmt_extra` surfaces `sample_system_target_remaining` when set. - `optimizer_rule_reference.md` updated to list `SamplePushdown` in the documented rule order. - `explain.slt` updated with `physical_plan after SamplePushdown SAME TEXT AS ABOVE` lines under each verbose-explain test. Tests: 7 unit tests on `ParquetSource::try_push_sample` covering the pushdown contract (full / single-file / multi-file / target clamping / REPEATABLE determinism / multi-file rounding compensation), and 3 opener end-to-end tests covering the adaptive split for single vs multi row group inputs and REPEATABLE-seed reproducibility across file paths. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 2026412 commit 1c7630c

22 files changed

Lines changed: 1570 additions & 26 deletions

File tree

datafusion/core/src/optimizer_rule_reference.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ in multiple phases.
8888
| 17 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
8989
| 18 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
9090
| 19 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
91-
| 20 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
92-
| 21 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
93-
| 22 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |
91+
| 20 | `SamplePushdown` | - | Pushes `TABLESAMPLE` into the source; errors at planning time if the sample can't be absorbed. |
92+
| 21 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
93+
| 22 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
94+
| 23 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |

datafusion/datasource-parquet/src/opener.rs

Lines changed: 287 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -893,21 +893,58 @@ impl FiltersPreparedParquetOpen {
893893
rg_metadata.len(),
894894
)?;
895895

896-
// Apply optional row-group and row-range sampling now that we
897-
// know the actual row-group count. Both calls are no-ops when
898-
// their respective fraction is `None`. Selection is
899-
// deterministic per `(file_name, row_group_index, fraction,
900-
// cluster_size)` so re-runs match.
901-
prepared.sampling.apply_row_group_sampling(
902-
&mut initial_plan,
903-
rg_metadata.len(),
904-
&prepared.file_name,
905-
);
906-
prepared.sampling.apply_row_fraction_sampling(
907-
&mut initial_plan,
908-
rg_metadata,
909-
&prepared.file_name,
910-
);
896+
// SYSTEM-mode adaptive split: when the SamplePushdown rule
897+
// hands us a residual fraction `remaining_p`, choose the
898+
// row-group / row split based on the row-group count we just
899+
// observed. With ≥ 2 row groups we split as `sqrt(remaining)`
900+
// at both axes; with 1 row group we apply the full residual at
901+
// the row level (the row-group axis can't reduce). This keeps
902+
// the expected output close to `p × N_total` even for tiny
903+
// scans where the cube-root math otherwise undershoots
904+
// (single-file / single-row-group inputs would read
905+
// `cbrt(p)` of the rows, ~46% for SYSTEM(10)).
906+
if let Some(remaining) = prepared.sampling.system_target_remaining {
907+
// SYSTEM-mode adaptive split: choose the row-group / row
908+
// axes based on the row-group count we just observed. With
909+
// ≥ 2 row groups split as `sqrt(remaining)` at both axes;
910+
// with 1 row group skip row-group sampling and apply the
911+
// full residual at the row level. Without this adaptation
912+
// a single-file / single-row-group scan would only reach
913+
// `cbrt(remaining)` of the rows (~46% for SYSTEM(10)).
914+
let n_rg = rg_metadata.len();
915+
let mut adapted = prepared.sampling.clone();
916+
if n_rg >= 2 {
917+
let q = remaining.sqrt();
918+
adapted.row_group_fraction = Some(q);
919+
adapted.row_fraction = Some(q);
920+
} else {
921+
adapted.row_group_fraction = None;
922+
adapted.row_fraction = Some(remaining);
923+
}
924+
adapted.apply_row_group_sampling(
925+
&mut initial_plan,
926+
n_rg,
927+
&prepared.file_name,
928+
);
929+
adapted.apply_row_fraction_sampling(
930+
&mut initial_plan,
931+
rg_metadata,
932+
&prepared.file_name,
933+
);
934+
} else {
935+
// Legacy direct-builder path: each method is a no-op when
936+
// its corresponding fraction is `None`.
937+
prepared.sampling.apply_row_group_sampling(
938+
&mut initial_plan,
939+
rg_metadata.len(),
940+
&prepared.file_name,
941+
);
942+
prepared.sampling.apply_row_fraction_sampling(
943+
&mut initial_plan,
944+
rg_metadata,
945+
&prepared.file_name,
946+
);
947+
}
911948

912949
let mut row_groups = RowGroupAccessPlanFilter::new(initial_plan);
913950

@@ -1586,6 +1623,10 @@ fn create_initial_plan(
15861623
Ok(ParquetAccessPlan::new_all(row_group_count))
15871624
}
15881625

1626+
// `apply_row_group_sampling` and `apply_row_fraction_sampling` live
1627+
// in `crate::sampling` so this file stays focused on the opener
1628+
// pipeline.
1629+
15891630
/// Build a page pruning predicate from an optional predicate expression.
15901631
/// If the predicate is None or the predicate cannot be converted to a page pruning
15911632
/// predicate, return None.
@@ -2753,6 +2794,9 @@ mod test {
27532794
);
27542795
}
27552796

2797+
// -- Sampling end-to-end (unit tests of the helpers themselves
2798+
// live in `crate::sampling`) -----------------------------------
2799+
27562800
/// End-to-end: a parquet file with 4 row groups, scanned with
27572801
/// `row_group_fraction = 0.5`, should return rows from exactly 2
27582802
/// of the 4 row groups.
@@ -2860,4 +2904,232 @@ mod test {
28602904
"row_fraction=0.1 should yield ~10-12 rows; got {num_rows}"
28612905
);
28622906
}
2907+
2908+
/// End-to-end for the SYSTEM-mode adaptive split on a *single*
2909+
/// row group: the file axis is fixed (handled at try_push_sample),
2910+
/// the row-group axis can't reduce, so the opener should apply the
2911+
/// full residual fraction at the row level. A naïve implementation
2912+
/// that always splits as `sqrt(remaining)` between row-group and
2913+
/// row would only achieve `sqrt(0.1) ≈ 32%` here.
2914+
#[tokio::test]
2915+
async fn system_target_remaining_single_row_group() {
2916+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
2917+
2918+
let values: Vec<Option<i32>> = (0..1000).map(Some).collect();
2919+
let batch = record_batch!(("a", Int32, values)).unwrap();
2920+
let schema = batch.schema();
2921+
let data_len =
2922+
write_parquet(Arc::clone(&store), "tr_1rg.parquet", batch.clone()).await;
2923+
let file = PartitionedFile::new(
2924+
"tr_1rg.parquet".to_string(),
2925+
u64::try_from(data_len).unwrap(),
2926+
);
2927+
2928+
let sampling = crate::sampling::ParquetSampling {
2929+
system_target_remaining: Some(0.1),
2930+
row_cluster_size: 4,
2931+
..Default::default()
2932+
};
2933+
2934+
let opener = ParquetMorselizerBuilder::new()
2935+
.with_store(Arc::clone(&store))
2936+
.with_schema(Arc::clone(&schema))
2937+
.with_projection_indices(&[0])
2938+
.with_sampling(sampling)
2939+
.build();
2940+
2941+
let stream = open_file(&opener, file).await.unwrap();
2942+
let (_num_batches, num_rows) = count_batches_and_rows(stream).await;
2943+
2944+
// 10% of 1000 with small clusters = ~100 rows (window padding
2945+
// can push it slightly higher). The key invariant: NOT 32%
2946+
// (sqrt) and NOT 46% (cbrt).
2947+
assert!(
2948+
(50..=150).contains(&num_rows),
2949+
"single-RG SYSTEM(0.1) should hit ~100 rows; got {num_rows} \
2950+
(would be ~316 if split as sqrt, ~464 if split as cbrt)"
2951+
);
2952+
}
2953+
2954+
/// End-to-end for the SYSTEM-mode adaptive split on multiple row
2955+
/// groups: the residual is split as `sqrt` between the row-group
2956+
/// and row axes, so the result is `sqrt(p) × sqrt(p) = p` of the
2957+
/// rows in expectation.
2958+
#[tokio::test]
2959+
async fn system_target_remaining_multi_row_group() {
2960+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
2961+
2962+
// 4 row groups × 250 rows = 1000 rows.
2963+
let batches = (0..4)
2964+
.map(|g| {
2965+
let vals: Vec<Option<i32>> =
2966+
((g * 250)..(g * 250 + 250)).map(Some).collect();
2967+
record_batch!(("a", Int32, vals)).unwrap()
2968+
})
2969+
.collect::<Vec<_>>();
2970+
let schema = batches[0].schema();
2971+
let props = WriterProperties::builder()
2972+
.set_max_row_group_row_count(Some(250))
2973+
.build();
2974+
let data_len = write_parquet_batches(
2975+
Arc::clone(&store),
2976+
"tr_4rg.parquet",
2977+
batches,
2978+
Some(props),
2979+
)
2980+
.await;
2981+
let file = PartitionedFile::new(
2982+
"tr_4rg.parquet".to_string(),
2983+
u64::try_from(data_len).unwrap(),
2984+
);
2985+
2986+
let sampling = crate::sampling::ParquetSampling {
2987+
system_target_remaining: Some(0.25),
2988+
row_cluster_size: 16,
2989+
..Default::default()
2990+
};
2991+
2992+
let opener = ParquetMorselizerBuilder::new()
2993+
.with_store(Arc::clone(&store))
2994+
.with_schema(Arc::clone(&schema))
2995+
.with_projection_indices(&[0])
2996+
.with_sampling(sampling)
2997+
.build();
2998+
2999+
let stream = open_file(&opener, file).await.unwrap();
3000+
let (_num_batches, num_rows) = count_batches_and_rows(stream).await;
3001+
3002+
// sqrt(0.25) = 0.5 at each axis: keep 2 of 4 row groups
3003+
// (= 500 rows under scrutiny), then 50% of those rows
3004+
// = ~250 rows. Window padding can push it higher; assert the
3005+
// upper bound is well below the unsplit 1000.
3006+
assert!(
3007+
(100..=400).contains(&num_rows),
3008+
"multi-RG SYSTEM remaining=0.25 should hit ~250 rows; got {num_rows}"
3009+
);
3010+
}
3011+
3012+
/// REPEATABLE(seed) must produce the same selection regardless of
3013+
/// where the parquet file lives. This is the SQL semantics users
3014+
/// expect from `TABLESAMPLE ... REPEATABLE(n)`.
3015+
#[tokio::test]
3016+
async fn system_target_remaining_repeatable_seed_ignores_file_name() {
3017+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
3018+
3019+
let values: Vec<Option<i32>> = (0..100).map(Some).collect();
3020+
let batch = record_batch!(("a", Int32, values)).unwrap();
3021+
let schema = batch.schema();
3022+
3023+
// Two files with different names but identical content.
3024+
let len_a =
3025+
write_parquet(Arc::clone(&store), "first.parquet", batch.clone()).await;
3026+
let len_b =
3027+
write_parquet(Arc::clone(&store), "second_path.parquet", batch.clone()).await;
3028+
3029+
let sampling = crate::sampling::ParquetSampling {
3030+
system_target_remaining: Some(0.5),
3031+
row_cluster_size: 4,
3032+
seed: Some(42),
3033+
..Default::default()
3034+
};
3035+
3036+
let opener = ParquetMorselizerBuilder::new()
3037+
.with_store(Arc::clone(&store))
3038+
.with_schema(Arc::clone(&schema))
3039+
.with_projection_indices(&[0])
3040+
.with_sampling(sampling)
3041+
.build();
3042+
3043+
let stream_a = open_file(
3044+
&opener,
3045+
PartitionedFile::new(
3046+
"first.parquet".to_string(),
3047+
u64::try_from(len_a).unwrap(),
3048+
),
3049+
)
3050+
.await
3051+
.unwrap();
3052+
let stream_b = open_file(
3053+
&opener,
3054+
PartitionedFile::new(
3055+
"second_path.parquet".to_string(),
3056+
u64::try_from(len_b).unwrap(),
3057+
),
3058+
)
3059+
.await
3060+
.unwrap();
3061+
3062+
let rows_a = collect_values(stream_a).await;
3063+
let rows_b = collect_values(stream_b).await;
3064+
3065+
assert_eq!(
3066+
rows_a, rows_b,
3067+
"REPEATABLE(seed) must select the same rows regardless of file path"
3068+
);
3069+
assert!(
3070+
!rows_a.is_empty() && rows_a.len() < 100,
3071+
"expected a strict subset; got {} rows",
3072+
rows_a.len()
3073+
);
3074+
3075+
// Without a seed the selection must depend on the file name —
3076+
// otherwise unrelated parquet files in the same scan would all
3077+
// produce correlated samples, defeating the purpose of file-
3078+
// axis randomisation.
3079+
let unseeded_sampling = crate::sampling::ParquetSampling {
3080+
system_target_remaining: Some(0.5),
3081+
row_cluster_size: 4,
3082+
..Default::default()
3083+
};
3084+
let unseeded_opener = ParquetMorselizerBuilder::new()
3085+
.with_store(Arc::clone(&store))
3086+
.with_schema(Arc::clone(&schema))
3087+
.with_projection_indices(&[0])
3088+
.with_sampling(unseeded_sampling)
3089+
.build();
3090+
let stream_a2 = open_file(
3091+
&unseeded_opener,
3092+
PartitionedFile::new(
3093+
"first.parquet".to_string(),
3094+
u64::try_from(len_a).unwrap(),
3095+
),
3096+
)
3097+
.await
3098+
.unwrap();
3099+
let stream_b2 = open_file(
3100+
&unseeded_opener,
3101+
PartitionedFile::new(
3102+
"second_path.parquet".to_string(),
3103+
u64::try_from(len_b).unwrap(),
3104+
),
3105+
)
3106+
.await
3107+
.unwrap();
3108+
let rows_a2 = collect_values(stream_a2).await;
3109+
let rows_b2 = collect_values(stream_b2).await;
3110+
assert_ne!(
3111+
rows_a2, rows_b2,
3112+
"without a seed, different file names should produce different samples"
3113+
);
3114+
}
3115+
3116+
/// Helper: pull an `i32` column out of a sampled stream.
3117+
async fn collect_values(
3118+
mut stream: BoxStream<'static, Result<RecordBatch>>,
3119+
) -> Vec<i32> {
3120+
use futures::StreamExt;
3121+
let mut out = Vec::new();
3122+
while let Some(batch) = stream.next().await {
3123+
let batch = batch.unwrap();
3124+
let col = batch
3125+
.column(0)
3126+
.as_any()
3127+
.downcast_ref::<arrow::array::Int32Array>()
3128+
.unwrap();
3129+
for i in 0..col.len() {
3130+
out.push(col.value(i));
3131+
}
3132+
}
3133+
out
3134+
}
28633135
}

0 commit comments

Comments
 (0)