Skip to content

Commit ec2c602

Browse files
authored
feat[vortex-datafusion]: use PartitionedFile instead of path in create_reader (#7156)
The motivation for this change is that we have a need to create a custom VortexReadAt implementation based on some custom values in a PartitionedFile extension. I don't think this will break any users because the reader module was private. <!-- Thank you for submitting a pull request! We appreciate your time and effort. Please make sure to provide enough information so that we can review your pull request. The Summary and Testing sections below contain guidance on what to include. --> <!-- ## API Changes Uncomment this section if there are any user-facing changes. Consider whether the change affects users in one of the following ways: 1. Breaks public APIs in some way. 2. Changes the underlying behavior of one of the engine integrations. 3. Should some documentation be updated to reflect this change? If a public API is changed in a breaking manner, make sure to add the appropriate label. You can run `./scripts/public-api.sh` locally to see if there are any public API changes (and this also runs in our CI). --> ## Testing <!-- Please describe how this change was tested. Here are some common categories for testing in Vortex: 1. Verifying existing behavior is maintained. 2. Verifying new behavior and functionality works correctly. 3. Serialization compatibility (backwards and forwards) should be maintained or explicitly broken. --> Signed-off-by: Alfonso Subiotto Marques <alfonso.subiotto@polarsignals.com>
1 parent 41b997d commit ec2c602

File tree

4 files changed

+38
-9
lines changed

4 files changed

+38
-9
lines changed

vortex-datafusion/public-api.lock

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,30 @@ pub type vortex_datafusion::metrics::VortexMetricsFinder::Error = core::convert:
1818

1919
pub fn vortex_datafusion::metrics::VortexMetricsFinder::pre_visit(&mut self, plan: &dyn datafusion_physical_plan::execution_plan::ExecutionPlan) -> core::result::Result<bool, Self::Error>
2020

21+
pub mod vortex_datafusion::reader
22+
23+
pub struct vortex_datafusion::reader::DefaultVortexReaderFactory
24+
25+
impl vortex_datafusion::reader::DefaultVortexReaderFactory
26+
27+
pub fn vortex_datafusion::reader::DefaultVortexReaderFactory::new(object_store: alloc::sync::Arc<dyn object_store::ObjectStore>) -> Self
28+
29+
impl core::fmt::Debug for vortex_datafusion::reader::DefaultVortexReaderFactory
30+
31+
pub fn vortex_datafusion::reader::DefaultVortexReaderFactory::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
32+
33+
impl vortex_datafusion::reader::VortexReaderFactory for vortex_datafusion::reader::DefaultVortexReaderFactory
34+
35+
pub fn vortex_datafusion::reader::DefaultVortexReaderFactory::create_reader(&self, file: &datafusion_datasource::PartitionedFile, session: &vortex_session::VortexSession) -> datafusion_common::error::Result<alloc::sync::Arc<dyn vortex_io::read_at::VortexReadAt>>
36+
37+
pub trait vortex_datafusion::reader::VortexReaderFactory: core::fmt::Debug + core::marker::Send + core::marker::Sync + 'static
38+
39+
pub fn vortex_datafusion::reader::VortexReaderFactory::create_reader(&self, file: &datafusion_datasource::PartitionedFile, session: &vortex_session::VortexSession) -> datafusion_common::error::Result<alloc::sync::Arc<dyn vortex_io::read_at::VortexReadAt>>
40+
41+
impl vortex_datafusion::reader::VortexReaderFactory for vortex_datafusion::reader::DefaultVortexReaderFactory
42+
43+
pub fn vortex_datafusion::reader::DefaultVortexReaderFactory::create_reader(&self, file: &datafusion_datasource::PartitionedFile, session: &vortex_session::VortexSession) -> datafusion_common::error::Result<alloc::sync::Arc<dyn vortex_io::read_at::VortexReadAt>>
44+
2145
pub mod vortex_datafusion::v2
2246

2347
pub struct vortex_datafusion::v2::VortexDataSource
@@ -176,7 +200,7 @@ pub fn vortex_datafusion::VortexSource::with_projection_pushdown(self, enabled:
176200

177201
pub fn vortex_datafusion::VortexSource::with_scan_concurrency(self, scan_concurrency: usize) -> Self
178202

179-
pub fn vortex_datafusion::VortexSource::with_vortex_reader_factory(self, vortex_reader_factory: alloc::sync::Arc<dyn vortex_datafusion::persistent::reader::VortexReaderFactory>) -> Self
203+
pub fn vortex_datafusion::VortexSource::with_vortex_reader_factory(self, vortex_reader_factory: alloc::sync::Arc<dyn vortex_datafusion::reader::VortexReaderFactory>) -> Self
180204

181205
impl core::clone::Clone for vortex_datafusion::VortexSource
182206

vortex-datafusion/src/persistent/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ mod cache;
77
mod format;
88
pub mod metrics;
99
mod opener;
10-
mod reader;
10+
pub mod reader;
1111
mod sink;
1212
mod source;
1313
mod stream;

vortex-datafusion/src/persistent/opener.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,7 @@ impl FileOpener for VortexOpener {
110110
let mut projection = self.projection.clone();
111111
let mut filter = self.filter.clone();
112112

113-
let reader = self
114-
.vortex_reader_factory
115-
.create_reader(file.path().as_ref(), &session)?;
113+
let reader = self.vortex_reader_factory.create_reader(&file, &session)?;
116114

117115
let reader =
118116
InstrumentedReadAt::new_with_labels(reader, metrics_registry.as_ref(), labels.clone());

vortex-datafusion/src/persistent/reader.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
//! Factory for creating [`VortexReadAt`][vortex::io::VortexReadAt] instances
5+
//! from [`PartitionedFile`]s.
6+
47
use std::fmt::Debug;
58
use std::sync::Arc;
69

710
use datafusion_common::Result as DFResult;
11+
use datafusion_datasource::PartitionedFile;
812
use object_store::ObjectStore;
913
use vortex::io::VortexReadAt;
1014
use vortex::io::object_store::ObjectStoreReadAt;
@@ -14,8 +18,11 @@ use vortex::session::VortexSession;
1418
/// Factory to create [`VortexReadAt`] instances to read the target file.
1519
pub trait VortexReaderFactory: Debug + Send + Sync + 'static {
1620
/// Create a reader for a target object.
17-
fn create_reader(&self, path: &str, session: &VortexSession)
18-
-> DFResult<Arc<dyn VortexReadAt>>;
21+
fn create_reader(
22+
&self,
23+
file: &PartitionedFile,
24+
session: &VortexSession,
25+
) -> DFResult<Arc<dyn VortexReadAt>>;
1926
}
2027

2128
/// Default factory, creates [`ObjectStore`] backed readers for files,
@@ -35,12 +42,12 @@ impl DefaultVortexReaderFactory {
3542
impl VortexReaderFactory for DefaultVortexReaderFactory {
3643
fn create_reader(
3744
&self,
38-
path: &str,
45+
file: &PartitionedFile,
3946
session: &VortexSession,
4047
) -> DFResult<Arc<dyn VortexReadAt>> {
4148
Ok(Arc::new(ObjectStoreReadAt::new(
4249
self.object_store.clone(),
43-
path.into(),
50+
file.path().as_ref().into(),
4451
session.handle(),
4552
)) as _)
4653
}

0 commit comments

Comments
 (0)