Skip to content

Commit 38269f9

Browse files
authored
feat: support file-level parquet row selections (#22940)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #22939 ## Rationale for this change - see issue #22939 ## What changes are included in this PR? - Add public `ParquetRowSelection`. - Add `ParquetAccessPlan::try_new_from_overall_row_selection`. - Allow Parquet opener setup to read either `ParquetAccessPlan` or `ParquetRowSelection`. - Reject using both extension types on the same file. - Validate that the selection row count matches the file row count. - Document the new extension path in `ParquetSource`. ## Are these changes tested? Yes. This PR adds tests for: - converting a file-level selection into row-group access - rejecting invalid selection row counts - creating an initial plan from `ParquetRowSelection` - rejecting both `ParquetAccessPlan` and `ParquetRowSelection` on the same file ## Are there any user-facing changes? Yes. This adds a new public `ParquetRowSelection` type for callers that want to attach a file-level Parquet `RowSelection` to a `PartitionedFile`.
1 parent cab75e0 commit 38269f9

5 files changed

Lines changed: 507 additions & 24 deletions

File tree

datafusion/core/tests/parquet/external_access_plan.rs

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ use datafusion::common::Result;
2929
use datafusion::datasource::listing::PartitionedFile;
3030
use datafusion::datasource::physical_plan::ParquetSource;
3131
use datafusion::prelude::SessionContext;
32-
use datafusion_common::{DFSchema, assert_contains};
33-
use datafusion_datasource_parquet::{ParquetAccessPlan, RowGroupAccess};
32+
use datafusion_common::{DFSchema, assert_batches_eq, assert_contains};
33+
use datafusion_datasource_parquet::{
34+
ParquetAccessPlan, ParquetRowSelection, RowGroupAccess,
35+
};
3436
use datafusion_execution::object_store::ObjectStoreUrl;
3537
use datafusion_expr::{Expr, col, lit};
3638
use datafusion_physical_plan::ExecutionPlan;
@@ -152,6 +154,94 @@ async fn skip_scan() {
152154
}
153155
}
154156

157+
#[tokio::test]
158+
async fn row_selection_extension() {
159+
// The file has 2 row groups of 5 rows each (10 rows total). Attach a
160+
// file-level `ParquetRowSelection` to the `PartitionedFile` and verify it
161+
// survives the path from `PartitionedFile` into the parquet opener/reader.
162+
163+
// select a single row in the first row group
164+
let parquet_metrics = TestFull {
165+
access_plan: None,
166+
row_selection: Some(ParquetRowSelection::new(RowSelection::from(vec![
167+
RowSelector::skip(2),
168+
RowSelector::select(1),
169+
RowSelector::skip(7),
170+
]))),
171+
expected_rows: 1,
172+
expected_output: Some(&[
173+
"+------+------------+",
174+
"| utf8 | large_utf8 |",
175+
"+------+------------+",
176+
"| c | c |",
177+
"+------+------------+",
178+
]),
179+
predicate: None,
180+
}
181+
.run()
182+
.await
183+
.unwrap();
184+
185+
// only the first row group is read, so some bytes are scanned
186+
let bytes_scanned = metric_value(&parquet_metrics, "bytes_scanned").unwrap();
187+
assert_ne!(bytes_scanned, 0, "metrics : {parquet_metrics:#?}",);
188+
}
189+
190+
#[tokio::test]
191+
async fn row_selection_extension_spanning_row_groups() {
192+
// A selection whose selectors straddle the row group boundary (row 4 is the
193+
// last row of group 0, rows 5-6 are the first rows of group 1).
194+
let parquet_metrics = TestFull {
195+
access_plan: None,
196+
row_selection: Some(ParquetRowSelection::new(RowSelection::from(vec![
197+
RowSelector::skip(4),
198+
RowSelector::select(3),
199+
RowSelector::skip(3),
200+
]))),
201+
expected_rows: 3,
202+
expected_output: Some(&[
203+
"+------+------------+",
204+
"| utf8 | large_utf8 |",
205+
"+------+------------+",
206+
"| | |",
207+
"| e | e |",
208+
"| f | f |",
209+
"+------+------------+",
210+
]),
211+
predicate: None,
212+
}
213+
.run()
214+
.await
215+
.unwrap();
216+
217+
let bytes_scanned = metric_value(&parquet_metrics, "bytes_scanned").unwrap();
218+
assert_ne!(bytes_scanned, 0, "metrics : {parquet_metrics:#?}",);
219+
}
220+
221+
#[tokio::test]
222+
async fn bad_row_selection_extension() {
223+
// selection specifies fewer rows than the file actually contains
224+
let err = TestFull {
225+
access_plan: None,
226+
row_selection: Some(ParquetRowSelection::new(RowSelection::from(vec![
227+
RowSelector::skip(2),
228+
RowSelector::select(1),
229+
]))),
230+
expected_rows: 10000,
231+
expected_output: None,
232+
predicate: None,
233+
}
234+
.run()
235+
.await
236+
.unwrap_err();
237+
let err_string = err.to_string();
238+
assert_contains!(&err_string, "Invalid Parquet RowSelection");
239+
assert_contains!(
240+
&err_string,
241+
"File has 10 rows, but selection specifies 3 rows."
242+
);
243+
}
244+
155245
#[tokio::test]
156246
async fn plan_and_filter() {
157247
// show that row group pruning is applied even when an initial plan is supplied
@@ -170,7 +260,9 @@ async fn plan_and_filter() {
170260
// initial
171261
let parquet_metrics = TestFull {
172262
access_plan,
263+
row_selection: None,
173264
expected_rows: 0,
265+
expected_output: None,
174266
predicate: Some(predicate),
175267
}
176268
.run()
@@ -227,7 +319,9 @@ async fn bad_row_groups() {
227319
RowGroupAccess::Skip,
228320
RowGroupAccess::Scan,
229321
])),
322+
row_selection: None,
230323
expected_rows: 0,
324+
expected_output: None,
231325
predicate: None,
232326
}
233327
.run()
@@ -249,8 +343,10 @@ async fn bad_selection() {
249343
])),
250344
RowGroupAccess::Skip,
251345
])),
346+
row_selection: None,
252347
// expects that we hit an error, this should not be run
253348
expected_rows: 10000,
349+
expected_output: None,
254350
predicate: None,
255351
}
256352
.run()
@@ -300,7 +396,9 @@ impl Test {
300396
} = self;
301397
TestFull {
302398
access_plan,
399+
row_selection: None,
303400
expected_rows,
401+
expected_output: None,
304402
predicate: None,
305403
}
306404
.run()
@@ -317,7 +415,9 @@ impl Test {
317415
/// 4. Returns the statistics from running the plan
318416
struct TestFull {
319417
access_plan: Option<ParquetAccessPlan>,
418+
row_selection: Option<ParquetRowSelection>,
320419
expected_rows: usize,
420+
expected_output: Option<&'static [&'static str]>,
321421
predicate: Option<Expr>,
322422
}
323423

@@ -327,7 +427,9 @@ impl TestFull {
327427

328428
let Self {
329429
access_plan,
430+
row_selection,
330431
expected_rows,
432+
expected_output,
331433
predicate,
332434
} = self;
333435

@@ -352,6 +454,11 @@ impl TestFull {
352454
partitioned_file = partitioned_file.with_extension(access_plan);
353455
}
354456

457+
// add the file-level row selection, if any, as an extension
458+
if let Some(row_selection) = row_selection {
459+
partitioned_file = partitioned_file.with_extension(row_selection);
460+
}
461+
355462
// Create a DataSourceExec to read the file
356463
let object_store_url = ObjectStoreUrl::local_filesystem();
357464
// add the predicate, if requested
@@ -380,6 +487,9 @@ impl TestFull {
380487
"results: \n{}",
381488
pretty_format_batches(&results).unwrap()
382489
);
490+
if let Some(expected_output) = expected_output {
491+
assert_batches_eq!(expected_output, &results);
492+
}
383493

384494
std::fs::remove_file(file_name).unwrap();
385495

0 commit comments

Comments
 (0)