Skip to content

Commit a79813f

Browse files
adriangbclaude
andcommitted
feat(parquet): row-group and row-range sampling on ParquetSource
Adds two opt-in sampling primitives to parquet scans, both built on the existing `ParquetAccessPlan` infrastructure: * `ParquetSource::with_row_group_sampling(fraction)` — keep `fraction` of row groups in each scanned file. Selection is deferred until the opener has loaded the parquet footer (so we sample by real row-group index, not guess) and is deterministic per `(file_name, row_group_count, fraction)` via a seeded `SmallRng`. * `ParquetSource::with_row_fraction(fraction)` — within each kept row group, keep `fraction` of rows by translating to a `RowSelection` of K small contiguous windows (size controlled by `with_row_cluster_size`, default 32 768 rows). The parquet reader uses the page index to read only the data pages covering the selected rows, so this gives "page-level" IO savings without requiring per-column page alignment. Falls back gracefully (no IO win, still correct) when the page index is missing. The two layers compose: scanning with both `row_group_fraction=0.1` and `row_fraction=0.1` reads ~1% of the rows in ~10% of the row groups, with windows spread out so the sample isn't clustered at one end of each row group. Selection within a row group is deterministic-but-random per `(file_name, row_group_index, fraction, cluster_size)` — same inputs yield the same windows, so re-runs are repeatable. ## Why this lives on `ParquetSource` The natural entry-point for "I want a sample" is at config time, before any metadata IO. The actual *which* row groups / *which* rows selection still has to be deferred to the opener (after the footer is parsed) — that's why `ParquetSampling` carries fractions plus a cluster size, and the opener pulls them through to its lazy decision points. This is intentionally orthogonal to file-level sampling: `ParquetSource` doesn't own the file list (`FileScanConfig.file_groups` does), so a file-fraction setter here would have been a confusing no-op. Callers that want to drop files should rebuild the `FileScanConfig` directly. ## Use cases * `TABLESAMPLE` SQL syntax (any future implementation can lower to these primitives). * Ad-hoc data exploration / `EXPLAIN ANALYZE` against a sample. * Mini-query-style stats sampling (a layered helper can call these to bound the cost of computing approximate min/max/NDV/histograms for the optimizer — out of scope here, see the linked POC in the PR description). * `EXPLAIN ANALYZE`-driven debug runs against a representative slice. ## Tests 5 unit tests on `apply_row_group_sampling` (target count, determinism, file-name dependence, no-op at fraction=1.0, target floor of 1) plus 2 end-to-end tests that build a real parquet file in `InMemory` object store and confirm the row counts emitted are what the sampling implies. `cargo build --workspace`, `cargo fmt --all`, and `cargo clippy -p datafusion-datasource-parquet --all-targets -- -D warnings` are clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent f043092 commit a79813f

6 files changed

Lines changed: 745 additions & 2 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/datasource-parquet/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ log = { workspace = true }
5353
object_store = { workspace = true }
5454
parking_lot = { workspace = true }
5555
parquet = { workspace = true }
56+
rand = { workspace = true, features = ["small_rng"] }
5657
tokio = { workspace = true }
5758

5859
[dev-dependencies]

datafusion/datasource-parquet/src/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ mod page_filter;
3333
mod reader;
3434
mod row_filter;
3535
mod row_group_filter;
36+
mod sampling;
3637
mod sort;
3738
pub mod source;
3839
mod supported_predicates;
@@ -46,4 +47,5 @@ pub use reader::*; // Expose so downstream crates can use it
4647
pub use row_filter::build_row_filter;
4748
pub use row_filter::can_expr_be_pushed_down_with_schemas;
4849
pub use row_group_filter::RowGroupAccessPlanFilter;
50+
pub use sampling::ParquetSampling;
4951
pub use writer::plan_to_parquet;

datafusion/datasource-parquet/src/opener.rs

Lines changed: 146 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ pub(super) struct ParquetMorselizer {
136136
pub max_predicate_cache_size: Option<usize>,
137137
/// Whether to read row groups in reverse order
138138
pub reverse_row_groups: bool,
139+
/// Sampling config carried from `ParquetSource`. Applied lazily
140+
/// inside the opener once the parquet metadata is available.
141+
pub sampling: crate::sampling::ParquetSampling,
139142
}
140143

141144
impl fmt::Debug for ParquetMorselizer {
@@ -287,6 +290,7 @@ struct PreparedParquetOpen {
287290
max_predicate_cache_size: Option<usize>,
288291
reverse_row_groups: bool,
289292
preserve_order: bool,
293+
sampling: crate::sampling::ParquetSampling,
290294
#[cfg(feature = "parquet_encryption")]
291295
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
292296
}
@@ -656,6 +660,7 @@ impl ParquetMorselizer {
656660
max_predicate_cache_size: self.max_predicate_cache_size,
657661
reverse_row_groups: self.reverse_row_groups,
658662
preserve_order: self.preserve_order,
663+
sampling: self.sampling.clone(),
659664
#[cfg(feature = "parquet_encryption")]
660665
file_decryption_properties: None,
661666
})
@@ -882,11 +887,33 @@ impl FiltersPreparedParquetOpen {
882887

883888
// Determine which row groups to actually read. The idea is to skip
884889
// as many row groups as possible based on the metadata and query
885-
let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan(
890+
let mut initial_plan = create_initial_plan(
886891
&prepared.file_name,
887892
prepared.extensions.clone(),
888893
rg_metadata.len(),
889-
)?);
894+
)?;
895+
896+
// Apply optional row-group and row-range sampling now that we
897+
// know the actual row-group count. Both calls are no-ops when
898+
// their respective fraction is `None`. Selection is
899+
// deterministic per `(partition_index, row_group_index,
900+
// fraction, cluster_size)` so re-runs match. The execution
901+
// `partition_index` is the stable per-file id we plumb in:
902+
// it makes sampling reproducible across environments without
903+
// depending on object-store paths, and decorrelates files
904+
// assigned to different partitions.
905+
prepared.sampling.apply_row_group_sampling(
906+
&mut initial_plan,
907+
rg_metadata.len(),
908+
prepared.partition_index,
909+
);
910+
prepared.sampling.apply_row_fraction_sampling(
911+
&mut initial_plan,
912+
rg_metadata,
913+
prepared.partition_index,
914+
);
915+
916+
let mut row_groups = RowGroupAccessPlanFilter::new(initial_plan);
890917

891918
// If there is a range restricting what parts of the file to read
892919
if let Some(range) = prepared.file_range.as_ref() {
@@ -1676,6 +1703,7 @@ mod test {
16761703
max_predicate_cache_size: Option<usize>,
16771704
reverse_row_groups: bool,
16781705
preserve_order: bool,
1706+
sampling: crate::sampling::ParquetSampling,
16791707
}
16801708

16811709
impl ParquetMorselizerBuilder {
@@ -1702,9 +1730,16 @@ mod test {
17021730
max_predicate_cache_size: None,
17031731
reverse_row_groups: false,
17041732
preserve_order: false,
1733+
sampling: crate::sampling::ParquetSampling::default(),
17051734
}
17061735
}
17071736

1737+
/// Set the sampling config.
1738+
fn with_sampling(mut self, sampling: crate::sampling::ParquetSampling) -> Self {
1739+
self.sampling = sampling;
1740+
self
1741+
}
1742+
17081743
/// Set the object store (required for building).
17091744
fn with_store(mut self, store: Arc<dyn ObjectStore>) -> Self {
17101745
self.store = Some(store);
@@ -1816,6 +1851,7 @@ mod test {
18161851
encryption_factory: None,
18171852
max_predicate_cache_size: self.max_predicate_cache_size,
18181853
reverse_row_groups: self.reverse_row_groups,
1854+
sampling: self.sampling,
18191855
}
18201856
}
18211857
}
@@ -2720,4 +2756,112 @@ mod test {
27202756
"without page index all rows are returned"
27212757
);
27222758
}
2759+
2760+
/// End-to-end: a parquet file with 4 row groups, scanned with
2761+
/// `row_group_fraction = 0.5`, should return rows from exactly 2
2762+
/// of the 4 row groups.
2763+
#[tokio::test]
2764+
async fn row_group_sampling_end_to_end() {
2765+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
2766+
2767+
// 4 row groups of 3 rows each = 12 rows total.
2768+
let batches = (0..4)
2769+
.map(|g| {
2770+
record_batch!((
2771+
"a",
2772+
Int32,
2773+
vec![Some(g * 10 + 1), Some(g * 10 + 2), Some(g * 10 + 3),]
2774+
))
2775+
.unwrap()
2776+
})
2777+
.collect::<Vec<_>>();
2778+
let schema = batches[0].schema();
2779+
let props = WriterProperties::builder()
2780+
.set_max_row_group_row_count(Some(3))
2781+
.build();
2782+
2783+
let data_len = write_parquet_batches(
2784+
Arc::clone(&store),
2785+
"rg_sampled.parquet",
2786+
batches,
2787+
Some(props),
2788+
)
2789+
.await;
2790+
2791+
let file = PartitionedFile::new(
2792+
"rg_sampled.parquet".to_string(),
2793+
u64::try_from(data_len).unwrap(),
2794+
);
2795+
2796+
let sampling = crate::sampling::ParquetSampling {
2797+
row_group_fraction: Some(0.5),
2798+
..Default::default()
2799+
};
2800+
2801+
let opener = ParquetMorselizerBuilder::new()
2802+
.with_store(Arc::clone(&store))
2803+
.with_schema(Arc::clone(&schema))
2804+
.with_projection_indices(&[0])
2805+
.with_sampling(sampling)
2806+
.build();
2807+
2808+
let stream = open_file(&opener, file).await.unwrap();
2809+
let (_num_batches, num_rows) = count_batches_and_rows(stream).await;
2810+
2811+
// ceil(4 * 0.5) = 2 row groups kept, each with 3 rows.
2812+
assert_eq!(
2813+
num_rows, 6,
2814+
"row_group_fraction=0.5 over 4 row groups should yield 2 row groups × 3 rows"
2815+
);
2816+
}
2817+
2818+
/// End-to-end: a single row group of 100 rows scanned with
2819+
/// `row_fraction = 0.1` and the default cluster size should yield
2820+
/// roughly 10 rows. The exact count depends on `ceil(100 * 0.1) =
2821+
/// 10` plus how the windows pack — we assert the count is in the
2822+
/// expected range and significantly less than 100.
2823+
#[tokio::test]
2824+
async fn row_fraction_end_to_end() {
2825+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
2826+
2827+
// One row group of 100 rows so we exercise the per-row-group
2828+
// RowSelection, not the row-group-level skip.
2829+
let values: Vec<Option<i32>> = (0..100).map(Some).collect();
2830+
let batch = record_batch!(("a", Int32, values)).unwrap();
2831+
let schema = batch.schema();
2832+
let data_len =
2833+
write_parquet(Arc::clone(&store), "rf.parquet", batch.clone()).await;
2834+
let file = PartitionedFile::new(
2835+
"rf.parquet".to_string(),
2836+
u64::try_from(data_len).unwrap(),
2837+
);
2838+
2839+
let sampling = crate::sampling::ParquetSampling {
2840+
row_fraction: Some(0.1),
2841+
row_cluster_size: 4, // small cluster -> several windows
2842+
..Default::default()
2843+
};
2844+
2845+
let opener = ParquetMorselizerBuilder::new()
2846+
.with_store(Arc::clone(&store))
2847+
.with_schema(Arc::clone(&schema))
2848+
.with_projection_indices(&[0])
2849+
.with_sampling(sampling)
2850+
.build();
2851+
2852+
let stream = open_file(&opener, file).await.unwrap();
2853+
let (_num_batches, num_rows) = count_batches_and_rows(stream).await;
2854+
2855+
// We asked for ~10% of 100 rows. ceil(10 / cluster=4) = 3
2856+
// windows of ceil(10/3)=4 rows each, capped at the total ->
2857+
// up to 12 rows in practice. Assert the bounds.
2858+
assert!(
2859+
(1..100).contains(&num_rows),
2860+
"row_fraction=0.1 should drop the vast majority of rows; got {num_rows}"
2861+
);
2862+
assert!(
2863+
num_rows <= 16,
2864+
"row_fraction=0.1 should yield ~10-12 rows; got {num_rows}"
2865+
);
2866+
}
27232867
}

0 commit comments

Comments
 (0)