Skip to content

Commit 2839387

Browse files
committed
Add null count stat aggregate wrappers
Signed-off-by: Nicholas Gates <nick@nickgates.com>
1 parent 505e8f3 commit 2839387

7 files changed

Lines changed: 366 additions & 8 deletions

File tree

vortex-array/public-api.lock

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,56 @@ pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::try_accumulate(&sel
716716

717717
pub fn vortex_array::aggregate_fn::fns::nan_count::nan_count(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<usize>
718718

719+
pub mod vortex_array::aggregate_fn::fns::null_count
720+
721+
pub struct vortex_array::aggregate_fn::fns::null_count::NullCount
722+
723+
impl core::clone::Clone for vortex_array::aggregate_fn::fns::null_count::NullCount
724+
725+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::clone(&self) -> vortex_array::aggregate_fn::fns::null_count::NullCount
726+
727+
impl core::fmt::Debug for vortex_array::aggregate_fn::fns::null_count::NullCount
728+
729+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result
730+
731+
impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::null_count::NullCount
732+
733+
pub type vortex_array::aggregate_fn::fns::null_count::NullCount::Options = vortex_array::aggregate_fn::EmptyOptions
734+
735+
pub type vortex_array::aggregate_fn::fns::null_count::NullCount::Partial = u64
736+
737+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::accumulate(&self, &mut Self::Partial, &vortex_array::Columnar, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()>
738+
739+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::coerce_args(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>
740+
741+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::combine_partials(&self, &mut Self::Partial, vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()>
742+
743+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>
744+
745+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::empty_partial(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult<Self::Partial>
746+
747+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::finalize(&self, vortex_array::ArrayRef) -> vortex_error::VortexResult<vortex_array::ArrayRef>
748+
749+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::finalize_scalar(&self, &Self::Partial) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>
750+
751+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::id(&self) -> vortex_array::aggregate_fn::AggregateFnId
752+
753+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::is_saturated(&self, &Self::Partial) -> bool
754+
755+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::partial_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option<vortex_array::dtype::DType>
756+
757+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::reset(&self, &mut Self::Partial)
758+
759+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::return_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option<vortex_array::dtype::DType>
760+
761+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::serialize(&self, &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>
762+
763+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::to_scalar(&self, &Self::Partial) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>
764+
765+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<bool>
766+
767+
pub fn vortex_array::aggregate_fn::fns::null_count::null_count(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<usize>
768+
719769
pub mod vortex_array::aggregate_fn::fns::sum
720770

721771
pub enum vortex_array::aggregate_fn::fns::sum::SumState
@@ -1376,6 +1426,42 @@ pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::to_scalar(&self, &S
13761426

13771427
pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<bool>
13781428

1429+
impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::null_count::NullCount
1430+
1431+
pub type vortex_array::aggregate_fn::fns::null_count::NullCount::Options = vortex_array::aggregate_fn::EmptyOptions
1432+
1433+
pub type vortex_array::aggregate_fn::fns::null_count::NullCount::Partial = u64
1434+
1435+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::accumulate(&self, &mut Self::Partial, &vortex_array::Columnar, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()>
1436+
1437+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::coerce_args(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>
1438+
1439+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::combine_partials(&self, &mut Self::Partial, vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()>
1440+
1441+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>
1442+
1443+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::empty_partial(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult<Self::Partial>
1444+
1445+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::finalize(&self, vortex_array::ArrayRef) -> vortex_error::VortexResult<vortex_array::ArrayRef>
1446+
1447+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::finalize_scalar(&self, &Self::Partial) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>
1448+
1449+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::id(&self) -> vortex_array::aggregate_fn::AggregateFnId
1450+
1451+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::is_saturated(&self, &Self::Partial) -> bool
1452+
1453+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::partial_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option<vortex_array::dtype::DType>
1454+
1455+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::reset(&self, &mut Self::Partial)
1456+
1457+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::return_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option<vortex_array::dtype::DType>
1458+
1459+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::serialize(&self, &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>
1460+
1461+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::to_scalar(&self, &Self::Partial) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>
1462+
1463+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<bool>
1464+
13791465
impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::sum::Sum
13801466

13811467
pub type vortex_array::aggregate_fn::fns::sum::Sum::Options = vortex_array::aggregate_fn::EmptyOptions
@@ -19676,8 +19762,16 @@ pub fn vortex_array::scalar_fn::fns::stat::StatOptions::hash<__H: core::hash::Ha
1967619762

1967719763
impl core::marker::StructuralPartialEq for vortex_array::scalar_fn::fns::stat::StatOptions
1967819764

19765+
pub fn vortex_array::stats::expr::min_max(vortex_array::expr::Expression) -> vortex_array::expr::Expression
19766+
19767+
pub fn vortex_array::stats::expr::nan_count(vortex_array::expr::Expression) -> vortex_array::expr::Expression
19768+
19769+
pub fn vortex_array::stats::expr::null_count(vortex_array::expr::Expression) -> vortex_array::expr::Expression
19770+
1967919771
pub fn vortex_array::stats::expr::stat(vortex_array::expr::Expression, vortex_array::aggregate_fn::AggregateFnRef) -> vortex_array::expr::Expression
1968019772

19773+
pub fn vortex_array::stats::expr::sum(vortex_array::expr::Expression) -> vortex_array::expr::Expression
19774+
1968119775
pub mod vortex_array::stats::flatbuffers
1968219776

1968319777
pub struct vortex_array::stats::ArrayStats
@@ -19910,10 +20004,18 @@ pub const vortex_array::stats::PRUNING_STATS: &[vortex_array::expr::stats::Stat]
1991020004

1991120005
pub fn vortex_array::stats::as_stat_bitset_bytes(&[vortex_array::expr::stats::Stat]) -> alloc::vec::Vec<u8>
1991220006

20007+
pub fn vortex_array::stats::min_max(vortex_array::expr::Expression) -> vortex_array::expr::Expression
20008+
20009+
pub fn vortex_array::stats::nan_count(vortex_array::expr::Expression) -> vortex_array::expr::Expression
20010+
20011+
pub fn vortex_array::stats::null_count(vortex_array::expr::Expression) -> vortex_array::expr::Expression
20012+
1991320013
pub fn vortex_array::stats::stat(vortex_array::expr::Expression, vortex_array::aggregate_fn::AggregateFnRef) -> vortex_array::expr::Expression
1991420014

1991520015
pub fn vortex_array::stats::stats_from_bitset_bytes(&[u8]) -> alloc::vec::Vec<vortex_array::expr::stats::Stat>
1991620016

20017+
pub fn vortex_array::stats::sum(vortex_array::expr::Expression) -> vortex_array::expr::Expression
20018+
1991720019
pub type vortex_array::stats::StatsArray = [(vortex_array::expr::stats::Stat, vortex_array::expr::stats::Precision<vortex_array::scalar::ScalarValue>); 4]
1991820020

1991920021
pub mod vortex_array::stream

vortex-array/src/aggregate_fn/fns/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@ pub mod last;
1010
pub mod mean;
1111
pub mod min_max;
1212
pub mod nan_count;
13+
pub mod null_count;
1314
pub mod sum;
1415
pub mod uncompressed_size_in_bytes;
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex_error::VortexExpect;
5+
use vortex_error::VortexResult;
6+
use vortex_error::vortex_err;
7+
8+
use crate::ArrayRef;
9+
use crate::Columnar;
10+
use crate::ExecutionCtx;
11+
use crate::IntoArray;
12+
use crate::aggregate_fn::Accumulator;
13+
use crate::aggregate_fn::AggregateFnId;
14+
use crate::aggregate_fn::AggregateFnVTable;
15+
use crate::aggregate_fn::DynAccumulator;
16+
use crate::aggregate_fn::EmptyOptions;
17+
use crate::dtype::DType;
18+
use crate::dtype::Nullability::NonNullable;
19+
use crate::dtype::PType;
20+
use crate::expr::stats::Precision;
21+
use crate::expr::stats::Stat;
22+
use crate::expr::stats::StatsProvider;
23+
use crate::scalar::Scalar;
24+
use crate::scalar::ScalarValue;
25+
26+
/// Return the number of null values in an array.
27+
pub fn null_count(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
28+
if let Some(Precision::Exact(null_count_scalar)) = array.statistics().get(Stat::NullCount) {
29+
return usize::try_from(&null_count_scalar)
30+
.map_err(|e| vortex_err!("Failed to convert null count stat to usize: {e}"));
31+
}
32+
33+
let mut acc = Accumulator::try_new(NullCount, EmptyOptions, array.dtype().clone())?;
34+
acc.accumulate(array, ctx)?;
35+
let result = acc.finish()?;
36+
37+
let count = result
38+
.as_primitive()
39+
.typed_value::<u64>()
40+
.vortex_expect("null_count result should not be null");
41+
let count_usize = usize::try_from(count).vortex_expect("Cannot be more nulls than usize::MAX");
42+
43+
array
44+
.statistics()
45+
.set(Stat::NullCount, Precision::Exact(ScalarValue::from(count)));
46+
47+
Ok(count_usize)
48+
}
49+
50+
/// Count the number of null values in an array.
51+
///
52+
/// Applies to all types and returns a non-null `u64`.
53+
#[derive(Clone, Debug)]
54+
pub struct NullCount;
55+
56+
impl AggregateFnVTable for NullCount {
57+
type Options = EmptyOptions;
58+
type Partial = u64;
59+
60+
fn id(&self) -> AggregateFnId {
61+
AggregateFnId::new("vortex.null_count")
62+
}
63+
64+
fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
65+
Ok(None)
66+
}
67+
68+
fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option<DType> {
69+
Some(DType::Primitive(PType::U64, NonNullable))
70+
}
71+
72+
fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
73+
self.return_dtype(options, input_dtype)
74+
}
75+
76+
fn empty_partial(
77+
&self,
78+
_options: &Self::Options,
79+
_input_dtype: &DType,
80+
) -> VortexResult<Self::Partial> {
81+
Ok(0)
82+
}
83+
84+
fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
85+
let count = other
86+
.as_primitive()
87+
.typed_value::<u64>()
88+
.vortex_expect("null_count partial should not be null");
89+
*partial += count;
90+
Ok(())
91+
}
92+
93+
fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
94+
Ok(Scalar::primitive(*partial, NonNullable))
95+
}
96+
97+
fn reset(&self, partial: &mut Self::Partial) {
98+
*partial = 0;
99+
}
100+
101+
#[inline]
102+
fn is_saturated(&self, _partial: &Self::Partial) -> bool {
103+
false
104+
}
105+
106+
fn try_accumulate(
107+
&self,
108+
state: &mut Self::Partial,
109+
batch: &ArrayRef,
110+
ctx: &mut ExecutionCtx,
111+
) -> VortexResult<bool> {
112+
*state += batch.invalid_count(ctx)? as u64;
113+
Ok(true)
114+
}
115+
116+
fn accumulate(
117+
&self,
118+
partial: &mut Self::Partial,
119+
batch: &Columnar,
120+
ctx: &mut ExecutionCtx,
121+
) -> VortexResult<()> {
122+
*partial += match batch {
123+
Columnar::Constant(c) => {
124+
if c.scalar().is_null() {
125+
c.len() as u64
126+
} else {
127+
0
128+
}
129+
}
130+
Columnar::Canonical(c) => c.clone().into_array().invalid_count(ctx)? as u64,
131+
};
132+
Ok(())
133+
}
134+
135+
fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
136+
Ok(partials)
137+
}
138+
139+
fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
140+
self.to_scalar(partial)
141+
}
142+
}
143+
144+
#[cfg(test)]
145+
mod tests {
146+
use vortex_error::VortexResult;
147+
148+
use crate::IntoArray;
149+
use crate::LEGACY_SESSION;
150+
use crate::VortexSessionExecute;
151+
use crate::aggregate_fn::Accumulator;
152+
use crate::aggregate_fn::DynAccumulator;
153+
use crate::aggregate_fn::EmptyOptions;
154+
use crate::aggregate_fn::fns::null_count::NullCount;
155+
use crate::aggregate_fn::fns::null_count::null_count;
156+
use crate::arrays::PrimitiveArray;
157+
use crate::dtype::DType;
158+
use crate::dtype::Nullability;
159+
use crate::dtype::PType;
160+
use crate::expr::stats::Precision;
161+
use crate::expr::stats::Stat;
162+
use crate::expr::stats::StatsProviderExt;
163+
164+
#[test]
165+
fn null_count_with_nulls() -> VortexResult<()> {
166+
let array =
167+
PrimitiveArray::from_option_iter([Some(1i32), None, Some(3), None]).into_array();
168+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
169+
170+
assert_eq!(null_count(&array, &mut ctx)?, 2);
171+
assert_eq!(
172+
array.statistics().get_as::<u64>(Stat::NullCount),
173+
Some(Precision::exact(2u64))
174+
);
175+
Ok(())
176+
}
177+
178+
#[test]
179+
fn null_count_multi_batch() -> VortexResult<()> {
180+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
181+
let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
182+
let mut acc = Accumulator::try_new(NullCount, EmptyOptions, dtype)?;
183+
184+
let batch1 = PrimitiveArray::from_option_iter([Some(1i32), None, Some(3)]).into_array();
185+
acc.accumulate(&batch1, &mut ctx)?;
186+
187+
let batch2 = PrimitiveArray::from_option_iter([None, Some(5i32), None]).into_array();
188+
acc.accumulate(&batch2, &mut ctx)?;
189+
190+
let result = acc.finish()?;
191+
assert_eq!(result.as_primitive().typed_value::<u64>(), Some(3));
192+
Ok(())
193+
}
194+
}

vortex-array/src/aggregate_fn/session.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::aggregate_fn::fns::is_sorted::IsSorted;
2121
use crate::aggregate_fn::fns::last::Last;
2222
use crate::aggregate_fn::fns::min_max::MinMax;
2323
use crate::aggregate_fn::fns::nan_count::NanCount;
24+
use crate::aggregate_fn::fns::null_count::NullCount;
2425
use crate::aggregate_fn::fns::sum::Sum;
2526
use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes;
2627
use crate::aggregate_fn::kernels::DynAggregateKernel;
@@ -74,6 +75,7 @@ impl Default for AggregateFnSession {
7475
this.register(Last);
7576
this.register(MinMax);
7677
this.register(NanCount);
78+
this.register(NullCount);
7779
this.register(Sum);
7880
this.register(UncompressedSizeInBytes);
7981

0 commit comments

Comments
 (0)