Skip to content

Commit e12666d

Browse files
committed
Support row group limit pruning
1 parent f1ecacc commit e12666d

9 files changed

Lines changed: 896 additions & 24 deletions

File tree

datafusion/core/tests/parquet/mod.rs

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,11 @@ impl TestOutput {
182182
.map(|(_pruned, matched)| matched)
183183
}
184184

185+
/// The number of row_groups fully matched by statistics
186+
fn row_groups_fully_matched_statistics(&self) -> Option<usize> {
187+
self.metric_value("row_groups_fully_matched_statistics")
188+
}
189+
185190
/// The number of row_groups pruned by statistics
186191
fn row_groups_pruned_statistics(&self) -> Option<usize> {
187192
self.pruning_metric("row_groups_pruned_statistics")
@@ -219,6 +224,11 @@ impl TestOutput {
219224
.map(|(pruned, _matched)| pruned)
220225
}
221226

227+
/// The number of row groups pruned by limit pruning
228+
fn limit_pruned_row_groups(&self) -> Option<usize> {
229+
self.metric_value("limit_pruned_row_groups")
230+
}
231+
222232
fn description(&self) -> String {
223233
format!(
224234
"Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}",
@@ -232,20 +242,41 @@ impl TestOutput {
232242
/// and the appropriate scenario
233243
impl ContextWithParquet {
234244
async fn new(scenario: Scenario, unit: Unit) -> Self {
235-
Self::with_config(scenario, unit, SessionConfig::new()).await
245+
Self::with_config(scenario, unit, SessionConfig::new(), None, None).await
246+
}
247+
248+
/// Set custom schema and batches for the test
249+
pub async fn with_custom_data(
250+
scenario: Scenario,
251+
unit: Unit,
252+
schema: Arc<Schema>,
253+
batches: Vec<RecordBatch>,
254+
) -> Self {
255+
Self::with_config(
256+
scenario,
257+
unit,
258+
SessionConfig::new(),
259+
Some(schema),
260+
Some(batches),
261+
)
262+
.await
236263
}
237264

238265
async fn with_config(
239266
scenario: Scenario,
240267
unit: Unit,
241268
mut config: SessionConfig,
269+
custom_schema: Option<Arc<Schema>>,
270+
custom_batches: Option<Vec<RecordBatch>>,
242271
) -> Self {
243272
// Use a single partition for deterministic results no matter how many CPUs the host has
244273
config = config.with_target_partitions(1);
245274
let file = match unit {
246275
Unit::RowGroup(row_per_group) => {
247276
config = config.with_parquet_bloom_filter_pruning(true);
248-
make_test_file_rg(scenario, row_per_group).await
277+
config.options_mut().execution.parquet.pushdown_filters = true;
278+
make_test_file_rg(scenario, row_per_group, custom_schema, custom_batches)
279+
.await
249280
}
250281
Unit::Page(row_per_page) => {
251282
config = config.with_parquet_page_index_pruning(true);
@@ -1071,7 +1102,12 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
10711102
}
10721103

10731104
/// Create a test parquet file with various data types
1074-
async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTempFile {
1105+
async fn make_test_file_rg(
1106+
scenario: Scenario,
1107+
row_per_group: usize,
1108+
custom_schema: Option<Arc<Schema>>,
1109+
custom_batches: Option<Vec<RecordBatch>>,
1110+
) -> NamedTempFile {
10751111
let mut output_file = tempfile::Builder::new()
10761112
.prefix("parquet_pruning")
10771113
.suffix(".parquet")
@@ -1084,8 +1120,14 @@ async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTem
10841120
.set_statistics_enabled(EnabledStatistics::Page)
10851121
.build();
10861122

1087-
let batches = create_data_batch(scenario);
1088-
let schema = batches[0].schema();
1123+
let (batches, schema) =
1124+
if let (Some(schema), Some(batches)) = (custom_schema, custom_batches) {
1125+
(batches, schema)
1126+
} else {
1127+
let batches = create_data_batch(scenario);
1128+
let schema = batches[0].schema();
1129+
(batches, schema)
1130+
};
10891131

10901132
let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap();
10911133

0 commit comments

Comments
 (0)