Skip to content

Commit d9be2f8

Browse files
gatesndimitarvdimitrov
authored andcommitted
MinMax AggregateFn (#7006)
Port min-max to an AggregateFn. Note that we could push-down over dictionary arrays with the given property (similarly for RunEnd): ```python def is_support_invariant(agg_func) -> bool: """ Returns True if this aggregate function depends only on the set of distinct values in its input, not on how many times each appears. Formally, for any multiset S: agg(S) == agg(set(S)) This is the property that allows the aggregate to be evaluated purely over dictionary values, skipping the codes column entirely. Since a dictionary already contains exactly the distinct values present, no codes scan is needed. Examples: is_support_invariant(min) # True is_support_invariant(max) # True is_support_invariant(count_distinct) # True is_support_invariant(bool_or) # True is_support_invariant(bool_and) # True is_support_invariant(any_value) # True is_support_invariant(median) # False — needs frequencies is_support_invariant(mode) # False — entirely about frequencies is_support_invariant(sum) # False is_support_invariant(count) # False ``` --------- Signed-off-by: Nicholas Gates <nick@nickgates.com>
1 parent 0fa36ae commit d9be2f8

58 files changed

Lines changed: 1222 additions & 1342 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

encodings/runend/public-api.lock

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,6 @@ pub fn vortex_runend::RunEnd::is_sorted(&self, array: &vortex_runend::RunEndArra
4444

4545
pub fn vortex_runend::RunEnd::is_strict_sorted(&self, array: &vortex_runend::RunEndArray) -> vortex_error::VortexResult<core::option::Option<bool>>
4646

47-
impl vortex_array::compute::min_max::MinMaxKernel for vortex_runend::RunEnd
48-
49-
pub fn vortex_runend::RunEnd::min_max(&self, array: &vortex_runend::RunEndArray) -> vortex_error::VortexResult<core::option::Option<vortex_array::compute::min_max::MinMaxResult>>
50-
5147
impl vortex_array::scalar_fn::fns::binary::compare::CompareKernel for vortex_runend::RunEnd
5248

5349
pub fn vortex_runend::RunEnd::compare(lhs: &vortex_runend::RunEndArray, rhs: &vortex_array::array::ArrayRef, operator: vortex_array::scalar_fn::fns::operators::CompareOperator, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::array::ArrayRef>>

encodings/runend/src/compute/min_max.rs

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,47 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use vortex_array::compute::MinMaxKernel;
5-
use vortex_array::compute::MinMaxKernelAdapter;
6-
use vortex_array::compute::MinMaxResult;
7-
use vortex_array::compute::min_max;
8-
use vortex_array::register_kernel;
4+
use vortex_array::ArrayRef;
5+
use vortex_array::ExecutionCtx;
6+
use vortex_array::aggregate_fn::AggregateFnRef;
7+
use vortex_array::aggregate_fn::fns::min_max::MinMax;
8+
use vortex_array::aggregate_fn::fns::min_max::make_minmax_dtype;
9+
use vortex_array::aggregate_fn::fns::min_max::min_max;
10+
use vortex_array::aggregate_fn::kernels::DynAggregateKernel;
11+
use vortex_array::scalar::Scalar;
912
use vortex_error::VortexResult;
1013

1114
use crate::RunEnd;
12-
use crate::RunEndArray;
1315

14-
impl MinMaxKernel for RunEnd {
15-
fn min_max(&self, array: &RunEndArray) -> VortexResult<Option<MinMaxResult>> {
16-
min_max(array.values())
16+
/// RunEnd-specific min/max kernel.
17+
///
18+
/// Run-end encoded arrays store each unique run value once, so min/max can be computed directly
19+
/// on the values array without decoding.
20+
#[derive(Debug)]
21+
pub(crate) struct RunEndMinMaxKernel;
22+
23+
impl DynAggregateKernel for RunEndMinMaxKernel {
24+
fn aggregate(
25+
&self,
26+
aggregate_fn: &AggregateFnRef,
27+
batch: &ArrayRef,
28+
ctx: &mut ExecutionCtx,
29+
) -> VortexResult<Option<Scalar>> {
30+
if !aggregate_fn.is::<MinMax>() {
31+
return Ok(None);
32+
}
33+
34+
let Some(run_end) = batch.as_opt::<RunEnd>() else {
35+
return Ok(None);
36+
};
37+
38+
let struct_dtype = make_minmax_dtype(batch.dtype());
39+
match min_max(run_end.values(), ctx)? {
40+
Some(result) => Ok(Some(Scalar::struct_(
41+
struct_dtype,
42+
vec![result.min, result.max],
43+
))),
44+
None => Ok(Some(Scalar::null(struct_dtype))),
45+
}
1746
}
1847
}
19-
20-
register_kernel!(MinMaxKernelAdapter(RunEnd).lift());

encodings/runend/src/compute/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ mod fill_null;
77
pub(crate) mod filter;
88
mod is_constant;
99
mod is_sorted;
10-
mod min_max;
10+
pub(crate) mod min_max;
1111
pub(crate) mod take;
1212
pub(crate) mod take_from;
1313

encodings/runend/src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,22 @@ pub mod _benchmarking {
2626
use super::*;
2727
}
2828

29+
use vortex_array::aggregate_fn::AggregateFnVTable;
30+
use vortex_array::aggregate_fn::fns::min_max::MinMax;
31+
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
2932
use vortex_array::session::ArraySessionExt;
3033
use vortex_session::VortexSession;
3134

3235
/// Initialize run-end encoding in the given session.
3336
pub fn initialize(session: &mut VortexSession) {
3437
session.arrays().register(RunEnd::ID, RunEnd);
38+
39+
// Register the RunEnd-specific min/max aggregate kernel.
40+
session.aggregate_fns().register_aggregate_kernel(
41+
RunEnd::ID,
42+
Some(MinMax.id()),
43+
&compute::min_max::RunEndMinMaxKernel,
44+
);
3545
}
3646

3747
#[cfg(test)]

encodings/sequence/public-api.lock

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,6 @@ pub fn vortex_sequence::Sequence::is_sorted(&self, array: &vortex_sequence::Sequ
2828

2929
pub fn vortex_sequence::Sequence::is_strict_sorted(&self, array: &vortex_sequence::SequenceArray) -> vortex_error::VortexResult<core::option::Option<bool>>
3030

31-
impl vortex_array::compute::min_max::MinMaxKernel for vortex_sequence::Sequence
32-
33-
pub fn vortex_sequence::Sequence::min_max(&self, array: &vortex_sequence::SequenceArray) -> vortex_error::VortexResult<core::option::Option<vortex_array::compute::min_max::MinMaxResult>>
34-
3531
impl vortex_array::scalar_fn::fns::binary::compare::CompareKernel for vortex_sequence::Sequence
3632

3733
pub fn vortex_sequence::Sequence::compare(lhs: &vortex_sequence::SequenceArray, rhs: &vortex_array::array::ArrayRef, operator: vortex_array::scalar_fn::fns::operators::CompareOperator, _ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::array::ArrayRef>>
@@ -162,4 +158,6 @@ pub vortex_sequence::SequenceArrayParts::nullability: vortex_array::dtype::nulla
162158

163159
pub vortex_sequence::SequenceArrayParts::ptype: vortex_array::dtype::ptype::PType
164160

161+
pub fn vortex_sequence::initialize(session: &mut vortex_session::VortexSession)
162+
165163
pub fn vortex_sequence::sequence_encode(primitive_array: &vortex_array::arrays::primitive::array::PrimitiveArray) -> vortex_error::VortexResult<core::option::Option<vortex_array::array::ArrayRef>>

encodings/sequence/src/compute/min_max.rs

Lines changed: 71 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,80 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use vortex_array::compute::MinMaxKernel;
5-
use vortex_array::compute::MinMaxKernelAdapter;
6-
use vortex_array::compute::MinMaxResult;
7-
use vortex_array::dtype::Nullability::NonNullable;
8-
use vortex_array::register_kernel;
4+
use vortex_array::ArrayRef;
5+
use vortex_array::ExecutionCtx;
6+
use vortex_array::aggregate_fn::AggregateFnRef;
7+
use vortex_array::aggregate_fn::fns::min_max::MinMax;
8+
use vortex_array::aggregate_fn::fns::min_max::make_minmax_dtype;
9+
use vortex_array::aggregate_fn::kernels::DynAggregateKernel;
10+
use vortex_array::dtype::DType;
11+
use vortex_array::dtype::Nullability;
12+
use vortex_array::match_each_pvalue;
13+
use vortex_array::scalar::PValue;
914
use vortex_array::scalar::Scalar;
15+
use vortex_array::scalar::ScalarValue;
1016
use vortex_error::VortexResult;
1117

12-
use crate::SequenceArray;
13-
use crate::array::Sequence;
14-
15-
impl MinMaxKernel for Sequence {
16-
fn min_max(&self, array: &SequenceArray) -> VortexResult<Option<MinMaxResult>> {
17-
let base = array.base();
18-
let last = array.last();
19-
let (min, max) = if base < last {
20-
(base, last)
21-
} else {
22-
(last, base)
18+
use crate::Sequence;
19+
20+
/// Sequence-specific min/max kernel.
21+
///
22+
/// A sequence array represents `A[i] = base + i * multiplier`, so min/max can be computed
23+
/// algebraically from `base` and `last` based on the sign of the multiplier.
24+
#[derive(Debug)]
25+
pub(crate) struct SequenceMinMaxKernel;
26+
27+
impl DynAggregateKernel for SequenceMinMaxKernel {
28+
fn aggregate(
29+
&self,
30+
aggregate_fn: &AggregateFnRef,
31+
batch: &ArrayRef,
32+
_ctx: &mut ExecutionCtx,
33+
) -> VortexResult<Option<Scalar>> {
34+
if !aggregate_fn.is::<MinMax>() {
35+
return Ok(None);
36+
}
37+
38+
let Some(seq) = batch.as_opt::<Sequence>() else {
39+
return Ok(None);
2340
};
24-
Ok(Some(MinMaxResult {
25-
min: Scalar::primitive_value(min, array.ptype(), NonNullable),
26-
max: Scalar::primitive_value(max, array.ptype(), NonNullable),
27-
}))
41+
42+
let struct_dtype = make_minmax_dtype(batch.dtype());
43+
44+
// Empty sequences shouldn't exist (try_new validates length), but handle gracefully.
45+
if seq.is_empty() {
46+
return Ok(Some(Scalar::null(struct_dtype)));
47+
}
48+
49+
let base = seq.base();
50+
let last = seq.last();
51+
52+
// Determine min and max based on multiplier direction.
53+
// For unsigned types, multiplier is always >= 0.
54+
let (min_pvalue, max_pvalue) = match_each_pvalue!(
55+
seq.multiplier(),
56+
uint: |_v| { (base, last) },
57+
int: |v| {
58+
if v >= 0 {
59+
(base, last)
60+
} else {
61+
(last, base)
62+
}
63+
},
64+
float: |_v| { unreachable!("float multiplier not supported for SequenceArray") }
65+
);
66+
67+
let non_nullable_dtype = DType::Primitive(seq.ptype(), Nullability::NonNullable);
68+
let min_scalar = Scalar::try_new(
69+
non_nullable_dtype.clone(),
70+
Some(ScalarValue::Primitive(min_pvalue)),
71+
)?;
72+
let max_scalar =
73+
Scalar::try_new(non_nullable_dtype, Some(ScalarValue::Primitive(max_pvalue)))?;
74+
75+
Ok(Some(Scalar::struct_(
76+
struct_dtype,
77+
vec![min_scalar, max_scalar],
78+
)))
2879
}
2980
}
30-
31-
register_kernel!(MinMaxKernelAdapter(Sequence).lift());

encodings/sequence/src/compute/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ pub(crate) mod compare;
66
mod filter;
77
mod is_sorted;
88
mod list_contains;
9-
mod min_max;
9+
pub(crate) mod min_max;
1010
mod slice;
1111
mod take;
1212

encodings/sequence/src/lib.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,23 @@ pub use array::Sequence;
1515
pub use array::SequenceArray;
1616
pub use array::SequenceArrayParts;
1717
pub use compress::sequence_encode;
18+
use vortex_array::aggregate_fn::AggregateFnVTable;
19+
use vortex_array::aggregate_fn::fns::min_max::MinMax;
20+
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
21+
use vortex_array::session::ArraySessionExt;
22+
use vortex_session::VortexSession;
23+
24+
/// Initialize sequence encoding in the given session.
25+
pub fn initialize(session: &mut VortexSession) {
26+
session.arrays().register(Sequence::ID, Sequence);
27+
28+
// Register the Sequence-specific min/max aggregate kernel.
29+
session.aggregate_fns().register_aggregate_kernel(
30+
Sequence::ID,
31+
Some(MinMax.id()),
32+
&compute::min_max::SequenceMinMaxKernel,
33+
);
34+
}
1835

1936
// TODO(joe): hook up to the compressor
2037
// TODO(joe): support comparisons with other operators

fuzz/src/array/min_max.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use vortex_array::Canonical;
5+
use vortex_array::ExecutionCtx;
56
use vortex_array::IntoArray as _;
6-
use vortex_array::compute::MinMaxResult;
7-
use vortex_array::compute::min_max;
7+
use vortex_array::aggregate_fn::fns::min_max::MinMaxResult;
8+
use vortex_array::aggregate_fn::fns::min_max::min_max;
89
use vortex_error::VortexResult;
910

1011
/// Compute min_max on the canonical form of the array to get a consistent baseline.
11-
pub fn min_max_canonical_array(canonical: Canonical) -> VortexResult<Option<MinMaxResult>> {
12-
// TODO(joe): replace with baseline not using canonical
13-
min_max(&canonical.into_array())
12+
pub fn min_max_canonical_array(
13+
canonical: Canonical,
14+
ctx: &mut ExecutionCtx,
15+
) -> VortexResult<Option<MinMaxResult>> {
16+
min_max(&canonical.into_array(), ctx)
1417
}

fuzz/src/array/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ use vortex_array::ArrayRef;
4343
use vortex_array::DynArray;
4444
use vortex_array::IntoArray;
4545
use vortex_array::VortexSessionExecute;
46+
use vortex_array::aggregate_fn::fns::min_max::MinMaxResult;
47+
use vortex_array::aggregate_fn::fns::min_max::min_max;
4648
use vortex_array::aggregate_fn::fns::sum::sum;
4749
use vortex_array::arrays::ConstantArray;
4850
use vortex_array::arrays::PrimitiveArray;
4951
use vortex_array::arrays::arbitrary::ArbitraryArray;
5052
use vortex_array::builtins::ArrayBuiltins;
51-
use vortex_array::compute::MinMaxResult;
52-
use vortex_array::compute::min_max;
5353
use vortex_array::dtype::DType;
5454
use vortex_array::dtype::Nullability;
5555
use vortex_array::scalar::Scalar;
@@ -345,6 +345,7 @@ impl<'a> Arbitrary<'a> for FuzzArrayAction {
345345
current_array
346346
.to_canonical()
347347
.vortex_expect("to_canonical should succeed in fuzz test"),
348+
&mut ctx,
348349
)
349350
.vortex_expect("min_max_canonical_array should succeed in fuzz test");
350351
(Action::MinMax, ExpectedValue::MinMax(min_max_result))
@@ -654,7 +655,7 @@ pub fn run_fuzz_action(fuzz_action: FuzzArrayAction) -> VortexFuzzResult<bool> {
654655
assert_scalar_eq(&expected.scalar(), &sum_result, i)?;
655656
}
656657
Action::MinMax => {
657-
let min_max_result = min_max(&current_array)
658+
let min_max_result = min_max(&current_array, &mut ctx)
658659
.vortex_expect("min_max operation should succeed in fuzz test");
659660
assert_min_max_eq(&expected.min_max(), &min_max_result, i)?;
660661
}

0 commit comments

Comments
 (0)