Skip to content

Commit 184c8bf

Browse files
committed
Add restriction for enabling limit pruning
1 parent 19bbdff commit 184c8bf

4 files changed

Lines changed: 29 additions & 2 deletions

File tree

datafusion/core/src/datasource/listing/table.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1253,6 +1253,7 @@ impl TableProvider for ListingTable {
12531253
.with_output_ordering(output_ordering)
12541254
.with_table_partition_cols(table_partition_cols)
12551255
.with_expr_adapter(self.expr_adapter_factory.clone())
1256+
.with_limit_pruning(limit.is_some())
12561257
.build(),
12571258
)
12581259
.await

datafusion/datasource-parquet/src/opener.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ pub(super) struct ParquetOpener {
9797
pub enable_row_group_stats_pruning: bool,
9898
/// Coerce INT96 timestamps to specific TimeUnit
9999
pub coerce_int96: Option<TimeUnit>,
100+
/// Should limit pruning be applied
101+
pub enable_limit_pruning: bool,
100102
/// Optional parquet FileDecryptionProperties
101103
#[cfg(feature = "parquet_encryption")]
102104
pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
@@ -144,6 +146,7 @@ impl FileOpener for ParquetOpener {
144146
let enable_bloom_filter = self.enable_bloom_filter;
145147
let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning;
146148
let limit = self.limit;
149+
let enable_limit_pruning = self.enable_limit_pruning;
147150

148151
let predicate_creation_errors = MetricBuilder::new(&self.metrics)
149152
.global_counter("num_predicate_creation_errors");
@@ -377,8 +380,10 @@ impl FileOpener for ParquetOpener {
377380
}
378381

379382
// Prune by limit
380-
if let Some(limit) = limit {
381-
row_groups.prune_by_limit(limit, rg_metadata, &file_metrics);
383+
if enable_limit_pruning {
384+
if let Some(limit) = limit {
385+
row_groups.prune_by_limit(limit, rg_metadata, &file_metrics);
386+
}
382387
}
383388

384389
let mut access_plan = row_groups.build();
@@ -826,6 +831,7 @@ mod test {
826831
reorder_filters: false,
827832
enable_page_index: false,
828833
enable_bloom_filter: false,
834+
enable_limit_pruning: false,
829835
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
830836
enable_row_group_stats_pruning: true,
831837
coerce_int96: None,
@@ -914,6 +920,7 @@ mod test {
914920
reorder_filters: false,
915921
enable_page_index: false,
916922
enable_bloom_filter: false,
923+
enable_limit_pruning: false,
917924
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
918925
enable_row_group_stats_pruning: true,
919926
coerce_int96: None,
@@ -1018,6 +1025,7 @@ mod test {
10181025
reorder_filters: false,
10191026
enable_page_index: false,
10201027
enable_bloom_filter: false,
1028+
enable_limit_pruning: false,
10211029
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
10221030
enable_row_group_stats_pruning: true,
10231031
coerce_int96: None,
@@ -1132,6 +1140,7 @@ mod test {
11321140
reorder_filters: true,
11331141
enable_page_index: false,
11341142
enable_bloom_filter: false,
1143+
enable_limit_pruning: false,
11351144
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
11361145
enable_row_group_stats_pruning: false, // note that this is false!
11371146
coerce_int96: None,
@@ -1247,6 +1256,7 @@ mod test {
12471256
reorder_filters: false,
12481257
enable_page_index: false,
12491258
enable_bloom_filter: false,
1259+
enable_limit_pruning: false,
12501260
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
12511261
enable_row_group_stats_pruning: true,
12521262
coerce_int96: None,
@@ -1429,6 +1439,7 @@ mod test {
14291439
reorder_filters: false,
14301440
enable_page_index: false,
14311441
enable_bloom_filter: false,
1442+
enable_limit_pruning: false,
14321443
schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory),
14331444
enable_row_group_stats_pruning: false,
14341445
coerce_int96: None,

datafusion/datasource-parquet/src/source.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,7 @@ impl FileSource for ParquetSource {
576576
enable_page_index: self.enable_page_index(),
577577
enable_bloom_filter: self.bloom_filter_on_read(),
578578
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
579+
enable_limit_pruning: base_config.limit_pruning,
579580
schema_adapter_factory,
580581
coerce_int96,
581582
#[cfg(feature = "parquet_encryption")]

datafusion/datasource/src/file_scan_config.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ pub struct FileScanConfig {
196196
/// Expression adapter used to adapt filters and projections that are pushed down into the scan
197197
/// from the logical schema to the physical schema of the file.
198198
pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
199+
/// If there is a limit pushed down at the logical plan level, we can enable limit_pruning
200+
pub limit_pruning: bool,
199201
}
200202

201203
/// A builder for [`FileScanConfig`]'s.
@@ -275,6 +277,8 @@ pub struct FileScanConfigBuilder {
275277
new_lines_in_values: Option<bool>,
276278
batch_size: Option<usize>,
277279
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
280+
/// If there is a limit pushed down at the logical plan level, we can enable limit_pruning
281+
limit_pruning: bool,
278282
}
279283

280284
impl FileScanConfigBuilder {
@@ -304,6 +308,7 @@ impl FileScanConfigBuilder {
304308
constraints: None,
305309
batch_size: None,
306310
expr_adapter_factory: None,
311+
limit_pruning: false,
307312
}
308313
}
309314

@@ -426,6 +431,12 @@ impl FileScanConfigBuilder {
426431
self
427432
}
428433

434+
/// Enable or disable limit pruning.
435+
pub fn with_limit_pruning(mut self, limit_pruning: bool) -> Self {
436+
self.limit_pruning = limit_pruning;
437+
self
438+
}
439+
429440
/// Build the final [`FileScanConfig`] with all the configured settings.
430441
///
431442
/// This method takes ownership of the builder and returns the constructed `FileScanConfig`.
@@ -446,6 +457,7 @@ impl FileScanConfigBuilder {
446457
new_lines_in_values,
447458
batch_size,
448459
expr_adapter_factory: expr_adapter,
460+
limit_pruning,
449461
} = self;
450462

451463
let constraints = constraints.unwrap_or_default();
@@ -473,6 +485,7 @@ impl FileScanConfigBuilder {
473485
new_lines_in_values,
474486
batch_size,
475487
expr_adapter_factory: expr_adapter,
488+
limit_pruning,
476489
}
477490
}
478491
}
@@ -494,6 +507,7 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
494507
constraints: Some(config.constraints),
495508
batch_size: config.batch_size,
496509
expr_adapter_factory: config.expr_adapter_factory,
510+
limit_pruning: config.limit_pruning,
497511
}
498512
}
499513
}

0 commit comments

Comments
 (0)