Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -17350,6 +17350,90 @@ pub fn vortex_array::scalar_fn::EmptyOptions::hash<__H: core::hash::Hasher>(&sel

impl core::marker::StructuralPartialEq for vortex_array::scalar_fn::EmptyOptions

pub struct vortex_array::scalar_fn::ForeignScalarFnOptions

impl vortex_array::scalar_fn::ForeignScalarFnOptions

pub fn vortex_array::scalar_fn::ForeignScalarFnOptions::new(metadata: alloc::vec::Vec<u8>, arity: usize) -> Self

impl core::clone::Clone for vortex_array::scalar_fn::ForeignScalarFnOptions

pub fn vortex_array::scalar_fn::ForeignScalarFnOptions::clone(&self) -> vortex_array::scalar_fn::ForeignScalarFnOptions

impl core::cmp::Eq for vortex_array::scalar_fn::ForeignScalarFnOptions

impl core::cmp::PartialEq for vortex_array::scalar_fn::ForeignScalarFnOptions

pub fn vortex_array::scalar_fn::ForeignScalarFnOptions::eq(&self, other: &vortex_array::scalar_fn::ForeignScalarFnOptions) -> bool

impl core::fmt::Debug for vortex_array::scalar_fn::ForeignScalarFnOptions

pub fn vortex_array::scalar_fn::ForeignScalarFnOptions::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl core::fmt::Display for vortex_array::scalar_fn::ForeignScalarFnOptions

pub fn vortex_array::scalar_fn::ForeignScalarFnOptions::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl core::hash::Hash for vortex_array::scalar_fn::ForeignScalarFnOptions

pub fn vortex_array::scalar_fn::ForeignScalarFnOptions::hash<__H: core::hash::Hasher>(&self, state: &mut __H)

impl core::marker::StructuralPartialEq for vortex_array::scalar_fn::ForeignScalarFnOptions

pub struct vortex_array::scalar_fn::ForeignScalarFnVTable

impl vortex_array::scalar_fn::ForeignScalarFnVTable

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::make_scalar_fn(id: vortex_array::scalar_fn::ScalarFnId, metadata: alloc::vec::Vec<u8>, arity: usize) -> vortex_array::scalar_fn::ScalarFnRef

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::new(id: vortex_array::scalar_fn::ScalarFnId) -> Self

impl core::clone::Clone for vortex_array::scalar_fn::ForeignScalarFnVTable

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::clone(&self) -> vortex_array::scalar_fn::ForeignScalarFnVTable

impl core::fmt::Debug for vortex_array::scalar_fn::ForeignScalarFnVTable

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl vortex_array::scalar_fn::ScalarFnVTable for vortex_array::scalar_fn::ForeignScalarFnVTable

pub type vortex_array::scalar_fn::ForeignScalarFnVTable::Options = vortex_array::scalar_fn::ForeignScalarFnOptions

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::arity(&self, options: &Self::Options) -> vortex_array::scalar_fn::Arity

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::child_name(&self, _options: &Self::Options, child_idx: usize) -> vortex_array::scalar_fn::ChildName

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::coerce_args(&self, options: &Self::Options, args: &[vortex_array::dtype::DType]) -> vortex_error::VortexResult<alloc::vec::Vec<vortex_array::dtype::DType>>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::deserialize(&self, metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::execute(&self, _options: &Self::Options, _args: &dyn vortex_array::scalar_fn::ExecutionArgs, _ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::ArrayRef>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::fmt_sql(&self, _options: &Self::Options, expr: &vortex_array::expr::Expression, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::id(&self) -> vortex_array::scalar_fn::ScalarFnId

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::is_fallible(&self, options: &Self::Options) -> bool

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::is_null_sensitive(&self, options: &Self::Options) -> bool

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::reduce(&self, options: &Self::Options, node: &dyn vortex_array::scalar_fn::ReduceNode, ctx: &dyn vortex_array::scalar_fn::ReduceCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::scalar_fn::ReduceNodeRef>>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::return_dtype(&self, _options: &Self::Options, _args: &[vortex_array::dtype::DType]) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::simplify(&self, options: &Self::Options, expr: &vortex_array::expr::Expression, ctx: &dyn vortex_array::scalar_fn::SimplifyCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::simplify_untyped(&self, options: &Self::Options, expr: &vortex_array::expr::Expression) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::stat_expression(&self, options: &Self::Options, expr: &vortex_array::expr::Expression, stat: vortex_array::expr::stats::Stat, catalog: &dyn vortex_array::expr::pruning::StatsCatalog) -> core::option::Option<vortex_array::expr::Expression>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::stat_falsification(&self, options: &Self::Options, expr: &vortex_array::expr::Expression, catalog: &dyn vortex_array::expr::pruning::StatsCatalog) -> core::option::Option<vortex_array::expr::Expression>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::validity(&self, options: &Self::Options, expression: &vortex_array::expr::Expression) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

pub struct vortex_array::scalar_fn::ScalarFn<V: vortex_array::scalar_fn::ScalarFnVTable>

impl<V: vortex_array::scalar_fn::ScalarFnVTable> vortex_array::scalar_fn::ScalarFn<V>
Expand Down Expand Up @@ -17564,6 +17648,44 @@ pub fn vortex_array::scalar_fn::ScalarFnVTable::stat_falsification(&self, option

pub fn vortex_array::scalar_fn::ScalarFnVTable::validity(&self, options: &Self::Options, expression: &vortex_array::expr::Expression) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

impl vortex_array::scalar_fn::ScalarFnVTable for vortex_array::scalar_fn::ForeignScalarFnVTable

pub type vortex_array::scalar_fn::ForeignScalarFnVTable::Options = vortex_array::scalar_fn::ForeignScalarFnOptions

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::arity(&self, options: &Self::Options) -> vortex_array::scalar_fn::Arity

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::child_name(&self, _options: &Self::Options, child_idx: usize) -> vortex_array::scalar_fn::ChildName

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::coerce_args(&self, options: &Self::Options, args: &[vortex_array::dtype::DType]) -> vortex_error::VortexResult<alloc::vec::Vec<vortex_array::dtype::DType>>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::deserialize(&self, metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::execute(&self, _options: &Self::Options, _args: &dyn vortex_array::scalar_fn::ExecutionArgs, _ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::ArrayRef>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::fmt_sql(&self, _options: &Self::Options, expr: &vortex_array::expr::Expression, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::id(&self) -> vortex_array::scalar_fn::ScalarFnId

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::is_fallible(&self, options: &Self::Options) -> bool

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::is_null_sensitive(&self, options: &Self::Options) -> bool

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::reduce(&self, options: &Self::Options, node: &dyn vortex_array::scalar_fn::ReduceNode, ctx: &dyn vortex_array::scalar_fn::ReduceCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::scalar_fn::ReduceNodeRef>>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::return_dtype(&self, _options: &Self::Options, _args: &[vortex_array::dtype::DType]) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::simplify(&self, options: &Self::Options, expr: &vortex_array::expr::Expression, ctx: &dyn vortex_array::scalar_fn::SimplifyCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::simplify_untyped(&self, options: &Self::Options, expr: &vortex_array::expr::Expression) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::stat_expression(&self, options: &Self::Options, expr: &vortex_array::expr::Expression, stat: vortex_array::expr::stats::Stat, catalog: &dyn vortex_array::expr::pruning::StatsCatalog) -> core::option::Option<vortex_array::expr::Expression>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::stat_falsification(&self, options: &Self::Options, expr: &vortex_array::expr::Expression, catalog: &dyn vortex_array::expr::pruning::StatsCatalog) -> core::option::Option<vortex_array::expr::Expression>

pub fn vortex_array::scalar_fn::ForeignScalarFnVTable::validity(&self, options: &Self::Options, expression: &vortex_array::expr::Expression) -> vortex_error::VortexResult<core::option::Option<vortex_array::expr::Expression>>

impl vortex_array::scalar_fn::ScalarFnVTable for vortex_array::scalar_fn::fns::between::Between

pub type vortex_array::scalar_fn::fns::between::Between::Options = vortex_array::scalar_fn::fns::between::BetweenOptions
Expand Down
122 changes: 122 additions & 0 deletions vortex-array/src/aggregate_fn/foreign.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::fmt;
use std::fmt::Display;
use std::fmt::Formatter;

use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_session::VortexSession;

use crate::ArrayRef;
use crate::Columnar;
use crate::ExecutionCtx;
use crate::aggregate_fn::AggregateFn;
use crate::aggregate_fn::AggregateFnId;
use crate::aggregate_fn::AggregateFnRef;
use crate::aggregate_fn::AggregateFnVTable;
use crate::dtype::DType;
use crate::scalar::Scalar;

/// Options payload for a foreign aggregate function.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ForeignAggregateFnOptions {
metadata: Vec<u8>,
}

impl ForeignAggregateFnOptions {
pub fn new(metadata: Vec<u8>) -> Self {
Self { metadata }
}
}

impl Display for ForeignAggregateFnOptions {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "foreign(metadata={}B)", self.metadata.len())
}
}

/// Aggregate-function placeholder used when deserializing an unknown aggregate function ID.
#[derive(Clone, Debug)]
pub struct ForeignAggregateFnVTable {
id: AggregateFnId,
}

impl ForeignAggregateFnVTable {
pub fn new(id: AggregateFnId) -> Self {
Self { id }
}
}

impl AggregateFnVTable for ForeignAggregateFnVTable {
type Options = ForeignAggregateFnOptions;
type Partial = ();

fn id(&self) -> AggregateFnId {
self.id.clone()
}

fn serialize(&self, options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
Ok(Some(options.metadata.clone()))
}

fn deserialize(
&self,
metadata: &[u8],
_session: &VortexSession,
) -> VortexResult<Self::Options> {
Ok(ForeignAggregateFnOptions::new(metadata.to_vec()))
}

fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option<DType> {
None
}

fn partial_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option<DType> {
None
}

fn empty_partial(
&self,
_options: &Self::Options,
_input_dtype: &DType,
) -> VortexResult<Self::Partial> {
vortex_bail!("Cannot execute unknown aggregate function '{}'", self.id)
}

fn combine_partials(&self, _partial: &mut Self::Partial, _other: Scalar) -> VortexResult<()> {
vortex_bail!("Cannot execute unknown aggregate function '{}'", self.id)
}

fn to_scalar(&self, _partial: &Self::Partial) -> VortexResult<Scalar> {
vortex_bail!("Cannot execute unknown aggregate function '{}'", self.id)
}

fn reset(&self, _partial: &mut Self::Partial) {}

fn is_saturated(&self, _state: &Self::Partial) -> bool {
false
}

fn accumulate(
&self,
_state: &mut Self::Partial,
_batch: &Columnar,
_ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
vortex_bail!("Cannot execute unknown aggregate function '{}'", self.id)
}

fn finalize(&self, _states: ArrayRef) -> VortexResult<ArrayRef> {
vortex_bail!("Cannot execute unknown aggregate function '{}'", self.id)
}
}

pub fn new_foreign_aggregate_fn(id: AggregateFnId, metadata: Vec<u8>) -> AggregateFnRef {
AggregateFn::new(
ForeignAggregateFnVTable::new(id),
ForeignAggregateFnOptions::new(metadata),
)
.erased()
}
3 changes: 3 additions & 0 deletions vortex-array/src/aggregate_fn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub use vtable::*;
mod plugin;
pub use plugin::*;

mod foreign;
pub(crate) use foreign::*;

mod typed;
pub use typed::*;

Expand Down
33 changes: 27 additions & 6 deletions vortex-array/src/aggregate_fn/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use vortex_session::VortexSession;

use crate::aggregate_fn::AggregateFnId;
use crate::aggregate_fn::AggregateFnRef;
use crate::aggregate_fn::new_foreign_aggregate_fn;
use crate::aggregate_fn::session::AggregateFnSessionExt;

impl AggregateFnRef {
Expand All @@ -38,12 +39,13 @@ impl AggregateFnRef {
/// Note: the serialization format is not stable and may change between versions.
pub fn from_proto(proto: &pb::AggregateFn, session: &VortexSession) -> VortexResult<Self> {
let agg_fn_id: AggregateFnId = ArcRef::new_arc(Arc::from(proto.id.as_str()));
let plugin = session
.aggregate_fns()
.registry()
.find(&agg_fn_id)
.ok_or_else(|| vortex_err!("unknown aggregate function id: {}", proto.id))?;
let agg_fn = plugin.deserialize(proto.metadata(), session)?;
let agg_fn = if let Some(plugin) = session.aggregate_fns().registry().find(&agg_fn_id) {
plugin.deserialize(proto.metadata(), session)?
} else if session.allows_unknown() {
new_foreign_aggregate_fn(agg_fn_id.clone(), proto.metadata().to_vec())
} else {
return Err(vortex_err!("unknown aggregate function id: {}", proto.id));
};

if agg_fn.id() != agg_fn_id {
vortex_bail!(
Expand Down Expand Up @@ -164,4 +166,23 @@ mod tests {

assert_eq!(deserialized, agg_fn);
}

#[test]
fn unknown_aggregate_fn_id_allow_unknown() {
let session = VortexSession::empty()
.with::<AggregateFnSession>()
.allow_unknown();

let proto = pb::AggregateFn {
id: "vortex.test.foreign_aggregate".to_string(),
metadata: Some(vec![7, 8, 9]),
};

let agg_fn = AggregateFnRef::from_proto(&proto, &session).unwrap();
assert_eq!(agg_fn.id().as_ref(), "vortex.test.foreign_aggregate");

let roundtrip = agg_fn.serialize_proto().unwrap();
assert_eq!(roundtrip.id, proto.id);
assert_eq!(roundtrip.metadata(), proto.metadata());
}
}
Loading
Loading