diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 6bfe1160ecdd6..6e99969aa763e 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -443,10 +443,7 @@ mod tests { use super::*; use datafusion::{ common::test_util::batches_to_string, - execution::cache::{ - DefaultListFilesCache, cache_manager::CacheManagerConfig, - cache_unit::DefaultFileStatisticsCache, - }, + execution::cache::{DefaultListFilesCache, cache_manager::CacheManagerConfig}, prelude::{ParquetReadOptions, col, lit, split_part}, }; use insta::assert_snapshot; @@ -656,8 +653,6 @@ mod tests { Ok(()) } - /// Shows that the statistics cache is not enabled by default yet - /// See https://github.com/apache/datafusion/issues/19217 #[tokio::test] async fn test_statistics_cache_default() -> Result<(), DataFusionError> { let ctx = SessionContext::new(); @@ -687,57 +682,6 @@ mod tests { .await?; } - // When the cache manager creates a StatisticsCache by default, - // the contents will show up here - let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename"; - let df = ctx.sql(sql).await?; - let rbs = df.collect().await?; - assert_snapshot!(batches_to_string(&rbs),@r" - ++ - ++ - "); - - Ok(()) - } - - // Can be removed when https://github.com/apache/datafusion/issues/19217 is resolved - #[tokio::test] - async fn test_statistics_cache_override() -> Result<(), DataFusionError> { - // Install a specific StatisticsCache implementation - let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default()); - let cache_config = CacheManagerConfig::default() - .with_files_statistics_cache(Some(file_statistics_cache.clone())); - let runtime = RuntimeEnvBuilder::new() - .with_cache_manager(cache_config) - .build()?; - let config = SessionConfig::new().with_collect_statistics(true); - let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime)); - - ctx.register_udtf( - "statistics_cache", - Arc::new(StatisticsCacheFunc::new( - ctx.task_ctx().runtime_env().cache_manager.clone(), - )), - ); - - for filename in [ - "alltypes_plain", - "alltypes_tiny_pages", - "lz4_raw_compressed_larger", - ] { - ctx.sql( - format!( - "create external table {filename} - stored as parquet - location '../parquet-testing/data/{filename}.parquet'", - ) - .as_str(), - ) - .await? - .collect() - .await?; - } - let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename"; let df = ctx.sql(sql).await?; let rbs = df.collect().await?; diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 9c30028ddd547..92224ba975483 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -359,7 +359,7 @@ fn try_into_partitioned_file( let mut pf: PartitionedFile = object_meta.into(); pf.partition_values = partition_values; - + pf.table_reference.clone_from(table_path.get_table_ref()); Ok(pf) } diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 06ba8c8113fac..7ee743a6abe71 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -36,7 +36,6 @@ use datafusion_datasource::{ }; use datafusion_execution::cache::TableScopedPath; use datafusion_execution::cache::cache_manager::FileStatisticsCache; -use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; use datafusion_expr::dml::InsertOp; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; @@ -187,7 +186,7 @@ pub struct ListingTable { /// The SQL definition for this table, if any definition: Option, /// Cache for collected file statistics - collected_statistics: Arc, + collected_statistics: Option>, /// Constraints applied to this table constraints: Constraints, /// Column default expressions for columns that are not physically present in the data files @@ -231,7 +230,7 @@ impl ListingTable { schema_source, options, definition: None, - collected_statistics: Arc::new(DefaultFileStatisticsCache::default()), + collected_statistics: None, constraints: Constraints::default(), column_defaults: HashMap::new(), expr_adapter_factory: config.expr_adapter_factory, @@ -260,10 +259,8 @@ impl ListingTable { /// Setting a statistics cache on the `SessionContext` can avoid refetching statistics /// multiple times in the same session. /// - /// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query. pub fn with_cache(mut self, cache: Option>) -> Self { - self.collected_statistics = - cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default())); + self.collected_statistics = cache; self } @@ -802,11 +799,15 @@ impl ListingTable { ) -> datafusion_common::Result<(Arc, Option)> { use datafusion_execution::cache::cache_manager::CachedFileMetadata; - let path = &part_file.object_meta.location; + let path = TableScopedPath { + table: part_file.table_reference.clone(), + path: part_file.object_meta.location.clone(), + }; let meta = &part_file.object_meta; // Check cache first - if we have valid cached statistics and ordering - if let Some(cached) = self.collected_statistics.get(path) + if let Some(cache) = &self.collected_statistics + && let Some(cached) = cache.get(&path) && cached.is_valid_for(meta) { // Return cached statistics and ordering @@ -823,14 +824,16 @@ impl ListingTable { let statistics = Arc::new(file_meta.statistics); // Store in cache - self.collected_statistics.put( - path, - CachedFileMetadata::new( - meta.clone(), - Arc::clone(&statistics), - file_meta.ordering.clone(), - ), - ); + if let Some(cache) = &self.collected_statistics { + cache.put( + &path, + CachedFileMetadata::new( + meta.clone(), + Arc::clone(&statistics), + file_meta.ordering.clone(), + ), + ); + } Ok((statistics, file_meta.ordering)) } diff --git a/datafusion/common/src/heap_size.rs b/datafusion/common/src/heap_size.rs new file mode 100644 index 0000000000000..1acc3486eb51c --- /dev/null +++ b/datafusion/common/src/heap_size.rs @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::stats::Precision; +use crate::{ColumnStatistics, ScalarValue, Statistics}; +use arrow::array::{ + Array, FixedSizeListArray, LargeListArray, ListArray, MapArray, StructArray, +}; +use arrow::datatypes::{ + DataType, Field, Fields, IntervalDayTime, IntervalMonthDayNano, IntervalUnit, + TimeUnit, UnionFields, UnionMode, i256, +}; +use chrono::{DateTime, Utc}; +use half::f16; +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; + +/// This is a temporary solution until and +/// are resolved. +/// Trait for calculating the size of various containers +pub trait DFHeapSize { + /// Return the size of any bytes allocated on the heap by this object, + /// including heap memory in those structures + /// + /// Note that the size of the type itself is not included in the result -- + /// instead, that size is added by the caller (e.g. container). + fn heap_size(&self) -> usize; +} + +impl DFHeapSize for Statistics { + fn heap_size(&self) -> usize { + self.num_rows.heap_size() + + self.total_byte_size.heap_size() + + self.column_statistics.heap_size() + } +} + +impl DFHeapSize + for Precision +{ + fn heap_size(&self) -> usize { + self.get_value().map_or_else(|| 0, |v| v.heap_size()) + } +} + +impl DFHeapSize for ColumnStatistics { + fn heap_size(&self) -> usize { + self.null_count.heap_size() + + self.max_value.heap_size() + + self.min_value.heap_size() + + self.sum_value.heap_size() + + self.distinct_count.heap_size() + + self.byte_size.heap_size() + } +} + +impl DFHeapSize for ScalarValue { + fn heap_size(&self) -> usize { + use crate::scalar::ScalarValue::*; + match self { + Null => 0, + Boolean(b) => b.heap_size(), + Float16(f) => f.heap_size(), + Float32(f) => f.heap_size(), + Float64(f) => f.heap_size(), + Decimal32(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Decimal64(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Decimal128(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Decimal256(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Int8(i) => i.heap_size(), + Int16(i) => i.heap_size(), + Int32(i) => i.heap_size(), + Int64(i) => i.heap_size(), + UInt8(u) => u.heap_size(), + UInt16(u) => u.heap_size(), + UInt32(u) => u.heap_size(), + UInt64(u) => u.heap_size(), + Utf8(u) => u.heap_size(), + Utf8View(u) => u.heap_size(), + LargeUtf8(l) => l.heap_size(), + Binary(b) => b.heap_size(), + BinaryView(b) => b.heap_size(), + FixedSizeBinary(a, b) => a.heap_size() + b.heap_size(), + LargeBinary(l) => l.heap_size(), + FixedSizeList(f) => f.heap_size(), + List(l) => l.heap_size(), + LargeList(l) => l.heap_size(), + Struct(s) => s.heap_size(), + Map(m) => m.heap_size(), + Date32(d) => d.heap_size(), + Date64(d) => d.heap_size(), + Time32Second(t) => t.heap_size(), + Time32Millisecond(t) => t.heap_size(), + Time64Microsecond(t) => t.heap_size(), + Time64Nanosecond(t) => t.heap_size(), + TimestampSecond(a, b) => a.heap_size() + b.heap_size(), + TimestampMillisecond(a, b) => a.heap_size() + b.heap_size(), + TimestampMicrosecond(a, b) => a.heap_size() + b.heap_size(), + TimestampNanosecond(a, b) => a.heap_size() + b.heap_size(), + IntervalYearMonth(i) => i.heap_size(), + IntervalDayTime(i) => i.heap_size(), + IntervalMonthDayNano(i) => i.heap_size(), + DurationSecond(d) => d.heap_size(), + DurationMillisecond(d) => d.heap_size(), + DurationMicrosecond(d) => d.heap_size(), + DurationNanosecond(d) => d.heap_size(), + Union(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + Dictionary(a, b) => a.heap_size() + b.heap_size(), + RunEndEncoded(a, b, c) => a.heap_size() + b.heap_size() + c.heap_size(), + } + } +} + +impl DFHeapSize for DataType { + fn heap_size(&self) -> usize { + use DataType::*; + match self { + Null => 0, + Boolean => 0, + Int8 => 0, + Int16 => 0, + Int32 => 0, + Int64 => 0, + UInt8 => 0, + UInt16 => 0, + UInt32 => 0, + UInt64 => 0, + Float16 => 0, + Float32 => 0, + Float64 => 0, + Timestamp(t, s) => t.heap_size() + s.heap_size(), + Date32 => 0, + Date64 => 0, + Time32(t) => t.heap_size(), + Time64(t) => t.heap_size(), + Duration(t) => t.heap_size(), + Interval(i) => i.heap_size(), + Binary => 0, + FixedSizeBinary(i) => i.heap_size(), + LargeBinary => 0, + BinaryView => 0, + Utf8 => 0, + LargeUtf8 => 0, + Utf8View => 0, + List(v) => v.heap_size(), + ListView(v) => v.heap_size(), + FixedSizeList(f, i) => f.heap_size() + i.heap_size(), + LargeList(l) => l.heap_size(), + LargeListView(l) => l.heap_size(), + Struct(s) => s.heap_size(), + Union(u, m) => u.heap_size() + m.heap_size(), + Dictionary(a, b) => a.heap_size() + b.heap_size(), + Decimal32(p, s) => p.heap_size() + s.heap_size(), + Decimal64(p, s) => p.heap_size() + s.heap_size(), + Decimal128(p, s) => p.heap_size() + s.heap_size(), + Decimal256(p, s) => p.heap_size() + s.heap_size(), + Map(m, b) => m.heap_size() + b.heap_size(), + RunEndEncoded(a, b) => a.heap_size() + b.heap_size(), + } + } +} + +impl DFHeapSize for Vec { + fn heap_size(&self) -> usize { + let item_size = size_of::(); + // account for the contents of the Vec + (self.capacity() * item_size) + + // add any heap allocations by contents + self.iter().map(|t| t.heap_size()).sum::() + } +} + +impl DFHeapSize for HashMap { + fn heap_size(&self) -> usize { + let capacity = self.capacity(); + if capacity == 0 { + return 0; + } + + // HashMap doesn't provide a way to get its heap size, so this is an approximation based on + // the behavior of hashbrown::HashMap as at version 0.16.0, and may become inaccurate + // if the implementation changes. + let key_val_size = size_of::<(K, V)>(); + // Overhead for the control tags group, which may be smaller depending on architecture + let group_size = 16; + // 1 byte of metadata stored per bucket. + let metadata_size = 1; + + // Compute the number of buckets for the capacity. Based on hashbrown's capacity_to_buckets + let buckets = if capacity < 15 { + let min_cap = match key_val_size { + 0..=1 => 14, + 2..=3 => 7, + _ => 3, + }; + let cap = min_cap.max(capacity); + if cap < 4 { + 4 + } else if cap < 8 { + 8 + } else { + 16 + } + } else { + (capacity.saturating_mul(8) / 7).next_power_of_two() + }; + + group_size + + (buckets * (key_val_size + metadata_size)) + + self.keys().map(|k| k.heap_size()).sum::() + + self.values().map(|v| v.heap_size()).sum::() + } +} + +impl DFHeapSize for Arc { + fn heap_size(&self) -> usize { + // Arc stores weak and strong counts on the heap alongside an instance of T + 2 * size_of::() + self.as_ref().heap_size() + } +} + +impl DFHeapSize for Arc { + fn heap_size(&self) -> usize { + 2 * size_of::() + size_of_val(self.as_ref()) + self.as_ref().heap_size() + } +} + +impl DFHeapSize for Fields { + fn heap_size(&self) -> usize { + self.into_iter().map(|f| f.heap_size()).sum::() + } +} + +impl DFHeapSize for StructArray { + fn heap_size(&self) -> usize { + self.get_array_memory_size() + } +} + +impl DFHeapSize for LargeListArray { + fn heap_size(&self) -> usize { + self.get_array_memory_size() + } +} + +impl DFHeapSize for ListArray { + fn heap_size(&self) -> usize { + self.get_array_memory_size() + } +} + +impl DFHeapSize for FixedSizeListArray { + fn heap_size(&self) -> usize { + self.get_array_memory_size() + } +} +impl DFHeapSize for MapArray { + fn heap_size(&self) -> usize { + self.get_array_memory_size() + } +} + +impl DFHeapSize for Arc { + fn heap_size(&self) -> usize { + 2 * size_of::() + self.as_ref().heap_size() + } +} + +impl DFHeapSize for Box { + fn heap_size(&self) -> usize { + size_of::() + self.as_ref().heap_size() + } +} + +impl DFHeapSize for Option { + fn heap_size(&self) -> usize { + self.as_ref().map(|inner| inner.heap_size()).unwrap_or(0) + } +} + +impl DFHeapSize for (A, B) +where + A: DFHeapSize, + B: DFHeapSize, +{ + fn heap_size(&self) -> usize { + self.0.heap_size() + self.1.heap_size() + } +} + +impl DFHeapSize for String { + fn heap_size(&self) -> usize { + self.capacity() + } +} + +impl DFHeapSize for str { + fn heap_size(&self) -> usize { + self.len() + } +} + +impl DFHeapSize for UnionFields { + fn heap_size(&self) -> usize { + self.iter().map(|f| f.0.heap_size() + f.1.heap_size()).sum() + } +} + +impl DFHeapSize for UnionMode { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for TimeUnit { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for IntervalUnit { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for Field { + fn heap_size(&self) -> usize { + self.name().heap_size() + + self.data_type().heap_size() + + self.is_nullable().heap_size() + + self.dict_is_ordered().heap_size() + + self.metadata().heap_size() + } +} + +impl DFHeapSize for IntervalMonthDayNano { + fn heap_size(&self) -> usize { + self.days.heap_size() + self.months.heap_size() + self.nanoseconds.heap_size() + } +} + +impl DFHeapSize for IntervalDayTime { + fn heap_size(&self) -> usize { + self.days.heap_size() + self.milliseconds.heap_size() + } +} + +impl DFHeapSize for DateTime { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for bool { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} +impl DFHeapSize for u8 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for u16 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for u32 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for u64 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for i8 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for i16 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for i32 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} +impl DFHeapSize for i64 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for i128 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for i256 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for f16 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for f32 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} +impl DFHeapSize for f64 { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} + +impl DFHeapSize for usize { + fn heap_size(&self) -> usize { + 0 // no heap allocations + } +} diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index fdd04f752455e..d093421f7dcb1 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -46,6 +46,7 @@ pub mod error; pub mod file_options; pub mod format; pub mod hash_utils; +pub mod heap_size; pub mod instant; pub mod metadata; pub mod nested_struct; @@ -61,6 +62,7 @@ pub mod test_util; pub mod tree_node; pub mod types; pub mod utils; + /// Reexport arrow crate pub use arrow; pub use column::Column; diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index d1ee301d91327..666a7b0d91537 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -231,7 +231,7 @@ mod tests { }; use datafusion_execution::cache::CacheAccessor; use datafusion_execution::cache::cache_manager::CacheManagerConfig; - use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; + use datafusion_execution::cache::file_statistics_cache::DefaultFileStatisticsCache; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use glob::Pattern; @@ -476,7 +476,7 @@ mod tests { // Test with collect_statistics enabled let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default()); let cache_config = CacheManagerConfig::default() - .with_files_statistics_cache(Some(file_statistics_cache.clone())); + .with_file_statistics_cache(Some(file_statistics_cache.clone())); let runtime = RuntimeEnvBuilder::new() .with_cache_manager(cache_config) .build_arc() @@ -506,7 +506,7 @@ mod tests { // Test with collect_statistics disabled let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default()); let cache_config = CacheManagerConfig::default() - .with_files_statistics_cache(Some(file_statistics_cache.clone())); + .with_file_statistics_cache(Some(file_statistics_cache.clone())); let runtime = RuntimeEnvBuilder::new() .with_cache_manager(cache_config) .build_arc() diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index b879973e7dc0b..4289daa89d6c8 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -101,6 +101,7 @@ use datafusion_session::SessionStore; use async_trait::async_trait; use chrono::{DateTime, Utc}; +use datafusion_execution::cache::file_statistics_cache::DEFAULT_FILE_STATISTICS_MEMORY_LIMIT; use object_store::ObjectStore; use parking_lot::RwLock; use url::Url; @@ -1182,6 +1183,10 @@ impl SessionContext { let duration = Self::parse_duration(variable, value)?; builder.with_object_list_cache_ttl(Some(duration)) } + "file_statistics_cache_limit" => { + let limit = Self::parse_capacity_limit(variable, value)?; + builder.with_file_statistics_cache_limit(limit) + } _ => return plan_err!("Unknown runtime configuration: {variable}"), // Remember to update `reset_runtime_variable()` when adding new options }; @@ -1221,9 +1226,13 @@ impl SessionContext { builder = builder.with_object_list_cache_ttl(DEFAULT_LIST_FILES_CACHE_TTL); } + "file_statistics_cache_limit" => { + builder = builder.with_file_statistics_cache_limit( + DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, + ); + } _ => return plan_err!("Unknown runtime configuration: {variable}"), }; - *state = SessionStateBuilder::from(state.clone()) .with_runtime_env(Arc::new(builder.build()?)) .build(); @@ -1416,14 +1425,19 @@ impl SessionContext { && table_provider.table_type() == table_type { schema.deregister_table(&table)?; - if table_type == TableType::Base - && let Some(lfc) = self.runtime_env().cache_manager.get_list_files_cache() - { - lfc.drop_table_entries(&Some(table_ref))?; + if table_type == TableType::Base { + if let Some(lfc) = self.runtime_env().cache_manager.get_list_files_cache() + { + lfc.drop_table_entries(&Some(table_ref.clone()))?; + } + if let Some(fsc) = + self.runtime_env().cache_manager.get_file_statistic_cache() + { + fsc.drop_table_entries(&Some(table_ref.clone()))?; + } } return Ok(true); } - Ok(false) } @@ -1752,7 +1766,9 @@ impl SessionContext { let config = ListingTableConfig::new(table_path) .with_listing_options(options) .with_schema(resolved_schema); - let table = ListingTable::try_new(config)?.with_definition(sql_definition); + let table = ListingTable::try_new(config)? + .with_definition(sql_definition) + .with_cache(self.runtime_env().cache_manager.get_file_statistic_cache()); self.register_table(table_ref, Arc::new(table))?; Ok(()) } diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index c96df5c50998f..d5648ccb66610 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -31,7 +31,7 @@ use datafusion_common::DFSchema; use datafusion_common::stats::Precision; use datafusion_execution::cache::DefaultListFilesCache; use datafusion_execution::cache::cache_manager::CacheManagerConfig; -use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; +use datafusion_execution::cache::file_statistics_cache::DefaultFileStatisticsCache; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_expr::{Expr, col, lit}; @@ -276,7 +276,7 @@ fn get_cache_runtime_state() -> ( let list_file_cache = Arc::new(DefaultListFilesCache::default()); let cache_config = cache_config - .with_files_statistics_cache(Some(file_static_cache.clone())) + .with_file_statistics_cache(Some(file_static_cache.clone())) .with_list_files_cache(Some(list_file_cache.clone())); let rt = RuntimeEnvBuilder::new() diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index cf5237d725805..407d7f95106bb 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -25,6 +25,7 @@ use datafusion::execution::context::TaskContext; use datafusion::prelude::SessionConfig; use datafusion_execution::cache::DefaultListFilesCache; use datafusion_execution::cache::cache_manager::CacheManagerConfig; +use datafusion_execution::cache::file_statistics_cache::DefaultFileStatisticsCache; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_physical_plan::common::collect; @@ -344,6 +345,51 @@ async fn test_list_files_cache_ttl() { assert_eq!(get_limit(&ctx), Duration::from_secs(90)); } +#[tokio::test] +async fn test_file_statistics_cache_limit() { + let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default()); + + let rt = RuntimeEnvBuilder::new() + .with_cache_manager( + CacheManagerConfig::default() + .with_file_statistics_cache(Some(file_statistics_cache)), + ) + .build_arc() + .unwrap(); + + let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), rt); + + let update_limit = async |ctx: &SessionContext, limit: &str| { + ctx.sql( + format!("SET datafusion.runtime.file_statistics_cache_limit = '{limit}'") + .as_str(), + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + }; + + let get_limit = |ctx: &SessionContext| -> usize { + ctx.task_ctx() + .runtime_env() + .cache_manager + .get_file_statistic_cache() + .unwrap() + .cache_limit() + }; + + update_limit(&ctx, "1M").await; + assert_eq!(get_limit(&ctx), 1024 * 1024); + + update_limit(&ctx, "42G").await; + assert_eq!(get_limit(&ctx), 42 * 1024 * 1024 * 1024); + + update_limit(&ctx, "23K").await; + assert_eq!(get_limit(&ctx), 23 * 1024); +} + #[tokio::test] async fn test_unknown_runtime_config() { let ctx = SessionContext::new(); diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index a9600271c28ce..d971762782258 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -56,7 +56,7 @@ pub use self::url::ListingTableUrl; use crate::file_groups::FileGroup; use chrono::TimeZone; use datafusion_common::stats::Precision; -use datafusion_common::{ColumnStatistics, Result, exec_datafusion_err}; +use datafusion_common::{ColumnStatistics, Result, TableReference, exec_datafusion_err}; use datafusion_common::{ScalarValue, Statistics}; use datafusion_physical_expr::LexOrdering; use futures::{Stream, StreamExt}; @@ -152,6 +152,7 @@ pub struct PartitionedFile { pub extensions: Option>, /// The estimated size of the parquet metadata, in bytes pub metadata_size_hint: Option, + pub table_reference: Option, } impl PartitionedFile { @@ -171,6 +172,7 @@ impl PartitionedFile { ordering: None, extensions: None, metadata_size_hint: None, + table_reference: None, } } @@ -184,6 +186,7 @@ impl PartitionedFile { ordering: None, extensions: None, metadata_size_hint: None, + table_reference: None, } } @@ -203,6 +206,7 @@ impl PartitionedFile { ordering: None, extensions: None, metadata_size_hint: None, + table_reference: None, } .with_range(start, end) } @@ -214,6 +218,14 @@ impl PartitionedFile { self } + pub fn with_table_reference( + mut self, + table_reference: Option, + ) -> Self { + self.table_reference = table_reference; + self + } + /// Size of the file to be scanned (taking into account the range, if present). pub fn effective_size(&self) -> u64 { if let Some(range) = &self.range { @@ -340,6 +352,7 @@ impl From for PartitionedFile { ordering: None, extensions: None, metadata_size_hint: None, + table_reference: None, } } } @@ -537,6 +550,7 @@ pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec { +pub trait FileStatisticsCache: + CacheAccessor +{ + /// Cache memory limit in bytes. + fn cache_limit(&self) -> usize; + + /// Updates the cache with a new memory limit in bytes. + fn update_cache_limit(&self, limit: usize); + /// Retrieves the information about the entries currently cached. fn list_entries(&self) -> HashMap; + + fn drop_table_entries(&self, table_ref: &Option) -> Result<()>; +} + +impl DFHeapSize for CachedFileMetadata { + fn heap_size(&self) -> usize { + self.meta.size.heap_size() + + self.meta.last_modified.heap_size() + + self.meta.version.heap_size() + + self.meta.e_tag.heap_size() + + self.meta.location.as_ref().heap_size() + + self.statistics.heap_size() + //TODO add ordering once LexOrdering/PhysicalExpr implements DFHeapSize + } } /// Represents information about a cached statistics entry. @@ -111,6 +137,8 @@ pub struct FileStatisticsCacheEntry { pub statistics_size_bytes: usize, /// Whether ordering information is cached for this file. pub has_ordering: bool, + /// Reference to the table associated with this statistics entry. + pub table_reference: Option, } /// Cached file listing. @@ -330,8 +358,19 @@ pub struct CacheManager { impl CacheManager { pub fn try_new(config: &CacheManagerConfig) -> Result> { - let file_statistic_cache = - config.table_files_statistics_cache.as_ref().map(Arc::clone); + let file_statistic_cache = match &config.file_statistics_cache { + Some(fsc) if config.file_statistics_cache_limit > 0 => { + fsc.update_cache_limit(config.file_statistics_cache_limit); + Some(Arc::clone(fsc)) + } + None if config.file_statistics_cache_limit > 0 => { + let fsc: Arc = Arc::new( + DefaultFileStatisticsCache::new(config.file_statistics_cache_limit), + ); + Some(fsc) + } + _ => None, + }; let list_files_cache = match &config.list_files_cache { Some(lfc) if config.list_files_cache_limit > 0 => { @@ -371,11 +410,18 @@ impl CacheManager { })) } - /// Get the cache of listing files statistics. + /// Get the file statistics cache. pub fn get_file_statistic_cache(&self) -> Option> { self.file_statistic_cache.clone() } + /// Get the memory limit of the file statistics cache. + pub fn get_file_statistic_cache_limit(&self) -> usize { + self.file_statistic_cache + .as_ref() + .map_or(0, |c| c.cache_limit()) + } + /// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path. pub fn get_list_files_cache(&self) -> Option> { self.list_files_cache.clone() @@ -410,8 +456,10 @@ pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M pub struct CacheManagerConfig { /// Enable caching of file statistics when listing files. /// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session. - /// Default is disabled. Currently only Parquet files are supported. - pub table_files_statistics_cache: Option>, + /// Default is enabled with 1MiB. Currently only Parquet files are supported. + pub file_statistics_cache: Option>, + /// Limit of the file statistics cache, in bytes. Default: 1MiB. + pub file_statistics_cache_limit: usize, /// Enable caching of file metadata when listing files. /// Enabling the cache avoids repeat list and object metadata fetch operations, which may be /// expensive in certain situations (e.g. remote object storage), for objects under paths that @@ -437,7 +485,8 @@ pub struct CacheManagerConfig { impl Default for CacheManagerConfig { fn default() -> Self { Self { - table_files_statistics_cache: Default::default(), + file_statistics_cache: Default::default(), + file_statistics_cache_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, list_files_cache: Default::default(), list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, list_files_cache_ttl: DEFAULT_LIST_FILES_CACHE_TTL, @@ -448,14 +497,20 @@ impl Default for CacheManagerConfig { } impl CacheManagerConfig { - /// Set the cache for files statistics. + /// Set the cache for file statistics. /// /// Default is `None` (disabled). - pub fn with_files_statistics_cache( + pub fn with_file_statistics_cache( mut self, cache: Option>, ) -> Self { - self.table_files_statistics_cache = cache; + self.file_statistics_cache = cache; + self + } + + /// Specifies the memory limit for the file statistics cache, in bytes. + pub fn with_file_statistics_cache_limit(mut self, limit: usize) -> Self { + self.file_statistics_cache_limit = limit; self } diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs deleted file mode 100644 index 59bb3f0702414..0000000000000 --- a/datafusion/execution/src/cache/cache_unit.rs +++ /dev/null @@ -1,414 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::collections::HashMap; - -use crate::cache::CacheAccessor; -use crate::cache::cache_manager::{ - CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry, -}; - -use dashmap::DashMap; -use object_store::path::Path; - -pub use crate::cache::DefaultFilesMetadataCache; - -/// Default implementation of [`FileStatisticsCache`] -/// -/// Stores cached file metadata (statistics and orderings) for files. -/// -/// The typical usage pattern is: -/// 1. Call `get(path)` to check for cached value -/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)` -/// 3. If invalid or missing, compute new value and call `put(path, new_value)` -/// -/// Uses DashMap for lock-free concurrent access. -/// -/// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache -#[derive(Default)] -pub struct DefaultFileStatisticsCache { - cache: DashMap, -} - -impl CacheAccessor for DefaultFileStatisticsCache { - fn get(&self, key: &Path) -> Option { - self.cache.get(key).map(|entry| entry.value().clone()) - } - - fn put(&self, key: &Path, value: CachedFileMetadata) -> Option { - self.cache.insert(key.clone(), value) - } - - fn remove(&self, k: &Path) -> Option { - self.cache.remove(k).map(|(_, entry)| entry) - } - - fn contains_key(&self, k: &Path) -> bool { - self.cache.contains_key(k) - } - - fn len(&self) -> usize { - self.cache.len() - } - - fn clear(&self) { - self.cache.clear(); - } - - fn name(&self) -> String { - "DefaultFileStatisticsCache".to_string() - } -} - -impl FileStatisticsCache for DefaultFileStatisticsCache { - fn list_entries(&self) -> HashMap { - let mut entries = HashMap::::new(); - - for entry in self.cache.iter() { - let path = entry.key(); - let cached = entry.value(); - entries.insert( - path.clone(), - FileStatisticsCacheEntry { - object_meta: cached.meta.clone(), - num_rows: cached.statistics.num_rows, - num_columns: cached.statistics.column_statistics.len(), - table_size_bytes: cached.statistics.total_byte_size, - statistics_size_bytes: 0, // TODO: set to the real size in the future - has_ordering: cached.ordering.is_some(), - }, - ); - } - - entries - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::cache::cache_manager::{ - CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry, - }; - use arrow::array::RecordBatch; - use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; - use chrono::DateTime; - use datafusion_common::Statistics; - use datafusion_common::stats::Precision; - use datafusion_expr::ColumnarValue; - use datafusion_physical_expr_common::physical_expr::PhysicalExpr; - use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; - use object_store::ObjectMeta; - use std::sync::Arc; - - fn create_test_meta(path: &str, size: u64) -> ObjectMeta { - ObjectMeta { - location: Path::from(path), - last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00") - .unwrap() - .into(), - size, - e_tag: None, - version: None, - } - } - - #[test] - fn test_statistics_cache() { - let meta = create_test_meta("test", 1024); - let cache = DefaultFileStatisticsCache::default(); - - let schema = Schema::new(vec![Field::new( - "test_column", - DataType::Timestamp(TimeUnit::Second, None), - false, - )]); - - // Cache miss - assert!(cache.get(&meta.location).is_none()); - - // Put a value - let cached_value = CachedFileMetadata::new( - meta.clone(), - Arc::new(Statistics::new_unknown(&schema)), - None, - ); - cache.put(&meta.location, cached_value); - - // Cache hit - let result = cache.get(&meta.location); - assert!(result.is_some()); - let cached = result.unwrap(); - assert!(cached.is_valid_for(&meta)); - - // File size changed - validation should fail - let meta2 = create_test_meta("test", 2048); - let cached = cache.get(&meta2.location).unwrap(); - assert!(!cached.is_valid_for(&meta2)); - - // Update with new value - let cached_value2 = CachedFileMetadata::new( - meta2.clone(), - Arc::new(Statistics::new_unknown(&schema)), - None, - ); - cache.put(&meta2.location, cached_value2); - - // Test list_entries - let entries = cache.list_entries(); - assert_eq!(entries.len(), 1); - let entry = entries.get(&Path::from("test")).unwrap(); - assert_eq!(entry.object_meta.size, 2048); // Should be updated value - } - - #[derive(Clone, Debug, PartialEq, Eq, Hash)] - struct MockExpr {} - - impl std::fmt::Display for MockExpr { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "MockExpr") - } - } - - impl PhysicalExpr for MockExpr { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn data_type( - &self, - _input_schema: &Schema, - ) -> datafusion_common::Result { - Ok(DataType::Int32) - } - - fn nullable(&self, _input_schema: &Schema) -> datafusion_common::Result { - Ok(false) - } - - fn evaluate( - &self, - _batch: &RecordBatch, - ) -> datafusion_common::Result { - unimplemented!() - } - - fn children(&self) -> Vec<&Arc> { - vec![] - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> datafusion_common::Result> { - assert!(children.is_empty()); - Ok(self) - } - - fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "MockExpr") - } - } - - fn ordering() -> LexOrdering { - let expr = Arc::new(MockExpr {}) as Arc; - LexOrdering::new(vec![PhysicalSortExpr::new_default(expr)]).unwrap() - } - - #[test] - fn test_ordering_cache() { - let meta = create_test_meta("test.parquet", 100); - let cache = DefaultFileStatisticsCache::default(); - - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - - // Cache statistics with no ordering - let cached_value = CachedFileMetadata::new( - meta.clone(), - Arc::new(Statistics::new_unknown(&schema)), - None, // No ordering yet - ); - cache.put(&meta.location, cached_value); - - let result = cache.get(&meta.location).unwrap(); - assert!(result.ordering.is_none()); - - // Update to add ordering - let mut cached = cache.get(&meta.location).unwrap(); - if cached.is_valid_for(&meta) && cached.ordering.is_none() { - cached.ordering = Some(ordering()); - } - cache.put(&meta.location, cached); - - let result2 = cache.get(&meta.location).unwrap(); - assert!(result2.ordering.is_some()); - - // Verify list_entries shows has_ordering = true - let entries = cache.list_entries(); - assert_eq!(entries.len(), 1); - assert!(entries.get(&meta.location).unwrap().has_ordering); - } - - #[test] - fn test_cache_invalidation_on_file_modification() { - let cache = DefaultFileStatisticsCache::default(); - let path = Path::from("test.parquet"); - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - - let meta_v1 = create_test_meta("test.parquet", 100); - - // Cache initial value - let cached_value = CachedFileMetadata::new( - meta_v1.clone(), - Arc::new(Statistics::new_unknown(&schema)), - None, - ); - cache.put(&path, cached_value); - - // File modified (size changed) - let meta_v2 = create_test_meta("test.parquet", 200); - - let cached = cache.get(&path).unwrap(); - // Should not be valid for new meta - assert!(!cached.is_valid_for(&meta_v2)); - - // Compute new value and update - let new_cached = CachedFileMetadata::new( - meta_v2.clone(), - Arc::new(Statistics::new_unknown(&schema)), - None, - ); - cache.put(&path, new_cached); - - // Should have new metadata - let result = cache.get(&path).unwrap(); - assert_eq!(result.meta.size, 200); - } - - #[test] - fn test_ordering_cache_invalidation_on_file_modification() { - let cache = DefaultFileStatisticsCache::default(); - let path = Path::from("test.parquet"); - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - - // Cache with original metadata and ordering - let meta_v1 = ObjectMeta { - location: path.clone(), - last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00") - .unwrap() - .into(), - size: 100, - e_tag: None, - version: None, - }; - let ordering_v1 = ordering(); - let cached_v1 = CachedFileMetadata::new( - meta_v1.clone(), - Arc::new(Statistics::new_unknown(&schema)), - Some(ordering_v1), - ); - cache.put(&path, cached_v1); - - // Verify cached ordering is valid - let cached = cache.get(&path).unwrap(); - assert!(cached.is_valid_for(&meta_v1)); - assert!(cached.ordering.is_some()); - - // File modified (size changed) - let meta_v2 = ObjectMeta { - location: path.clone(), - last_modified: DateTime::parse_from_rfc3339("2022-09-28T10:00:00+02:00") - .unwrap() - .into(), - size: 200, // Changed - e_tag: None, - version: None, - }; - - // Cache entry exists but should be invalid for new metadata - let cached = cache.get(&path).unwrap(); - assert!(!cached.is_valid_for(&meta_v2)); - - // Cache new version with different ordering - let ordering_v2 = ordering(); // New ordering instance - let cached_v2 = CachedFileMetadata::new( - meta_v2.clone(), - Arc::new(Statistics::new_unknown(&schema)), - Some(ordering_v2), - ); - cache.put(&path, cached_v2); - - // Old metadata should be invalid - let cached = cache.get(&path).unwrap(); - assert!(!cached.is_valid_for(&meta_v1)); - - // New metadata should be valid - assert!(cached.is_valid_for(&meta_v2)); - assert!(cached.ordering.is_some()); - } - - #[test] - fn test_list_entries() { - let cache = DefaultFileStatisticsCache::default(); - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - - let meta1 = create_test_meta("test1.parquet", 100); - - let cached_value = CachedFileMetadata::new( - meta1.clone(), - Arc::new(Statistics::new_unknown(&schema)), - None, - ); - cache.put(&meta1.location, cached_value); - let meta2 = create_test_meta("test2.parquet", 200); - let cached_value = CachedFileMetadata::new( - meta2.clone(), - Arc::new(Statistics::new_unknown(&schema)), - Some(ordering()), - ); - cache.put(&meta2.location, cached_value); - - let entries = cache.list_entries(); - assert_eq!( - entries, - HashMap::from([ - ( - Path::from("test1.parquet"), - FileStatisticsCacheEntry { - object_meta: meta1, - num_rows: Precision::Absent, - num_columns: 1, - table_size_bytes: Precision::Absent, - statistics_size_bytes: 0, - has_ordering: false, - } - ), - ( - Path::from("test2.parquet"), - FileStatisticsCacheEntry { - object_meta: meta2, - num_rows: Precision::Absent, - num_columns: 1, - table_size_bytes: Precision::Absent, - statistics_size_bytes: 0, - has_ordering: true, - } - ), - ]) - ); - } -} diff --git a/datafusion/execution/src/cache/file_statistics_cache.rs b/datafusion/execution/src/cache/file_statistics_cache.rs new file mode 100644 index 0000000000000..bebc495534008 --- /dev/null +++ b/datafusion/execution/src/cache/file_statistics_cache.rs @@ -0,0 +1,743 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::cache::cache_manager::{ + CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry, +}; +use crate::cache::{CacheAccessor, TableScopedPath}; +use object_store::path::Path; +use std::collections::HashMap; +use std::sync::Mutex; + +pub use crate::cache::DefaultFilesMetadataCache; +use crate::cache::lru_queue::LruQueue; +use datafusion_common::TableReference; +use datafusion_common::heap_size::DFHeapSize; + +/// Default implementation of [`FileStatisticsCache`] +/// +/// Stores cached file metadata (statistics and orderings) for files. +/// +/// The typical usage pattern is: +/// 1. Call `get(path)` to check for cached value +/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)` +/// 3. If invalid or missing, compute new value and call `put(path, new_value)` +/// +/// # Internal details +/// +/// The `memory_limit` controls the maximum size of the cache, which uses a +/// Least Recently Used eviction algorithm. When adding a new entry, if the total +/// size of the cached entries exceeds `memory_limit`, the least recently used entries +/// are evicted until the total size is lower than `memory_limit`. +/// +/// +/// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache +#[derive(Default)] +pub struct DefaultFileStatisticsCache { + state: Mutex, +} + +impl DefaultFileStatisticsCache { + pub fn new(memory_limit: usize) -> Self { + Self { + state: Mutex::new(DefaultFileStatisticsCacheState::new(memory_limit)), + } + } + + /// Returns the size of the cached memory, in bytes. + pub fn memory_used(&self) -> usize { + let state = self.state.lock().unwrap(); + state.memory_used + } +} + +struct DefaultFileStatisticsCacheState { + lru_queue: LruQueue, + memory_limit: usize, + memory_used: usize, +} + +pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 20 * 1024 * 1024; // 20MiB + +impl Default for DefaultFileStatisticsCacheState { + fn default() -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, + memory_used: 0, + } + } +} + +impl DefaultFileStatisticsCacheState { + fn new(memory_limit: usize) -> Self { + Self { + lru_queue: LruQueue::new(), + memory_limit, + memory_used: 0, + } + } + fn get(&mut self, key: &TableScopedPath) -> Option { + self.lru_queue.get(key).cloned() + } + + fn put( + &mut self, + key: &TableScopedPath, + value: CachedFileMetadata, + ) -> Option { + let key_size = key.path.as_ref().heap_size(); + let entry_size = value.heap_size(); + + if entry_size + key_size > self.memory_limit { + // Remove potential stale entry + self.remove(key); + return None; + } + + let old_value = self.lru_queue.put(key.clone(), value); + self.memory_used += entry_size; + self.memory_used += key.path.as_ref().heap_size(); + + if let Some(old_entry) = &old_value { + self.memory_used -= old_entry.heap_size(); + self.memory_used -= key.path.as_ref().heap_size(); + } + + self.evict_entries(); + + old_value + } + + fn remove(&mut self, k: &TableScopedPath) -> Option { + if let Some(old_entry) = self.lru_queue.remove(k) { + self.memory_used -= k.path.as_ref().heap_size(); + self.memory_used -= old_entry.heap_size(); + Some(old_entry) + } else { + None + } + } + + fn contains_key(&self, k: &TableScopedPath) -> bool { + self.lru_queue.contains_key(k) + } + + fn len(&self) -> usize { + self.lru_queue.len() + } + + fn clear(&mut self) { + self.lru_queue.clear(); + self.memory_used = 0; + } + + fn evict_entries(&mut self) { + while self.memory_used > self.memory_limit { + if let Some(removed) = self.lru_queue.pop() { + self.memory_used -= removed.0.path.as_ref().heap_size(); + self.memory_used -= removed.1.heap_size(); + } else { + // cache is empty while memory_used > memory_limit, cannot happen + log::error!( + "File statistics cache memory accounting bug: memory_used={} but cache is empty. \ + Please report this to the Apache DataFusion developers.", + self.memory_used + ); + debug_assert!( + false, + "memory_used={} but cache is empty", + self.memory_used + ); + self.memory_used = 0; + return; + } + } + } +} +impl CacheAccessor for DefaultFileStatisticsCache { + fn get(&self, key: &TableScopedPath) -> Option { + let mut state = self.state.lock().unwrap(); + state.get(key) + } + + fn put( + &self, + key: &TableScopedPath, + value: CachedFileMetadata, + ) -> Option { + let mut state = self.state.lock().unwrap(); + state.put(key, value) + } + + fn remove(&self, key: &TableScopedPath) -> Option { + let mut state = self.state.lock().unwrap(); + state.remove(key) + } + + fn contains_key(&self, k: &TableScopedPath) -> bool { + let state = self.state.lock().unwrap(); + state.contains_key(k) + } + + fn len(&self) -> usize { + let state = self.state.lock().unwrap(); + state.len() + } + + fn clear(&self) { + let mut state = self.state.lock().unwrap(); + state.clear(); + } + + fn name(&self) -> String { + "DefaultFileStatisticsCache".to_string() + } +} + +impl FileStatisticsCache for DefaultFileStatisticsCache { + fn cache_limit(&self) -> usize { + let state = self.state.lock().unwrap(); + state.memory_limit + } + + fn update_cache_limit(&self, limit: usize) { + let mut state = self.state.lock().unwrap(); + state.memory_limit = limit; + state.evict_entries(); + } + + fn list_entries(&self) -> HashMap { + let mut entries = HashMap::::new(); + for entry in self.state.lock().unwrap().lru_queue.list_entries() { + let path = entry.0.clone(); + let cached = entry.1; + entries.insert( + path.path, + FileStatisticsCacheEntry { + object_meta: cached.meta.clone(), + num_rows: cached.statistics.num_rows, + num_columns: cached.statistics.column_statistics.len(), + table_size_bytes: cached.statistics.total_byte_size, + statistics_size_bytes: cached.statistics.heap_size(), + has_ordering: cached.ordering.is_some(), + table_reference: path.table, + }, + ); + } + + entries + } + + fn drop_table_entries( + &self, + table_ref: &Option, + ) -> datafusion_common::Result<()> { + let mut state = self.state.lock().unwrap(); + let mut table_paths = vec![]; + for (path, _) in state.lru_queue.list_entries() { + if path.table == *table_ref { + table_paths.push(path.clone()); + } + } + for path in table_paths { + state.remove(&path); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::cache::cache_manager::{ + CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry, + }; + use arrow::array::{Int32Array, ListArray, RecordBatch}; + use arrow::buffer::{OffsetBuffer, ScalarBuffer}; + use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; + use chrono::DateTime; + use datafusion_common::stats::Precision; + use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; + use datafusion_expr::ColumnarValue; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; + use object_store::ObjectMeta; + use std::sync::Arc; + + fn create_test_meta(path: &str, size: u64) -> ObjectMeta { + ObjectMeta { + location: Path::from(path), + last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00") + .unwrap() + .into(), + size, + e_tag: None, + version: None, + } + } + + #[test] + fn test_statistics_cache() { + let meta = create_test_meta("test", 1024); + let cache = DefaultFileStatisticsCache::default(); + + let schema = Schema::new(vec![Field::new( + "test_column", + DataType::Timestamp(TimeUnit::Second, None), + false, + )]); + + let path = TableScopedPath { + path: meta.location.clone(), + table: None, + }; + + // Cache miss + assert!(cache.get(&path).is_none()); + + // Put a value + let cached_value = CachedFileMetadata::new( + meta.clone(), + Arc::new(Statistics::new_unknown(&schema)), + None, + ); + cache.put(&path, cached_value); + + // Cache hit + let result = cache.get(&path); + assert!(result.is_some()); + + let cached = result.unwrap(); + assert!(cached.is_valid_for(&meta)); + + // File size changed - validation should fail + let meta2 = create_test_meta("test", 2048); + + let path_2 = TableScopedPath { + path: meta2.location.clone(), + table: None, + }; + + let cached = cache.get(&path_2).unwrap(); + assert!(!cached.is_valid_for(&meta2)); + + // Update with new value + let cached_value2 = CachedFileMetadata::new( + meta2.clone(), + Arc::new(Statistics::new_unknown(&schema)), + None, + ); + cache.put(&path_2, cached_value2); + + // Test list_entries + let entries = cache.list_entries(); + assert_eq!(entries.len(), 1); + + let path_3 = TableScopedPath { + path: Path::from("test"), + table: None, + }; + + let entry = entries.get(&path_3.path).unwrap(); + assert_eq!(entry.object_meta.size, 2048); // Should be updated value + } + + #[derive(Clone, Debug, PartialEq, Eq, Hash)] + struct MockExpr {} + + impl std::fmt::Display for MockExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "MockExpr") + } + } + + impl PhysicalExpr for MockExpr { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn data_type( + &self, + _input_schema: &Schema, + ) -> datafusion_common::Result { + Ok(DataType::Int32) + } + + fn nullable(&self, _input_schema: &Schema) -> datafusion_common::Result { + Ok(false) + } + + fn evaluate( + &self, + _batch: &RecordBatch, + ) -> datafusion_common::Result { + unimplemented!() + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion_common::Result> { + assert!(children.is_empty()); + Ok(self) + } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "MockExpr") + } + } + + fn ordering() -> LexOrdering { + let expr = Arc::new(MockExpr {}) as Arc; + LexOrdering::new(vec![PhysicalSortExpr::new_default(expr)]).unwrap() + } + + #[test] + fn test_ordering_cache() { + let meta = create_test_meta("test.parquet", 100); + let cache = DefaultFileStatisticsCache::default(); + + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + + // Cache statistics with no ordering + let cached_value = CachedFileMetadata::new( + meta.clone(), + Arc::new(Statistics::new_unknown(&schema)), + None, // No ordering yet + ); + + let path = TableScopedPath { + path: meta.location.clone(), + table: None, + }; + + cache.put(&path, cached_value); + + let result = cache.get(&path).unwrap(); + assert!(result.ordering.is_none()); + + // Update to add ordering + let mut cached = cache.get(&path).unwrap(); + if cached.is_valid_for(&meta) && cached.ordering.is_none() { + cached.ordering = Some(ordering()); + } + cache.put(&path, cached); + + let result2 = cache.get(&path).unwrap(); + assert!(result2.ordering.is_some()); + + // Verify list_entries shows has_ordering = true + let entries = cache.list_entries(); + assert_eq!(entries.len(), 1); + assert!(entries.get(&path.path).unwrap().has_ordering); + } + + #[test] + fn test_cache_invalidation_on_file_modification() { + let cache = DefaultFileStatisticsCache::default(); + let path = TableScopedPath { + path: Path::from("test.parquet"), + table: None, + }; + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + + let meta_v1 = create_test_meta("test.parquet", 100); + + // Cache initial value + let cached_value = CachedFileMetadata::new( + meta_v1.clone(), + Arc::new(Statistics::new_unknown(&schema)), + None, + ); + cache.put(&path, cached_value); + + // File modified (size changed) + let meta_v2 = create_test_meta("test.parquet", 200); + + let cached = cache.get(&path).unwrap(); + // Should not be valid for new meta + assert!(!cached.is_valid_for(&meta_v2)); + + // Compute new value and update + let new_cached = CachedFileMetadata::new( + meta_v2.clone(), + Arc::new(Statistics::new_unknown(&schema)), + None, + ); + cache.put(&path, new_cached); + + // Should have new metadata + let result = cache.get(&path).unwrap(); + assert_eq!(result.meta.size, 200); + } + + #[test] + fn test_ordering_cache_invalidation_on_file_modification() { + let cache = DefaultFileStatisticsCache::default(); + let path = TableScopedPath { + path: Path::from("test.parquet"), + table: None, + }; + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + + // Cache with original metadata and ordering + let meta_v1 = ObjectMeta { + location: path.path.clone(), + last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00") + .unwrap() + .into(), + size: 100, + e_tag: None, + version: None, + }; + let ordering_v1 = ordering(); + let cached_v1 = CachedFileMetadata::new( + meta_v1.clone(), + Arc::new(Statistics::new_unknown(&schema)), + Some(ordering_v1), + ); + cache.put(&path, cached_v1); + + // Verify cached ordering is valid + let cached = cache.get(&path).unwrap(); + assert!(cached.is_valid_for(&meta_v1)); + assert!(cached.ordering.is_some()); + + // File modified (size changed) + let meta_v2 = ObjectMeta { + location: path.path.clone(), + last_modified: DateTime::parse_from_rfc3339("2022-09-28T10:00:00+02:00") + .unwrap() + .into(), + size: 200, // Changed + e_tag: None, + version: None, + }; + + // Cache entry exists but should be invalid for new metadata + let cached = cache.get(&path).unwrap(); + assert!(!cached.is_valid_for(&meta_v2)); + + // Cache new version with different ordering + let ordering_v2 = ordering(); // New ordering instance + let cached_v2 = CachedFileMetadata::new( + meta_v2.clone(), + Arc::new(Statistics::new_unknown(&schema)), + Some(ordering_v2), + ); + cache.put(&path, cached_v2); + + // Old metadata should be invalid + let cached = cache.get(&path).unwrap(); + assert!(!cached.is_valid_for(&meta_v1)); + + // New metadata should be valid + assert!(cached.is_valid_for(&meta_v2)); + assert!(cached.ordering.is_some()); + } + + #[test] + fn test_list_entries() { + let cache = DefaultFileStatisticsCache::default(); + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + + let meta1 = create_test_meta("test1.parquet", 100); + + let cached_value = CachedFileMetadata::new( + meta1.clone(), + Arc::new(Statistics::new_unknown(&schema)), + None, + ); + + let path_1 = TableScopedPath { + path: meta1.location.clone(), + table: None, + }; + + cache.put(&path_1, cached_value); + let meta2 = create_test_meta("test2.parquet", 200); + let cached_value = CachedFileMetadata::new( + meta2.clone(), + Arc::new(Statistics::new_unknown(&schema)), + Some(ordering()), + ); + + let path_2 = TableScopedPath { + path: meta2.location.clone(), + table: None, + }; + + cache.put(&path_2, cached_value); + + let entries = cache.list_entries(); + assert_eq!( + entries, + HashMap::from([ + ( + Path::from("test1.parquet"), + FileStatisticsCacheEntry { + object_meta: meta1, + num_rows: Precision::Absent, + num_columns: 1, + table_size_bytes: Precision::Absent, + statistics_size_bytes: 304, + has_ordering: false, + table_reference: None, + } + ), + ( + Path::from("test2.parquet"), + FileStatisticsCacheEntry { + object_meta: meta2, + num_rows: Precision::Absent, + num_columns: 1, + table_size_bytes: Precision::Absent, + statistics_size_bytes: 304, + has_ordering: true, + table_reference: None, + } + ), + ]) + ); + } + + #[test] + fn test_cache_entry_added_when_entries_are_within_cache_limit() { + let (meta_1, value_1) = create_cached_file_metadata_with_stats("test1.parquet"); + let (meta_2, value_2) = create_cached_file_metadata_with_stats("test2.parquet"); + let (meta_3, value_3) = create_cached_file_metadata_with_stats("test3.parquet"); + + let limit_for_2_entries = meta_1.location.as_ref().heap_size() + + value_1.heap_size() + + meta_2.location.as_ref().heap_size() + + value_2.heap_size(); + + // create a cache with a limit which fits exactly 2 entries + let cache = DefaultFileStatisticsCache::new(limit_for_2_entries); + let path_1 = TableScopedPath { + path: meta_1.location.clone(), + table: None, + }; + + let path_2 = TableScopedPath { + path: meta_2.location.clone(), + table: None, + }; + + cache.put(&path_1, value_1.clone()); + cache.put(&path_2, value_2.clone()); + + assert_eq!(cache.len(), 2); + assert_eq!(cache.memory_used(), limit_for_2_entries); + + let result_1 = cache.get(&path_1); + let result_2 = cache.get(&path_2); + assert_eq!(result_1.unwrap(), value_1); + assert_eq!(result_2.unwrap(), value_2); + + let path_3 = TableScopedPath { + path: meta_3.location.clone(), + table: None, + }; + + // adding the third entry evicts the first entry + cache.put(&path_3, value_3.clone()); + assert_eq!(cache.len(), 2); + assert_eq!(cache.memory_used(), limit_for_2_entries); + + let result_1 = cache.get(&path_1); + assert!(result_1.is_none()); + + let result_2 = cache.get(&path_2); + let result_3 = cache.get(&path_3); + + assert_eq!(result_2.unwrap(), value_2); + assert_eq!(result_3.unwrap(), value_3); + + // add the third entry again, making sure memory usage remains the same + cache.put(&path_3, value_3.clone()); + assert_eq!(cache.memory_used(), limit_for_2_entries); + cache.put(&path_3, value_3.clone()); + assert_eq!(cache.memory_used(), limit_for_2_entries); + + cache.remove(&path_2); + assert_eq!(cache.len(), 1); + assert_eq!( + cache.memory_used(), + meta_3.location.as_ref().heap_size() + value_3.heap_size() + ); + + cache.clear(); + assert_eq!(cache.len(), 0); + assert_eq!(cache.memory_used(), 0); + } + + #[test] + fn test_cache_rejects_entry_which_is_too_large() { + let (meta, value) = create_cached_file_metadata_with_stats("test1.parquet"); + + let limit_less_than_the_entry = value.heap_size() - 1; + + // create a cache with a size less than the entry + let cache = DefaultFileStatisticsCache::new(limit_less_than_the_entry); + + let path_1 = TableScopedPath { + path: meta.location.clone(), + table: None, + }; + + cache.put(&path_1, value); + + assert_eq!(cache.len(), 0); + assert_eq!(cache.memory_used(), 0); + } + + fn create_cached_file_metadata_with_stats( + file_name: &str, + ) -> (ObjectMeta, CachedFileMetadata) { + let series: Vec = (0..=10).collect(); + let values = Int32Array::from(series); + let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 11])); + let field = Arc::new(Field::new_list_field(DataType::Int32, false)); + let list_array = ListArray::new(field, offsets, Arc::new(values), None); + + let column_statistics = ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::List(Arc::new(list_array.clone()))), + min_value: Precision::Exact(ScalarValue::List(Arc::new(list_array.clone()))), + sum_value: Precision::Exact(ScalarValue::List(Arc::new(list_array.clone()))), + distinct_count: Precision::Exact(10), + byte_size: Precision::Absent, + }; + + let stats = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Exact(100), + column_statistics: vec![column_statistics.clone()], + }; + + let object_meta = create_test_meta(file_name, stats.heap_size() as u64); + let value = + CachedFileMetadata::new(object_meta.clone(), Arc::new(stats.clone()), None); + (object_meta, value) + } +} diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 0380e50c0935c..76bd660e6c7d5 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -16,7 +16,7 @@ // under the License. pub mod cache_manager; -pub mod cache_unit; +pub mod file_statistics_cache; pub mod lru_queue; mod file_metadata_cache; diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 67604c424c766..e393a7a127873 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -103,6 +103,7 @@ fn create_runtime_config_entries( metadata_cache_limit: Option, list_files_cache_limit: Option, list_files_cache_ttl: Option, + file_statistics_cache_limit: Option, ) -> Vec { vec![ ConfigEntry { @@ -135,6 +136,11 @@ fn create_runtime_config_entries( value: list_files_cache_ttl, description: "TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes.", }, + ConfigEntry { + key: "datafusion.runtime.file_statistics_cache_limit".to_string(), + value: file_statistics_cache_limit, + description: "Maximum memory to use for file statistics cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", + }, ] } @@ -296,6 +302,14 @@ impl RuntimeEnv { .get_list_files_cache_ttl() .map(format_duration); + let file_statistics_cache_limit = + self.cache_manager.get_file_statistic_cache_limit(); + let file_statistics_cache_value = format_byte_size( + file_statistics_cache_limit + .try_into() + .expect("File statistics cache size conversion failed"), + ); + create_runtime_config_entries( memory_limit_value, Some(max_temp_dir_value), @@ -303,6 +317,7 @@ impl RuntimeEnv { Some(metadata_cache_value), Some(list_files_cache_value), list_files_cache_ttl, + Some(file_statistics_cache_value), ) } } @@ -438,6 +453,11 @@ impl RuntimeEnvBuilder { self } + pub fn with_file_statistics_cache_limit(mut self, limit: usize) -> Self { + self.cache_manager = self.cache_manager.with_file_statistics_cache_limit(limit); + self + } + /// Build a RuntimeEnv pub fn build(self) -> Result { let Self { @@ -475,9 +495,10 @@ impl RuntimeEnvBuilder { /// Create a new RuntimeEnvBuilder from an existing RuntimeEnv pub fn from_runtime_env(runtime_env: &RuntimeEnv) -> Self { let cache_config = CacheManagerConfig { - table_files_statistics_cache: runtime_env + file_statistics_cache: runtime_env.cache_manager.get_file_statistic_cache(), + file_statistics_cache_limit: runtime_env .cache_manager - .get_file_statistic_cache(), + .get_file_statistic_cache_limit(), list_files_cache: runtime_env.cache_manager.get_list_files_cache(), list_files_cache_limit: runtime_env .cache_manager @@ -514,6 +535,7 @@ impl RuntimeEnvBuilder { Some("50M".to_owned()), Some("1M".to_owned()), None, + Some("1M".to_owned()), ) } diff --git a/datafusion/sqllogictest/test_files/encrypted_parquet.slt b/datafusion/sqllogictest/test_files/encrypted_parquet.slt index d580b7d1ad2b8..f51d84f0c88eb 100644 --- a/datafusion/sqllogictest/test_files/encrypted_parquet.slt +++ b/datafusion/sqllogictest/test_files/encrypted_parquet.slt @@ -85,5 +85,5 @@ float_field float ) STORED AS PARQUET LOCATION 'test_files/scratch/encrypted_parquet/' -query error DataFusion error: Parquet error: Parquet error: Parquet file has an encrypted footer but decryption properties were not provided +query error Parquet error: Parquet error: Parquet file has an encrypted footer but decryption properties were not provided SELECT * FROM parquet_table diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index ab8a4a293234e..4c952a74aa362 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -331,6 +331,7 @@ datafusion.optimizer.skip_failed_rules false datafusion.optimizer.subset_repartition_threshold 4 datafusion.optimizer.top_down_join_key_reordering true datafusion.optimizer.use_statistics_registry false +datafusion.runtime.file_statistics_cache_limit 20M datafusion.runtime.list_files_cache_limit 1M datafusion.runtime.list_files_cache_ttl NULL datafusion.runtime.max_temp_directory_size 100G @@ -476,6 +477,7 @@ datafusion.optimizer.skip_failed_rules false When set to true, the logical plan datafusion.optimizer.subset_repartition_threshold 4 Partition count threshold for subset satisfaction optimization. When the current partition count is >= this threshold, DataFusion will skip repartitioning if the required partitioning expression is a subset of the current partition expression such as Hash(a) satisfies Hash(a, b). When the current partition count is < this threshold, DataFusion will repartition to increase parallelism even when subset satisfaction applies. Set to 0 to always repartition (disable subset satisfaction optimization). Set to a high value to always use subset satisfaction. Example (subset_repartition_threshold = 4): ```text Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is subset of Hash([a]) If current partitions (3) < threshold (4), repartition: AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)] RepartitionExec: partitioning=Hash([a, b], 8), input_partitions=3 AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 3) If current partitions (8) >= threshold (4), use subset satisfaction: AggregateExec: mode=SinglePartitioned, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 8) ``` datafusion.optimizer.top_down_join_key_reordering true When set to true, the physical plan optimizer will run a top down process to reorder the join keys datafusion.optimizer.use_statistics_registry false When set to true, the physical plan optimizer uses the pluggable `StatisticsRegistry` for statistics propagation across operators. This enables more accurate cardinality estimates compared to each operator's built-in `partition_statistics`. +datafusion.runtime.file_statistics_cache_limit 20M Maximum memory to use for file statistics cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. datafusion.runtime.list_files_cache_limit 1M Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. datafusion.runtime.list_files_cache_ttl NULL TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes. datafusion.runtime.max_temp_directory_size 100G Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. diff --git a/datafusion/sqllogictest/test_files/set_variable.slt b/datafusion/sqllogictest/test_files/set_variable.slt index f270b9b169572..aea9b9aeea41f 100644 --- a/datafusion/sqllogictest/test_files/set_variable.slt +++ b/datafusion/sqllogictest/test_files/set_variable.slt @@ -351,6 +351,12 @@ RESET datafusion.runtime.memory_limit statement ok EXPLAIN ANALYZE SELECT * FROM generate_series(1, 1000) AS t1(v1) ORDER BY v1 +statement ok +SET datafusion.runtime.file_statistics_cache_limit = '1K' + +statement ok +RESET datafusion.runtime.file_statistics_cache_limit + statement ok SET datafusion.runtime.list_files_cache_limit = '1K' @@ -605,6 +611,15 @@ SHOW datafusion.runtime.max_temp_directory_size ---- datafusion.runtime.max_temp_directory_size 10G +# Test SET and SHOW runtime.file_statistics_cache_limit +statement ok +SET datafusion.runtime.file_statistics_cache_limit = '42M' + +query TT +SHOW datafusion.runtime.file_statistics_cache_limit +---- +datafusion.runtime.file_statistics_cache_limit 42M + # Test SET and SHOW runtime.metadata_cache_limit statement ok SET datafusion.runtime.metadata_cache_limit = '200M' @@ -639,6 +654,7 @@ datafusion.runtime.list_files_cache_ttl 1m30s query T SELECT name FROM information_schema.df_settings WHERE name LIKE 'datafusion.runtime.%' ORDER BY name ---- +datafusion.runtime.file_statistics_cache_limit datafusion.runtime.list_files_cache_limit datafusion.runtime.list_files_cache_ttl datafusion.runtime.max_temp_directory_size diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index b828f0e793d47..9505a5806feb3 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -227,14 +227,15 @@ SET datafusion.runtime.memory_limit = '2G'; The following runtime configuration settings are available: -| key | default | description | -| ------------------------------------------ | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| datafusion.runtime.list_files_cache_limit | 1M | Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | -| datafusion.runtime.list_files_cache_ttl | NULL | TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes. | -| datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | -| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | -| datafusion.runtime.metadata_cache_limit | 50M | Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | -| datafusion.runtime.temp_directory | NULL | The path to the temporary file directory. | +| key | default | description | +| ---------------------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.runtime.file_statistics_cache_limit | 1M | Maximum memory to use for file statistics cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.list_files_cache_limit | 1M | Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.list_files_cache_ttl | NULL | TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes. | +| datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.metadata_cache_limit | 50M | Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.temp_directory | NULL | The path to the temporary file directory. | # Tuning Guide