From f8a61e465f9ef78f963affb680b6a1d668b46d01 Mon Sep 17 00:00:00 2001 From: Alfonso Subiotto Marques Date: Wed, 25 Mar 2026 13:59:33 +0100 Subject: [PATCH] feat[vortex-datafusion]: use PartitionedFile instead of path in create_reader The motivation for this change is that we have a need to create a custom VortexReadAt implementation based on some custome values in a PartitionedFile extension. I don't think this will break any users because the reader module was private. Signed-off-by: Alfonso Subiotto Marques --- vortex-datafusion/public-api.lock | 26 +++++++++++++++++++++- vortex-datafusion/src/persistent/mod.rs | 2 +- vortex-datafusion/src/persistent/opener.rs | 4 +--- vortex-datafusion/src/persistent/reader.rs | 15 +++++++++---- 4 files changed, 38 insertions(+), 9 deletions(-) diff --git a/vortex-datafusion/public-api.lock b/vortex-datafusion/public-api.lock index 0dd1d1d14b6..4ef0235c55b 100644 --- a/vortex-datafusion/public-api.lock +++ b/vortex-datafusion/public-api.lock @@ -18,6 +18,30 @@ pub type vortex_datafusion::metrics::VortexMetricsFinder::Error = core::convert: pub fn vortex_datafusion::metrics::VortexMetricsFinder::pre_visit(&mut self, plan: &dyn datafusion_physical_plan::execution_plan::ExecutionPlan) -> core::result::Result +pub mod vortex_datafusion::reader + +pub struct vortex_datafusion::reader::DefaultVortexReaderFactory + +impl vortex_datafusion::reader::DefaultVortexReaderFactory + +pub fn vortex_datafusion::reader::DefaultVortexReaderFactory::new(object_store: alloc::sync::Arc) -> Self + +impl core::fmt::Debug for vortex_datafusion::reader::DefaultVortexReaderFactory + +pub fn vortex_datafusion::reader::DefaultVortexReaderFactory::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_datafusion::reader::VortexReaderFactory for vortex_datafusion::reader::DefaultVortexReaderFactory + +pub fn vortex_datafusion::reader::DefaultVortexReaderFactory::create_reader(&self, file: &datafusion_datasource::PartitionedFile, session: &vortex_session::VortexSession) -> datafusion_common::error::Result> + +pub trait vortex_datafusion::reader::VortexReaderFactory: core::fmt::Debug + core::marker::Send + core::marker::Sync + 'static + +pub fn vortex_datafusion::reader::VortexReaderFactory::create_reader(&self, file: &datafusion_datasource::PartitionedFile, session: &vortex_session::VortexSession) -> datafusion_common::error::Result> + +impl vortex_datafusion::reader::VortexReaderFactory for vortex_datafusion::reader::DefaultVortexReaderFactory + +pub fn vortex_datafusion::reader::DefaultVortexReaderFactory::create_reader(&self, file: &datafusion_datasource::PartitionedFile, session: &vortex_session::VortexSession) -> datafusion_common::error::Result> + pub mod vortex_datafusion::v2 pub struct vortex_datafusion::v2::VortexDataSource @@ -176,7 +200,7 @@ pub fn vortex_datafusion::VortexSource::with_projection_pushdown(self, enabled: pub fn vortex_datafusion::VortexSource::with_scan_concurrency(self, scan_concurrency: usize) -> Self -pub fn vortex_datafusion::VortexSource::with_vortex_reader_factory(self, vortex_reader_factory: alloc::sync::Arc) -> Self +pub fn vortex_datafusion::VortexSource::with_vortex_reader_factory(self, vortex_reader_factory: alloc::sync::Arc) -> Self impl core::clone::Clone for vortex_datafusion::VortexSource diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index 29488d4f7dd..558032ebd01 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -7,7 +7,7 @@ mod cache; mod format; pub mod metrics; mod opener; -mod reader; +pub mod reader; mod sink; mod source; mod stream; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 5986a06da49..4da18b89946 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -110,9 +110,7 @@ impl FileOpener for VortexOpener { let mut projection = self.projection.clone(); let mut filter = self.filter.clone(); - let reader = self - .vortex_reader_factory - .create_reader(file.path().as_ref(), &session)?; + let reader = self.vortex_reader_factory.create_reader(&file, &session)?; let reader = InstrumentedReadAt::new_with_labels(reader, metrics_registry.as_ref(), labels.clone()); diff --git a/vortex-datafusion/src/persistent/reader.rs b/vortex-datafusion/src/persistent/reader.rs index a6f40314ba3..4e480ca374f 100644 --- a/vortex-datafusion/src/persistent/reader.rs +++ b/vortex-datafusion/src/persistent/reader.rs @@ -1,10 +1,14 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +//! Factory for creating [`VortexReadAt`][vortex::io::VortexReadAt] instances +//! from [`PartitionedFile`]s. + use std::fmt::Debug; use std::sync::Arc; use datafusion_common::Result as DFResult; +use datafusion_datasource::PartitionedFile; use object_store::ObjectStore; use vortex::io::VortexReadAt; use vortex::io::object_store::ObjectStoreReadAt; @@ -14,8 +18,11 @@ use vortex::session::VortexSession; /// Factory to create [`VortexReadAt`] instances to read the target file. pub trait VortexReaderFactory: Debug + Send + Sync + 'static { /// Create a reader for a target object. - fn create_reader(&self, path: &str, session: &VortexSession) - -> DFResult>; + fn create_reader( + &self, + file: &PartitionedFile, + session: &VortexSession, + ) -> DFResult>; } /// Default factory, creates [`ObjectStore`] backed readers for files, @@ -35,12 +42,12 @@ impl DefaultVortexReaderFactory { impl VortexReaderFactory for DefaultVortexReaderFactory { fn create_reader( &self, - path: &str, + file: &PartitionedFile, session: &VortexSession, ) -> DFResult> { Ok(Arc::new(ObjectStoreReadAt::new( self.object_store.clone(), - path.into(), + file.path().as_ref().into(), session.handle(), )) as _) }