Skip to content

Commit 9dcb75e

Browse files
authored
block serialization for the existing aggregations (#7322)
Signed-off-by: blaginin <github@blaginin.me>
1 parent b8f8bd5 commit 9dcb75e

10 files changed

Lines changed: 96 additions & 92 deletions

File tree

vortex-array/public-api.lock

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::coerce_args(&self,
222222

223223
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::combine_partials(&self, partial: &mut Self::Partial, other: vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()>
224224

225-
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::deserialize(&self, metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>
225+
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::deserialize(&self, _metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>
226226

227227
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::empty_partial(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<Self::Partial>
228228

@@ -240,7 +240,7 @@ pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::reset(&self, partia
240240

241241
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> core::option::Option<vortex_array::dtype::DType>
242242

243-
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>
243+
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::serialize(&self, _options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>
244244

245245
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::to_scalar(&self, partial: &Self::Partial) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>
246246

@@ -900,7 +900,7 @@ pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::coerce_args(&self,
900900

901901
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::combine_partials(&self, partial: &mut Self::Partial, other: vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()>
902902

903-
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::deserialize(&self, metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>
903+
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::deserialize(&self, _metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>
904904

905905
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::empty_partial(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<Self::Partial>
906906

@@ -918,7 +918,7 @@ pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::reset(&self, partia
918918

919919
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::return_dtype(&self, _options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> core::option::Option<vortex_array::dtype::DType>
920920

921-
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>
921+
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::serialize(&self, _options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>
922922

923923
pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::to_scalar(&self, partial: &Self::Partial) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>
924924

vortex-array/src/aggregate_fn/fns/count/mod.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,7 @@ impl AggregateFnVTable for Count {
3131
}
3232

3333
fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
34-
Ok(Some(vec![]))
35-
}
36-
37-
fn deserialize(
38-
&self,
39-
_metadata: &[u8],
40-
_session: &vortex_session::VortexSession,
41-
) -> VortexResult<Self::Options> {
42-
Ok(EmptyOptions)
34+
unimplemented!("Count is not yet serializable");
4335
}
4436

4537
fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option<DType> {

vortex-array/src/aggregate_fn/fns/first/mod.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,7 @@ impl AggregateFnVTable for First {
4444
}
4545

4646
fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
47-
Ok(Some(vec![]))
48-
}
49-
50-
fn deserialize(
51-
&self,
52-
_metadata: &[u8],
53-
_session: &vortex_session::VortexSession,
54-
) -> VortexResult<Self::Options> {
55-
Ok(EmptyOptions)
47+
unimplemented!("First is not yet serializable");
5648
}
5749

5850
fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {

vortex-array/src/aggregate_fn/fns/is_constant/mod.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -260,15 +260,7 @@ impl AggregateFnVTable for IsConstant {
260260
}
261261

262262
fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
263-
Ok(Some(vec![]))
264-
}
265-
266-
fn deserialize(
267-
&self,
268-
_metadata: &[u8],
269-
_session: &vortex_session::VortexSession,
270-
) -> VortexResult<Self::Options> {
271-
Ok(EmptyOptions)
263+
unimplemented!("IsConstant is not yet serializable");
272264
}
273265

274266
fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {

vortex-array/src/aggregate_fn/fns/is_sorted/mod.rs

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use std::fmt::Formatter;
1313

1414
use vortex_error::VortexExpect;
1515
use vortex_error::VortexResult;
16-
use vortex_error::vortex_bail;
1716

1817
use self::bool::check_bool_sorted;
1918
use self::decimal::check_decimal_sorted;
@@ -231,27 +230,8 @@ impl AggregateFnVTable for IsSorted {
231230
AggregateFnId::new_ref("vortex.is_sorted")
232231
}
233232

234-
fn serialize(&self, options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
235-
Ok(Some(vec![u8::from(options.strict)]))
236-
}
237-
238-
fn deserialize(
239-
&self,
240-
metadata: &[u8],
241-
_session: &vortex_session::VortexSession,
242-
) -> VortexResult<Self::Options> {
243-
let &[strict_byte] = metadata else {
244-
vortex_bail!(
245-
"IsSorted: expected 1 byte of metadata, got {}",
246-
metadata.len()
247-
);
248-
};
249-
let strict = match strict_byte {
250-
0 => false,
251-
1 => true,
252-
_ => vortex_bail!("IsSorted: expected 0 or 1 for strict, got {}", strict_byte),
253-
};
254-
Ok(IsSortedOptions { strict })
233+
fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
234+
unimplemented!("IsSorted is not yet serializable");
255235
}
256236

257237
fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {

vortex-array/src/aggregate_fn/fns/last/mod.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,7 @@ impl AggregateFnVTable for Last {
4444
}
4545

4646
fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
47-
Ok(Some(vec![]))
48-
}
49-
50-
fn deserialize(
51-
&self,
52-
_metadata: &[u8],
53-
_session: &vortex_session::VortexSession,
54-
) -> VortexResult<Self::Options> {
55-
Ok(EmptyOptions)
47+
unimplemented!("Last is not yet serializable");
5648
}
5749

5850
fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {

vortex-array/src/aggregate_fn/fns/min_max/mod.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -177,15 +177,7 @@ impl AggregateFnVTable for MinMax {
177177
}
178178

179179
fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
180-
Ok(Some(vec![]))
181-
}
182-
183-
fn deserialize(
184-
&self,
185-
_metadata: &[u8],
186-
_session: &vortex_session::VortexSession,
187-
) -> VortexResult<Self::Options> {
188-
Ok(EmptyOptions)
180+
unimplemented!("MinMax is not yet serializable");
189181
}
190182

191183
fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {

vortex-array/src/aggregate_fn/fns/nan_count/mod.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,7 @@ impl AggregateFnVTable for NanCount {
8585
}
8686

8787
fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
88-
Ok(Some(vec![]))
89-
}
90-
91-
fn deserialize(
92-
&self,
93-
_metadata: &[u8],
94-
_session: &vortex_session::VortexSession,
95-
) -> VortexResult<Self::Options> {
96-
Ok(EmptyOptions)
88+
unimplemented!("NanCount is not yet serializable");
9789
}
9890

9991
fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {

vortex-array/src/aggregate_fn/fns/sum/mod.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,7 @@ impl AggregateFnVTable for Sum {
7575
}
7676

7777
fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
78-
Ok(Some(vec![]))
79-
}
80-
81-
fn deserialize(
82-
&self,
83-
_metadata: &[u8],
84-
_session: &vortex_session::VortexSession,
85-
) -> VortexResult<Self::Options> {
86-
Ok(EmptyOptions)
78+
unimplemented!("Sum is not yet serializable");
8779
}
8880

8981
fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {

vortex-array/src/aggregate_fn/proto.rs

Lines changed: 83 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,22 +60,102 @@ impl AggregateFnRef {
6060
#[cfg(test)]
6161
mod tests {
6262
use prost::Message;
63+
use vortex_error::VortexResult;
64+
use vortex_error::vortex_panic;
6365
use vortex_proto::expr as pb;
6466
use vortex_session::VortexSession;
6567

68+
use crate::ArrayRef;
69+
use crate::Columnar;
70+
use crate::ExecutionCtx;
71+
use crate::aggregate_fn::AggregateFnId;
6672
use crate::aggregate_fn::AggregateFnRef;
73+
use crate::aggregate_fn::AggregateFnVTable;
6774
use crate::aggregate_fn::AggregateFnVTableExt;
6875
use crate::aggregate_fn::EmptyOptions;
69-
use crate::aggregate_fn::fns::sum::Sum;
7076
use crate::aggregate_fn::session::AggregateFnSession;
7177
use crate::aggregate_fn::session::AggregateFnSessionExt;
78+
use crate::dtype::DType;
79+
use crate::scalar::Scalar;
80+
81+
/// A minimal serializable aggregate function used solely to exercise the serde round-trip.
82+
#[derive(Clone, Debug)]
83+
struct TestAgg;
84+
85+
impl AggregateFnVTable for TestAgg {
86+
type Options = EmptyOptions;
87+
type Partial = ();
88+
89+
fn id(&self) -> AggregateFnId {
90+
AggregateFnId::new_ref("vortex.test.proto")
91+
}
92+
93+
fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
94+
Ok(Some(vec![]))
95+
}
96+
97+
fn deserialize(
98+
&self,
99+
_metadata: &[u8],
100+
_session: &VortexSession,
101+
) -> VortexResult<Self::Options> {
102+
Ok(EmptyOptions)
103+
}
104+
105+
fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
106+
Some(input_dtype.clone())
107+
}
108+
109+
fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
110+
self.return_dtype(options, input_dtype)
111+
}
112+
113+
fn empty_partial(
114+
&self,
115+
_options: &Self::Options,
116+
_input_dtype: &DType,
117+
) -> VortexResult<Self::Partial> {
118+
Ok(())
119+
}
120+
121+
fn combine_partials(
122+
&self,
123+
_partial: &mut Self::Partial,
124+
_other: Scalar,
125+
) -> VortexResult<()> {
126+
Ok(())
127+
}
128+
129+
fn to_scalar(&self, _partial: &Self::Partial) -> VortexResult<Scalar> {
130+
vortex_panic!("TestAgg is for serde tests only");
131+
}
132+
133+
fn reset(&self, _partial: &mut Self::Partial) {}
134+
135+
fn is_saturated(&self, _partial: &Self::Partial) -> bool {
136+
true
137+
}
138+
139+
fn accumulate(
140+
&self,
141+
_state: &mut Self::Partial,
142+
_batch: &Columnar,
143+
_ctx: &mut ExecutionCtx,
144+
) -> VortexResult<()> {
145+
Ok(())
146+
}
147+
148+
fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
149+
Ok(partials)
150+
}
151+
}
72152

73153
#[test]
74154
fn aggregate_fn_serde() {
75155
let session = VortexSession::empty().with::<AggregateFnSession>();
76-
session.aggregate_fns().register(Sum);
156+
session.aggregate_fns().register(TestAgg);
77157

78-
let agg_fn = Sum.bind(EmptyOptions);
158+
let agg_fn = TestAgg.bind(EmptyOptions);
79159

80160
let serialized = agg_fn.serialize_proto().unwrap();
81161
let buf = serialized.encode_to_vec();

0 commit comments

Comments
 (0)