Skip to content

Commit 75ba3eb

Browse files
committed
Add null count stat aggregate wrappers
Signed-off-by: Nicholas Gates <nick@nickgates.com>
1 parent 0dce6a1 commit 75ba3eb

8 files changed

Lines changed: 378 additions & 3 deletions

File tree

vortex-array/public-api.lock

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -734,6 +734,58 @@ pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::try_partial_from_st
734734

735735
pub fn vortex_array::aggregate_fn::fns::nan_count::nan_count(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<usize>
736736

737+
pub mod vortex_array::aggregate_fn::fns::null_count
738+
739+
pub struct vortex_array::aggregate_fn::fns::null_count::NullCount
740+
741+
impl core::clone::Clone for vortex_array::aggregate_fn::fns::null_count::NullCount
742+
743+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::clone(&self) -> vortex_array::aggregate_fn::fns::null_count::NullCount
744+
745+
impl core::fmt::Debug for vortex_array::aggregate_fn::fns::null_count::NullCount
746+
747+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result
748+
749+
impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::null_count::NullCount
750+
751+
pub type vortex_array::aggregate_fn::fns::null_count::NullCount::Options = vortex_array::aggregate_fn::EmptyOptions
752+
753+
pub type vortex_array::aggregate_fn::fns::null_count::NullCount::Partial = u64
754+
755+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::accumulate(&self, &mut Self::Partial, &vortex_array::Columnar, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()>
756+
757+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::coerce_args(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>
758+
759+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::combine_partials(&self, &mut Self::Partial, vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()>
760+
761+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>
762+
763+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::empty_partial(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult<Self::Partial>
764+
765+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::finalize(&self, vortex_array::ArrayRef) -> vortex_error::VortexResult<vortex_array::ArrayRef>
766+
767+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::finalize_scalar(&self, &Self::Partial) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>
768+
769+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::id(&self) -> vortex_array::aggregate_fn::AggregateFnId
770+
771+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::is_saturated(&self, &Self::Partial) -> bool
772+
773+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::partial_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option<vortex_array::dtype::DType>
774+
775+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::reset(&self, &mut Self::Partial)
776+
777+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::return_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option<vortex_array::dtype::DType>
778+
779+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::serialize(&self, &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>
780+
781+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::to_scalar(&self, &Self::Partial) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>
782+
783+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<bool>
784+
785+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult<core::option::Option<vortex_array::scalar::Scalar>>
786+
787+
pub fn vortex_array::aggregate_fn::fns::null_count::null_count(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<usize>
788+
737789
pub mod vortex_array::aggregate_fn::fns::sum
738790

739791
pub enum vortex_array::aggregate_fn::fns::sum::SumState
@@ -1416,6 +1468,44 @@ pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::try_accumulate(&sel
14161468

14171469
pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult<core::option::Option<vortex_array::scalar::Scalar>>
14181470

1471+
impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::null_count::NullCount
1472+
1473+
pub type vortex_array::aggregate_fn::fns::null_count::NullCount::Options = vortex_array::aggregate_fn::EmptyOptions
1474+
1475+
pub type vortex_array::aggregate_fn::fns::null_count::NullCount::Partial = u64
1476+
1477+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::accumulate(&self, &mut Self::Partial, &vortex_array::Columnar, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()>
1478+
1479+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::coerce_args(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>
1480+
1481+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::combine_partials(&self, &mut Self::Partial, vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()>
1482+
1483+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>
1484+
1485+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::empty_partial(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult<Self::Partial>
1486+
1487+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::finalize(&self, vortex_array::ArrayRef) -> vortex_error::VortexResult<vortex_array::ArrayRef>
1488+
1489+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::finalize_scalar(&self, &Self::Partial) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>
1490+
1491+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::id(&self) -> vortex_array::aggregate_fn::AggregateFnId
1492+
1493+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::is_saturated(&self, &Self::Partial) -> bool
1494+
1495+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::partial_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option<vortex_array::dtype::DType>
1496+
1497+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::reset(&self, &mut Self::Partial)
1498+
1499+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::return_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option<vortex_array::dtype::DType>
1500+
1501+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::serialize(&self, &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>
1502+
1503+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::to_scalar(&self, &Self::Partial) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>
1504+
1505+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<bool>
1506+
1507+
pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult<core::option::Option<vortex_array::scalar::Scalar>>
1508+
14191509
impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::sum::Sum
14201510

14211511
pub type vortex_array::aggregate_fn::fns::sum::Sum::Options = vortex_array::aggregate_fn::EmptyOptions
@@ -19718,8 +19808,16 @@ pub fn vortex_array::scalar_fn::fns::stat::StatOptions::hash<__H: core::hash::Ha
1971819808

1971919809
impl core::marker::StructuralPartialEq for vortex_array::scalar_fn::fns::stat::StatOptions
1972019810

19811+
pub fn vortex_array::stats::expr::min_max(vortex_array::expr::Expression) -> vortex_array::expr::Expression
19812+
19813+
pub fn vortex_array::stats::expr::nan_count(vortex_array::expr::Expression) -> vortex_array::expr::Expression
19814+
19815+
pub fn vortex_array::stats::expr::null_count(vortex_array::expr::Expression) -> vortex_array::expr::Expression
19816+
1972119817
pub fn vortex_array::stats::expr::stat(vortex_array::expr::Expression, vortex_array::aggregate_fn::AggregateFnRef) -> vortex_array::expr::Expression
1972219818

19819+
pub fn vortex_array::stats::expr::sum(vortex_array::expr::Expression) -> vortex_array::expr::Expression
19820+
1972319821
pub mod vortex_array::stats::flatbuffers
1972419822

1972519823
pub struct vortex_array::stats::ArrayStats
@@ -19952,10 +20050,18 @@ pub const vortex_array::stats::PRUNING_STATS: &[vortex_array::expr::stats::Stat]
1995220050

1995320051
pub fn vortex_array::stats::as_stat_bitset_bytes(&[vortex_array::expr::stats::Stat]) -> alloc::vec::Vec<u8>
1995420052

20053+
pub fn vortex_array::stats::min_max(vortex_array::expr::Expression) -> vortex_array::expr::Expression
20054+
20055+
pub fn vortex_array::stats::nan_count(vortex_array::expr::Expression) -> vortex_array::expr::Expression
20056+
20057+
pub fn vortex_array::stats::null_count(vortex_array::expr::Expression) -> vortex_array::expr::Expression
20058+
1995520059
pub fn vortex_array::stats::stat(vortex_array::expr::Expression, vortex_array::aggregate_fn::AggregateFnRef) -> vortex_array::expr::Expression
1995620060

1995720061
pub fn vortex_array::stats::stats_from_bitset_bytes(&[u8]) -> alloc::vec::Vec<vortex_array::expr::stats::Stat>
1995820062

20063+
pub fn vortex_array::stats::sum(vortex_array::expr::Expression) -> vortex_array::expr::Expression
20064+
1995920065
pub type vortex_array::stats::StatsArray = [(vortex_array::expr::stats::Stat, vortex_array::expr::stats::Precision<vortex_array::scalar::ScalarValue>); 4]
1996020066

1996120067
pub mod vortex_array::stream

vortex-array/src/aggregate_fn/fns/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@ pub mod last;
1010
pub mod mean;
1111
pub mod min_max;
1212
pub mod nan_count;
13+
pub mod null_count;
1314
pub mod sum;
1415
pub mod uncompressed_size_in_bytes;
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex_error::VortexExpect;
5+
use vortex_error::VortexResult;
6+
use vortex_error::vortex_err;
7+
8+
use crate::ArrayRef;
9+
use crate::Columnar;
10+
use crate::ExecutionCtx;
11+
use crate::IntoArray;
12+
use crate::aggregate_fn::Accumulator;
13+
use crate::aggregate_fn::AggregateFnId;
14+
use crate::aggregate_fn::AggregateFnVTable;
15+
use crate::aggregate_fn::DynAccumulator;
16+
use crate::aggregate_fn::EmptyOptions;
17+
use crate::dtype::DType;
18+
use crate::dtype::Nullability::NonNullable;
19+
use crate::dtype::PType;
20+
use crate::expr::stats::Precision;
21+
use crate::expr::stats::Stat;
22+
use crate::expr::stats::StatsProvider;
23+
use crate::scalar::Scalar;
24+
use crate::scalar::ScalarValue;
25+
26+
/// Return the number of null values in an array.
27+
pub fn null_count(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
28+
if let Some(Precision::Exact(null_count_scalar)) = array.statistics().get(Stat::NullCount) {
29+
return usize::try_from(&null_count_scalar)
30+
.map_err(|e| vortex_err!("Failed to convert null count stat to usize: {e}"));
31+
}
32+
33+
let mut acc = Accumulator::try_new(NullCount, EmptyOptions, array.dtype().clone())?;
34+
acc.accumulate(array, ctx)?;
35+
let result = acc.finish()?;
36+
37+
let count = result
38+
.as_primitive()
39+
.typed_value::<u64>()
40+
.vortex_expect("null_count result should not be null");
41+
let count_usize = usize::try_from(count).vortex_expect("Cannot be more nulls than usize::MAX");
42+
43+
array
44+
.statistics()
45+
.set(Stat::NullCount, Precision::Exact(ScalarValue::from(count)));
46+
47+
Ok(count_usize)
48+
}
49+
50+
/// Count the number of null values in an array.
51+
///
52+
/// Applies to all types and returns a non-null `u64`.
53+
#[derive(Clone, Debug)]
54+
pub struct NullCount;
55+
56+
impl AggregateFnVTable for NullCount {
57+
type Options = EmptyOptions;
58+
type Partial = u64;
59+
60+
fn id(&self) -> AggregateFnId {
61+
AggregateFnId::new("vortex.null_count")
62+
}
63+
64+
fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
65+
Ok(None)
66+
}
67+
68+
fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option<DType> {
69+
Some(DType::Primitive(PType::U64, NonNullable))
70+
}
71+
72+
fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
73+
self.return_dtype(options, input_dtype)
74+
}
75+
76+
fn empty_partial(
77+
&self,
78+
_options: &Self::Options,
79+
_input_dtype: &DType,
80+
) -> VortexResult<Self::Partial> {
81+
Ok(0)
82+
}
83+
84+
fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
85+
let count = other
86+
.as_primitive()
87+
.typed_value::<u64>()
88+
.vortex_expect("null_count partial should not be null");
89+
*partial += count;
90+
Ok(())
91+
}
92+
93+
fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
94+
Ok(Scalar::primitive(*partial, NonNullable))
95+
}
96+
97+
fn reset(&self, partial: &mut Self::Partial) {
98+
*partial = 0;
99+
}
100+
101+
#[inline]
102+
fn is_saturated(&self, _partial: &Self::Partial) -> bool {
103+
false
104+
}
105+
106+
fn try_partial_from_stats(&self, batch: &ArrayRef) -> VortexResult<Option<Scalar>> {
107+
let Some(Precision::Exact(null_count_scalar)) = batch.statistics().get(Stat::NullCount)
108+
else {
109+
return Ok(None);
110+
};
111+
let count = u64::try_from(&null_count_scalar)
112+
.map_err(|e| vortex_err!("Failed to convert null count stat to u64: {e}"))?;
113+
Ok(Some(Scalar::primitive(count, NonNullable)))
114+
}
115+
116+
fn try_accumulate(
117+
&self,
118+
state: &mut Self::Partial,
119+
batch: &ArrayRef,
120+
ctx: &mut ExecutionCtx,
121+
) -> VortexResult<bool> {
122+
*state += batch.invalid_count(ctx)? as u64;
123+
Ok(true)
124+
}
125+
126+
fn accumulate(
127+
&self,
128+
partial: &mut Self::Partial,
129+
batch: &Columnar,
130+
ctx: &mut ExecutionCtx,
131+
) -> VortexResult<()> {
132+
*partial += match batch {
133+
Columnar::Constant(c) => {
134+
if c.scalar().is_null() {
135+
c.len() as u64
136+
} else {
137+
0
138+
}
139+
}
140+
Columnar::Canonical(c) => c.clone().into_array().invalid_count(ctx)? as u64,
141+
};
142+
Ok(())
143+
}
144+
145+
fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
146+
Ok(partials)
147+
}
148+
149+
fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
150+
self.to_scalar(partial)
151+
}
152+
}
153+
154+
#[cfg(test)]
155+
mod tests {
156+
use vortex_error::VortexResult;
157+
158+
use crate::IntoArray;
159+
use crate::LEGACY_SESSION;
160+
use crate::VortexSessionExecute;
161+
use crate::aggregate_fn::Accumulator;
162+
use crate::aggregate_fn::DynAccumulator;
163+
use crate::aggregate_fn::EmptyOptions;
164+
use crate::aggregate_fn::fns::null_count::NullCount;
165+
use crate::aggregate_fn::fns::null_count::null_count;
166+
use crate::arrays::PrimitiveArray;
167+
use crate::dtype::DType;
168+
use crate::dtype::Nullability;
169+
use crate::dtype::PType;
170+
use crate::expr::stats::Precision;
171+
use crate::expr::stats::Stat;
172+
use crate::expr::stats::StatsProviderExt;
173+
174+
#[test]
175+
fn null_count_with_nulls() -> VortexResult<()> {
176+
let array =
177+
PrimitiveArray::from_option_iter([Some(1i32), None, Some(3), None]).into_array();
178+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
179+
180+
assert_eq!(null_count(&array, &mut ctx)?, 2);
181+
assert_eq!(
182+
array.statistics().get_as::<u64>(Stat::NullCount),
183+
Some(Precision::exact(2u64))
184+
);
185+
Ok(())
186+
}
187+
188+
#[test]
189+
fn null_count_multi_batch() -> VortexResult<()> {
190+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
191+
let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
192+
let mut acc = Accumulator::try_new(NullCount, EmptyOptions, dtype)?;
193+
194+
let batch1 = PrimitiveArray::from_option_iter([Some(1i32), None, Some(3)]).into_array();
195+
acc.accumulate(&batch1, &mut ctx)?;
196+
197+
let batch2 = PrimitiveArray::from_option_iter([None, Some(5i32), None]).into_array();
198+
acc.accumulate(&batch2, &mut ctx)?;
199+
200+
let result = acc.finish()?;
201+
assert_eq!(result.as_primitive().typed_value::<u64>(), Some(3));
202+
Ok(())
203+
}
204+
}

vortex-array/src/aggregate_fn/session.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::aggregate_fn::fns::is_sorted::IsSorted;
2121
use crate::aggregate_fn::fns::last::Last;
2222
use crate::aggregate_fn::fns::min_max::MinMax;
2323
use crate::aggregate_fn::fns::nan_count::NanCount;
24+
use crate::aggregate_fn::fns::null_count::NullCount;
2425
use crate::aggregate_fn::fns::sum::Sum;
2526
use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes;
2627
use crate::aggregate_fn::kernels::DynAggregateKernel;
@@ -74,6 +75,7 @@ impl Default for AggregateFnSession {
7475
this.register(Last);
7576
this.register(MinMax);
7677
this.register(NanCount);
78+
this.register(NullCount);
7779
this.register(Sum);
7880
this.register(UncompressedSizeInBytes);
7981

vortex-array/src/expr/stats/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use num_enum::TryFromPrimitive;
1212

1313
use crate::dtype::DType;
1414
use crate::dtype::Nullability::NonNullable;
15-
use crate::dtype::PType;
1615

1716
mod bound;
1817
mod precision;
@@ -172,7 +171,10 @@ impl Stat {
172171
Self::Max => data_type.clone(),
173172
Self::Min if matches!(data_type, DType::Null) => return None,
174173
Self::Min => data_type.clone(),
175-
Self::NullCount => DType::Primitive(PType::U64, NonNullable),
174+
Self::NullCount => {
175+
return aggregate_fn::fns::null_count::NullCount
176+
.return_dtype(&EmptyOptions, data_type);
177+
}
176178
Self::UncompressedSizeInBytes => {
177179
return aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes
178180
.return_dtype(&EmptyOptions, data_type);

0 commit comments

Comments
 (0)