Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,56 @@ pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::try_accumulate(&sel

pub fn vortex_array::aggregate_fn::fns::nan_count::nan_count(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<usize>

pub mod vortex_array::aggregate_fn::fns::null_count

pub struct vortex_array::aggregate_fn::fns::null_count::NullCount

impl core::clone::Clone for vortex_array::aggregate_fn::fns::null_count::NullCount

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::clone(&self) -> vortex_array::aggregate_fn::fns::null_count::NullCount

impl core::fmt::Debug for vortex_array::aggregate_fn::fns::null_count::NullCount

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::null_count::NullCount

pub type vortex_array::aggregate_fn::fns::null_count::NullCount::Options = vortex_array::aggregate_fn::EmptyOptions

pub type vortex_array::aggregate_fn::fns::null_count::NullCount::Partial = u64

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::accumulate(&self, &mut Self::Partial, &vortex_array::Columnar, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::coerce_args(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::combine_partials(&self, &mut Self::Partial, vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::empty_partial(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult<Self::Partial>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::finalize(&self, vortex_array::ArrayRef) -> vortex_error::VortexResult<vortex_array::ArrayRef>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::finalize_scalar(&self, &Self::Partial) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::id(&self) -> vortex_array::aggregate_fn::AggregateFnId

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::is_saturated(&self, &Self::Partial) -> bool

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::partial_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::reset(&self, &mut Self::Partial)

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::return_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::serialize(&self, &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::to_scalar(&self, &Self::Partial) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<bool>

pub fn vortex_array::aggregate_fn::fns::null_count::null_count(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<usize>

pub mod vortex_array::aggregate_fn::fns::sum

pub enum vortex_array::aggregate_fn::fns::sum::SumState
Expand Down Expand Up @@ -1376,6 +1426,42 @@ pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::to_scalar(&self, &S

pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<bool>

impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::null_count::NullCount

pub type vortex_array::aggregate_fn::fns::null_count::NullCount::Options = vortex_array::aggregate_fn::EmptyOptions

pub type vortex_array::aggregate_fn::fns::null_count::NullCount::Partial = u64

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::accumulate(&self, &mut Self::Partial, &vortex_array::Columnar, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::coerce_args(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::combine_partials(&self, &mut Self::Partial, vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::empty_partial(&self, &Self::Options, &vortex_array::dtype::DType) -> vortex_error::VortexResult<Self::Partial>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::finalize(&self, vortex_array::ArrayRef) -> vortex_error::VortexResult<vortex_array::ArrayRef>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::finalize_scalar(&self, &Self::Partial) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::id(&self) -> vortex_array::aggregate_fn::AggregateFnId

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::is_saturated(&self, &Self::Partial) -> bool

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::partial_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::reset(&self, &mut Self::Partial)

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::return_dtype(&self, &Self::Options, &vortex_array::dtype::DType) -> core::option::Option<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::serialize(&self, &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::to_scalar(&self, &Self::Partial) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>

pub fn vortex_array::aggregate_fn::fns::null_count::NullCount::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<bool>

impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::sum::Sum

pub type vortex_array::aggregate_fn::fns::sum::Sum::Options = vortex_array::aggregate_fn::EmptyOptions
Expand Down Expand Up @@ -19676,8 +19762,16 @@ pub fn vortex_array::scalar_fn::fns::stat::StatOptions::hash<__H: core::hash::Ha

impl core::marker::StructuralPartialEq for vortex_array::scalar_fn::fns::stat::StatOptions

pub fn vortex_array::stats::expr::min_max(vortex_array::expr::Expression) -> vortex_array::expr::Expression

pub fn vortex_array::stats::expr::nan_count(vortex_array::expr::Expression) -> vortex_array::expr::Expression

pub fn vortex_array::stats::expr::null_count(vortex_array::expr::Expression) -> vortex_array::expr::Expression

pub fn vortex_array::stats::expr::stat(vortex_array::expr::Expression, vortex_array::aggregate_fn::AggregateFnRef) -> vortex_array::expr::Expression

pub fn vortex_array::stats::expr::sum(vortex_array::expr::Expression) -> vortex_array::expr::Expression

pub mod vortex_array::stats::flatbuffers

pub struct vortex_array::stats::ArrayStats
Expand Down Expand Up @@ -19910,10 +20004,18 @@ pub const vortex_array::stats::PRUNING_STATS: &[vortex_array::expr::stats::Stat]

pub fn vortex_array::stats::as_stat_bitset_bytes(&[vortex_array::expr::stats::Stat]) -> alloc::vec::Vec<u8>

pub fn vortex_array::stats::min_max(vortex_array::expr::Expression) -> vortex_array::expr::Expression

pub fn vortex_array::stats::nan_count(vortex_array::expr::Expression) -> vortex_array::expr::Expression

pub fn vortex_array::stats::null_count(vortex_array::expr::Expression) -> vortex_array::expr::Expression

pub fn vortex_array::stats::stat(vortex_array::expr::Expression, vortex_array::aggregate_fn::AggregateFnRef) -> vortex_array::expr::Expression

pub fn vortex_array::stats::stats_from_bitset_bytes(&[u8]) -> alloc::vec::Vec<vortex_array::expr::stats::Stat>

pub fn vortex_array::stats::sum(vortex_array::expr::Expression) -> vortex_array::expr::Expression

pub type vortex_array::stats::StatsArray = [(vortex_array::expr::stats::Stat, vortex_array::expr::stats::Precision<vortex_array::scalar::ScalarValue>); 4]

pub mod vortex_array::stream
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/aggregate_fn/fns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ pub mod last;
pub mod mean;
pub mod min_max;
pub mod nan_count;
pub mod null_count;
pub mod sum;
pub mod uncompressed_size_in_bytes;
194 changes: 194 additions & 0 deletions vortex-array/src/aggregate_fn/fns/null_count/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_err;

use crate::ArrayRef;
use crate::Columnar;
use crate::ExecutionCtx;
use crate::IntoArray;
use crate::aggregate_fn::Accumulator;
use crate::aggregate_fn::AggregateFnId;
use crate::aggregate_fn::AggregateFnVTable;
use crate::aggregate_fn::DynAccumulator;
use crate::aggregate_fn::EmptyOptions;
use crate::dtype::DType;
use crate::dtype::Nullability::NonNullable;
use crate::dtype::PType;
use crate::expr::stats::Precision;
use crate::expr::stats::Stat;
use crate::expr::stats::StatsProvider;
use crate::scalar::Scalar;
use crate::scalar::ScalarValue;

/// Return the number of null values in an array.
pub fn null_count(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
if let Some(Precision::Exact(null_count_scalar)) = array.statistics().get(Stat::NullCount) {
return usize::try_from(&null_count_scalar)
.map_err(|e| vortex_err!("Failed to convert null count stat to usize: {e}"));
}

let mut acc = Accumulator::try_new(NullCount, EmptyOptions, array.dtype().clone())?;
acc.accumulate(array, ctx)?;
let result = acc.finish()?;

let count = result
.as_primitive()
.typed_value::<u64>()
.vortex_expect("null_count result should not be null");
let count_usize = usize::try_from(count).vortex_expect("Cannot be more nulls than usize::MAX");

array
.statistics()
.set(Stat::NullCount, Precision::Exact(ScalarValue::from(count)));

Ok(count_usize)
}

/// Count the number of null values in an array.
///
/// Applies to all types and returns a non-null `u64`.
#[derive(Clone, Debug)]
pub struct NullCount;

impl AggregateFnVTable for NullCount {
type Options = EmptyOptions;
type Partial = u64;

fn id(&self) -> AggregateFnId {
AggregateFnId::new("vortex.null_count")
}

fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
Ok(None)
}

fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option<DType> {
Some(DType::Primitive(PType::U64, NonNullable))
}

fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
self.return_dtype(options, input_dtype)
}

fn empty_partial(
&self,
_options: &Self::Options,
_input_dtype: &DType,
) -> VortexResult<Self::Partial> {
Ok(0)
}

fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
let count = other
.as_primitive()
.typed_value::<u64>()
.vortex_expect("null_count partial should not be null");
*partial += count;
Ok(())
}

fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
Ok(Scalar::primitive(*partial, NonNullable))
}

fn reset(&self, partial: &mut Self::Partial) {
*partial = 0;
}

#[inline]
fn is_saturated(&self, _partial: &Self::Partial) -> bool {
false
}

fn try_accumulate(
&self,
state: &mut Self::Partial,
batch: &ArrayRef,
ctx: &mut ExecutionCtx,
) -> VortexResult<bool> {
*state += batch.invalid_count(ctx)? as u64;
Ok(true)
}

fn accumulate(
&self,
partial: &mut Self::Partial,
batch: &Columnar,
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
*partial += match batch {
Columnar::Constant(c) => {
if c.scalar().is_null() {
c.len() as u64
} else {
0
}
}
Columnar::Canonical(c) => c.clone().into_array().invalid_count(ctx)? as u64,
};
Ok(())
}

fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
Ok(partials)
}

fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
self.to_scalar(partial)
}
}

#[cfg(test)]
mod tests {
use vortex_error::VortexResult;

use crate::IntoArray;
use crate::LEGACY_SESSION;
use crate::VortexSessionExecute;
use crate::aggregate_fn::Accumulator;
use crate::aggregate_fn::DynAccumulator;
use crate::aggregate_fn::EmptyOptions;
use crate::aggregate_fn::fns::null_count::NullCount;
use crate::aggregate_fn::fns::null_count::null_count;
use crate::arrays::PrimitiveArray;
use crate::dtype::DType;
use crate::dtype::Nullability;
use crate::dtype::PType;
use crate::expr::stats::Precision;
use crate::expr::stats::Stat;
use crate::expr::stats::StatsProviderExt;

#[test]
fn null_count_with_nulls() -> VortexResult<()> {
let array =
PrimitiveArray::from_option_iter([Some(1i32), None, Some(3), None]).into_array();
let mut ctx = LEGACY_SESSION.create_execution_ctx();

assert_eq!(null_count(&array, &mut ctx)?, 2);
assert_eq!(
array.statistics().get_as::<u64>(Stat::NullCount),
Some(Precision::exact(2u64))
);
Ok(())
}

#[test]
fn null_count_multi_batch() -> VortexResult<()> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
let mut acc = Accumulator::try_new(NullCount, EmptyOptions, dtype)?;

let batch1 = PrimitiveArray::from_option_iter([Some(1i32), None, Some(3)]).into_array();
acc.accumulate(&batch1, &mut ctx)?;

let batch2 = PrimitiveArray::from_option_iter([None, Some(5i32), None]).into_array();
acc.accumulate(&batch2, &mut ctx)?;

let result = acc.finish()?;
assert_eq!(result.as_primitive().typed_value::<u64>(), Some(3));
Ok(())
}
}
2 changes: 2 additions & 0 deletions vortex-array/src/aggregate_fn/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::aggregate_fn::fns::is_sorted::IsSorted;
use crate::aggregate_fn::fns::last::Last;
use crate::aggregate_fn::fns::min_max::MinMax;
use crate::aggregate_fn::fns::nan_count::NanCount;
use crate::aggregate_fn::fns::null_count::NullCount;
use crate::aggregate_fn::fns::sum::Sum;
use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes;
use crate::aggregate_fn::kernels::DynAggregateKernel;
Expand Down Expand Up @@ -74,6 +75,7 @@ impl Default for AggregateFnSession {
this.register(Last);
this.register(MinMax);
this.register(NanCount);
this.register(NullCount);
this.register(Sum);
this.register(UncompressedSizeInBytes);

Expand Down
Loading
Loading