Skip to content

Commit eda4dd0

Browse files
authored
Minor touchups for vortex-datafusion (#8356)
## Summary Some minor touchups to the `FileSource` based DataFusion integration. The two substantial changes here are: 1. When reading files, if we didn't have the footer in the cache, make sure to insert it. That can happen when using `ListingTable` without stats inference, or when using `FileScanConfig` directly in another table provider. 2. On write - move the schema-to-dtype logic outside of the loop. It only needs to happen once and the `dtype` is cloned per write task. Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 0dd6db7 commit eda4dd0

5 files changed

Lines changed: 89 additions & 31 deletions

File tree

vortex-datafusion/src/persistent/access_plan.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,16 @@ pub struct VortexAccessPlan {
4343
}
4444

4545
impl VortexAccessPlan {
46+
/// Returns the selection, if one was set.
47+
pub fn selection(&self) -> Option<&Selection> {
48+
self.selection.as_ref()
49+
}
50+
4651
/// Sets the row [`Selection`] to apply when the file is opened.
4752
pub fn with_selection(mut self, selection: Selection) -> Self {
4853
self.selection = Some(selection);
4954
self
5055
}
51-
}
52-
53-
impl VortexAccessPlan {
54-
/// Returns the selection, if one was set.
55-
pub fn selection(&self) -> Option<&Selection> {
56-
self.selection.as_ref()
57-
}
5856

5957
/// Applies this access plan to a [`ScanBuilder`].
6058
///

vortex-datafusion/src/persistent/format.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ impl FileFormatFactory for VortexFormatFactory {
285285
if let Some(key) = key.strip_prefix("format.") {
286286
opts.set(key, value)?;
287287
} else {
288-
tracing::trace!("Ignoring options '{key}'");
288+
tracing::trace!("Ignoring option '{key}'");
289289
}
290290
}
291291

vortex-datafusion/src/persistent/opener.rs

Lines changed: 73 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use datafusion_datasource::PartitionedFile;
1717
use datafusion_datasource::TableSchema;
1818
use datafusion_datasource::file_stream::FileOpenFuture;
1919
use datafusion_datasource::file_stream::FileOpener;
20+
use datafusion_execution::cache::cache_manager::CachedFileMetadataEntry;
2021
use datafusion_execution::cache::cache_manager::FileMetadataCache;
2122
use datafusion_physical_expr::PhysicalExprRef;
2223
use datafusion_physical_expr::projection::ProjectionExprs;
@@ -129,7 +130,7 @@ impl FileOpener for VortexOpener {
129130
let unified_file_schema = Arc::clone(self.table_schema.file_schema());
130131
let batch_size = self.batch_size;
131132
let limit = self.limit;
132-
let layout_reader = Arc::clone(&self.layout_readers);
133+
let layout_readers = Arc::clone(&self.layout_readers);
133134
let natural_split_ranges = Arc::clone(&self.natural_split_ranges);
134135
let has_output_ordering = self.has_output_ordering;
135136
let scan_concurrency = self.scan_concurrency;
@@ -160,9 +161,7 @@ impl FileOpener for VortexOpener {
160161
Ok(async move {
161162
// Create FilePruner when we have a predicate and either dynamic expressions
162163
// or file statistics available. The pruner can eliminate files without
163-
// opening them based on:
164-
// - Partition column values (e.g., date=2024-01-01)
165-
// - File-level statistics (min/max values per column)
164+
// opening them based on File-level statistics (min/max values per column)
166165
let mut file_pruner = file_pruning_predicate
167166
.filter(|p| {
168167
// Only create pruner if we have dynamic expressions or file statistics
@@ -192,22 +191,41 @@ impl FileOpener for VortexOpener {
192191
.with_metrics_registry(Arc::clone(&metrics_registry))
193192
.with_labels(labels);
194193

195-
if let Some(file_metadata_cache) = file_metadata_cache
196-
&& let Some(entry) = file_metadata_cache.get(file.path())
197-
&& entry.is_valid_for(&file.object_meta)
198-
&& let Some(vortex_metadata) = entry
199-
.file_metadata
200-
.as_any()
201-
.downcast_ref::<CachedVortexMetadata>()
202-
{
203-
open_opts = open_opts.with_footer(vortex_metadata.footer().clone());
194+
let cached_footer = file_metadata_cache
195+
.as_ref()
196+
.and_then(|cache| cache.get(file.path()))
197+
.filter(|entry| entry.is_valid_for(&file.object_meta))
198+
.and_then(|entry| {
199+
entry
200+
.file_metadata
201+
.as_any()
202+
.downcast_ref::<CachedVortexMetadata>()
203+
.map(|vortex_metadata| vortex_metadata.footer().clone())
204+
});
205+
let footer_cache_hit = cached_footer.is_some();
206+
207+
if let Some(footer) = cached_footer {
208+
open_opts = open_opts.with_footer(footer);
204209
}
205210

206211
let vxf = open_opts
207212
.open_read(reader)
208213
.await
209214
.map_err(|e| exec_datafusion_err!("Failed to open Vortex file {e}"))?;
210215

216+
// On a miss, cache the parsed footer so other partitions and later executions
217+
// skip the footer fetch and parse. `infer_schema`/`infer_stats` also populate
218+
// this cache, but only when planning goes through `VortexFormat`.
219+
if !footer_cache_hit && let Some(cache) = &file_metadata_cache {
220+
cache.put(
221+
file.path(),
222+
CachedFileMetadataEntry::new(
223+
file.object_meta.clone(),
224+
Arc::new(CachedVortexMetadata::new(&vxf)),
225+
),
226+
);
227+
}
228+
211229
// Check if there are rows in this file. If not, we can save
212230
// ourselves some work and return an empty stream.
213231
if vxf.row_count() == 0 {
@@ -285,7 +303,7 @@ impl FileOpener for VortexOpener {
285303
let projector = leftover_projection.make_projector(&stream_schema)?;
286304

287305
// We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once.
288-
let layout_reader = match layout_reader.entry(file.object_meta.location.clone()) {
306+
let layout_reader = match layout_readers.entry(file.object_meta.location.clone()) {
289307
Entry::Occupied(mut occupied_entry) => {
290308
if let Some(reader) = occupied_entry.get().upgrade() {
291309
tracing::trace!("reusing layout reader for {}", occupied_entry.key());
@@ -352,7 +370,6 @@ impl FileOpener for VortexOpener {
352370
// This will only fail if the user has not configured a suitable
353371
// PhysicalExprAdapterFactory on the file source to handle rewriting the
354372
// expression to handle missing/reordered columns in the Vortex file.
355-
356373
let (pushed, unpushed): (Vec<PhysicalExprRef>, Vec<PhysicalExprRef>) =
357374
split_conjunction(&f)
358375
.into_iter()
@@ -568,6 +585,7 @@ mod tests {
568585
use datafusion::physical_expr::planner::logical2physical;
569586
use datafusion::physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
570587
use datafusion::scalar::ScalarValue;
588+
use datafusion_execution::cache::DefaultFilesMetadataCache;
571589
use datafusion_expr::Operator;
572590
use datafusion_physical_expr::expressions as df_expr;
573591
use datafusion_physical_expr::projection::ProjectionExpr;
@@ -775,6 +793,46 @@ mod tests {
775793
Ok(())
776794
}
777795

796+
#[tokio::test]
797+
async fn test_open_populates_file_metadata_cache() -> anyhow::Result<()> {
798+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
799+
let file_path = "cached/file.vortex";
800+
let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
801+
let data_size =
802+
write_arrow_to_vortex(Arc::clone(&object_store), file_path, batch.clone()).await?;
803+
804+
let file = PartitionedFile::new(file_path.to_string(), data_size);
805+
let table_schema = TableSchema::from_file_schema(batch.schema());
806+
807+
let cache: Arc<dyn FileMetadataCache> =
808+
Arc::new(DefaultFilesMetadataCache::new(64 * 1024 * 1024));
809+
let mut opener = make_opener(Arc::clone(&object_store), table_schema, None);
810+
opener.file_metadata_cache = Some(Arc::clone(&cache));
811+
812+
// The first open misses the cache and must write the parsed footer back.
813+
let stream = opener.open(file.clone())?.await?;
814+
stream.try_collect::<Vec<_>>().await?;
815+
816+
let entry = cache
817+
.get(file.path())
818+
.ok_or_else(|| anyhow::anyhow!("footer was not cached after open"))?;
819+
assert!(entry.is_valid_for(&file.object_meta));
820+
assert!(
821+
entry
822+
.file_metadata
823+
.as_any()
824+
.downcast_ref::<CachedVortexMetadata>()
825+
.is_some()
826+
);
827+
828+
// The second open hits the cache and still returns the same data.
829+
let stream = opener.open(file.clone())?.await?;
830+
let data = stream.try_collect::<Vec<_>>().await?;
831+
assert_eq!(data.iter().map(|rb| rb.num_rows()).sum::<usize>(), 3);
832+
833+
Ok(())
834+
}
835+
778836
#[rstest]
779837
#[tokio::test]
780838
async fn test_open_files_different_table_schema() -> anyhow::Result<()> {

vortex-datafusion/src/persistent/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ impl VortexReaderFactory for DefaultVortexReaderFactory {
7272
) -> DFResult<Arc<dyn VortexReadAt>> {
7373
Ok(Arc::new(ObjectStoreReadAt::new_with_allocator(
7474
Arc::clone(&self.object_store),
75-
file.path().as_ref().into(),
75+
file.path().clone(),
7676
session.handle(),
7777
session.allocator(),
7878
)) as _)

vortex-datafusion/src/persistent/sink.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,24 +100,26 @@ impl FileSink for VortexSink {
100100
object_store: Arc<dyn ObjectStore>,
101101
) -> DFResult<u64> {
102102
let mut file_write_tasks: JoinSet<DFResult<(Path, WriteSummary)>> = JoinSet::new();
103+
let writer_schema = get_writer_schema(&self.config);
104+
let dtype = self
105+
.session
106+
.arrow()
107+
.from_arrow_schema(&writer_schema)
108+
.map_err(|e| {
109+
exec_datafusion_err!("Failed to derive Vortex DType from writer schema: {e}")
110+
})?;
103111

104112
// TODO(adamg):
105113
// 1. We can probably be better at signaling how much memory we're consuming (potentially when reading too), see ParquetSink::spawn_writer_tasks_and_join.
106114
while let Some((path, rx)) = file_stream_rx.recv().await {
107115
let session = self.session.clone();
108116
let object_store = Arc::clone(&object_store);
109-
let writer_schema = get_writer_schema(&self.config);
110-
let dtype = session
111-
.arrow()
112-
.from_arrow_schema(&writer_schema)
113-
.map_err(|e| {
114-
exec_datafusion_err!("Failed to derive Vortex DType from writer schema: {e}")
115-
})?;
116117

117118
// We need to spawn work because there's a dependency between the different files. If one file has too many batches buffered,
118119
// the demux task might deadlock itself.
119120
let arrow_session = session.clone();
120121
let import_schema = Arc::clone(&writer_schema);
122+
let dtype = dtype.clone();
121123
file_write_tasks.spawn(async move {
122124
let stream = ReceiverStream::new(rx).map(move |rb| {
123125
arrow_session

0 commit comments

Comments
 (0)