Skip to content

Commit f8a61e4

Browse files
committed
feat[vortex-datafusion]: use PartitionedFile instead of path in create_reader
The motivation for this change is that we have a need to create a custom VortexReadAt implementation based on some custome values in a PartitionedFile extension. I don't think this will break any users because the reader module was private. Signed-off-by: Alfonso Subiotto Marques <alfonso.subiotto@polarsignals.com>
1 parent 41b997d commit f8a61e4

4 files changed

Lines changed: 38 additions & 9 deletions

File tree

vortex-datafusion/public-api.lock

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,30 @@ pub type vortex_datafusion::metrics::VortexMetricsFinder::Error = core::convert:
1818

1919
pub fn vortex_datafusion::metrics::VortexMetricsFinder::pre_visit(&mut self, plan: &dyn datafusion_physical_plan::execution_plan::ExecutionPlan) -> core::result::Result<bool, Self::Error>
2020

21+
pub mod vortex_datafusion::reader
22+
23+
pub struct vortex_datafusion::reader::DefaultVortexReaderFactory
24+
25+
impl vortex_datafusion::reader::DefaultVortexReaderFactory
26+
27+
pub fn vortex_datafusion::reader::DefaultVortexReaderFactory::new(object_store: alloc::sync::Arc<dyn object_store::ObjectStore>) -> Self
28+
29+
impl core::fmt::Debug for vortex_datafusion::reader::DefaultVortexReaderFactory
30+
31+
pub fn vortex_datafusion::reader::DefaultVortexReaderFactory::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
32+
33+
impl vortex_datafusion::reader::VortexReaderFactory for vortex_datafusion::reader::DefaultVortexReaderFactory
34+
35+
pub fn vortex_datafusion::reader::DefaultVortexReaderFactory::create_reader(&self, file: &datafusion_datasource::PartitionedFile, session: &vortex_session::VortexSession) -> datafusion_common::error::Result<alloc::sync::Arc<dyn vortex_io::read_at::VortexReadAt>>
36+
37+
pub trait vortex_datafusion::reader::VortexReaderFactory: core::fmt::Debug + core::marker::Send + core::marker::Sync + 'static
38+
39+
pub fn vortex_datafusion::reader::VortexReaderFactory::create_reader(&self, file: &datafusion_datasource::PartitionedFile, session: &vortex_session::VortexSession) -> datafusion_common::error::Result<alloc::sync::Arc<dyn vortex_io::read_at::VortexReadAt>>
40+
41+
impl vortex_datafusion::reader::VortexReaderFactory for vortex_datafusion::reader::DefaultVortexReaderFactory
42+
43+
pub fn vortex_datafusion::reader::DefaultVortexReaderFactory::create_reader(&self, file: &datafusion_datasource::PartitionedFile, session: &vortex_session::VortexSession) -> datafusion_common::error::Result<alloc::sync::Arc<dyn vortex_io::read_at::VortexReadAt>>
44+
2145
pub mod vortex_datafusion::v2
2246

2347
pub struct vortex_datafusion::v2::VortexDataSource
@@ -176,7 +200,7 @@ pub fn vortex_datafusion::VortexSource::with_projection_pushdown(self, enabled:
176200

177201
pub fn vortex_datafusion::VortexSource::with_scan_concurrency(self, scan_concurrency: usize) -> Self
178202

179-
pub fn vortex_datafusion::VortexSource::with_vortex_reader_factory(self, vortex_reader_factory: alloc::sync::Arc<dyn vortex_datafusion::persistent::reader::VortexReaderFactory>) -> Self
203+
pub fn vortex_datafusion::VortexSource::with_vortex_reader_factory(self, vortex_reader_factory: alloc::sync::Arc<dyn vortex_datafusion::reader::VortexReaderFactory>) -> Self
180204

181205
impl core::clone::Clone for vortex_datafusion::VortexSource
182206

vortex-datafusion/src/persistent/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ mod cache;
77
mod format;
88
pub mod metrics;
99
mod opener;
10-
mod reader;
10+
pub mod reader;
1111
mod sink;
1212
mod source;
1313
mod stream;

vortex-datafusion/src/persistent/opener.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,7 @@ impl FileOpener for VortexOpener {
110110
let mut projection = self.projection.clone();
111111
let mut filter = self.filter.clone();
112112

113-
let reader = self
114-
.vortex_reader_factory
115-
.create_reader(file.path().as_ref(), &session)?;
113+
let reader = self.vortex_reader_factory.create_reader(&file, &session)?;
116114

117115
let reader =
118116
InstrumentedReadAt::new_with_labels(reader, metrics_registry.as_ref(), labels.clone());

vortex-datafusion/src/persistent/reader.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
//! Factory for creating [`VortexReadAt`][vortex::io::VortexReadAt] instances
5+
//! from [`PartitionedFile`]s.
6+
47
use std::fmt::Debug;
58
use std::sync::Arc;
69

710
use datafusion_common::Result as DFResult;
11+
use datafusion_datasource::PartitionedFile;
812
use object_store::ObjectStore;
913
use vortex::io::VortexReadAt;
1014
use vortex::io::object_store::ObjectStoreReadAt;
@@ -14,8 +18,11 @@ use vortex::session::VortexSession;
1418
/// Factory to create [`VortexReadAt`] instances to read the target file.
1519
pub trait VortexReaderFactory: Debug + Send + Sync + 'static {
1620
/// Create a reader for a target object.
17-
fn create_reader(&self, path: &str, session: &VortexSession)
18-
-> DFResult<Arc<dyn VortexReadAt>>;
21+
fn create_reader(
22+
&self,
23+
file: &PartitionedFile,
24+
session: &VortexSession,
25+
) -> DFResult<Arc<dyn VortexReadAt>>;
1926
}
2027

2128
/// Default factory, creates [`ObjectStore`] backed readers for files,
@@ -35,12 +42,12 @@ impl DefaultVortexReaderFactory {
3542
impl VortexReaderFactory for DefaultVortexReaderFactory {
3643
fn create_reader(
3744
&self,
38-
path: &str,
45+
file: &PartitionedFile,
3946
session: &VortexSession,
4047
) -> DFResult<Arc<dyn VortexReadAt>> {
4148
Ok(Arc::new(ObjectStoreReadAt::new(
4249
self.object_store.clone(),
43-
path.into(),
50+
file.path().as_ref().into(),
4451
session.handle(),
4552
)) as _)
4653
}

0 commit comments

Comments
 (0)