Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::coerce_args(&self,

pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::combine_partials(&self, partial: &mut Self::Partial, other: vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()>

pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::deserialize(&self, metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::deserialize(&self, _metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::empty_partial(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<Self::Partial>

Expand All @@ -240,7 +240,7 @@ pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::reset(&self, partia

pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> core::option::Option<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::serialize(&self, _options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::to_scalar(&self, partial: &Self::Partial) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>

Expand Down Expand Up @@ -900,7 +900,7 @@ pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::coerce_args(&self,

pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::combine_partials(&self, partial: &mut Self::Partial, other: vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()>

pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::deserialize(&self, metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::deserialize(&self, _metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::empty_partial(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<Self::Partial>

Expand All @@ -918,7 +918,7 @@ pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::reset(&self, partia

pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> core::option::Option<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::serialize(&self, _options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::to_scalar(&self, partial: &Self::Partial) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>

Expand Down
10 changes: 1 addition & 9 deletions vortex-array/src/aggregate_fn/fns/count/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,7 @@ impl AggregateFnVTable for Count {
}

fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
Ok(Some(vec![]))
}

fn deserialize(
&self,
_metadata: &[u8],
_session: &vortex_session::VortexSession,
) -> VortexResult<Self::Options> {
Ok(EmptyOptions)
unimplemented!("Count is not yet serializable");
}

fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option<DType> {
Expand Down
10 changes: 1 addition & 9 deletions vortex-array/src/aggregate_fn/fns/first/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,7 @@ impl AggregateFnVTable for First {
}

fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
Ok(Some(vec![]))
}

fn deserialize(
&self,
_metadata: &[u8],
_session: &vortex_session::VortexSession,
) -> VortexResult<Self::Options> {
Ok(EmptyOptions)
unimplemented!("First is not yet serializable");
}

fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
Expand Down
10 changes: 1 addition & 9 deletions vortex-array/src/aggregate_fn/fns/is_constant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,7 @@ impl AggregateFnVTable for IsConstant {
}

fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
Ok(Some(vec![]))
}

fn deserialize(
&self,
_metadata: &[u8],
_session: &vortex_session::VortexSession,
) -> VortexResult<Self::Options> {
Ok(EmptyOptions)
unimplemented!("IsConstant is not yet serializable");
}

fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
Expand Down
24 changes: 2 additions & 22 deletions vortex-array/src/aggregate_fn/fns/is_sorted/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use std::fmt::Formatter;

use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;

use self::bool::check_bool_sorted;
use self::decimal::check_decimal_sorted;
Expand Down Expand Up @@ -231,27 +230,8 @@ impl AggregateFnVTable for IsSorted {
AggregateFnId::new_ref("vortex.is_sorted")
}

fn serialize(&self, options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
Ok(Some(vec![u8::from(options.strict)]))
}

fn deserialize(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀

&self,
metadata: &[u8],
_session: &vortex_session::VortexSession,
) -> VortexResult<Self::Options> {
let &[strict_byte] = metadata else {
vortex_bail!(
"IsSorted: expected 1 byte of metadata, got {}",
metadata.len()
);
};
let strict = match strict_byte {
0 => false,
1 => true,
_ => vortex_bail!("IsSorted: expected 0 or 1 for strict, got {}", strict_byte),
};
Ok(IsSortedOptions { strict })
fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
unimplemented!("IsSorted is not yet serializable");
}

fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
Expand Down
10 changes: 1 addition & 9 deletions vortex-array/src/aggregate_fn/fns/last/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,7 @@ impl AggregateFnVTable for Last {
}

fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
Ok(Some(vec![]))
}

fn deserialize(
&self,
_metadata: &[u8],
_session: &vortex_session::VortexSession,
) -> VortexResult<Self::Options> {
Ok(EmptyOptions)
unimplemented!("Last is not yet serializable");
}

fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
Expand Down
10 changes: 1 addition & 9 deletions vortex-array/src/aggregate_fn/fns/min_max/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,7 @@ impl AggregateFnVTable for MinMax {
}

fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
Ok(Some(vec![]))
}

fn deserialize(
&self,
_metadata: &[u8],
_session: &vortex_session::VortexSession,
) -> VortexResult<Self::Options> {
Ok(EmptyOptions)
unimplemented!("MinMax is not yet serializable");
}

fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
Expand Down
10 changes: 1 addition & 9 deletions vortex-array/src/aggregate_fn/fns/nan_count/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,7 @@ impl AggregateFnVTable for NanCount {
}

fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
Ok(Some(vec![]))
}

fn deserialize(
&self,
_metadata: &[u8],
_session: &vortex_session::VortexSession,
) -> VortexResult<Self::Options> {
Ok(EmptyOptions)
unimplemented!("NanCount is not yet serializable");
}

fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
Expand Down
10 changes: 1 addition & 9 deletions vortex-array/src/aggregate_fn/fns/sum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,7 @@ impl AggregateFnVTable for Sum {
}

fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
Ok(Some(vec![]))
}

fn deserialize(
&self,
_metadata: &[u8],
_session: &vortex_session::VortexSession,
) -> VortexResult<Self::Options> {
Ok(EmptyOptions)
unimplemented!("Sum is not yet serializable");
}

fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
Expand Down
86 changes: 83 additions & 3 deletions vortex-array/src/aggregate_fn/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,102 @@ impl AggregateFnRef {
#[cfg(test)]
mod tests {
use prost::Message;
use vortex_error::VortexResult;
use vortex_error::vortex_panic;
use vortex_proto::expr as pb;
use vortex_session::VortexSession;

use crate::ArrayRef;
use crate::Columnar;
use crate::ExecutionCtx;
use crate::aggregate_fn::AggregateFnId;
use crate::aggregate_fn::AggregateFnRef;
use crate::aggregate_fn::AggregateFnVTable;
use crate::aggregate_fn::AggregateFnVTableExt;
use crate::aggregate_fn::EmptyOptions;
use crate::aggregate_fn::fns::sum::Sum;
use crate::aggregate_fn::session::AggregateFnSession;
use crate::aggregate_fn::session::AggregateFnSessionExt;
use crate::dtype::DType;
use crate::scalar::Scalar;

/// A minimal serializable aggregate function used solely to exercise the serde round-trip.
#[derive(Clone, Debug)]
struct TestAgg;

impl AggregateFnVTable for TestAgg {
type Options = EmptyOptions;
type Partial = ();

fn id(&self) -> AggregateFnId {
AggregateFnId::new_ref("vortex.test.proto")
}

fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
Ok(Some(vec![]))
}

fn deserialize(
&self,
_metadata: &[u8],
_session: &VortexSession,
) -> VortexResult<Self::Options> {
Ok(EmptyOptions)
}

fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
Some(input_dtype.clone())
}

fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
self.return_dtype(options, input_dtype)
}

fn empty_partial(
&self,
_options: &Self::Options,
_input_dtype: &DType,
) -> VortexResult<Self::Partial> {
Ok(())
}

fn combine_partials(
&self,
_partial: &mut Self::Partial,
_other: Scalar,
) -> VortexResult<()> {
Ok(())
}

fn to_scalar(&self, _partial: &Self::Partial) -> VortexResult<Scalar> {
vortex_panic!("TestAgg is for serde tests only");
}

fn reset(&self, _partial: &mut Self::Partial) {}

fn is_saturated(&self, _partial: &Self::Partial) -> bool {
true
}

fn accumulate(
&self,
_state: &mut Self::Partial,
_batch: &Columnar,
_ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
Ok(())
}

fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
Ok(partials)
}
}

#[test]
fn aggregate_fn_serde() {
let session = VortexSession::empty().with::<AggregateFnSession>();
session.aggregate_fns().register(Sum);
session.aggregate_fns().register(TestAgg);

let agg_fn = Sum.bind(EmptyOptions);
let agg_fn = TestAgg.bind(EmptyOptions);

let serialized = agg_fn.serialize_proto().unwrap();
let buf = serialized.encode_to_vec();
Expand Down
Loading