Skip to content

Commit 436b05a

Browse files
committed
Refactor cache APIs to support ordering information
1 parent 102caeb commit 436b05a

10 files changed

Lines changed: 898 additions & 743 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/catalog-listing/src/table.rs

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -705,31 +705,42 @@ impl ListingTable {
705705
store: &Arc<dyn ObjectStore>,
706706
part_file: &PartitionedFile,
707707
) -> datafusion_common::Result<Arc<Statistics>> {
708-
match self
708+
use datafusion_execution::cache::cache_manager::CachedFileMetadata;
709+
710+
// Check cache first
711+
if let Some(cached) = self
709712
.collected_statistics
710-
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
713+
.get(&part_file.object_meta.location)
711714
{
712-
Some(statistics) => Ok(statistics),
713-
None => {
714-
let statistics = self
715-
.options
716-
.format
717-
.infer_stats(
718-
ctx,
719-
store,
720-
Arc::clone(&self.file_schema),
721-
&part_file.object_meta,
722-
)
723-
.await?;
724-
let statistics = Arc::new(statistics);
725-
self.collected_statistics.put_with_extra(
726-
&part_file.object_meta.location,
727-
Arc::clone(&statistics),
728-
&part_file.object_meta,
729-
);
730-
Ok(statistics)
715+
// Validate that cached entry is still valid
716+
if cached.is_valid_for(&part_file.object_meta) {
717+
return Ok(cached.statistics);
731718
}
732719
}
720+
721+
// Cache miss or invalid - infer statistics
722+
let statistics = self
723+
.options
724+
.format
725+
.infer_stats(
726+
ctx,
727+
store,
728+
Arc::clone(&self.file_schema),
729+
&part_file.object_meta,
730+
)
731+
.await?;
732+
let statistics = Arc::new(statistics);
733+
734+
// Store in cache
735+
self.collected_statistics.put(
736+
&part_file.object_meta.location,
737+
CachedFileMetadata::new(
738+
part_file.object_meta.clone(),
739+
Arc::clone(&statistics),
740+
None, // No ordering information in this PR
741+
),
742+
);
743+
Ok(statistics)
733744
}
734745
}
735746

datafusion/datasource-parquet/src/metadata.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ use datafusion_common::stats::Precision;
3131
use datafusion_common::{
3232
ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics,
3333
};
34-
use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache};
34+
use datafusion_execution::cache::cache_manager::{
35+
CachedFileMetadataEntry, FileMetadata, FileMetadataCache,
36+
};
3537
use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
3638
use datafusion_physical_plan::Accumulator;
3739
use log::debug;
@@ -125,19 +127,15 @@ impl<'a> DFParquetMetadata<'a> {
125127
!cfg!(feature = "parquet_encryption") || decryption_properties.is_none();
126128

127129
if cache_metadata
128-
&& let Some(parquet_metadata) = file_metadata_cache
129-
.as_ref()
130-
.and_then(|file_metadata_cache| file_metadata_cache.get(object_meta))
131-
.and_then(|file_metadata| {
132-
file_metadata
133-
.as_any()
134-
.downcast_ref::<CachedParquetMetaData>()
135-
.map(|cached_parquet_metadata| {
136-
Arc::clone(cached_parquet_metadata.parquet_metadata())
137-
})
138-
})
130+
&& let Some(file_metadata_cache) = file_metadata_cache.as_ref()
131+
&& let Some(cached) = file_metadata_cache.get(&object_meta.location)
132+
&& cached.is_valid_for(object_meta)
133+
&& let Some(cached_parquet) = cached
134+
.file_metadata
135+
.as_any()
136+
.downcast_ref::<CachedParquetMetaData>()
139137
{
140-
return Ok(parquet_metadata);
138+
return Ok(Arc::clone(cached_parquet.parquet_metadata()));
141139
}
142140

143141
let mut reader =
@@ -163,8 +161,11 @@ impl<'a> DFParquetMetadata<'a> {
163161

164162
if cache_metadata && let Some(file_metadata_cache) = file_metadata_cache {
165163
file_metadata_cache.put(
166-
object_meta,
167-
Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
164+
&object_meta.location,
165+
CachedFileMetadataEntry::new(
166+
(*object_meta).clone(),
167+
Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
168+
),
168169
);
169170
}
170171

datafusion/datasource/src/url.rs

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::sync::Arc;
19-
2018
use datafusion_common::{DataFusionError, Result};
19+
use datafusion_execution::cache::cache_manager::CachedFileList;
2120
use datafusion_execution::object_store::ObjectStoreUrl;
2221
use datafusion_session::Session;
2322

@@ -364,35 +363,24 @@ async fn list_with_cache<'b>(
364363
.map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
365364
.boxed()),
366365
Some(cache) => {
367-
// Convert prefix to Option<Path> for cache lookup
368-
let prefix_filter = prefix.cloned();
366+
// Build the filter prefix (only Some if prefix was requested)
367+
let filter_prefix = prefix.is_some().then(|| full_prefix.clone());
369368

370-
// Try cache lookup with optional prefix filter
371-
let vec = if let Some(res) =
372-
cache.get_with_extra(table_base_path, &prefix_filter)
373-
{
369+
// Try cache lookup
370+
let vec = if let Some(cached) = cache.get(table_base_path) {
374371
debug!("Hit list files cache");
375-
res.as_ref().clone()
372+
cached.filter_by_prefix(&filter_prefix)
376373
} else {
377374
// Cache miss - always list and cache the full table
378375
// This ensures we have complete data for future prefix queries
379376
let vec = store
380377
.list(Some(table_base_path))
381378
.try_collect::<Vec<ObjectMeta>>()
382379
.await?;
383-
cache.put(table_base_path, Arc::new(vec.clone()));
384-
385-
// If a prefix filter was requested, apply it to the results
386-
if prefix.is_some() {
387-
let full_prefix_str = full_prefix.as_ref();
388-
vec.into_iter()
389-
.filter(|meta| {
390-
meta.location.as_ref().starts_with(full_prefix_str)
391-
})
392-
.collect()
393-
} else {
394-
vec
395-
}
380+
let cached = CachedFileList::new(vec);
381+
let result = cached.filter_by_prefix(&filter_prefix);
382+
cache.put(table_base_path, cached);
383+
result
396384
};
397385
Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
398386
}
@@ -494,6 +482,7 @@ mod tests {
494482
use std::any::Any;
495483
use std::collections::HashMap;
496484
use std::ops::Range;
485+
use std::sync::Arc;
497486
use tempfile::tempdir;
498487

499488
#[test]

datafusion/execution/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ chrono = { workspace = true }
5555
dashmap = { workspace = true }
5656
datafusion-common = { workspace = true, default-features = false }
5757
datafusion-expr = { workspace = true, default-features = false }
58+
datafusion-physical-expr-common = { workspace = true, default-features = false }
5859
futures = { workspace = true }
5960
log = { workspace = true }
6061
object_store = { workspace = true, features = ["fs"] }

0 commit comments

Comments
 (0)