Skip to content

Commit 2026412

Browse files
adriangbclaude
andcommitted
feat(parquet): row-group and row-range sampling on ParquetSource
Adds two opt-in sampling primitives to parquet scans, both built on the existing `ParquetAccessPlan` infrastructure: * `ParquetSource::with_row_group_sampling(fraction)` — keep `fraction` of row groups in each scanned file. Selection is deferred until the opener has loaded the parquet footer (so we sample by real row-group index, not guess) and is deterministic per `(file_name, row_group_count, fraction)` via a seeded `SmallRng`. * `ParquetSource::with_row_fraction(fraction)` — within each kept row group, keep `fraction` of rows by translating to a `RowSelection` of K small contiguous windows (size controlled by `with_row_cluster_size`, default 32 768 rows). The parquet reader uses the page index to read only the data pages covering the selected rows, so this gives "page-level" IO savings without requiring per-column page alignment. Falls back gracefully (no IO win, still correct) when the page index is missing. The two layers compose: scanning with both `row_group_fraction=0.1` and `row_fraction=0.1` reads ~1% of the rows in ~10% of the row groups, with windows spread out so the sample isn't clustered at one end of each row group. Selection within a row group is deterministic-but-random per `(file_name, row_group_index, fraction, cluster_size)` — same inputs yield the same windows, so re-runs are repeatable. ## Why this lives on `ParquetSource` The natural entry-point for "I want a sample" is at config time, before any metadata IO. The actual *which* row groups / *which* rows selection still has to be deferred to the opener (after the footer is parsed) — that's why `ParquetSampling` carries fractions plus a cluster size, and the opener pulls them through to its lazy decision points. This is intentionally orthogonal to file-level sampling: `ParquetSource` doesn't own the file list (`FileScanConfig.file_groups` does), so a file-fraction setter here would have been a confusing no-op. Callers that want to drop files should rebuild the `FileScanConfig` directly. ## Use cases * `TABLESAMPLE` SQL syntax (any future implementation can lower to these primitives). * Ad-hoc data exploration / `EXPLAIN ANALYZE` against a sample. * Mini-query-style stats sampling (a layered helper can call these to bound the cost of computing approximate min/max/NDV/histograms for the optimizer — out of scope here, see the linked POC in the PR description). * `EXPLAIN ANALYZE`-driven debug runs against a representative slice. ## Tests 5 unit tests on `apply_row_group_sampling` (target count, determinism, file-name dependence, no-op at fraction=1.0, target floor of 1) plus 2 end-to-end tests that build a real parquet file in `InMemory` object store and confirm the row counts emitted are what the sampling implies. `cargo build --workspace`, `cargo fmt --all`, and `cargo clippy -p datafusion-datasource-parquet --all-targets -- -D warnings` are clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9a29e33 commit 2026412

6 files changed

Lines changed: 502 additions & 2 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/datasource-parquet/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ log = { workspace = true }
5353
object_store = { workspace = true }
5454
parking_lot = { workspace = true }
5555
parquet = { workspace = true }
56+
rand = { workspace = true, features = ["small_rng"] }
5657
tokio = { workspace = true }
5758

5859
[dev-dependencies]

datafusion/datasource-parquet/src/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ mod page_filter;
3333
mod reader;
3434
mod row_filter;
3535
mod row_group_filter;
36+
mod sampling;
3637
mod sort;
3738
pub mod source;
3839
mod supported_predicates;
@@ -46,4 +47,5 @@ pub use reader::*; // Expose so downstream crates can use it
4647
pub use row_filter::build_row_filter;
4748
pub use row_filter::can_expr_be_pushed_down_with_schemas;
4849
pub use row_group_filter::RowGroupAccessPlanFilter;
50+
pub use sampling::ParquetSampling;
4951
pub use writer::plan_to_parquet;

datafusion/datasource-parquet/src/opener.rs

Lines changed: 142 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ pub(super) struct ParquetMorselizer {
136136
pub max_predicate_cache_size: Option<usize>,
137137
/// Whether to read row groups in reverse order
138138
pub reverse_row_groups: bool,
139+
/// Sampling config carried from `ParquetSource`. Applied lazily
140+
/// inside the opener once the parquet metadata is available.
141+
pub sampling: crate::sampling::ParquetSampling,
139142
}
140143

141144
impl fmt::Debug for ParquetMorselizer {
@@ -287,6 +290,7 @@ struct PreparedParquetOpen {
287290
max_predicate_cache_size: Option<usize>,
288291
reverse_row_groups: bool,
289292
preserve_order: bool,
293+
sampling: crate::sampling::ParquetSampling,
290294
#[cfg(feature = "parquet_encryption")]
291295
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
292296
}
@@ -656,6 +660,7 @@ impl ParquetMorselizer {
656660
max_predicate_cache_size: self.max_predicate_cache_size,
657661
reverse_row_groups: self.reverse_row_groups,
658662
preserve_order: self.preserve_order,
663+
sampling: self.sampling.clone(),
659664
#[cfg(feature = "parquet_encryption")]
660665
file_decryption_properties: None,
661666
})
@@ -882,11 +887,29 @@ impl FiltersPreparedParquetOpen {
882887

883888
// Determine which row groups to actually read. The idea is to skip
884889
// as many row groups as possible based on the metadata and query
885-
let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan(
890+
let mut initial_plan = create_initial_plan(
886891
&prepared.file_name,
887892
prepared.extensions.clone(),
888893
rg_metadata.len(),
889-
)?);
894+
)?;
895+
896+
// Apply optional row-group and row-range sampling now that we
897+
// know the actual row-group count. Both calls are no-ops when
898+
// their respective fraction is `None`. Selection is
899+
// deterministic per `(file_name, row_group_index, fraction,
900+
// cluster_size)` so re-runs match.
901+
prepared.sampling.apply_row_group_sampling(
902+
&mut initial_plan,
903+
rg_metadata.len(),
904+
&prepared.file_name,
905+
);
906+
prepared.sampling.apply_row_fraction_sampling(
907+
&mut initial_plan,
908+
rg_metadata,
909+
&prepared.file_name,
910+
);
911+
912+
let mut row_groups = RowGroupAccessPlanFilter::new(initial_plan);
890913

891914
// If there is a range restricting what parts of the file to read
892915
if let Some(range) = prepared.file_range.as_ref() {
@@ -1676,6 +1699,7 @@ mod test {
16761699
max_predicate_cache_size: Option<usize>,
16771700
reverse_row_groups: bool,
16781701
preserve_order: bool,
1702+
sampling: crate::sampling::ParquetSampling,
16791703
}
16801704

16811705
impl ParquetMorselizerBuilder {
@@ -1702,9 +1726,16 @@ mod test {
17021726
max_predicate_cache_size: None,
17031727
reverse_row_groups: false,
17041728
preserve_order: false,
1729+
sampling: crate::sampling::ParquetSampling::default(),
17051730
}
17061731
}
17071732

1733+
/// Set the sampling config.
1734+
fn with_sampling(mut self, sampling: crate::sampling::ParquetSampling) -> Self {
1735+
self.sampling = sampling;
1736+
self
1737+
}
1738+
17081739
/// Set the object store (required for building).
17091740
fn with_store(mut self, store: Arc<dyn ObjectStore>) -> Self {
17101741
self.store = Some(store);
@@ -1816,6 +1847,7 @@ mod test {
18161847
encryption_factory: None,
18171848
max_predicate_cache_size: self.max_predicate_cache_size,
18181849
reverse_row_groups: self.reverse_row_groups,
1850+
sampling: self.sampling,
18191851
}
18201852
}
18211853
}
@@ -2720,4 +2752,112 @@ mod test {
27202752
"without page index all rows are returned"
27212753
);
27222754
}
2755+
2756+
/// End-to-end: a parquet file with 4 row groups, scanned with
2757+
/// `row_group_fraction = 0.5`, should return rows from exactly 2
2758+
/// of the 4 row groups.
2759+
#[tokio::test]
2760+
async fn row_group_sampling_end_to_end() {
2761+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
2762+
2763+
// 4 row groups of 3 rows each = 12 rows total.
2764+
let batches = (0..4)
2765+
.map(|g| {
2766+
record_batch!((
2767+
"a",
2768+
Int32,
2769+
vec![Some(g * 10 + 1), Some(g * 10 + 2), Some(g * 10 + 3),]
2770+
))
2771+
.unwrap()
2772+
})
2773+
.collect::<Vec<_>>();
2774+
let schema = batches[0].schema();
2775+
let props = WriterProperties::builder()
2776+
.set_max_row_group_row_count(Some(3))
2777+
.build();
2778+
2779+
let data_len = write_parquet_batches(
2780+
Arc::clone(&store),
2781+
"rg_sampled.parquet",
2782+
batches,
2783+
Some(props),
2784+
)
2785+
.await;
2786+
2787+
let file = PartitionedFile::new(
2788+
"rg_sampled.parquet".to_string(),
2789+
u64::try_from(data_len).unwrap(),
2790+
);
2791+
2792+
let sampling = crate::sampling::ParquetSampling {
2793+
row_group_fraction: Some(0.5),
2794+
..Default::default()
2795+
};
2796+
2797+
let opener = ParquetMorselizerBuilder::new()
2798+
.with_store(Arc::clone(&store))
2799+
.with_schema(Arc::clone(&schema))
2800+
.with_projection_indices(&[0])
2801+
.with_sampling(sampling)
2802+
.build();
2803+
2804+
let stream = open_file(&opener, file).await.unwrap();
2805+
let (_num_batches, num_rows) = count_batches_and_rows(stream).await;
2806+
2807+
// ceil(4 * 0.5) = 2 row groups kept, each with 3 rows.
2808+
assert_eq!(
2809+
num_rows, 6,
2810+
"row_group_fraction=0.5 over 4 row groups should yield 2 row groups × 3 rows"
2811+
);
2812+
}
2813+
2814+
/// End-to-end: a single row group of 100 rows scanned with
2815+
/// `row_fraction = 0.1` and the default cluster size should yield
2816+
/// roughly 10 rows. The exact count depends on `ceil(100 * 0.1) =
2817+
/// 10` plus how the windows pack — we assert the count is in the
2818+
/// expected range and significantly less than 100.
2819+
#[tokio::test]
2820+
async fn row_fraction_end_to_end() {
2821+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
2822+
2823+
// One row group of 100 rows so we exercise the per-row-group
2824+
// RowSelection, not the row-group-level skip.
2825+
let values: Vec<Option<i32>> = (0..100).map(Some).collect();
2826+
let batch = record_batch!(("a", Int32, values)).unwrap();
2827+
let schema = batch.schema();
2828+
let data_len =
2829+
write_parquet(Arc::clone(&store), "rf.parquet", batch.clone()).await;
2830+
let file = PartitionedFile::new(
2831+
"rf.parquet".to_string(),
2832+
u64::try_from(data_len).unwrap(),
2833+
);
2834+
2835+
let sampling = crate::sampling::ParquetSampling {
2836+
row_fraction: Some(0.1),
2837+
row_cluster_size: 4, // small cluster -> several windows
2838+
..Default::default()
2839+
};
2840+
2841+
let opener = ParquetMorselizerBuilder::new()
2842+
.with_store(Arc::clone(&store))
2843+
.with_schema(Arc::clone(&schema))
2844+
.with_projection_indices(&[0])
2845+
.with_sampling(sampling)
2846+
.build();
2847+
2848+
let stream = open_file(&opener, file).await.unwrap();
2849+
let (_num_batches, num_rows) = count_batches_and_rows(stream).await;
2850+
2851+
// We asked for ~10% of 100 rows. ceil(10 / cluster=4) = 3
2852+
// windows of ceil(10/3)=4 rows each, capped at the total ->
2853+
// up to 12 rows in practice. Assert the bounds.
2854+
assert!(
2855+
(1..100).contains(&num_rows),
2856+
"row_fraction=0.1 should drop the vast majority of rows; got {num_rows}"
2857+
);
2858+
assert!(
2859+
num_rows <= 16,
2860+
"row_fraction=0.1 should yield ~10-12 rows; got {num_rows}"
2861+
);
2862+
}
27232863
}

0 commit comments

Comments
 (0)