Skip to content

Commit 072c728

Browse files
committed
Support returning row count in prunning aggregate expressions
This lets us effectively prune expressions like IsNotNull Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent f152281 commit 072c728

13 files changed

Lines changed: 284 additions & 58 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust-toolchain.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[toolchain]
22
channel = "1.90.0"
33
components = ["rust-src", "rustfmt", "clippy", "rust-analyzer"]
4-
profile = "minimal"
4+
profile = "minimal"

vortex-array/public-api.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11324,6 +11324,8 @@ impl<K: core::hash::Hash + core::cmp::Eq, V: core::hash::Hash + core::cmp::Eq> c
1132411324

1132511325
pub fn vortex_array::expr::pruning::Relation<K, V>::default() -> Self
1132611326

11327+
pub const vortex_array::expr::pruning::ROW_COUNT_FIELD: &str
11328+
1132711329
pub trait vortex_array::expr::pruning::StatsCatalog
1132811330

1132911331
pub fn vortex_array::expr::pruning::StatsCatalog::stats_ref(&self, _field_path: &vortex_array::dtype::FieldPath, _stat: vortex_array::expr::stats::Stat) -> core::option::Option<vortex_array::expr::Expression>

vortex-array/src/expr/pruning/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
pub(crate) mod pruning_expr;
55
mod relation;
66

7+
pub use pruning_expr::ROW_COUNT_FIELD;
78
pub use pruning_expr::RequiredStats;
89
pub use pruning_expr::checked_pruning_expr;
910
pub use pruning_expr::field_path_stat_field_name;
@@ -18,7 +19,8 @@ pub trait StatsCatalog {
1819
/// Given a field path and statistic, return an expression that when evaluated over the catalog
1920
/// will return that stat for the referenced field.
2021
///
21-
/// This is likely to be a column expression, or a literal.
22+
/// This is likely to be a column expression, or a literal. Implementations may also expose
23+
/// synthetic scope-level values via reserved field paths such as [`ROW_COUNT_FIELD`].
2224
///
2325
/// Returns `None` if the stat is not available for the field path.
2426
fn stats_ref(&self, _field_path: &FieldPath, _stat: Stat) -> Option<Expression> {

vortex-array/src/expr/pruning/pruning_expr.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,17 @@ use crate::dtype::FieldPath;
1414
use crate::dtype::FieldPathSet;
1515
use crate::expr::Expression;
1616
use crate::expr::StatsCatalog;
17+
use crate::expr::col;
1718
use crate::expr::get_item;
1819
use crate::expr::root;
1920
use crate::expr::stats::Stat;
2021

2122
pub type RequiredStats = Relation<FieldPath, Stat>;
23+
pub const ROW_COUNT_FIELD: &str = "__vortex_row_count";
24+
25+
fn is_row_count_field(field_path: &FieldPath) -> bool {
26+
field_path.parts().len() == 1 && field_path.parts()[0].as_name() == Some(ROW_COUNT_FIELD)
27+
}
2228

2329
// A catalog that return a stat column whenever it is required, tracking all accessed
2430
// stats and returning them later.
@@ -43,6 +49,13 @@ struct ScopeStatsCatalog<'a> {
4349

4450
impl StatsCatalog for ScopeStatsCatalog<'_> {
4551
fn stats_ref(&self, field_path: &FieldPath, stat: Stat) -> Option<Expression> {
52+
if is_row_count_field(field_path) {
53+
return self
54+
.available_stats
55+
.contains(field_path)
56+
.then(|| col(ROW_COUNT_FIELD));
57+
}
58+
4659
let stat_path = field_path.clone().push(stat.name());
4760

4861
if self.available_stats.contains(&stat_path) {
@@ -55,6 +68,10 @@ impl StatsCatalog for ScopeStatsCatalog<'_> {
5568

5669
impl StatsCatalog for TrackingStatsCatalog {
5770
fn stats_ref(&self, field_path: &FieldPath, stat: Stat) -> Option<Expression> {
71+
if is_row_count_field(field_path) {
72+
return Some(col(ROW_COUNT_FIELD));
73+
}
74+
5875
let mut expr = root();
5976
let name = field_path_stat_field_name(field_path, stat);
6077
expr = get_item(name, expr);
@@ -86,6 +103,9 @@ pub fn field_path_stat_field_name(field_path: &FieldPath, stat: Stat) -> FieldNa
86103
/// cannot hold, and false if it cannot be determined from stats alone whether the positions can
87104
/// be pruned.
88105
///
106+
/// Include [`ROW_COUNT_FIELD`] in `available_stats` to make the scope row count available to
107+
/// row-count-aware pruning expressions such as `is_not_null(...)`.
108+
///
89109
/// If the falsification logic attempts to access an unknown stat,
90110
/// this function will return `None`.
91111
pub fn checked_pruning_expr(

vortex-array/src/scalar_fn/fns/is_not_null.rs

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,12 @@ use crate::ExecutionCtx;
1111
use crate::IntoArray;
1212
use crate::arrays::ConstantArray;
1313
use crate::dtype::DType;
14+
use crate::dtype::FieldPath;
1415
use crate::dtype::Nullability;
1516
use crate::expr::Expression;
1617
use crate::expr::StatsCatalog;
17-
use crate::expr::and;
1818
use crate::expr::eq;
19-
use crate::expr::gt;
20-
use crate::expr::lit;
19+
use crate::expr::pruning::ROW_COUNT_FIELD;
2120
use crate::expr::stats::Stat;
2221
use crate::scalar_fn::Arity;
2322
use crate::scalar_fn::ChildName;
@@ -106,20 +105,12 @@ impl ScalarFnVTable for IsNotNull {
106105
expr: &Expression,
107106
catalog: &dyn StatsCatalog,
108107
) -> Option<Expression> {
109-
// is_not_null is falsified when ALL values are null, i.e. null_count == len.
110-
// Since there is no len stat in the zone map, we approximate using IsConstant:
111-
// if the zone is constant and has any nulls, then all values must be null.
112-
//
113-
// TODO(#7187): Add a len stat to enable the more general falsification:
114-
// null_count == len => is_not_null is all false.
115-
let null_count_expr = expr.child(0).stat_expression(Stat::NullCount, catalog)?;
116-
let is_constant_expr = expr.child(0).stat_expression(Stat::IsConstant, catalog)?;
117-
// If the zone is constant (is_constant == true) and has nulls (null_count > 0),
118-
// then all values must be null, so is_not_null is all false.
119-
Some(and(
120-
eq(is_constant_expr, lit(true)),
121-
gt(null_count_expr, lit(0u64)),
122-
))
108+
// is_not_null is falsified when ALL values are null, i.e. null_count == row_count.
109+
let child = expr.child(0);
110+
let null_count_expr = child.stat_expression(Stat::NullCount, catalog)?;
111+
let row_count_expr =
112+
catalog.stats_ref(&FieldPath::from_name(ROW_COUNT_FIELD), Stat::NullCount)?;
113+
Some(eq(null_count_expr, row_count_expr))
123114
}
124115
}
125116

@@ -267,11 +258,9 @@ mod tests {
267258
use crate::dtype::Field;
268259
use crate::dtype::FieldPath;
269260
use crate::dtype::FieldPathSet;
270-
use crate::expr::and;
271261
use crate::expr::col;
272262
use crate::expr::eq;
273-
use crate::expr::gt;
274-
use crate::expr::lit;
263+
use crate::expr::pruning::ROW_COUNT_FIELD;
275264
use crate::expr::pruning::checked_pruning_expr;
276265
use crate::expr::stats::Stat;
277266

@@ -281,24 +270,18 @@ mod tests {
281270
&expr,
282271
&FieldPathSet::from_iter([
283272
FieldPath::from_iter([Field::Name("a".into()), Field::Name("null_count".into())]),
284-
FieldPath::from_iter([Field::Name("a".into()), Field::Name("is_constant".into())]),
273+
FieldPath::from_name(ROW_COUNT_FIELD),
285274
]),
286275
)
287276
.unwrap();
288277

289278
assert_eq!(
290279
&pruning_expr,
291-
&and(
292-
eq(col("a_is_constant"), lit(true)),
293-
gt(col("a_null_count"), lit(0u64)),
294-
)
280+
&eq(col("a_null_count"), col(ROW_COUNT_FIELD))
295281
);
296282
assert_eq!(
297283
st.map(),
298-
&HashMap::from_iter([(
299-
FieldPath::from_name("a"),
300-
HashSet::from([Stat::NullCount, Stat::IsConstant])
301-
)])
284+
&HashMap::from_iter([(FieldPath::from_name("a"), HashSet::from([Stat::NullCount]))])
302285
);
303286
}
304287
}

vortex-file/src/file.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use vortex_array::dtype::FieldMask;
1919
use vortex_array::dtype::FieldPath;
2020
use vortex_array::dtype::FieldPathSet;
2121
use vortex_array::expr::Expression;
22+
use vortex_array::expr::pruning::ROW_COUNT_FIELD;
2223
use vortex_array::expr::pruning::checked_pruning_expr;
2324
use vortex_error::VortexResult;
2425
use vortex_layout::LayoutReader;
@@ -139,7 +140,8 @@ impl VortexFile {
139140
Field::Name(stat.name().into()),
140141
])
141142
})
142-
}),
143+
})
144+
.chain(std::iter::once(FieldPath::from_name(ROW_COUNT_FIELD))),
143145
);
144146

145147
let Some((predicate, required_stats)) = checked_pruning_expr(filter, &set) else {
@@ -157,6 +159,7 @@ impl VortexFile {
157159
&required_file_stats,
158160
stats.stats_sets(),
159161
fields,
162+
self.footer.row_count(),
160163
)?
161164
else {
162165
return Ok(false);

vortex-file/src/pruning.rs

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,13 @@ use vortex_array::arrays::ConstantArray;
99
use vortex_array::arrays::StructArray;
1010
use vortex_array::dtype::Field;
1111
use vortex_array::dtype::FieldName;
12-
use vortex_array::dtype::FieldNames;
1312
use vortex_array::dtype::FieldPath;
1413
use vortex_array::dtype::StructFields;
14+
use vortex_array::expr::pruning::ROW_COUNT_FIELD;
1515
use vortex_array::expr::pruning::field_path_stat_field_name;
1616
use vortex_array::expr::stats::Stat;
1717
use vortex_array::expr::stats::StatsProvider;
1818
use vortex_array::stats::StatsSet;
19-
use vortex_array::validity::Validity;
2019
use vortex_error::VortexExpect;
2120
use vortex_error::VortexResult;
2221
use vortex_error::vortex_bail;
@@ -28,13 +27,14 @@ pub fn extract_relevant_file_stats_as_struct_row(
2827
access: &HashMap<FieldPath, HashSet<Stat>>,
2928
stats_sets: &Arc<[StatsSet]>,
3029
struct_dtype: &StructFields,
30+
row_count: u64,
3131
) -> VortexResult<Option<ArrayRef>> {
32-
if access.is_empty() {
33-
return StructArray::try_new(FieldNames::default(), vec![], 1, Validity::NonNullable)
34-
.map(|s| Some(s.into_array()));
35-
}
32+
let mut columns: Vec<(FieldName, ArrayRef)> = Vec::with_capacity(access.len() * 2 + 1);
33+
columns.push((
34+
FieldName::from(ROW_COUNT_FIELD),
35+
ConstantArray::new(row_count, 1).into_array(),
36+
));
3637

37-
let mut columns: Vec<(FieldName, ArrayRef)> = Vec::with_capacity(access.len() * 2);
3838
for (field_path, stats) in access.into_iter() {
3939
if field_path.parts().len() != 1 {
4040
return Ok(None);
@@ -76,3 +76,38 @@ pub fn extract_relevant_file_stats_as_struct_row(
7676
StructArray::from_fields(columns.as_slice())?.into_array(),
7777
))
7878
}
79+
80+
#[cfg(test)]
81+
mod tests {
82+
use vortex_array::LEGACY_SESSION;
83+
use vortex_array::VortexSessionExecute;
84+
use vortex_array::arrays::StructArray;
85+
use vortex_array::arrays::struct_::StructArrayExt;
86+
87+
use super::*;
88+
89+
#[test]
90+
fn includes_row_count_field_when_no_stats_are_needed() {
91+
let row = extract_relevant_file_stats_as_struct_row(
92+
&HashMap::default(),
93+
&Arc::default(),
94+
&StructFields::default(),
95+
128,
96+
)
97+
.unwrap()
98+
.unwrap()
99+
.execute::<StructArray>(&mut LEGACY_SESSION.create_execution_ctx())
100+
.unwrap();
101+
102+
assert_eq!(row.names().as_ref(), &[FieldName::from(ROW_COUNT_FIELD)]);
103+
assert_eq!(
104+
row.unmasked_field_by_name(ROW_COUNT_FIELD)
105+
.unwrap()
106+
.execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())
107+
.unwrap()
108+
.as_primitive()
109+
.typed_value::<u64>(),
110+
Some(128),
111+
);
112+
}
113+
}

vortex-file/src/v2/file_stats_reader.rs

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use vortex_array::dtype::StructFields;
2323
use vortex_array::expr::Expression;
2424
use vortex_array::expr::StatsCatalog;
2525
use vortex_array::expr::lit;
26+
use vortex_array::expr::pruning::ROW_COUNT_FIELD;
2627
use vortex_array::expr::stats::Stat;
2728
use vortex_array::scalar::Scalar;
2829
use vortex_array::scalar_fn::fns::literal::Literal;
@@ -115,6 +116,11 @@ impl FileStatsLayoutReader {
115116
/// Implements [`StatsCatalog`] to provide file-level stats to expressions during pruning evaluation.
116117
impl StatsCatalog for FileStatsLayoutReader {
117118
fn stats_ref(&self, field_path: &FieldPath, stat: Stat) -> Option<Expression> {
119+
if field_path.parts().len() == 1 && field_path.parts()[0].as_name() == Some(ROW_COUNT_FIELD)
120+
{
121+
return Some(lit(self.child.row_count()));
122+
}
123+
118124
// FileStats currently only holds top-level field statistics.
119125
if field_path.parts().len() != 1 {
120126
return None;
@@ -126,7 +132,8 @@ impl StatsCatalog for FileStatsLayoutReader {
126132

127133
let stat_value = field_stats.get(stat)?.as_exact()?;
128134
let field_dtype = self.struct_fields.field_by_index(field_idx)?;
129-
let stat_scalar = Scalar::try_new(field_dtype, Some(stat_value)).ok()?;
135+
let stat_dtype = stat.dtype(&field_dtype)?;
136+
let stat_scalar = Scalar::try_new(stat_dtype, Some(stat_value)).ok()?;
130137

131138
Some(lit(stat_scalar))
132139
}
@@ -209,12 +216,14 @@ mod tests {
209216

210217
use vortex_array::ArrayContext;
211218
use vortex_array::IntoArray as _;
219+
use vortex_array::arrays::PrimitiveArray;
212220
use vortex_array::arrays::StructArray;
213221
use vortex_array::dtype::DType;
214222
use vortex_array::dtype::Nullability;
215223
use vortex_array::dtype::PType;
216224
use vortex_array::expr::get_item;
217225
use vortex_array::expr::gt;
226+
use vortex_array::expr::is_not_null;
218227
use vortex_array::expr::lit;
219228
use vortex_array::expr::root;
220229
use vortex_array::expr::stats::Precision;
@@ -259,6 +268,18 @@ mod tests {
259268
)
260269
}
261270

271+
fn test_file_null_count_stats(null_count: u64) -> FileStatistics {
272+
let mut stats = StatsSet::default();
273+
stats.set(
274+
Stat::NullCount,
275+
Precision::exact(ScalarValue::from(null_count)),
276+
);
277+
FileStatistics::new(
278+
Arc::from([stats]),
279+
Arc::from([DType::Primitive(PType::I32, Nullability::Nullable)]),
280+
)
281+
}
282+
262283
#[test]
263284
fn pruning_when_filter_out_of_range() -> VortexResult<()> {
264285
block_on(|handle| async {
@@ -337,4 +358,47 @@ mod tests {
337358
Ok(())
338359
})
339360
}
361+
362+
#[test]
363+
fn pruning_is_not_null_when_file_is_all_null() -> VortexResult<()> {
364+
block_on(|handle| async {
365+
let session = SESSION.clone().with_handle(handle);
366+
let ctx = ArrayContext::empty();
367+
let segments = Arc::new(TestSegments::default());
368+
let (ptr, eof) = SequenceId::root().split();
369+
let struct_array = StructArray::from_fields(
370+
[(
371+
"col",
372+
PrimitiveArray::from_option_iter([None::<i32>, None, None, None, None])
373+
.into_array(),
374+
)]
375+
.as_slice(),
376+
)?;
377+
let strategy = TableStrategy::new(
378+
Arc::new(FlatLayoutStrategy::default()),
379+
Arc::new(FlatLayoutStrategy::default()),
380+
);
381+
let layout = strategy
382+
.write_stream(
383+
ctx,
384+
Arc::<TestSegments>::clone(&segments),
385+
struct_array.into_array().to_array_stream().sequenced(ptr),
386+
eof,
387+
&session,
388+
)
389+
.await?;
390+
391+
let child = layout.new_reader("".into(), segments, &SESSION)?;
392+
393+
let reader =
394+
FileStatsLayoutReader::new(child, test_file_null_count_stats(5), SESSION.clone());
395+
396+
let expr = is_not_null(get_item("col", root()));
397+
let mask = Mask::new_true(5);
398+
let result = reader.pruning_evaluation(&(0..5), &expr, mask)?.await?;
399+
assert_eq!(result, Mask::new_false(5));
400+
401+
Ok(())
402+
})
403+
}
340404
}

vortex-layout/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ vortex-flatbuffers = { workspace = true, features = ["layout"] }
4747
vortex-io = { workspace = true }
4848
vortex-mask = { workspace = true }
4949
vortex-metrics = { workspace = true }
50+
vortex-runend = { workspace = true }
5051
vortex-scan = { workspace = true }
5152
vortex-sequence = { workspace = true }
5253
vortex-session = { workspace = true }

0 commit comments

Comments
 (0)