Skip to content

Commit 1d449ff

Browse files
committed
Add a default FileStatisticsCache implementation for the ListingTable
1 parent 568f19f commit 1d449ff

File tree

16 files changed

+873
-117
lines changed

16 files changed

+873
-117
lines changed

datafusion-cli/src/main.rs

Lines changed: 3 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -443,10 +443,7 @@ mod tests {
443443
use super::*;
444444
use datafusion::{
445445
common::test_util::batches_to_string,
446-
execution::cache::{
447-
DefaultListFilesCache, cache_manager::CacheManagerConfig,
448-
cache_unit::DefaultFileStatisticsCache,
449-
},
446+
execution::cache::{DefaultListFilesCache, cache_manager::CacheManagerConfig},
450447
prelude::{ParquetReadOptions, col, lit, split_part},
451448
};
452449
use insta::assert_snapshot;
@@ -647,9 +644,9 @@ mod tests {
647644
+-----------------------------------+-----------------+---------------------+------+------------------+
648645
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
649646
+-----------------------------------+-----------------+---------------------+------+------------------+
650-
| alltypes_plain.parquet | 1851 | 8882 | 5 | page_index=false |
647+
| alltypes_plain.parquet | 1851 | 8882 | 8 | page_index=false |
651648
| alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true |
652-
| lz4_raw_compressed_larger.parquet | 380836 | 1347 | 3 | page_index=false |
649+
| lz4_raw_compressed_larger.parquet | 380836 | 1347 | 4 | page_index=false |
653650
+-----------------------------------+-----------------+---------------------+------+------------------+
654651
");
655652

@@ -689,55 +686,6 @@ mod tests {
689686

690687
// When the cache manager creates a StatisticsCache by default,
691688
// the contents will show up here
692-
let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
693-
let df = ctx.sql(sql).await?;
694-
let rbs = df.collect().await?;
695-
assert_snapshot!(batches_to_string(&rbs),@r"
696-
++
697-
++
698-
");
699-
700-
Ok(())
701-
}
702-
703-
// Can be removed when https://github.com/apache/datafusion/issues/19217 is resolved
704-
#[tokio::test]
705-
async fn test_statistics_cache_override() -> Result<(), DataFusionError> {
706-
// Install a specific StatisticsCache implementation
707-
let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default());
708-
let cache_config = CacheManagerConfig::default()
709-
.with_files_statistics_cache(Some(file_statistics_cache.clone()));
710-
let runtime = RuntimeEnvBuilder::new()
711-
.with_cache_manager(cache_config)
712-
.build()?;
713-
let config = SessionConfig::new().with_collect_statistics(true);
714-
let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime));
715-
716-
ctx.register_udtf(
717-
"statistics_cache",
718-
Arc::new(StatisticsCacheFunc::new(
719-
ctx.task_ctx().runtime_env().cache_manager.clone(),
720-
)),
721-
);
722-
723-
for filename in [
724-
"alltypes_plain",
725-
"alltypes_tiny_pages",
726-
"lz4_raw_compressed_larger",
727-
] {
728-
ctx.sql(
729-
format!(
730-
"create external table {filename}
731-
stored as parquet
732-
location '../parquet-testing/data/{filename}.parquet'",
733-
)
734-
.as_str(),
735-
)
736-
.await?
737-
.collect()
738-
.await?;
739-
}
740-
741689
let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
742690
let df = ctx.sql(sql).await?;
743691
let rbs = df.collect().await?;

datafusion/catalog-listing/src/table.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ use datafusion_datasource::{
3636
};
3737
use datafusion_execution::cache::TableScopedPath;
3838
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
39-
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
4039
use datafusion_expr::dml::InsertOp;
4140
use datafusion_expr::execution_props::ExecutionProps;
4241
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
@@ -188,7 +187,7 @@ pub struct ListingTable {
188187
/// The SQL definition for this table, if any
189188
definition: Option<String>,
190189
/// Cache for collected file statistics
191-
collected_statistics: Arc<dyn FileStatisticsCache>,
190+
collected_statistics: Option<Arc<dyn FileStatisticsCache>>,
192191
/// Constraints applied to this table
193192
constraints: Constraints,
194193
/// Column default expressions for columns that are not physically present in the data files
@@ -232,7 +231,7 @@ impl ListingTable {
232231
schema_source,
233232
options,
234233
definition: None,
235-
collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
234+
collected_statistics: None,
236235
constraints: Constraints::default(),
237236
column_defaults: HashMap::new(),
238237
expr_adapter_factory: config.expr_adapter_factory,
@@ -261,10 +260,8 @@ impl ListingTable {
261260
/// Setting a statistics cache on the `SessionContext` can avoid refetching statistics
262261
/// multiple times in the same session.
263262
///
264-
/// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
265263
pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
266-
self.collected_statistics =
267-
cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
264+
self.collected_statistics = cache;
268265
self
269266
}
270267

@@ -810,7 +807,8 @@ impl ListingTable {
810807
let meta = &part_file.object_meta;
811808

812809
// Check cache first - if we have valid cached statistics and ordering
813-
if let Some(cached) = self.collected_statistics.get(path)
810+
if let Some(cache) = &self.collected_statistics
811+
&& let Some(cached) = cache.get(path)
814812
&& cached.is_valid_for(meta)
815813
{
816814
// Return cached statistics and ordering
@@ -827,14 +825,16 @@ impl ListingTable {
827825
let statistics = Arc::new(file_meta.statistics);
828826

829827
// Store in cache
830-
self.collected_statistics.put(
831-
path,
832-
CachedFileMetadata::new(
833-
meta.clone(),
834-
Arc::clone(&statistics),
835-
file_meta.ordering.clone(),
836-
),
837-
);
828+
if let Some(cache) = &self.collected_statistics {
829+
cache.put(
830+
path,
831+
CachedFileMetadata::new(
832+
meta.clone(),
833+
Arc::clone(&statistics),
834+
file_meta.ordering.clone(),
835+
),
836+
);
837+
}
838838

839839
Ok((statistics, file_meta.ordering))
840840
}

0 commit comments

Comments
 (0)