Skip to content

Commit 2f38485

Browse files
committed
MORE
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent 2e315cd commit 2f38485

17 files changed

Lines changed: 687 additions & 182 deletions

File tree

.codex

Whitespace-only changes.

vortex-array/public-api.lock

Lines changed: 198 additions & 2 deletions
Large diffs are not rendered by default.

vortex-array/src/aggregate_fn/fns/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ pub mod is_sorted;
88
pub mod last;
99
pub mod min_max;
1010
pub mod nan_count;
11+
pub mod row_count;
1112
pub mod sum;
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex_error::VortexExpect;
5+
use vortex_error::VortexResult;
6+
7+
use crate::ArrayRef;
8+
use crate::Columnar;
9+
use crate::ExecutionCtx;
10+
use crate::aggregate_fn::AggregateFnId;
11+
use crate::aggregate_fn::AggregateFnVTable;
12+
use crate::aggregate_fn::EmptyOptions;
13+
use crate::dtype::DType;
14+
use crate::dtype::Nullability;
15+
use crate::dtype::PType;
16+
use crate::scalar::Scalar;
17+
18+
/// Count the total number of elements in an array, including nulls.
19+
///
20+
/// Applies to all types. Returns a `u64` count.
21+
/// The identity value is zero.
22+
///
23+
/// Unlike [`Count`][crate::aggregate_fn::fns::count::Count], this aggregate includes
24+
/// null elements in the total. It is primarily used as a marker inside pruning
25+
/// predicates that need to refer to the scope row count.
26+
#[derive(Clone, Debug)]
27+
pub struct RowCount;
28+
29+
impl AggregateFnVTable for RowCount {
30+
type Options = EmptyOptions;
31+
type Partial = u64;
32+
33+
fn id(&self) -> AggregateFnId {
34+
AggregateFnId::new("vortex.row_count")
35+
}
36+
37+
fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
38+
unimplemented!("RowCount is not yet serializable");
39+
}
40+
41+
fn return_dtype(&self, _options: &Self::Options, _input_dtype: &DType) -> Option<DType> {
42+
Some(DType::Primitive(PType::U64, Nullability::NonNullable))
43+
}
44+
45+
fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
46+
self.return_dtype(options, input_dtype)
47+
}
48+
49+
fn empty_partial(
50+
&self,
51+
_options: &Self::Options,
52+
_input_dtype: &DType,
53+
) -> VortexResult<Self::Partial> {
54+
Ok(0u64)
55+
}
56+
57+
fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
58+
let val = other
59+
.as_primitive()
60+
.typed_value::<u64>()
61+
.vortex_expect("row_count partial should not be null");
62+
*partial += val;
63+
Ok(())
64+
}
65+
66+
fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
67+
Ok(Scalar::primitive(*partial, Nullability::NonNullable))
68+
}
69+
70+
fn reset(&self, partial: &mut Self::Partial) {
71+
*partial = 0;
72+
}
73+
74+
#[inline]
75+
fn is_saturated(&self, _partial: &Self::Partial) -> bool {
76+
false
77+
}
78+
79+
fn try_accumulate(
80+
&self,
81+
state: &mut Self::Partial,
82+
batch: &ArrayRef,
83+
_ctx: &mut ExecutionCtx,
84+
) -> VortexResult<bool> {
85+
*state += batch.len() as u64;
86+
Ok(true)
87+
}
88+
89+
fn accumulate(
90+
&self,
91+
_partial: &mut Self::Partial,
92+
_batch: &Columnar,
93+
_ctx: &mut ExecutionCtx,
94+
) -> VortexResult<()> {
95+
unreachable!("RowCount::try_accumulate handles all arrays")
96+
}
97+
98+
fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
99+
Ok(partials)
100+
}
101+
102+
fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
103+
self.to_scalar(partial)
104+
}
105+
}
106+
107+
#[cfg(test)]
108+
mod tests {
109+
use vortex_buffer::buffer;
110+
use vortex_error::VortexResult;
111+
112+
use crate::IntoArray;
113+
use crate::LEGACY_SESSION;
114+
use crate::VortexSessionExecute;
115+
use crate::aggregate_fn::Accumulator;
116+
use crate::aggregate_fn::DynAccumulator;
117+
use crate::aggregate_fn::EmptyOptions;
118+
use crate::aggregate_fn::fns::row_count::RowCount;
119+
use crate::arrays::PrimitiveArray;
120+
121+
#[test]
122+
fn row_count_all_valid() -> VortexResult<()> {
123+
let array = buffer![1i32, 2, 3, 4, 5].into_array();
124+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
125+
let mut acc = Accumulator::try_new(RowCount, EmptyOptions, array.dtype().clone())?;
126+
acc.accumulate(&array, &mut ctx)?;
127+
let result = acc.finish()?;
128+
assert_eq!(result.as_primitive().typed_value::<u64>(), Some(5));
129+
Ok(())
130+
}
131+
132+
#[test]
133+
fn row_count_includes_nulls() -> VortexResult<()> {
134+
let array = PrimitiveArray::from_option_iter([Some(1i32), None, Some(3), None, Some(5)])
135+
.into_array();
136+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
137+
let mut acc = Accumulator::try_new(RowCount, EmptyOptions, array.dtype().clone())?;
138+
acc.accumulate(&array, &mut ctx)?;
139+
let result = acc.finish()?;
140+
assert_eq!(result.as_primitive().typed_value::<u64>(), Some(5));
141+
Ok(())
142+
}
143+
}

vortex-array/src/aggregate_fn/session.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::aggregate_fn::fns::is_sorted::IsSorted;
1818
use crate::aggregate_fn::fns::last::Last;
1919
use crate::aggregate_fn::fns::min_max::MinMax;
2020
use crate::aggregate_fn::fns::nan_count::NanCount;
21+
use crate::aggregate_fn::fns::row_count::RowCount;
2122
use crate::aggregate_fn::fns::sum::Sum;
2223
use crate::aggregate_fn::kernels::DynAggregateKernel;
2324
use crate::aggregate_fn::kernels::DynGroupedAggregateKernel;
@@ -59,6 +60,7 @@ impl Default for AggregateFnSession {
5960
this.register(Last);
6061
this.register(MinMax);
6162
this.register(NanCount);
63+
this.register(RowCount);
6264
this.register(Sum);
6365

6466
// Register the built-in aggregate kernels.

vortex-array/src/expr/exprs.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ use vortex_error::VortexExpect;
99
use vortex_error::vortex_panic;
1010
use vortex_utils::iter::ReduceBalancedIterExt;
1111

12+
use crate::aggregate_fn::AggregateFnRef;
13+
use crate::aggregate_fn::AggregateFnVTableExt;
14+
use crate::aggregate_fn::fns::row_count::RowCount;
1215
use crate::dtype::DType;
1316
use crate::dtype::FieldName;
1417
use crate::dtype::FieldNames;
@@ -46,6 +49,7 @@ use crate::scalar_fn::fns::pack::PackOptions;
4649
use crate::scalar_fn::fns::root::Root;
4750
use crate::scalar_fn::fns::select::FieldSelection;
4851
use crate::scalar_fn::fns::select::Select;
52+
use crate::scalar_fn::fns::stats_expression::StatsExpression;
4953
use crate::scalar_fn::fns::zip::Zip;
5054

5155
// ---- Root ----
@@ -701,3 +705,21 @@ pub fn dynamic(
701705
pub fn list_contains(list: Expression, value: Expression) -> Expression {
702706
ListContains.new_expr(EmptyOptions, [list, value])
703707
}
708+
709+
// ---- StatsExpression ----
710+
711+
/// Creates a placeholder [`StatsExpression`] wrapping the given aggregate.
712+
///
713+
/// The expression must be substituted before evaluation by the layer that owns the
714+
/// evaluation scope — see [`StatsExpression`] for details.
715+
pub fn stats_expression(agg: AggregateFnRef) -> Expression {
716+
StatsExpression.new_expr(agg, [])
717+
}
718+
719+
/// Creates a [`StatsExpression`] wrapping the [`RowCount`] aggregate.
720+
///
721+
/// This is the canonical way to refer to the row count of the current evaluation scope inside
722+
/// a pruning predicate.
723+
pub fn row_count() -> Expression {
724+
stats_expression(RowCount.bind(crate::aggregate_fn::EmptyOptions))
725+
}

vortex-array/src/expr/pruning/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
pub(crate) mod pruning_expr;
55
mod relation;
66

7-
pub use pruning_expr::ROW_COUNT_FIELD;
87
pub use pruning_expr::RequiredStats;
98
pub use pruning_expr::checked_pruning_expr;
109
pub use pruning_expr::field_path_stat_field_name;
@@ -19,8 +18,7 @@ pub trait StatsCatalog {
1918
/// Given a field path and statistic, return an expression that when evaluated over the catalog
2019
/// will return that stat for the referenced field.
2120
///
22-
/// This is likely to be a column expression, or a literal. Implementations may also expose
23-
/// synthetic scope-level values via reserved field paths such as [`ROW_COUNT_FIELD`].
21+
/// This is likely to be a column expression, or a literal.
2422
///
2523
/// Returns `None` if the stat is not available for the field path.
2624
fn stats_ref(&self, _field_path: &FieldPath, _stat: Stat) -> Option<Expression> {

vortex-array/src/expr/pruning/pruning_expr.rs

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,11 @@ use crate::dtype::FieldPath;
1414
use crate::dtype::FieldPathSet;
1515
use crate::expr::Expression;
1616
use crate::expr::StatsCatalog;
17-
use crate::expr::col;
1817
use crate::expr::get_item;
1918
use crate::expr::root;
2019
use crate::expr::stats::Stat;
2120

2221
pub type RequiredStats = Relation<FieldPath, Stat>;
23-
pub const ROW_COUNT_FIELD: &str = "__vortex_row_count";
24-
25-
fn is_row_count_field(field_path: &FieldPath) -> bool {
26-
field_path.parts().len() == 1 && field_path.parts()[0].as_name() == Some(ROW_COUNT_FIELD)
27-
}
2822

2923
// A catalog that return a stat column whenever it is required, tracking all accessed
3024
// stats and returning them later.
@@ -49,13 +43,6 @@ struct ScopeStatsCatalog<'a> {
4943

5044
impl StatsCatalog for ScopeStatsCatalog<'_> {
5145
fn stats_ref(&self, field_path: &FieldPath, stat: Stat) -> Option<Expression> {
52-
if is_row_count_field(field_path) {
53-
return self
54-
.available_stats
55-
.contains(field_path)
56-
.then(|| col(ROW_COUNT_FIELD));
57-
}
58-
5946
let stat_path = field_path.clone().push(stat.name());
6047

6148
if self.available_stats.contains(&stat_path) {
@@ -68,10 +55,6 @@ impl StatsCatalog for ScopeStatsCatalog<'_> {
6855

6956
impl StatsCatalog for TrackingStatsCatalog {
7057
fn stats_ref(&self, field_path: &FieldPath, stat: Stat) -> Option<Expression> {
71-
if is_row_count_field(field_path) {
72-
return Some(col(ROW_COUNT_FIELD));
73-
}
74-
7558
let mut expr = root();
7659
let name = field_path_stat_field_name(field_path, stat);
7760
expr = get_item(name, expr);
@@ -103,8 +86,9 @@ pub fn field_path_stat_field_name(field_path: &FieldPath, stat: Stat) -> FieldNa
10386
/// cannot hold, and false if it cannot be determined from stats alone whether the positions can
10487
/// be pruned.
10588
///
106-
/// Include [`ROW_COUNT_FIELD`] in `available_stats` to make the scope row count available to
107-
/// row-count-aware pruning expressions such as `is_not_null(...)`.
89+
/// Row-count-aware pruning (for example `is_not_null(...)`) emits
90+
/// [`row_count`][crate::expr::row_count] placeholders that the evaluation layer must substitute
91+
/// before executing the returned expression.
10892
///
10993
/// If the falsification logic attempts to access an unknown stat,
11094
/// this function will return `None`.

vortex-array/src/scalar_fn/fns/is_not_null.rs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,11 @@ use crate::ExecutionCtx;
1111
use crate::IntoArray;
1212
use crate::arrays::ConstantArray;
1313
use crate::dtype::DType;
14-
use crate::dtype::FieldPath;
1514
use crate::dtype::Nullability;
1615
use crate::expr::Expression;
1716
use crate::expr::StatsCatalog;
1817
use crate::expr::eq;
19-
use crate::expr::pruning::ROW_COUNT_FIELD;
18+
use crate::expr::row_count;
2019
use crate::expr::stats::Stat;
2120
use crate::scalar_fn::Arity;
2221
use crate::scalar_fn::ChildName;
@@ -108,9 +107,7 @@ impl ScalarFnVTable for IsNotNull {
108107
// is_not_null is falsified when ALL values are null, i.e. null_count == row_count.
109108
let child = expr.child(0);
110109
let null_count_expr = child.stat_expression(Stat::NullCount, catalog)?;
111-
let row_count_expr =
112-
catalog.stats_ref(&FieldPath::from_name(ROW_COUNT_FIELD), Stat::NullCount)?;
113-
Some(eq(null_count_expr, row_count_expr))
110+
Some(eq(null_count_expr, row_count()))
114111
}
115112
}
116113

@@ -260,25 +257,22 @@ mod tests {
260257
use crate::dtype::FieldPathSet;
261258
use crate::expr::col;
262259
use crate::expr::eq;
263-
use crate::expr::pruning::ROW_COUNT_FIELD;
264260
use crate::expr::pruning::checked_pruning_expr;
261+
use crate::expr::row_count;
265262
use crate::expr::stats::Stat;
266263

267264
let expr = is_not_null(col("a"));
268265

269266
let (pruning_expr, st) = checked_pruning_expr(
270267
&expr,
271-
&FieldPathSet::from_iter([
272-
FieldPath::from_iter([Field::Name("a".into()), Field::Name("null_count".into())]),
273-
FieldPath::from_name(ROW_COUNT_FIELD),
274-
]),
268+
&FieldPathSet::from_iter([FieldPath::from_iter([
269+
Field::Name("a".into()),
270+
Field::Name("null_count".into()),
271+
])]),
275272
)
276273
.unwrap();
277274

278-
assert_eq!(
279-
&pruning_expr,
280-
&eq(col("a_null_count"), col(ROW_COUNT_FIELD))
281-
);
275+
assert_eq!(&pruning_expr, &eq(col("a_null_count"), row_count()));
282276
assert_eq!(
283277
st.map(),
284278
&HashMap::from_iter([(FieldPath::from_name("a"), HashSet::from([Stat::NullCount]))])

vortex-array/src/scalar_fn/fns/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ pub mod operators;
2020
pub mod pack;
2121
pub mod root;
2222
pub mod select;
23+
pub mod stats_expression;
2324
pub mod zip;

0 commit comments

Comments
 (0)