Skip to content

Commit 75edac6

Browse files
adriangbclaude
andcommitted
adaptive cube-root: don't undershoot small parquet scans
The fixed cube-root split read `cbrt(p) ≈ 46%` of the rows for a single-file `SYSTEM(10)` because the file and row-group axes had no capacity to reduce. The user-visible behaviour was: ask for 10%, read 46%. Adapt the split based on the actual axis cardinality: - `try_push_sample` (knows num_files): when num_files == 1, hand the entire fraction to the opener via a new `ParquetSampling.system_target_remaining`. When num_files >= 2, drop ~cbrt(p) of files and pass `p × num_files / target_files` as the residual (compensates for the file-level rounding too). - Opener (knows row_group count per file): when remaining is set and n_row_groups >= 2, split as sqrt(remaining) at row-group and row axes; when n_row_groups == 1, apply the full residual at the row level. Falls back to legacy explicit fractions when `system_target_remaining` is None — direct callers of `with_row_group_sampling` / `with_row_fraction` are unaffected. - EXPLAIN now surfaces `sample_system_target_remaining` for the pushdown path. Also: - 7 unit tests on `try_push_sample` updated for the new contract + new tests for the adaptive single-file case and rounding compensation. - 2 new opener end-to-end tests covering the single-RG and multi-RG adaptive splits. - SLT updated: `SYSTEM(50) REPEATABLE(42)` on a 1024-row single-file / single-RG input now returns 512 (was 813), the EXPLAIN line shows `sample_system_target_remaining=0.5000`. - SLT error message: "could not be pushed" → "is not supported". - User-guide docs updated to describe the adaptive split. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9a88f04 commit 75edac6

5 files changed

Lines changed: 344 additions & 111 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 167 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -893,32 +893,70 @@ impl FiltersPreparedParquetOpen {
893893
rg_metadata.len(),
894894
)?;
895895

896-
// Apply optional row-group level sampling now that we know the
897-
// actual row-group count. Selection is deterministic per
898-
// `(file_name, row_group_count, fraction)` so re-runs match.
899-
if let Some(fraction) = prepared.sampling.row_group_fraction {
900-
apply_row_group_sampling(
901-
&mut initial_plan,
902-
rg_metadata.len(),
903-
fraction,
904-
&prepared.file_name,
905-
);
906-
}
896+
// SYSTEM-mode adaptive split: when the SamplePushdown rule
897+
// hands us a residual fraction `remaining_p`, choose the
898+
// row-group / row split based on the row-group count we just
899+
// observed. With ≥ 2 row groups we split as `sqrt(remaining)`
900+
// at both axes; with 1 row group we apply the full residual at
901+
// the row level (the row-group axis can't reduce). This keeps
902+
// the expected output close to `p × N_total` even for tiny
903+
// scans where the cube-root math otherwise undershoots
904+
// (single-file / single-row-group inputs would read
905+
// `cbrt(p)` of the rows, ~46% for SYSTEM(10)).
906+
if let Some(remaining) = prepared.sampling.system_target_remaining {
907+
let n_rg = rg_metadata.len();
908+
if n_rg >= 2 {
909+
let q = remaining.sqrt();
910+
apply_row_group_sampling(&mut initial_plan, n_rg, q, &prepared.file_name);
911+
apply_row_fraction_sampling(
912+
&mut initial_plan,
913+
rg_metadata,
914+
q,
915+
prepared.sampling.row_cluster_size,
916+
&prepared.file_name,
917+
);
918+
} else {
919+
apply_row_fraction_sampling(
920+
&mut initial_plan,
921+
rg_metadata,
922+
remaining,
923+
prepared.sampling.row_cluster_size,
924+
&prepared.file_name,
925+
);
926+
}
927+
} else {
928+
// Legacy direct-builder path (no SYSTEM target set):
929+
// honour the explicit per-axis fractions independently.
930+
931+
// Apply optional row-group level sampling now that we know
932+
// the actual row-group count. Selection is deterministic
933+
// per `(file_name, row_group_count, fraction)` so re-runs
934+
// match.
935+
if let Some(fraction) = prepared.sampling.row_group_fraction {
936+
apply_row_group_sampling(
937+
&mut initial_plan,
938+
rg_metadata.len(),
939+
fraction,
940+
&prepared.file_name,
941+
);
942+
}
907943

908-
// Apply optional row-range sampling within each kept row group.
909-
// Each row group still marked `Scan` is downgraded to a
910-
// `Selection` covering ~`fraction` of the rows in K spread-out
911-
// windows. The parquet reader uses the page index to read only
912-
// the data pages that overlap the selection, giving page-level
913-
// IO savings without requiring per-column page alignment.
914-
if let Some(fraction) = prepared.sampling.row_fraction {
915-
apply_row_fraction_sampling(
916-
&mut initial_plan,
917-
rg_metadata,
918-
fraction,
919-
prepared.sampling.row_cluster_size,
920-
&prepared.file_name,
921-
);
944+
// Apply optional row-range sampling within each kept row
945+
// group. Each row group still marked `Scan` is downgraded
946+
// to a `Selection` covering ~`fraction` of the rows in K
947+
// spread-out windows. The parquet reader uses the page
948+
// index to read only the data pages that overlap the
949+
// selection, giving page-level IO savings without
950+
// requiring per-column page alignment.
951+
if let Some(fraction) = prepared.sampling.row_fraction {
952+
apply_row_fraction_sampling(
953+
&mut initial_plan,
954+
rg_metadata,
955+
fraction,
956+
prepared.sampling.row_cluster_size,
957+
&prepared.file_name,
958+
);
959+
}
922960
}
923961

924962
let mut row_groups = RowGroupAccessPlanFilter::new(initial_plan);
@@ -3056,4 +3094,108 @@ mod test {
30563094
"row_fraction=0.1 should yield ~10-12 rows; got {num_rows}"
30573095
);
30583096
}
3097+
3098+
/// End-to-end for the SYSTEM-mode adaptive split on a *single*
3099+
/// row group: the file axis is fixed (handled at try_push_sample),
3100+
/// the row-group axis can't reduce, so the opener should apply the
3101+
/// full residual fraction at the row level. A naïve implementation
3102+
/// that always splits as `sqrt(remaining)` between row-group and
3103+
/// row would only achieve `sqrt(0.1) ≈ 32%` here.
3104+
#[tokio::test]
3105+
async fn system_target_remaining_single_row_group() {
3106+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
3107+
3108+
let values: Vec<Option<i32>> = (0..1000).map(Some).collect();
3109+
let batch = record_batch!(("a", Int32, values)).unwrap();
3110+
let schema = batch.schema();
3111+
let data_len =
3112+
write_parquet(Arc::clone(&store), "tr_1rg.parquet", batch.clone()).await;
3113+
let file = PartitionedFile::new(
3114+
"tr_1rg.parquet".to_string(),
3115+
u64::try_from(data_len).unwrap(),
3116+
);
3117+
3118+
let sampling = crate::source::ParquetSampling {
3119+
system_target_remaining: Some(0.1),
3120+
row_cluster_size: 4,
3121+
..Default::default()
3122+
};
3123+
3124+
let opener = ParquetMorselizerBuilder::new()
3125+
.with_store(Arc::clone(&store))
3126+
.with_schema(Arc::clone(&schema))
3127+
.with_projection_indices(&[0])
3128+
.with_sampling(sampling)
3129+
.build();
3130+
3131+
let stream = open_file(&opener, file).await.unwrap();
3132+
let (_num_batches, num_rows) = count_batches_and_rows(stream).await;
3133+
3134+
// 10% of 1000 with small clusters = ~100 rows (window padding
3135+
// can push it slightly higher). The key invariant: NOT 32%
3136+
// (sqrt) and NOT 46% (cbrt).
3137+
assert!(
3138+
(50..=150).contains(&num_rows),
3139+
"single-RG SYSTEM(0.1) should hit ~100 rows; got {num_rows} \
3140+
(would be ~316 if split as sqrt, ~464 if split as cbrt)"
3141+
);
3142+
}
3143+
3144+
/// End-to-end for the SYSTEM-mode adaptive split on multiple row
3145+
/// groups: the residual is split as `sqrt` between the row-group
3146+
/// and row axes, so the result is `sqrt(p) × sqrt(p) = p` of the
3147+
/// rows in expectation.
3148+
#[tokio::test]
3149+
async fn system_target_remaining_multi_row_group() {
3150+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
3151+
3152+
// 4 row groups × 250 rows = 1000 rows.
3153+
let batches = (0..4)
3154+
.map(|g| {
3155+
let vals: Vec<Option<i32>> =
3156+
((g * 250)..(g * 250 + 250)).map(Some).collect();
3157+
record_batch!(("a", Int32, vals)).unwrap()
3158+
})
3159+
.collect::<Vec<_>>();
3160+
let schema = batches[0].schema();
3161+
let props = WriterProperties::builder()
3162+
.set_max_row_group_row_count(Some(250))
3163+
.build();
3164+
let data_len = write_parquet_batches(
3165+
Arc::clone(&store),
3166+
"tr_4rg.parquet",
3167+
batches,
3168+
Some(props),
3169+
)
3170+
.await;
3171+
let file = PartitionedFile::new(
3172+
"tr_4rg.parquet".to_string(),
3173+
u64::try_from(data_len).unwrap(),
3174+
);
3175+
3176+
let sampling = crate::source::ParquetSampling {
3177+
system_target_remaining: Some(0.25),
3178+
row_cluster_size: 16,
3179+
..Default::default()
3180+
};
3181+
3182+
let opener = ParquetMorselizerBuilder::new()
3183+
.with_store(Arc::clone(&store))
3184+
.with_schema(Arc::clone(&schema))
3185+
.with_projection_indices(&[0])
3186+
.with_sampling(sampling)
3187+
.build();
3188+
3189+
let stream = open_file(&opener, file).await.unwrap();
3190+
let (_num_batches, num_rows) = count_batches_and_rows(stream).await;
3191+
3192+
// sqrt(0.25) = 0.5 at each axis: keep 2 of 4 row groups
3193+
// (= 500 rows under scrutiny), then 50% of those rows
3194+
// = ~250 rows. Window padding can push it higher; assert the
3195+
// upper bound is well below the unsplit 1000.
3196+
assert!(
3197+
(100..=400).contains(&num_rows),
3198+
"multi-RG SYSTEM remaining=0.25 should hit ~250 rows; got {num_rows}"
3199+
);
3200+
}
30593201
}

0 commit comments

Comments
 (0)