Skip to content

Commit 7b76045

Browse files
committed
Centralize aggregate stat bridge
Signed-off-by: Nicholas Gates <nick@nickgates.com>
1 parent 8c5a0f1 commit 7b76045

8 files changed

Lines changed: 69 additions & 129 deletions

File tree

vortex-array/public-api.lock

Lines changed: 4 additions & 46 deletions
Large diffs are not rendered by default.

vortex-array/src/aggregate_fn/accumulator.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ use crate::aggregate_fn::session::AggregateFnSessionExt;
1515
use crate::columnar::AnyColumnar;
1616
use crate::dtype::DType;
1717
use crate::executor::max_iterations;
18+
use crate::expr::stats::Precision;
19+
use crate::expr::stats::Stat;
20+
use crate::expr::stats::StatsProvider;
1821
use crate::scalar::Scalar;
1922

2023
/// Reference-counted type-erased accumulator.
@@ -116,17 +119,20 @@ impl<V: AggregateFnVTable> DynAccumulator for Accumulator<V> {
116119
batch.dtype()
117120
);
118121

119-
// 0. Stats-driven shortcut: if the aggregate can be derived directly from the batch's
120-
// cached statistics, use that and skip both kernel dispatch and decode. This is the
121-
// only layer that consults `batch.statistics()`; encoding kernels must not.
122-
if let Some(result) = self.vtable.try_partial_from_stats(batch)? {
122+
// 0. Legacy stats bridge: if this aggregate is still cached under a legacy Stat slot,
123+
// consume that exact stat before kernel dispatch or decode.
124+
if let Some(stat) = Stat::from_aggregate_fn(&self.aggregate_fn)
125+
&& let Some(Precision::Exact(partial)) = batch.statistics().get(stat)
126+
{
123127
vortex_ensure!(
124-
result.dtype() == &self.partial_dtype,
125-
"Aggregate try_partial_from_stats returned {}, expected {}",
126-
result.dtype(),
128+
partial.dtype() == &self.partial_dtype,
129+
"Aggregate {} read legacy stat {} with dtype {}, expected {}",
130+
self.aggregate_fn,
131+
stat,
132+
partial.dtype(),
127133
self.partial_dtype,
128134
);
129-
self.vtable.combine_partials(&mut self.partial, result)?;
135+
self.vtable.combine_partials(&mut self.partial, partial)?;
130136
return Ok(());
131137
}
132138

vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -150,17 +150,6 @@ impl AggregateFnVTable for UncompressedSizeInBytes {
150150
false
151151
}
152152

153-
fn try_partial_from_stats(&self, batch: &ArrayRef) -> VortexResult<Option<Scalar>> {
154-
let Some(Precision::Exact(size_scalar)) =
155-
batch.statistics().get(Stat::UncompressedSizeInBytes)
156-
else {
157-
return Ok(None);
158-
};
159-
let size = u64::try_from(&size_scalar)
160-
.map_err(|e| vortex_err!("Failed to convert uncompressed size stat to u64: {e}"))?;
161-
Ok(Some(Scalar::primitive(size, NonNullable)))
162-
}
163-
164153
fn accumulate(
165154
&self,
166155
partial: &mut Self::Partial,

vortex-array/src/aggregate_fn/vtable.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -102,25 +102,6 @@ pub trait AggregateFnVTable: 'static + Sized + Clone + Send + Sync {
102102
/// final result is fully determined.
103103
fn is_saturated(&self, state: &Self::Partial) -> bool;
104104

105-
/// Try to derive a partial scalar from the batch's cached statistics, before any
106-
/// kernel dispatch or canonicalization.
107-
///
108-
/// Returns `Some(partial_scalar)` if the answer can be read directly from `batch.statistics()`,
109-
/// otherwise `Ok(None)` to fall through to the rest of dispatch. The returned scalar must
110-
/// have the dtype reported by `partial_dtype`.
111-
///
112-
/// This is the single place stats-based shortcuts live; encoding kernels must not consult
113-
/// stats themselves. Runs first so that an upstream producer who pre-populates the relevant
114-
/// stat (e.g. a layout reader hydrating `Stat::UncompressedSizeInBytes` from file metadata)
115-
/// can skip both kernel dispatch and decode.
116-
///
117-
/// TODO: this hook may be removed once `ArrayStats` stores aggregate partials internally —
118-
/// at that point stat-driven shortcuts can be resolved automatically by the dispatch layer
119-
/// without each aggregate vtable opting in.
120-
fn try_partial_from_stats(&self, _batch: &ArrayRef) -> VortexResult<Option<Scalar>> {
121-
Ok(None)
122-
}
123-
124105
/// Try to accumulate the raw array before decompression.
125106
///
126107
/// Returns `true` if the array was handled, `false` to fall through to

vortex-array/src/expr/stats/mod.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ pub use provider::*;
2525
pub use stat_bound::*;
2626

2727
use crate::aggregate_fn;
28+
use crate::aggregate_fn::AggregateFnRef;
2829
use crate::aggregate_fn::AggregateFnVTable;
30+
use crate::aggregate_fn::AggregateFnVTableExt;
2931
use crate::aggregate_fn::EmptyOptions;
3032

3133
#[derive(
@@ -187,6 +189,40 @@ impl Stat {
187189
})
188190
}
189191

192+
/// Return the built-in aggregate function corresponding to this statistic, if one exists.
193+
pub fn aggregate_fn(&self) -> Option<AggregateFnRef> {
194+
Some(match self {
195+
Self::Sum => aggregate_fn::fns::sum::Sum.bind(EmptyOptions),
196+
Self::NaNCount => aggregate_fn::fns::nan_count::NanCount.bind(EmptyOptions),
197+
Self::UncompressedSizeInBytes => {
198+
aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes
199+
.bind(EmptyOptions)
200+
}
201+
Self::IsConstant
202+
| Self::IsSorted
203+
| Self::IsStrictSorted
204+
| Self::Max
205+
| Self::Min
206+
| Self::NullCount => return None,
207+
})
208+
}
209+
210+
/// Return the statistic represented by `aggregate_fn`, if it has a legacy stat slot.
211+
pub fn from_aggregate_fn(aggregate_fn: &AggregateFnRef) -> Option<Self> {
212+
if aggregate_fn.is::<aggregate_fn::fns::sum::Sum>() {
213+
return Some(Self::Sum);
214+
}
215+
if aggregate_fn.is::<aggregate_fn::fns::nan_count::NanCount>() {
216+
return Some(Self::NaNCount);
217+
}
218+
if aggregate_fn
219+
.is::<aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes>()
220+
{
221+
return Some(Self::UncompressedSizeInBytes);
222+
}
223+
None
224+
}
225+
190226
pub fn name(&self) -> &str {
191227
match self {
192228
Self::IsConstant => "is_constant",

vortex-array/src/scalar_fn/fns/stat.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ use crate::aggregate_fn::AggregateFnRef;
1616
use crate::arrays::ConstantArray;
1717
use crate::dtype::DType;
1818
use crate::expr::Expression;
19+
use crate::expr::stats::Stat;
1920
use crate::expr::stats::StatsProvider;
2021
use crate::scalar::Scalar;
2122
use crate::scalar_fn::Arity;
2223
use crate::scalar_fn::ChildName;
2324
use crate::scalar_fn::ExecutionArgs;
2425
use crate::scalar_fn::ScalarFnId;
2526
use crate::scalar_fn::ScalarFnVTable;
26-
use crate::stats::legacy::legacy_stat_for_aggregate;
2727

2828
/// Options for the `stat` scalar function.
2929
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
@@ -133,15 +133,20 @@ fn stat_array(
133133
dtype: DType,
134134
len: usize,
135135
) -> VortexResult<ArrayRef> {
136-
let value = legacy_stat_for_aggregate(aggregate_fn)
137-
.and_then(|stat| {
138-
array
139-
.statistics()
140-
.with_typed_stats_set(|stats| stats.get(stat))
141-
})
142-
// We don't mind whether the stat is approxed or not, since these are row-wise bounds
143-
.map(|stat| stat.into_inner())
144-
.and_then(Scalar::into_value);
136+
let value = if let Some(stat) = Stat::from_aggregate_fn(aggregate_fn) {
137+
array
138+
.statistics()
139+
.with_typed_stats_set(|stats| stats.get(stat))
140+
// We don't mind whether the stat is approxed or not, since these are row-wise bounds.
141+
.map(|stat| stat.into_inner())
142+
.and_then(Scalar::into_value)
143+
} else {
144+
tracing::trace!(
145+
"No legacy Stat slot for aggregate {}; stat expression will resolve to null",
146+
aggregate_fn
147+
);
148+
None
149+
};
145150

146151
let scalar = Scalar::try_new(dtype, value)?;
147152
Ok(ConstantArray::new(scalar, len).into_array())

vortex-array/src/stats/legacy.rs

Lines changed: 0 additions & 34 deletions
This file was deleted.

vortex-array/src/stats/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ pub use stats_set::*;
1313
mod array;
1414
pub mod expr;
1515
pub mod flatbuffers;
16-
pub(crate) mod legacy;
1716
mod stats_set;
1817

1918
pub use array::*;

0 commit comments

Comments
 (0)