Skip to content

Commit ec48974

Browse files
adriangbclaude
andcommitted
refactor(parquet sampling): address PR apache#22000 review feedback
Two changes responding to review on the parent commit: 1. Key sampling on a stable `file_index` instead of `file_name` (apache#22000 (comment)). Both `apply_row_group_sampling` and `apply_row_fraction_sampling` now take `file_index: usize` rather than `file_name: &str`. The parquet opener passes the execution `partition_index`. This makes sampling reproducible across environments (no dependency on the on-disk path), while still decorrelating files assigned to different partitions. 2. Extract the row-window selection into `build_row_window_selectors` and add fuzz coverage (apache#22000 (comment)). The previous inline arithmetic could produce overlapping windows when `target_rows` was close to `total_rows`: `window_size = ceil(target / n_windows)` could exceed `stride = total / n_windows`, so adjacent strides' windows would intersect. The extracted function caps `window_size` at `stride` (the construction that guarantees disjointness) and is covered by: * `row_window_selection_basic_layout` — hand-checked anchor case. * `row_window_selection_returns_none_on_invalid_input` — degenerate inputs return `None` cleanly. * `row_window_selection_full_target_no_overlap` — the previously buggy `target_rows == total_rows` case. * `row_window_selection_fuzz_invariants` — 5 000 randomized `(total_rows, target_rows, cluster_size, seed)` configurations, asserting full coverage, in-bounds positions, and no overlap. * `row_window_selection_fuzz_determinism` — 1 000 iterations verifying identical seeds produce identical layouts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 30cd44d commit ec48974

2 files changed

Lines changed: 346 additions & 111 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -893,24 +893,24 @@ impl FiltersPreparedParquetOpen {
893893
rg_metadata.len(),
894894
)?;
895895

896-
// SYSTEM-mode adaptive split: when the SamplePushdown rule
897-
// hands us a residual fraction `remaining_p`, choose the
898-
// row-group / row split based on the row-group count we just
899-
// observed. With ≥ 2 row groups we split as `sqrt(remaining)`
900-
// at both axes; with 1 row group we apply the full residual at
901-
// the row level (the row-group axis can't reduce). This keeps
902-
// the expected output close to `p × N_total` even for tiny
903-
// scans where the cube-root math otherwise undershoots
904-
// (single-file / single-row-group inputs would read
905-
// `cbrt(p)` of the rows, ~46% for SYSTEM(10)).
896+
// Apply optional row-group and row-range sampling now that we
897+
// know the actual row-group count. Selection is deterministic
898+
// per `(partition_index, row_group_index, fraction,
899+
// cluster_size)` so re-runs match. The execution
900+
// `partition_index` is the stable per-file id we plumb in: it
901+
// makes sampling reproducible across environments without
902+
// depending on object-store paths, and decorrelates files
903+
// assigned to different partitions.
906904
if let Some(remaining) = prepared.sampling.system_target_remaining {
907-
// SYSTEM-mode adaptive split: choose the row-group / row
908-
// axes based on the row-group count we just observed. With
909-
// ≥ 2 row groups split as `sqrt(remaining)` at both axes;
910-
// with 1 row group skip row-group sampling and apply the
911-
// full residual at the row level. Without this adaptation
912-
// a single-file / single-row-group scan would only reach
913-
// `cbrt(remaining)` of the rows (~46% for SYSTEM(10)).
905+
// SYSTEM-mode adaptive split: when the SamplePushdown rule
906+
// hands us a residual fraction `remaining`, choose the
907+
// row-group / row split based on the row-group count we
908+
// just observed. With ≥ 2 row groups split as
909+
// `sqrt(remaining)` at both axes; with 1 row group skip
910+
// row-group sampling and apply the full residual at the
911+
// row level. Without this adaptation a single-file /
912+
// single-row-group scan would only reach `cbrt(remaining)`
913+
// of the rows (~46% for SYSTEM(10)).
914914
let n_rg = rg_metadata.len();
915915
let mut adapted = prepared.sampling.clone();
916916
if n_rg >= 2 {
@@ -924,25 +924,25 @@ impl FiltersPreparedParquetOpen {
924924
adapted.apply_row_group_sampling(
925925
&mut initial_plan,
926926
n_rg,
927-
&prepared.file_name,
927+
prepared.partition_index,
928928
);
929929
adapted.apply_row_fraction_sampling(
930930
&mut initial_plan,
931931
rg_metadata,
932-
&prepared.file_name,
932+
prepared.partition_index,
933933
);
934934
} else {
935935
// Legacy direct-builder path: each method is a no-op when
936936
// its corresponding fraction is `None`.
937937
prepared.sampling.apply_row_group_sampling(
938938
&mut initial_plan,
939939
rg_metadata.len(),
940-
&prepared.file_name,
940+
prepared.partition_index,
941941
);
942942
prepared.sampling.apply_row_fraction_sampling(
943943
&mut initial_plan,
944944
rg_metadata,
945-
&prepared.file_name,
945+
prepared.partition_index,
946946
);
947947
}
948948

0 commit comments

Comments
 (0)