Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 4 additions & 46 deletions vortex-array/public-api.lock

Large diffs are not rendered by default.

22 changes: 14 additions & 8 deletions vortex-array/src/aggregate_fn/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ use crate::aggregate_fn::session::AggregateFnSessionExt;
use crate::columnar::AnyColumnar;
use crate::dtype::DType;
use crate::executor::max_iterations;
use crate::expr::stats::Precision;
use crate::expr::stats::Stat;
use crate::expr::stats::StatsProvider;
use crate::scalar::Scalar;

/// Reference-counted type-erased accumulator.
Expand Down Expand Up @@ -116,17 +119,20 @@ impl<V: AggregateFnVTable> DynAccumulator for Accumulator<V> {
batch.dtype()
);

// 0. Stats-driven shortcut: if the aggregate can be derived directly from the batch's
// cached statistics, use that and skip both kernel dispatch and decode. This is the
// only layer that consults `batch.statistics()`; encoding kernels must not.
if let Some(result) = self.vtable.try_partial_from_stats(batch)? {
// 0. Legacy stats bridge: if this aggregate is still cached under a legacy Stat slot,
// consume that exact stat before kernel dispatch or decode.
if let Some(stat) = Stat::from_aggregate_fn(&self.aggregate_fn)
&& let Some(Precision::Exact(partial)) = batch.statistics().get(stat)
{
vortex_ensure!(
result.dtype() == &self.partial_dtype,
"Aggregate try_partial_from_stats returned {}, expected {}",
result.dtype(),
partial.dtype() == &self.partial_dtype,
"Aggregate {} read legacy stat {} with dtype {}, expected {}",
self.aggregate_fn,
stat,
partial.dtype(),
self.partial_dtype,
);
self.vtable.combine_partials(&mut self.partial, result)?;
self.vtable.combine_partials(&mut self.partial, partial)?;
return Ok(());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,17 +150,6 @@ impl AggregateFnVTable for UncompressedSizeInBytes {
false
}

fn try_partial_from_stats(&self, batch: &ArrayRef) -> VortexResult<Option<Scalar>> {
let Some(Precision::Exact(size_scalar)) =
batch.statistics().get(Stat::UncompressedSizeInBytes)
else {
return Ok(None);
};
let size = u64::try_from(&size_scalar)
.map_err(|e| vortex_err!("Failed to convert uncompressed size stat to u64: {e}"))?;
Ok(Some(Scalar::primitive(size, NonNullable)))
}

fn accumulate(
&self,
partial: &mut Self::Partial,
Expand Down
19 changes: 0 additions & 19 deletions vortex-array/src/aggregate_fn/vtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,25 +102,6 @@ pub trait AggregateFnVTable: 'static + Sized + Clone + Send + Sync {
/// final result is fully determined.
fn is_saturated(&self, state: &Self::Partial) -> bool;

/// Try to derive a partial scalar from the batch's cached statistics, before any
/// kernel dispatch or canonicalization.
///
/// Returns `Some(partial_scalar)` if the answer can be read directly from `batch.statistics()`,
/// otherwise `Ok(None)` to fall through to the rest of dispatch. The returned scalar must
/// have the dtype reported by `partial_dtype`.
///
/// This is the single place stats-based shortcuts live; encoding kernels must not consult
/// stats themselves. Runs first so that an upstream producer who pre-populates the relevant
/// stat (e.g. a layout reader hydrating `Stat::UncompressedSizeInBytes` from file metadata)
/// can skip both kernel dispatch and decode.
///
/// TODO: this hook may be removed once `ArrayStats` stores aggregate partials internally —
/// at that point stat-driven shortcuts can be resolved automatically by the dispatch layer
/// without each aggregate vtable opting in.
fn try_partial_from_stats(&self, _batch: &ArrayRef) -> VortexResult<Option<Scalar>> {
Ok(None)
}

/// Try to accumulate the raw array before decompression.
///
/// Returns `true` if the array was handled, `false` to fall through to
Expand Down
36 changes: 36 additions & 0 deletions vortex-array/src/expr/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ pub use provider::*;
pub use stat_bound::*;

use crate::aggregate_fn;
use crate::aggregate_fn::AggregateFnRef;
use crate::aggregate_fn::AggregateFnVTable;
use crate::aggregate_fn::AggregateFnVTableExt;
use crate::aggregate_fn::EmptyOptions;

#[derive(
Expand Down Expand Up @@ -187,6 +189,40 @@ impl Stat {
})
}

/// Return the built-in aggregate function corresponding to this statistic, if one exists.
pub fn aggregate_fn(&self) -> Option<AggregateFnRef> {
Some(match self {
Self::Sum => aggregate_fn::fns::sum::Sum.bind(EmptyOptions),
Self::NaNCount => aggregate_fn::fns::nan_count::NanCount.bind(EmptyOptions),
Self::UncompressedSizeInBytes => {
aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes
.bind(EmptyOptions)
}
Self::IsConstant
| Self::IsSorted
| Self::IsStrictSorted
| Self::Max
| Self::Min
| Self::NullCount => return None,
})
}

/// Return the statistic represented by `aggregate_fn`, if it has a legacy stat slot.
pub fn from_aggregate_fn(aggregate_fn: &AggregateFnRef) -> Option<Self> {
if aggregate_fn.is::<aggregate_fn::fns::sum::Sum>() {
return Some(Self::Sum);
}
if aggregate_fn.is::<aggregate_fn::fns::nan_count::NanCount>() {
return Some(Self::NaNCount);
}
if aggregate_fn
.is::<aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes>()
{
return Some(Self::UncompressedSizeInBytes);
}
None
}

pub fn name(&self) -> &str {
match self {
Self::IsConstant => "is_constant",
Expand Down
25 changes: 15 additions & 10 deletions vortex-array/src/scalar_fn/fns/stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use crate::aggregate_fn::AggregateFnRef;
use crate::arrays::ConstantArray;
use crate::dtype::DType;
use crate::expr::Expression;
use crate::expr::stats::Stat;
use crate::expr::stats::StatsProvider;
use crate::scalar::Scalar;
use crate::scalar_fn::Arity;
use crate::scalar_fn::ChildName;
use crate::scalar_fn::ExecutionArgs;
use crate::scalar_fn::ScalarFnId;
use crate::scalar_fn::ScalarFnVTable;
use crate::stats::legacy::legacy_stat_for_aggregate;

/// Options for the `stat` scalar function.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -133,15 +133,20 @@ fn stat_array(
dtype: DType,
len: usize,
) -> VortexResult<ArrayRef> {
let value = legacy_stat_for_aggregate(aggregate_fn)
.and_then(|stat| {
array
.statistics()
.with_typed_stats_set(|stats| stats.get(stat))
})
// We don't mind whether the stat is approxed or not, since these are row-wise bounds
.map(|stat| stat.into_inner())
.and_then(Scalar::into_value);
let value = if let Some(stat) = Stat::from_aggregate_fn(aggregate_fn) {
array
.statistics()
.with_typed_stats_set(|stats| stats.get(stat))
// We don't mind whether the stat is approxed or not, since these are row-wise bounds.
.map(|stat| stat.into_inner())
.and_then(Scalar::into_value)
} else {
tracing::trace!(
"No legacy Stat slot for aggregate {}; stat expression will resolve to null",
aggregate_fn
);
None
};

let scalar = Scalar::try_new(dtype, value)?;
Ok(ConstantArray::new(scalar, len).into_array())
Expand Down
34 changes: 0 additions & 34 deletions vortex-array/src/stats/legacy.rs

This file was deleted.

1 change: 0 additions & 1 deletion vortex-array/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub use stats_set::*;
mod array;
pub mod expr;
pub mod flatbuffers;
pub(crate) mod legacy;
mod stats_set;

pub use array::*;
Expand Down
Loading