Skip to content

Commit 74914e0

Browse files
adriangbclaude
andcommitted
feat: SamplePushdown rule + Sample logical/physical nodes for parquet
Adds the infrastructure for pushing TABLESAMPLE-shaped sampling into file sources, with parquet as the first absorbing source. There is no SQL surface yet; this commit only ships the primitives. Wiring a RelationPlanner / ExtensionPlanner so it works out of the box from SQL is a follow-up. - `Sample` `UserDefinedLogicalNodeCore` extension node in `datafusion-expr` (`logical_plan/sample.rs`). Schema-preserving; validates `fraction ∈ (0, 1]`. Currently encodes `SampleMethod::System` only. - `SampleExec` placeholder in `datafusion-physical-plan`. Errors at `execute` (it's a marker — the `SamplePushdown` rule is expected to remove it). Implements filter / sort pushdown passthrough so unrelated optimizer rules see straight through it. - New `try_push_sample` method on `ExecutionPlan` and `FileSource`, returning `Absorbed { inner }` / `Passthrough` / `Unsupported { reason }`. Default is `Unsupported`; per-node `Passthrough` overrides on filter, projection, coalesce_batches, coalesce_partitions, repartition, and non-fetch sort. - `ParquetSource::try_push_sample` runs the (intentionally private) hierarchical block-level reduction across files / row groups / rows, with adaptive collapse when an axis can't reduce. Coordinates with the opener via a `pub(crate)` `system_target_remaining` field on `ParquetSampling`. Single-file, single-row-group inputs hit ~p × N rows instead of undershooting at p^(1/3) × N. - `SamplePushdown` optimizer rule (between `PushdownSort` and `EnsureCooperative`) walks top-down. On `Absorbed` it replaces `SampleExec` with the rebuilt source; on `Passthrough` it pushes through the single-child node and recurses; on `Unsupported` it errors at planning time with `"TABLESAMPLE is not supported for this source"`. There is intentionally no generic post-scan `SampleExec` yet. - EXPLAIN visibility: `ParquetSource::fmt_extra` surfaces `sample_system_target_remaining` when set. - `optimizer_rule_reference.md` updated to list `SamplePushdown` in the documented rule order. - `explain.slt` updated with `physical_plan after SamplePushdown SAME TEXT AS ABOVE` lines under each verbose-explain test. Tests: 7 unit tests on `ParquetSource::try_push_sample` covering the pushdown contract (full / single-file / multi-file / target clamping / REPEATABLE determinism / multi-file rounding compensation), and 2 opener end-to-end tests covering the adaptive split for single vs multi row group inputs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 3d0dc4a commit 74914e0

21 files changed

Lines changed: 1414 additions & 30 deletions

File tree

datafusion/core/src/optimizer_rule_reference.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ in multiple phases.
8888
| 17 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
8989
| 18 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
9090
| 19 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
91-
| 20 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
92-
| 21 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
93-
| 22 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |
91+
| 20 | `SamplePushdown` | - | Pushes `TABLESAMPLE` into the source; errors at planning time if the sample can't be absorbed. |
92+
| 21 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
93+
| 22 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
94+
| 23 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |

datafusion/datasource-parquet/src/opener.rs

Lines changed: 167 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -893,32 +893,70 @@ impl FiltersPreparedParquetOpen {
893893
rg_metadata.len(),
894894
)?;
895895

896-
// Apply optional row-group level sampling now that we know the
897-
// actual row-group count. Selection is deterministic per
898-
// `(file_name, row_group_count, fraction)` so re-runs match.
899-
if let Some(fraction) = prepared.sampling.row_group_fraction {
900-
apply_row_group_sampling(
901-
&mut initial_plan,
902-
rg_metadata.len(),
903-
fraction,
904-
&prepared.file_name,
905-
);
906-
}
896+
// SYSTEM-mode adaptive split: when the SamplePushdown rule
897+
// hands us a residual fraction `remaining_p`, choose the
898+
// row-group / row split based on the row-group count we just
899+
// observed. With ≥ 2 row groups we split as `sqrt(remaining)`
900+
// at both axes; with 1 row group we apply the full residual at
901+
// the row level (the row-group axis can't reduce). This keeps
902+
// the expected output close to `p × N_total` even for tiny
903+
// scans where the cube-root math otherwise undershoots
904+
// (single-file / single-row-group inputs would read
905+
// `cbrt(p)` of the rows, ~46% for SYSTEM(10)).
906+
if let Some(remaining) = prepared.sampling.system_target_remaining {
907+
let n_rg = rg_metadata.len();
908+
if n_rg >= 2 {
909+
let q = remaining.sqrt();
910+
apply_row_group_sampling(&mut initial_plan, n_rg, q, &prepared.file_name);
911+
apply_row_fraction_sampling(
912+
&mut initial_plan,
913+
rg_metadata,
914+
q,
915+
prepared.sampling.row_cluster_size,
916+
&prepared.file_name,
917+
);
918+
} else {
919+
apply_row_fraction_sampling(
920+
&mut initial_plan,
921+
rg_metadata,
922+
remaining,
923+
prepared.sampling.row_cluster_size,
924+
&prepared.file_name,
925+
);
926+
}
927+
} else {
928+
// Legacy direct-builder path (no SYSTEM target set):
929+
// honour the explicit per-axis fractions independently.
930+
931+
// Apply optional row-group level sampling now that we know
932+
// the actual row-group count. Selection is deterministic
933+
// per `(file_name, row_group_count, fraction)` so re-runs
934+
// match.
935+
if let Some(fraction) = prepared.sampling.row_group_fraction {
936+
apply_row_group_sampling(
937+
&mut initial_plan,
938+
rg_metadata.len(),
939+
fraction,
940+
&prepared.file_name,
941+
);
942+
}
907943

908-
// Apply optional row-range sampling within each kept row group.
909-
// Each row group still marked `Scan` is downgraded to a
910-
// `Selection` covering ~`fraction` of the rows in K spread-out
911-
// windows. The parquet reader uses the page index to read only
912-
// the data pages that overlap the selection, giving page-level
913-
// IO savings without requiring per-column page alignment.
914-
if let Some(fraction) = prepared.sampling.row_fraction {
915-
apply_row_fraction_sampling(
916-
&mut initial_plan,
917-
rg_metadata,
918-
fraction,
919-
prepared.sampling.row_cluster_size,
920-
&prepared.file_name,
921-
);
944+
// Apply optional row-range sampling within each kept row
945+
// group. Each row group still marked `Scan` is downgraded
946+
// to a `Selection` covering ~`fraction` of the rows in K
947+
// spread-out windows. The parquet reader uses the page
948+
// index to read only the data pages that overlap the
949+
// selection, giving page-level IO savings without
950+
// requiring per-column page alignment.
951+
if let Some(fraction) = prepared.sampling.row_fraction {
952+
apply_row_fraction_sampling(
953+
&mut initial_plan,
954+
rg_metadata,
955+
fraction,
956+
prepared.sampling.row_cluster_size,
957+
&prepared.file_name,
958+
);
959+
}
922960
}
923961

924962
let mut row_groups = RowGroupAccessPlanFilter::new(initial_plan);
@@ -3056,4 +3094,108 @@ mod test {
30563094
"row_fraction=0.1 should yield ~10-12 rows; got {num_rows}"
30573095
);
30583096
}
3097+
3098+
/// End-to-end for the SYSTEM-mode adaptive split on a *single*
3099+
/// row group: the file axis is fixed (handled at try_push_sample),
3100+
/// the row-group axis can't reduce, so the opener should apply the
3101+
/// full residual fraction at the row level. A naïve implementation
3102+
/// that always splits as `sqrt(remaining)` between row-group and
3103+
/// row would only achieve `sqrt(0.1) ≈ 32%` here.
3104+
#[tokio::test]
3105+
async fn system_target_remaining_single_row_group() {
3106+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
3107+
3108+
let values: Vec<Option<i32>> = (0..1000).map(Some).collect();
3109+
let batch = record_batch!(("a", Int32, values)).unwrap();
3110+
let schema = batch.schema();
3111+
let data_len =
3112+
write_parquet(Arc::clone(&store), "tr_1rg.parquet", batch.clone()).await;
3113+
let file = PartitionedFile::new(
3114+
"tr_1rg.parquet".to_string(),
3115+
u64::try_from(data_len).unwrap(),
3116+
);
3117+
3118+
let sampling = crate::source::ParquetSampling {
3119+
system_target_remaining: Some(0.1),
3120+
row_cluster_size: 4,
3121+
..Default::default()
3122+
};
3123+
3124+
let opener = ParquetMorselizerBuilder::new()
3125+
.with_store(Arc::clone(&store))
3126+
.with_schema(Arc::clone(&schema))
3127+
.with_projection_indices(&[0])
3128+
.with_sampling(sampling)
3129+
.build();
3130+
3131+
let stream = open_file(&opener, file).await.unwrap();
3132+
let (_num_batches, num_rows) = count_batches_and_rows(stream).await;
3133+
3134+
// 10% of 1000 with small clusters = ~100 rows (window padding
3135+
// can push it slightly higher). The key invariant: NOT 32%
3136+
// (sqrt) and NOT 46% (cbrt).
3137+
assert!(
3138+
(50..=150).contains(&num_rows),
3139+
"single-RG SYSTEM(0.1) should hit ~100 rows; got {num_rows} \
3140+
(would be ~316 if split as sqrt, ~464 if split as cbrt)"
3141+
);
3142+
}
3143+
3144+
/// End-to-end for the SYSTEM-mode adaptive split on multiple row
3145+
/// groups: the residual is split as `sqrt` between the row-group
3146+
/// and row axes, so the result is `sqrt(p) × sqrt(p) = p` of the
3147+
/// rows in expectation.
3148+
#[tokio::test]
3149+
async fn system_target_remaining_multi_row_group() {
3150+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
3151+
3152+
// 4 row groups × 250 rows = 1000 rows.
3153+
let batches = (0..4)
3154+
.map(|g| {
3155+
let vals: Vec<Option<i32>> =
3156+
((g * 250)..(g * 250 + 250)).map(Some).collect();
3157+
record_batch!(("a", Int32, vals)).unwrap()
3158+
})
3159+
.collect::<Vec<_>>();
3160+
let schema = batches[0].schema();
3161+
let props = WriterProperties::builder()
3162+
.set_max_row_group_row_count(Some(250))
3163+
.build();
3164+
let data_len = write_parquet_batches(
3165+
Arc::clone(&store),
3166+
"tr_4rg.parquet",
3167+
batches,
3168+
Some(props),
3169+
)
3170+
.await;
3171+
let file = PartitionedFile::new(
3172+
"tr_4rg.parquet".to_string(),
3173+
u64::try_from(data_len).unwrap(),
3174+
);
3175+
3176+
let sampling = crate::source::ParquetSampling {
3177+
system_target_remaining: Some(0.25),
3178+
row_cluster_size: 16,
3179+
..Default::default()
3180+
};
3181+
3182+
let opener = ParquetMorselizerBuilder::new()
3183+
.with_store(Arc::clone(&store))
3184+
.with_schema(Arc::clone(&schema))
3185+
.with_projection_indices(&[0])
3186+
.with_sampling(sampling)
3187+
.build();
3188+
3189+
let stream = open_file(&opener, file).await.unwrap();
3190+
let (_num_batches, num_rows) = count_batches_and_rows(stream).await;
3191+
3192+
// sqrt(0.25) = 0.5 at each axis: keep 2 of 4 row groups
3193+
// (= 500 rows under scrutiny), then 50% of those rows
3194+
// = ~250 rows. Window padding can push it higher; assert the
3195+
// upper bound is well below the unsplit 1000.
3196+
assert!(
3197+
(100..=400).contains(&num_rows),
3198+
"multi-RG SYSTEM remaining=0.25 should hit ~250 rows; got {num_rows}"
3199+
);
3200+
}
30593201
}

0 commit comments

Comments
 (0)