Skip to content

Commit 5959b37

Browse files
committed
wip
Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent 8150d5a commit 5959b37

2 files changed

Lines changed: 57 additions & 61 deletions

File tree

vortex-array/src/arrays/filter/vtable.rs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::buffer::BufferHandle;
3737
use crate::dtype::DType;
3838
use crate::executor::ExecutionCtx;
3939
use crate::executor::ExecutionResult;
40+
use crate::require_child;
4041
use crate::scalar::Scalar;
4142
use crate::serde::ArrayChildren;
4243
use crate::validity::Validity;
@@ -153,20 +154,14 @@ impl VTable for Filter {
153154
_ => unreachable!("`execute_filter_fast_paths` handles AllTrue and AllFalse"),
154155
};
155156

156-
if array.child().is_canonical() {
157-
// If the child is already canonical, apply the filter directly.
158-
// TODO(joe): fix the ownership of AnyCanonical
159-
let child = Canonical::from(array.child().as_::<AnyCanonical>());
160-
return Ok(ExecutionResult::done(
161-
execute_filter(child, &mask_values).into_array(),
162-
));
163-
}
157+
let array = require_child!(array, array.child(), CHILD_SLOT => AnyCanonical);
164158

165-
// Execute one step and re-wrap rather than using require_child!(=> AnyCanonical),
166-
// which would decode past intermediate encodings that have their own FilterKernels.
167-
let child = array.child().clone().execute::<ArrayRef>(ctx)?;
159+
// We rely on the optimization pass that runs prior to this execution for filter pushdown,
160+
// so now we can just execute the filter without worrying.
161+
// TODO(joe): fix the ownership of AnyCanonical
162+
let child = Canonical::from(array.child().as_::<AnyCanonical>());
168163
Ok(ExecutionResult::done(
169-
child.filter(Mask::Values(mask_values))?,
164+
execute_filter(child, &mask_values).into_array(),
170165
))
171166
}
172167

vortex-array/src/executor.rs

Lines changed: 50 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::env::VarError;
2121
use std::fmt;
2222
use std::fmt::Display;
2323
use std::sync::LazyLock;
24+
#[cfg(debug_assertions)]
2425
use std::sync::atomic::AtomicUsize;
2526

2627
use vortex_error::VortexExpect;
@@ -83,10 +84,18 @@ impl ArrayRef {
8384
/// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
8485
/// stack.
8586
///
86-
/// The scheduler repeatedly:
87-
/// 1. Checks if the current array matches `M` — if so, pops the stack or returns.
88-
/// 2. Runs `execute_parent` on each child for child-driven optimizations.
89-
/// 3. Calls `execute` which returns an [`ExecutionStep`].
87+
/// Each iteration proceeds through three steps in order:
88+
///
89+
/// 1. **Done / canonical check** — if `current` satisfies the active done predicate or is
90+
/// canonical, splice it back into the stacked parent (if any) and continue, or return.
91+
/// 2. **`execute_parent` on children** — try each child's `execute_parent` against `current`
92+
/// as the parent (e.g. `Filter(RunEnd)` → `FilterExecuteAdaptor` fires from RunEnd).
93+
/// If there is a stacked parent frame, the rewritten child is spliced back into it so
94+
/// that optimize and further `execute_parent` can fire on the reconstructed parent
95+
/// (e.g. `Slice(RunEnd)` → `RunEnd` spliced into stacked `Filter` → `Filter(RunEnd)`
96+
/// whose `FilterExecuteAdaptor` fires on the next iteration).
97+
/// 3. **`execute`** — call the encoding's own execute step, which either returns `Done` or
98+
/// `ExecuteSlot(i)` to push a child onto the stack for focused execution.
9099
///
91100
/// Note: the returned array may not match `M`. If execution converges to a canonical form
92101
/// that does not match `M`, the canonical array is returned since no further execution
@@ -95,67 +104,46 @@ impl ArrayRef {
95104
/// For safety, we will error when the number of execution iterations reaches a configurable
96105
/// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`).
97106
pub fn execute_until<M: Matcher>(self, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
98-
static MAX_ITERATIONS: LazyLock<usize> =
99-
LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
100-
Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
101-
vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
102-
}),
103-
Err(VarError::NotPresent) => 128,
104-
Err(VarError::NotUnicode(_)) => {
105-
vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
106-
}
107-
});
108-
109107
let mut current = self.optimize()?;
110108
// Stack frames: (parent, slot_idx, done_predicate_for_slot)
111109
let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
112110

113111
for _ in 0..*MAX_ITERATIONS {
114-
// Check for termination: use the stack frame's done predicate, or the root matcher.
112+
// Step 1: done / canonical — splice back into stacked parent or return.
115113
let is_done = stack
116114
.last()
117115
.map_or(M::matches as DonePredicate, |frame| frame.2);
118-
if is_done(&current) {
116+
if is_done(&current) || AnyCanonical::matches(&current) {
119117
match stack.pop() {
120118
None => {
121119
ctx.log(format_args!("-> {}", current));
122120
return Ok(current);
123121
}
124122
Some((parent, slot_idx, _)) => {
125-
current = parent.with_slot(slot_idx, current)?;
126-
current = current.optimize()?;
127-
continue;
128-
}
129-
}
130-
}
131-
132-
// If we've reached canonical form, we can't execute any further regardless
133-
// of whether the matcher matched.
134-
if AnyCanonical::matches(&current) {
135-
match stack.pop() {
136-
None => {
137-
ctx.log(format_args!("-> canonical (unmatched) {}", current));
138-
return Ok(current);
139-
}
140-
Some((parent, slot_idx, _)) => {
141-
current = parent.with_slot(slot_idx, current)?;
142-
current = current.optimize()?;
123+
current = parent.with_slot(slot_idx, current)?.optimize()?;
143124
continue;
144125
}
145126
}
146127
}
147128

148-
// Try execute_parent (child-driven optimized execution)
129+
// Step 2: execute_parent on children (current is the parent).
130+
// If there is a stacked parent frame, splice the rewritten child back into it
131+
// so that optimize and execute_parent can fire naturally on the reconstructed parent
132+
// (e.g. Slice(RunEnd) -RunEndSliceKernel-> RunEnd, spliced back into Filter gives
133+
// Filter(RunEnd), whose FilterExecuteAdaptor fires on the next iteration).
149134
if let Some(rewritten) = try_execute_parent(&current, ctx)? {
150135
ctx.log(format_args!(
151136
"execute_parent rewrote {} -> {}",
152137
current, rewritten
153138
));
154139
current = rewritten.optimize()?;
140+
if let Some((parent, slot_idx, _)) = stack.pop() {
141+
current = parent.with_slot(slot_idx, current)?.optimize()?;
142+
}
155143
continue;
156144
}
157145

158-
// Execute the array itself.
146+
// Step 4: execute the encoding's own step.
159147
let result = execute_step(current, ctx)?;
160148
let (array, step) = result.into_parts();
161149
match step {
@@ -186,23 +174,30 @@ impl ArrayRef {
186174

187175
/// Execution context for batch CPU compute.
188176
///
189-
/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
190-
/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
177+
/// In debug builds, accumulates a trace of execution steps: individual steps are logged at TRACE
178+
/// level for real-time following, and the full trace is dumped at DEBUG level on drop.
179+
/// In release builds the tracing fields are compiled out entirely — `log` is a zero-overhead
180+
/// no-op and the struct holds only the session.
191181
#[derive(Debug, Clone)]
192182
pub struct ExecutionCtx {
193-
id: usize,
194183
session: VortexSession,
184+
#[cfg(debug_assertions)]
185+
id: usize,
186+
#[cfg(debug_assertions)]
195187
ops: Vec<String>,
196188
}
197189

198190
impl ExecutionCtx {
199191
/// Create a new execution context with the given session.
200192
pub fn new(session: VortexSession) -> Self {
201-
static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
202-
let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
203193
Self {
204-
id,
205194
session,
195+
#[cfg(debug_assertions)]
196+
id: {
197+
static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
198+
EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
199+
},
200+
#[cfg(debug_assertions)]
206201
ops: Vec::new(),
207202
}
208203
}
@@ -217,15 +212,17 @@ impl ExecutionCtx {
217212
self.session.allocator()
218213
}
219214

220-
/// Log an execution step at the current depth.
215+
/// Log an execution step.
221216
///
222-
/// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
223-
/// Individual steps are also logged at TRACE level for real-time following.
217+
/// Compiled out entirely in release builds. In debug builds, steps are accumulated and
218+
/// dumped as a single trace on drop at DEBUG level, and logged individually at TRACE level.
224219
///
225220
/// Use the [`format_args!`] macro to create the `msg` argument.
226-
pub fn log(&mut self, msg: fmt::Arguments<'_>) {
221+
#[inline]
222+
pub fn log(&mut self, _msg: fmt::Arguments<'_>) {
223+
#[cfg(debug_assertions)]
227224
if tracing::enabled!(tracing::Level::DEBUG) {
228-
let formatted = format!(" - {msg}");
225+
let formatted = format!(" - {_msg}");
229226
tracing::trace!("exec[{}]: {formatted}", self.id);
230227
self.ops.push(formatted);
231228
}
@@ -234,12 +231,16 @@ impl ExecutionCtx {
234231

235232
impl Display for ExecutionCtx {
236233
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
237-
write!(f, "exec[{}]", self.id)
234+
#[cfg(debug_assertions)]
235+
return write!(f, "exec[{}]", self.id);
236+
#[cfg(not(debug_assertions))]
237+
write!(f, "exec")
238238
}
239239
}
240240

241241
impl Drop for ExecutionCtx {
242242
fn drop(&mut self) {
243+
#[cfg(debug_assertions)]
243244
if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
244245
// Unlike itertools `.format()` (panics in 0.14 on second format)
245246
struct FmtOps<'a>(&'a [String]);

0 commit comments

Comments
 (0)