Skip to content

Commit 7f170ce

Browse files
authored
Touch up some of the interfaces and docs in vortex-datafusion (#8485)
## Summary This PR mostly touches up some of the interfaces, and exposes a few extra types that might be helpful for users. 1. Makes our type conversion traits public 2. Exposes `DefaultExpressionConverter`, so users can fall back to it if they need to in their own implementations. 3. `VortexSink` if they want to explore write physical plans 4. Introduces `VortexSource::predicate`, to explore read physical plans. Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 4f52cbc commit 7f170ce

8 files changed

Lines changed: 53 additions & 10 deletions

File tree

vortex-datafusion/src/convert/exprs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ pub trait ExpressionConvertor: Send + Sync {
106106
}
107107
}
108108

109-
/// The default [`ExpressionConvertor`].
109+
/// The default [`ExpressionConvertor`] implementation.
110110
#[derive(Default)]
111111
pub struct DefaultExpressionConvertor {}
112112

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,32 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
//! Utilities and interface to convert DataFusion types to Vortex types.
5+
//!
6+
//! Currently includes:
7+
//! [`ExpressionConvertor`] - Controls the rewrite of DataFusion expressions to Vortex expressions, and whether they can
8+
//! be pushed into the underlying scan. A default implementation is provided.
9+
//! [`FromDataFusion`] - Converts a DataFusion type into a Vortex type infallible.
10+
//! [TryToDataFusion] - Fallibly converts a Vortex type to a DataFusion type.
11+
412
use vortex::error::VortexResult;
513

614
pub(crate) mod exprs;
715
mod scalars;
816
pub(crate) mod schema;
917
pub(crate) mod stats;
1018

19+
pub use exprs::DefaultExpressionConvertor;
20+
pub use exprs::ExpressionConvertor;
21+
1122
/// First-party trait for implementing conversion from DataFusion types to Vortex types.
12-
pub(crate) trait FromDataFusion<D: ?Sized>: Sized {
23+
pub trait FromDataFusion<D: ?Sized>: Sized {
24+
/// Convert to this Vortex type from the input DataFusion type.
1325
fn from_df(df: &D) -> Self;
1426
}
1527

16-
/// First-party trait for implementing conversion from Vortex to DataFusion types.
17-
pub(crate) trait TryToDataFusion<D> {
28+
/// First-party trait for implementing fallible conversions from Vortex to DataFusion types.
29+
pub trait TryToDataFusion<D> {
30+
/// Try to convert this Vortex type from the input DataFusion type.
1831
fn try_to_df(&self) -> VortexResult<D>;
1932
}

vortex-datafusion/src/convert/scalars.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,16 @@ impl TryToDataFusion<ScalarValue> for Scalar {
119119
.cloned()
120120
.map(|b| Vec::<u8>::from(b.into_inner())),
121121
),
122-
DType::List(..) => todo!("list scalar conversion"),
123-
DType::FixedSizeList(..) => todo!("fixed-size list scalar conversion"),
122+
dtype @ DType::List(..) => vortex_bail!(
123+
"cannot convert Vortex scalar dtype {dtype} to DataFusion ScalarValue: unsupported scalar type"
124+
),
125+
dtype @ DType::FixedSizeList(..) => vortex_bail!(
126+
"cannot convert Vortex scalar dtype {dtype} to DataFusion ScalarValue: unsupported scalar type"
127+
),
124128
DType::Struct(..) => struct_to_df(self)?,
125-
DType::Union(..) => todo!("union scalar conversion"),
129+
dtype @ DType::Union(..) => vortex_bail!(
130+
"cannot convert Vortex scalar dtype {dtype} to DataFusion ScalarValue: unsupported scalar type"
131+
),
126132
DType::Variant(_) => vortex_bail!("Variant scalars aren't supported with DF"),
127133
DType::Extension(ext) => {
128134
let storage_scalar = self.as_extension().to_storage_scalar();
@@ -809,4 +815,21 @@ mod tests {
809815
assert!(Scalar::from_df(&df).is_null());
810816
Ok(())
811817
}
818+
819+
#[rstest]
820+
#[case::list(Scalar::null(DType::List(
821+
Arc::new(DType::Primitive(PType::I32, Nullability::Nullable)),
822+
Nullability::Nullable
823+
)))]
824+
#[case::fixed_size_list(Scalar::null(DType::FixedSizeList(
825+
Arc::new(DType::Primitive(PType::I32, Nullability::Nullable)),
826+
2,
827+
Nullability::Nullable
828+
)))]
829+
#[case::union(Scalar::null(DType::Union(Nullability::Nullable)))]
830+
fn unsupported_vortex_scalars_return_errors(#[case] scalar: Scalar) {
831+
let err = scalar.try_to_df().unwrap_err();
832+
833+
assert!(err.to_string().contains("unsupported scalar type"), "{err}");
834+
}
812835
}

vortex-datafusion/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,13 @@ use std::fmt::Debug;
8888
use datafusion_common::stats::Precision as DFPrecision;
8989
use vortex::expr::stats::Precision;
9090

91-
mod convert;
91+
pub mod convert;
9292
mod persistent;
9393
pub mod v2;
9494

9595
#[cfg(test)]
9696
mod tests;
9797

98-
pub use convert::exprs::ExpressionConvertor;
9998
pub use persistent::*;
10099

101100
/// Extension trait to convert our [`Precision`] to DataFusion's

vortex-datafusion/src/persistent/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub use access_plan::VortexAccessPlan;
3636
pub use format::VortexFormat;
3737
pub use format::VortexFormatFactory;
3838
pub use format::VortexTableOptions;
39+
pub use sink::VortexSink;
3940
pub use source::VortexSource;
4041

4142
#[cfg(test)]

vortex-datafusion/src/persistent/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
//! Factory for creating [`VortexReadAt`] instances from [`PartitionedFile`]s.
4+
//! Factory for creating [`VortexReadAt`] instances for [`PartitionedFile`]s.
55
66
use std::fmt::Debug;
77
use std::sync::Arc;

vortex-datafusion/src/persistent/sink.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,15 @@ use vortex::io::VortexWrite;
3232
use vortex::io::object_store::ObjectStoreWrite;
3333
use vortex::session::VortexSession;
3434

35+
/// Implements [`DataSink`] for writing Vortex files.
3536
pub struct VortexSink {
3637
config: FileSinkConfig,
3738
schema: SchemaRef,
3839
session: VortexSession,
3940
}
4041

4142
impl VortexSink {
43+
/// Creates a new [`VortexSink`] instance.
4244
pub fn new(config: FileSinkConfig, schema: SchemaRef, session: VortexSession) -> Self {
4345
Self {
4446
config,

vortex-datafusion/src/persistent/source.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,11 @@ impl VortexSource {
306306
self
307307
}
308308

309+
/// Returns the predicate this source is going to push down
310+
pub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>> {
311+
self.vortex_predicate.as_ref()
312+
}
313+
309314
fn create_vortex_opener(
310315
&self,
311316
object_store: Arc<dyn ObjectStore>,

0 commit comments

Comments
 (0)