Skip to content

Commit 07bfcdc

Browse files
committed
Use chunked execute_parent for stat expressions
Signed-off-by: "Nicholas Gates" <nick@nickgates.com>
1 parent 7349cd6 commit 07bfcdc

4 files changed

Lines changed: 71 additions & 23 deletions

File tree

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,77 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use vortex_error::VortexExpect;
5+
use vortex_error::VortexResult;
6+
7+
use crate::ArrayRef;
8+
use crate::ExecutionCtx;
9+
use crate::IntoArray;
10+
use crate::array::ArrayView;
411
use crate::arrays::Chunked;
12+
use crate::arrays::ChunkedArray;
13+
use crate::arrays::ScalarFn;
14+
use crate::arrays::ScalarFnArray;
15+
use crate::arrays::chunked::ChunkedArrayExt;
516
use crate::arrays::dict::TakeExecuteAdaptor;
617
use crate::arrays::filter::FilterExecuteAdaptor;
18+
use crate::arrays::scalar_fn::ExactScalarFn;
19+
use crate::arrays::scalar_fn::ScalarFnArrayExt;
20+
use crate::arrays::scalar_fn::ScalarFnArrayView;
721
use crate::arrays::slice::SliceExecuteAdaptor;
22+
use crate::kernel::ExecuteParentKernel;
823
use crate::kernel::ParentKernelSet;
924
use crate::scalar_fn::fns::mask::MaskExecuteAdaptor;
25+
use crate::scalar_fn::fns::stat::StatFn;
1026
use crate::scalar_fn::fns::zip::ZipExecuteAdaptor;
1127

1228
pub(crate) static PARENT_KERNELS: ParentKernelSet<Chunked> = ParentKernelSet::new(&[
29+
ParentKernelSet::lift(&ChunkedStatExecuteParentKernel),
1330
ParentKernelSet::lift(&FilterExecuteAdaptor(Chunked)),
1431
ParentKernelSet::lift(&MaskExecuteAdaptor(Chunked)),
1532
ParentKernelSet::lift(&SliceExecuteAdaptor(Chunked)),
1633
ParentKernelSet::lift(&TakeExecuteAdaptor(Chunked)),
1734
ParentKernelSet::lift(&ZipExecuteAdaptor(Chunked)),
1835
]);
36+
37+
#[derive(Debug)]
38+
struct ChunkedStatExecuteParentKernel;
39+
40+
impl ExecuteParentKernel<Chunked> for ChunkedStatExecuteParentKernel {
41+
type Parent = ExactScalarFn<StatFn>;
42+
43+
fn execute_parent(
44+
&self,
45+
array: ArrayView<'_, Chunked>,
46+
parent: ScalarFnArrayView<'_, StatFn>,
47+
child_idx: usize,
48+
_ctx: &mut ExecutionCtx,
49+
) -> VortexResult<Option<ArrayRef>> {
50+
if child_idx != 0 {
51+
return Ok(None);
52+
}
53+
54+
tracing::trace!(
55+
"stat({}) descending into ChunkedArray with {} chunks",
56+
parent.options.aggregate_fn(),
57+
array.nchunks()
58+
);
59+
60+
let scalar_fn = parent
61+
.as_opt::<ScalarFn>()
62+
.vortex_expect("ExactScalarFn matcher confirmed ScalarFnArray")
63+
.scalar_fn()
64+
.clone();
65+
let chunks = array
66+
.iter_chunks()
67+
.map(|chunk| {
68+
ScalarFnArray::try_new(scalar_fn.clone(), vec![chunk.clone()], chunk.len())
69+
.map(IntoArray::into_array)
70+
})
71+
.collect::<VortexResult<Vec<_>>>()?;
72+
73+
Ok(Some(
74+
ChunkedArray::try_new(chunks, parent.dtype().clone())?.into_array(),
75+
))
76+
}
77+
}

vortex-array/src/arrays/chunked/compute/rules.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::optimizer::rules::ArrayParentReduceRule;
2121
use crate::optimizer::rules::ParentRuleSet;
2222
use crate::scalar_fn::fns::cast::CastReduceAdaptor;
2323
use crate::scalar_fn::fns::fill_null::FillNullReduceAdaptor;
24+
use crate::scalar_fn::fns::stat::StatFn;
2425

2526
pub(crate) const PARENT_RULES: ParentRuleSet<Chunked> = ParentRuleSet::new(&[
2627
ParentRuleSet::lift(&CastReduceAdaptor(Chunked)),
@@ -44,6 +45,9 @@ impl ArrayParentReduceRule<Chunked> for ChunkedUnaryScalarFnPushDownRule {
4445
if parent.nchildren() != 1 {
4546
return Ok(None);
4647
}
48+
if parent.scalar_fn().is::<StatFn>() {
49+
return Ok(None);
50+
}
4751

4852
let new_chunks: Vec<_> = array
4953
.iter_chunks()
@@ -76,6 +80,10 @@ impl ArrayParentReduceRule<Chunked> for ChunkedConstantScalarFnPushDownRule {
7680
parent: ArrayView<'_, ScalarFn>,
7781
child_idx: usize,
7882
) -> VortexResult<Option<ArrayRef>> {
83+
if parent.scalar_fn().is::<StatFn>() {
84+
return Ok(None);
85+
}
86+
7987
for (idx, child) in parent.iter_children().enumerate() {
8088
if idx == child_idx {
8189
continue;

vortex-array/src/scalar_fn/fns/stat.rs

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,7 @@ use crate::ArrayRef;
1313
use crate::ExecutionCtx;
1414
use crate::IntoArray;
1515
use crate::aggregate_fn::AggregateFnRef;
16-
use crate::arrays::Chunked;
17-
use crate::arrays::ChunkedArray;
1816
use crate::arrays::ConstantArray;
19-
use crate::arrays::chunked::ChunkedArrayExt;
2017
use crate::dtype::DType;
2118
use crate::expr::Expression;
2219
use crate::expr::stats::StatsProvider;
@@ -115,26 +112,6 @@ impl ScalarFnVTable for StatFn {
115112
) -> VortexResult<ArrayRef> {
116113
let input = args.get(0)?;
117114
let dtype = stat_dtype(options.aggregate_fn(), input.dtype())?;
118-
119-
// Recurse into each chunk so the output keeps per-chunk granularity (one constant
120-
// per chunk) instead of collapsing to a single combined stat across the whole array.
121-
// Per-chunk granularity is what makes the result useful for pruning: predicates can
122-
// drop whole chunks at a time. Without this, we'd lose every chunk boundary as a
123-
// pruning opportunity. Other encodings (zone maps, etc.) will need similar
124-
// structure-preserving handling once they land.
125-
if let Some(chunked) = input.as_opt::<Chunked>() {
126-
tracing::trace!(
127-
"stat({}) descending into ChunkedArray with {} chunks",
128-
options.aggregate_fn(),
129-
chunked.nchunks()
130-
);
131-
let chunks = chunked
132-
.iter_chunks()
133-
.map(|chunk| stat_array(chunk, options.aggregate_fn(), dtype.clone(), chunk.len()))
134-
.collect::<VortexResult<Vec<_>>>()?;
135-
return Ok(ChunkedArray::try_new(chunks, dtype)?.into_array());
136-
}
137-
138115
stat_array(&input, options.aggregate_fn(), dtype, args.row_count())
139116
}
140117
}

vortex-array/src/stats/expr.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ mod tests {
2424
use vortex_error::VortexResult;
2525

2626
use super::stat;
27+
use crate::ArrayRef;
2728
use crate::Canonical;
2829
use crate::IntoArray;
2930
use crate::LEGACY_SESSION;
@@ -102,6 +103,9 @@ mod tests {
102103
.into_array();
103104

104105
let result = chunked.apply(&stat(root(), AggregateFn::new(Sum, EmptyOptions).erased()))?;
106+
assert!(result.as_opt::<Chunked>().is_none());
107+
108+
let result = result.execute::<ArrayRef>(&mut LEGACY_SESSION.create_execution_ctx())?;
105109

106110
let chunked_result = result
107111
.as_opt::<Chunked>()

0 commit comments

Comments
 (0)