-
Notifications
You must be signed in to change notification settings - Fork 174
Expand file tree
/
Copy pathvtable.rs
More file actions
143 lines (123 loc) · 5.35 KB
/
Copy pathvtable.rs
File metadata and controls
143 lines (123 loc) · 5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors
use std::fmt;
use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::Formatter;
use std::hash::Hash;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_session::VortexSession;
use crate::ArrayRef;
use crate::Columnar;
use crate::DynArray;
use crate::ExecutionCtx;
use crate::IntoArray;
use crate::aggregate_fn::AggregateFn;
use crate::aggregate_fn::AggregateFnId;
use crate::aggregate_fn::AggregateFnRef;
use crate::arrays::ConstantArray;
use crate::dtype::DType;
use crate::scalar::Scalar;
/// Defines the interface for aggregate function vtables.
///
/// This trait is non-object-safe and allows the implementer to make use of associated types
/// for improved type safety, while allowing Vortex to enforce runtime checks on the inputs and
/// outputs of each function.
///
/// The [`AggregateFnVTable`] trait should be implemented for a struct that holds global data across
/// all instances of the aggregate. In almost all cases, this struct will be an empty unit
/// struct, since most aggregates do not require any global state.
pub trait AggregateFnVTable: 'static + Sized + Clone + Send + Sync {
/// Options for this aggregate function.
type Options: 'static + Send + Sync + Clone + Debug + Display + PartialEq + Eq + Hash;
/// The partial accumulator state for a single group.
type Partial: 'static + Send;
/// Returns the ID of the aggregate function vtable.
fn id(&self) -> AggregateFnId;
/// Serialize the options for this aggregate function.
///
/// Should return `Ok(None)` if the function is not serializable, and `Ok(vec![])` if it is
/// serializable but has no metadata.
fn serialize(&self, options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
_ = options;
Ok(None)
}
/// Deserialize the options of this aggregate function.
fn deserialize(
&self,
_metadata: &[u8],
_session: &VortexSession,
) -> VortexResult<Self::Options> {
vortex_bail!("Aggregate function {} is not deserializable", self.id());
}
/// Coerce the input type for this aggregate function.
///
/// This is optionally used by Vortex users when performing type coercion over a Vortex
/// expression. The default implementation returns the input type unchanged.
fn coerce_args(&self, options: &Self::Options, input_dtype: &DType) -> VortexResult<DType> {
let _ = options;
Ok(input_dtype.clone())
}
/// The return [`DType`] of the aggregate.
fn return_dtype(&self, options: &Self::Options, input_dtype: &DType) -> VortexResult<DType>;
/// DType of the intermediate partial accumulator state.
///
/// Use a struct dtype when multiple fields are needed
/// (e.g., Mean: `Struct { sum: f64, count: u64 }`).
fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> VortexResult<DType>;
/// Return the partial accumulator state for an empty group.
fn empty_partial(
&self,
options: &Self::Options,
input_dtype: &DType,
) -> VortexResult<Self::Partial>;
/// Combine partial scalar state into the accumulator.
fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()>;
/// Flush the partial aggregate for the given accumulator state.
///
/// The returned scalar must have the same DType as specified by `state_dtype` for the
/// options and input dtype used to construct the state.
///
/// The internal state of the accumulator is reset to the empty state after flushing.
fn flush(&self, partial: &mut Self::Partial) -> VortexResult<Scalar>;
/// Is the partial accumulator state is "saturated", i.e. has it reached a state where the
/// final result is fully determined.
fn is_saturated(&self, state: &Self::Partial) -> bool;
/// Accumulate a new canonical array into the accumulator state.
fn accumulate(
&self,
state: &mut Self::Partial,
batch: &Columnar,
ctx: &mut ExecutionCtx,
) -> VortexResult<()>;
/// Finalize an array of accumulator states into an array of aggregate results.
///
/// The provides `states` array has dtype as specified by `state_dtype`, the result array
/// must have dtype as specified by `return_dtype`.
fn finalize(&self, states: ArrayRef) -> VortexResult<ArrayRef>;
/// Finalize a scalar accumulator state into an aggregate result.
///
/// The provided `state` has dtype as specified by `state_dtype`, the result scalar must have
/// dtype as specified by `return_dtype`.
fn finalize_scalar(&self, state: Scalar) -> VortexResult<Scalar> {
let array = ConstantArray::new(state, 1).into_array();
let result = self.finalize(array)?;
result.scalar_at(0)
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct EmptyOptions;
impl Display for EmptyOptions {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "")
}
}
/// Factory functions for aggregate vtables.
pub trait AggregateFnVTableExt: AggregateFnVTable {
/// Bind this vtable with the given options into an [`AggregateFnRef`].
fn bind(&self, options: Self::Options) -> AggregateFnRef {
AggregateFn::new(self.clone(), options).erased()
}
}
impl<V: AggregateFnVTable> AggregateFnVTableExt for V {}