Skip to content

Commit 7bb6e15

Browse files
gabotechsalamb
andauthored
Remove redundant collect_stat and target_partitions on ListingOptions (#22969)
## Which issue does this PR close? - Closes #. ## Rationale for this change Something that was spotted during the review of: - #22657 `ListingOptions::target_partitions` and `ListingOptions::collect_stat` duplicate `SessionConfig`'s `execution.target_partitions` and `execution.collect_statistics`. After some investigation, I think they only live on `ListingOptions` for historical reasons: when the struct was added (#1010 5 years ago), `TableProvider::scan` had no access to the session, so the values had to be copied onto the table at build time. Once #2660 passed `SessionState` into `scan`, the fields became redundant (and had already drifted — `scan` read them from the session config while `list_files_for_scan` read the stale copy). This PR makes `SessionConfig` the single source of truth. ## What changes are included in this PR? - Remove `target_partitions`/`collect_stat` fields, their builders, and `with_session_config_options` from `ListingOptions`. - `ListingTable` now reads both values from the session config at scan time. - Reserve proto tags 8/9 in `ListingTableScanNode` and drop the related (de)serialization. - Update benchmarks, factory, and test call sites. ## Are these changes tested? Yes, by existing tests ## Are there any user-facing changes? Yes, breaking: the removed fields/builders require configuring `SessionConfig` instead, and the two proto fields no longer round-trip. --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 8172873 commit 7bb6e15

24 files changed

Lines changed: 171 additions & 247 deletions

File tree

benchmarks/src/bin/external_aggr.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -318,9 +318,7 @@ impl ExternalAggrConfig {
318318
);
319319
let extension = DEFAULT_PARQUET_EXTENSION;
320320

321-
let options = ListingOptions::new(format)
322-
.with_file_extension(extension)
323-
.with_collect_stat(state.config().collect_statistics());
321+
let options = ListingOptions::new(format).with_file_extension(extension);
324322

325323
let table_path = ListingTableUrl::parse(path)?;
326324
let config = ListingTableConfig::new(table_path).with_listing_options(options);

benchmarks/src/imdb/run.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,6 @@ impl RunOpt {
425425
let table_format = self.file_format.as_str();
426426

427427
// Obtain a snapshot of the SessionState
428-
let state = ctx.state();
429428
let (format, path, extension): (Arc<dyn FileFormat>, String, &'static str) =
430429
match table_format {
431430
// dbgen creates .tbl ('|' delimited) files without header
@@ -458,9 +457,7 @@ impl RunOpt {
458457
}
459458
};
460459

461-
let options = ListingOptions::new(format)
462-
.with_file_extension(extension)
463-
.with_collect_stat(state.config().collect_statistics());
460+
let options = ListingOptions::new(format).with_file_extension(extension);
464461

465462
let table_path = ListingTableUrl::parse(path)?;
466463
let config = ListingTableConfig::new(table_path).with_listing_options(options);

benchmarks/src/sort_pushdown.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,8 @@ impl RunOpt {
162162
let config = self.common.config()?;
163163
let rt = self.common.build_runtime()?;
164164
let state = SessionStateBuilder::new()
165-
.with_config(config)
165+
// Always collect statistics for sort pushdown
166+
.with_config(config.with_collect_statistics(true))
166167
.with_runtime_env(rt)
167168
.with_default_features()
168169
.build();
@@ -255,9 +256,7 @@ impl RunOpt {
255256
);
256257
let extension = DEFAULT_PARQUET_EXTENSION;
257258

258-
let options = ListingOptions::new(format)
259-
.with_file_extension(extension)
260-
.with_collect_stat(true); // Always collect statistics for sort pushdown
259+
let options = ListingOptions::new(format).with_file_extension(extension);
261260

262261
let table_path = ListingTableUrl::parse(path)?;
263262
let schema = options.infer_schema(&state, &table_path).await?;

benchmarks/src/sort_tpch.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -333,9 +333,7 @@ impl RunOpt {
333333
);
334334
let extension = DEFAULT_PARQUET_EXTENSION;
335335

336-
let options = ListingOptions::new(format)
337-
.with_file_extension(extension)
338-
.with_collect_stat(state.config().collect_statistics());
336+
let options = ListingOptions::new(format).with_file_extension(extension);
339337

340338
let table_path = ListingTableUrl::parse(path)?;
341339
let schema = options.infer_schema(&state, &table_path).await?;

benchmarks/src/tpcds/run.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,6 @@ impl RunOpt {
364364
table: &str,
365365
) -> Result<Arc<dyn TableProvider>> {
366366
let path = self.path.to_str().unwrap();
367-
let target_partitions = self.partitions();
368367

369368
// Obtain a snapshot of the SessionState
370369
let state = ctx.state();
@@ -380,9 +379,7 @@ impl RunOpt {
380379

381380
let table_path = ListingTableUrl::parse(path)?;
382381
let options = ListingOptions::new(Arc::new(format))
383-
.with_file_extension(DEFAULT_PARQUET_EXTENSION)
384-
.with_target_partitions(target_partitions)
385-
.with_collect_stat(state.config().collect_statistics());
382+
.with_file_extension(DEFAULT_PARQUET_EXTENSION);
386383

387384
let schema = options.infer_schema(&state, &table_path).await?;
388385
let constraints = table_constraints(table, schema.as_ref());

benchmarks/src/tpch/run.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,6 @@ impl RunOpt {
283283
) -> Result<Arc<dyn TableProvider>> {
284284
let path = self.path.to_str().unwrap();
285285
let table_format = self.file_format.as_str();
286-
let target_partitions = self.partitions();
287286

288287
// Obtain a snapshot of the SessionState
289288
let state = ctx.state();
@@ -320,10 +319,7 @@ impl RunOpt {
320319
};
321320

322321
let table_path = ListingTableUrl::parse(path)?;
323-
let options = ListingOptions::new(format)
324-
.with_file_extension(extension)
325-
.with_target_partitions(target_partitions)
326-
.with_collect_stat(state.config().collect_statistics());
322+
let options = ListingOptions::new(format).with_file_extension(extension);
327323

328324
let schema = match table_format {
329325
"parquet" => options.infer_schema(&state, &table_path).await?,

datafusion/catalog-listing/src/config.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,7 @@ impl ListingTableConfig {
152152
/// # use datafusion_datasource_parquet::file_format::ParquetFormat;
153153
/// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap();
154154
/// let options = ListingOptions::new(Arc::new(ParquetFormat::default()))
155-
/// .with_file_extension(".parquet")
156-
/// .with_collect_stat(true);
155+
/// .with_file_extension(".parquet");
157156
///
158157
/// let config = ListingTableConfig::new(table_paths).with_listing_options(options);
159158
/// // Configure file format and options

datafusion/catalog-listing/src/options.rs

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use datafusion_catalog::Session;
2020
use datafusion_common::plan_err;
2121
use datafusion_datasource::ListingTableUrl;
2222
use datafusion_datasource::file_format::FileFormat;
23-
use datafusion_execution::config::SessionConfig;
2423
use datafusion_expr::SortExpr;
2524
use futures::StreamExt;
2625
use futures::TryStreamExt;
@@ -38,13 +37,6 @@ pub struct ListingOptions {
3837
/// The expected partition column names in the folder structure.
3938
/// See [Self::with_table_partition_cols] for details
4039
pub table_partition_cols: Vec<(String, DataType)>,
41-
/// Set true to try to guess statistics from the files.
42-
/// This can add a lot of overhead as it will usually require files
43-
/// to be opened and at least partially parsed.
44-
pub collect_stat: bool,
45-
/// Group files to avoid that the number of partitions exceeds
46-
/// this limit
47-
pub target_partitions: usize,
4840
/// Optional pre-known sort order(s). Must be `SortExpr`s.
4941
///
5042
/// DataFusion may take advantage of this ordering to omit sorts
@@ -68,30 +60,15 @@ impl ListingOptions {
6860
/// Default values:
6961
/// - use default file extension filter
7062
/// - no input partition to discover
71-
/// - one target partition
72-
/// - do not collect statistics
7363
pub fn new(format: Arc<dyn FileFormat>) -> Self {
7464
Self {
7565
file_extension: format.get_ext(),
7666
format,
7767
table_partition_cols: vec![],
78-
collect_stat: false,
79-
target_partitions: 1,
8068
file_sort_order: vec![],
8169
}
8270
}
8371

84-
/// Set options from [`SessionConfig`] and returns self.
85-
///
86-
/// Currently this sets `target_partitions` and `collect_stat`
87-
/// but if more options are added in the future that need to be coordinated
88-
/// they will be synchronized through this method.
89-
pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self {
90-
self = self.with_target_partitions(config.target_partitions());
91-
self = self.with_collect_stat(config.collect_statistics());
92-
self
93-
}
94-
9572
/// Set file extension on [`ListingOptions`] and returns self.
9673
///
9774
/// # Example
@@ -205,40 +182,6 @@ impl ListingOptions {
205182
self
206183
}
207184

208-
/// Set stat collection on [`ListingOptions`] and returns self.
209-
///
210-
/// ```
211-
/// # use std::sync::Arc;
212-
/// # use datafusion_catalog_listing::ListingOptions;
213-
/// # use datafusion_datasource_parquet::file_format::ParquetFormat;
214-
///
215-
/// let listing_options =
216-
/// ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true);
217-
///
218-
/// assert_eq!(listing_options.collect_stat, true);
219-
/// ```
220-
pub fn with_collect_stat(mut self, collect_stat: bool) -> Self {
221-
self.collect_stat = collect_stat;
222-
self
223-
}
224-
225-
/// Set number of target partitions on [`ListingOptions`] and returns self.
226-
///
227-
/// ```
228-
/// # use std::sync::Arc;
229-
/// # use datafusion_catalog_listing::ListingOptions;
230-
/// # use datafusion_datasource_parquet::file_format::ParquetFormat;
231-
///
232-
/// let listing_options =
233-
/// ListingOptions::new(Arc::new(ParquetFormat::default())).with_target_partitions(8);
234-
///
235-
/// assert_eq!(listing_options.target_partitions, 8);
236-
/// ```
237-
pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
238-
self.target_partitions = target_partitions;
239-
self
240-
}
241-
242185
/// Set file sort order on [`ListingOptions`] and returns self.
243186
///
244187
/// ```

datafusion/catalog-listing/src/table.rs

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -533,15 +533,15 @@ impl TableProvider for ListingTable {
533533
&self.table_schema,
534534
&partitioned_file_lists,
535535
output_ordering,
536-
self.options.target_partitions,
536+
state.config().target_partitions(),
537537
)
538538
})
539539
})
540540
.flatten()
541541
{
542542
Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
543543
Some(Ok(new_groups)) => {
544-
if new_groups.len() <= self.options.target_partitions {
544+
if new_groups.len() <= state.config().target_partitions() {
545545
partitioned_file_lists = new_groups;
546546
} else {
547547
log::debug!(
@@ -724,7 +724,7 @@ impl ListingTable {
724724
let files = file_list
725725
.map(|part_file| async {
726726
let part_file = part_file?;
727-
let (statistics, ordering) = if self.options.collect_stat {
727+
let (statistics, ordering) = if ctx.config().collect_statistics() {
728728
self.do_collect_statistics_and_ordering(ctx, &store, &part_file)
729729
.await?
730730
} else {
@@ -738,7 +738,7 @@ impl ListingTable {
738738
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
739739

740740
let (file_group, inexact_stats) =
741-
get_files_with_limit(files, limit, self.options.collect_stat).await?;
741+
get_files_with_limit(files, limit, ctx.config().collect_statistics()).await?;
742742

743743
// Threshold: 0 = disabled, N > 0 = enabled when distinct_keys >= N
744744
//
@@ -747,32 +747,32 @@ impl ListingTable {
747747
// hash repartitioning for aggregates and joins on partition columns.
748748
let threshold = ctx.config_options().optimizer.preserve_file_partitions;
749749

750-
let (file_groups, grouped_by_partition) = if threshold > 0
751-
&& !self.options.table_partition_cols.is_empty()
752-
{
753-
let grouped =
754-
file_group.group_by_partition_values(self.options.target_partitions);
755-
if grouped.len() >= threshold {
756-
(grouped, true)
750+
let (file_groups, grouped_by_partition) =
751+
if threshold > 0 && !self.options.table_partition_cols.is_empty() {
752+
let grouped = file_group
753+
.group_by_partition_values(ctx.config().target_partitions());
754+
if grouped.len() >= threshold {
755+
(grouped, true)
756+
} else {
757+
let all_files: Vec<_> =
758+
grouped.into_iter().flat_map(|g| g.into_inner()).collect();
759+
(
760+
FileGroup::new(all_files)
761+
.split_files(ctx.config().target_partitions()),
762+
false,
763+
)
764+
}
757765
} else {
758-
let all_files: Vec<_> =
759-
grouped.into_iter().flat_map(|g| g.into_inner()).collect();
760766
(
761-
FileGroup::new(all_files).split_files(self.options.target_partitions),
767+
file_group.split_files(ctx.config().target_partitions()),
762768
false,
763769
)
764-
}
765-
} else {
766-
(
767-
file_group.split_files(self.options.target_partitions),
768-
false,
769-
)
770-
};
770+
};
771771

772772
let (file_groups, stats) = compute_all_files_statistics(
773773
file_groups,
774774
self.schema(),
775-
self.options.collect_stat,
775+
ctx.config().collect_statistics(),
776776
inexact_stats,
777777
)?;
778778

datafusion/common/src/config.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -677,8 +677,7 @@ config_namespace! {
677677
pub coalesce_batches: bool, default = true
678678

679679
/// Should DataFusion collect statistics when first creating a table.
680-
/// Has no effect after the table is created. Applies to the default
681-
/// `ListingTableProvider` in DataFusion. Defaults to true.
680+
/// Has no effect after the table is created. Defaults to true.
682681
pub collect_statistics: bool, default = true
683682

684683
/// Number of partitions for query execution. Increasing partitions can increase

0 commit comments

Comments
 (0)