Skip to content

Commit 6160f40

Browse files
committed
fix ci, add files_matching_prefix
1 parent 436b05a commit 6160f40

3 files changed

Lines changed: 22 additions & 6 deletions

File tree

datafusion-cli/src/functions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -811,14 +811,14 @@ impl TableFunctionImpl for ListFilesCacheFunc {
811811
.map(|t| t.duration_since(now).as_millis() as i64),
812812
);
813813

814-
for meta in entry.metas.iter() {
814+
for meta in entry.cached_file_list.files.iter() {
815815
file_path_arr.push(meta.location.to_string());
816816
file_modified_arr.push(meta.last_modified.timestamp_millis());
817817
file_size_bytes_arr.push(meta.size);
818818
etag_arr.push(meta.e_tag.clone());
819819
version_arr.push(meta.version.clone());
820820
}
821-
current_offset += entry.metas.len() as i32;
821+
current_offset += entry.cached_file_list.files.len() as i32;
822822
offsets.push(current_offset);
823823
}
824824
}

datafusion/datasource/src/url.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::sync::Arc;
19+
1820
use datafusion_common::{DataFusionError, Result};
1921
use datafusion_execution::cache::cache_manager::CachedFileList;
2022
use datafusion_execution::object_store::ObjectStoreUrl;
@@ -369,7 +371,7 @@ async fn list_with_cache<'b>(
369371
// Try cache lookup
370372
let vec = if let Some(cached) = cache.get(table_base_path) {
371373
debug!("Hit list files cache");
372-
cached.filter_by_prefix(&filter_prefix)
374+
cached.files_matching_prefix(&filter_prefix)
373375
} else {
374376
// Cache miss - always list and cache the full table
375377
// This ensures we have complete data for future prefix queries
@@ -378,11 +380,14 @@ async fn list_with_cache<'b>(
378380
.try_collect::<Vec<ObjectMeta>>()
379381
.await?;
380382
let cached = CachedFileList::new(vec);
381-
let result = cached.filter_by_prefix(&filter_prefix);
383+
let result = cached.files_matching_prefix(&filter_prefix);
382384
cache.put(table_base_path, cached);
383385
result
384386
};
385-
Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
387+
Ok(
388+
futures::stream::iter(Arc::unwrap_or_clone(vec).into_iter().map(Ok))
389+
.boxed(),
390+
)
386391
}
387392
}
388393
}

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ impl CachedFileList {
128128
}
129129

130130
/// Filter the files by prefix.
131-
pub fn filter_by_prefix(&self, prefix: &Option<Path>) -> Vec<ObjectMeta> {
131+
fn filter_by_prefix(&self, prefix: &Option<Path>) -> Vec<ObjectMeta> {
132132
match prefix {
133133
Some(prefix) => self
134134
.files
@@ -139,6 +139,17 @@ impl CachedFileList {
139139
None => self.files.as_ref().clone(),
140140
}
141141
}
142+
143+
/// Returns files matching the given prefix.
144+
///
145+
/// When prefix is `None`, returns a clone of the `Arc` (no data copy).
146+
/// When filtering is needed, returns a new `Arc` with filtered results (clones each matching [`ObjectMeta`]).
147+
pub fn files_matching_prefix(&self, prefix: &Option<Path>) -> Arc<Vec<ObjectMeta>> {
148+
match prefix {
149+
None => Arc::clone(&self.files),
150+
Some(p) => Arc::new(self.filter_by_prefix(&Some(p.clone()))),
151+
}
152+
}
142153
}
143154

144155
/// Cache for storing the [`ObjectMeta`]s that result from listing a path

0 commit comments

Comments
 (0)