Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions encodings/runend/src/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ fn filter_run_end_primitive<R: NativePType + AddAssign + From<bool> + AsPrimitiv
#[cfg(test)]
mod tests {
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::assert_arrays_eq;
use vortex_error::VortexResult;
Expand Down Expand Up @@ -142,4 +144,43 @@ mod tests {
);
Ok(())
}

/// Regression: Filter(Slice(RunEnd)) must preserve RunEnd after execution.
/// Previously Filter.execute() forced its child to canonical, decoding
/// Slice(RunEnd) → Primitive and destroying run structure. The fix lets
/// Filter unwrap one layer at a time so RunEnd's FilterKernel can fire.
#[test]
fn filter_sliced_run_end_preserves_encoding() -> VortexResult<()> {
// 4 runs of 32 each = 128 rows. Large enough that FilterKernel takes
// the run-preserving path (true_count >= 25).
let values: Vec<i32> = [10, 20, 30, 40]
.iter()
.flat_map(|&v| std::iter::repeat_n(v, 32))
.collect();
let arr = RunEnd::encode(PrimitiveArray::from_iter(values).into_array())?;

// Slice off the first 16 rows. Slice(RunEnd), 112 rows, 4 runs.
let sliced = arr.into_array().slice(16..128)?;

// Keep every other row = 112/2 = 56 rows.
let mask = Mask::from_iter((0..sliced.len()).map(|i| i % 2 == 0));
let filtered = sliced.filter(mask)?;

let mut ctx = LEGACY_SESSION.create_execution_ctx();
let executed = filtered.execute_until::<RunEnd>(&mut ctx)?;
assert_eq!(
executed.encoding_id().as_ref(),
"vortex.runend",
"Filter(Slice(RunEnd)) should preserve RunEnd encoding"
);

let expected: Vec<i32> = std::iter::repeat_n(10, 8)
.chain(std::iter::repeat_n(20, 16))
.chain(std::iter::repeat_n(30, 16))
.chain(std::iter::repeat_n(40, 16))
.collect();
assert_arrays_eq!(executed, PrimitiveArray::from_iter(expected));

Ok(())
}
}
55 changes: 25 additions & 30 deletions vortex-array/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,18 @@ impl ArrayRef {
/// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
/// stack.
///
/// The scheduler repeatedly:
/// 1. Checks if the current array matches `M` — if so, pops the stack or returns.
/// 2. Runs `execute_parent` on each child for child-driven optimizations.
/// 3. Calls `execute` which returns an [`ExecutionStep`].
/// Each iteration proceeds through three steps in order:
///
/// 1. **Done / canonical check** — if `current` satisfies the active done predicate or is
/// canonical, splice it back into the stacked parent (if any) and continue, or return.
/// 2. **`execute_parent` on children** — try each child's `execute_parent` against `current`
/// as the parent (e.g. `Filter(RunEnd)` → `FilterExecuteAdaptor` fires from RunEnd).
/// If there is a stacked parent frame, the rewritten child is spliced back into it so
/// that optimize and further `execute_parent` can fire on the reconstructed parent
/// (e.g. `Slice(RunEnd)` → `RunEnd` spliced into stacked `Filter` → `Filter(RunEnd)`
/// whose `FilterExecuteAdaptor` fires on the next iteration).
/// 3. **`execute`** — call the encoding's own execute step, which either returns `Done` or
/// `ExecuteSlot(i)` to push a child onto the stack for focused execution.
///
/// Note: the returned array may not match `M`. If execution converges to a canonical form
/// that does not match `M`, the canonical array is returned since no further execution
Expand All @@ -103,51 +111,41 @@ impl ArrayRef {
let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();

for _ in 0..max_iterations() {
// Check for termination: use the stack frame's done predicate, or the root matcher.
// Step 1: done / canonical — splice back into stacked parent or return.
let is_done = stack
.last()
.map_or(M::matches as DonePredicate, |frame| frame.2);
if is_done(&current) {
if is_done(&current) || AnyCanonical::matches(&current) {
match stack.pop() {
None => {
ctx.log(format_args!("-> {}", current));
return Ok(current);
}
Some((parent, slot_idx, _)) => {
current = parent.with_slot(slot_idx, current)?;
current = current.optimize()?;
current = parent.with_slot(slot_idx, current)?.optimize()?;
continue;
}
}
}

// If we've reached canonical form, we can't execute any further regardless
// of whether the matcher matched.
if AnyCanonical::matches(&current) {
match stack.pop() {
None => {
ctx.log(format_args!("-> canonical (unmatched) {}", current));
return Ok(current);
}
Some((parent, slot_idx, _)) => {
current = parent.with_slot(slot_idx, current)?;
current = current.optimize()?;
continue;
}
}
}

// Try execute_parent (child-driven optimized execution)
// Step 2: execute_parent on children (current is the parent).
// If there is a stacked parent frame, splice the rewritten child back into it
// so that optimize and execute_parent can fire naturally on the reconstructed parent
// (e.g. Slice(RunEnd) -RunEndSliceKernel-> RunEnd, spliced back into Filter gives
// Filter(RunEnd), whose FilterExecuteAdaptor fires on the next iteration).
if let Some(rewritten) = try_execute_parent(&current, ctx)? {
ctx.log(format_args!(
"execute_parent rewrote {} -> {}",
current, rewritten
));
current = rewritten.optimize()?;
if let Some((parent, slot_idx, _)) = stack.pop() {
current = parent.with_slot(slot_idx, current)?.optimize()?;
}
continue;
}

// Execute the array itself.
// Step 4: execute the encoding's own step.
let result = execute_step(current, ctx)?;
let (array, step) = result.into_parts();
match step {
Expand Down Expand Up @@ -177,9 +175,6 @@ impl ArrayRef {
}

/// Execution context for batch CPU compute.
///
/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
#[derive(Debug, Clone)]
pub struct ExecutionCtx {
id: usize,
Expand All @@ -193,8 +188,8 @@ impl ExecutionCtx {
static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self {
id,
session,
id,
ops: Vec::new(),
}
}
Expand Down
Loading