Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions encodings/runend/src/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ fn filter_run_end_primitive<R: NativePType + AddAssign + From<bool> + AsPrimitiv
#[cfg(test)]
mod tests {
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::assert_arrays_eq;
use vortex_error::VortexResult;
Expand Down Expand Up @@ -142,4 +144,43 @@ mod tests {
);
Ok(())
}

/// Regression: Filter(Slice(RunEnd)) must preserve RunEnd after execution.
/// Previously Filter.execute() forced its child to canonical, decoding
/// Slice(RunEnd) → Primitive and destroying run structure. The fix lets
/// Filter unwrap one layer at a time so RunEnd's FilterKernel can fire.
#[test]
fn filter_sliced_run_end_preserves_encoding() -> VortexResult<()> {
// 4 runs of 32 each = 128 rows. Large enough that FilterKernel takes
// the run-preserving path (true_count >= 25).
let values: Vec<i32> = [10, 20, 30, 40]
.iter()
.flat_map(|&v| std::iter::repeat_n(v, 32))
.collect();
let arr = RunEnd::encode(PrimitiveArray::from_iter(values).into_array())?;

// Slice off the first 16 rows. Slice(RunEnd), 112 rows, 4 runs.
let sliced = arr.into_array().slice(16..128)?;

// Keep every other row = 112/2 = 56 rows.
let mask = Mask::from_iter((0..sliced.len()).map(|i| i % 2 == 0));
let filtered = sliced.filter(mask)?;

let mut ctx = LEGACY_SESSION.create_execution_ctx();
let executed = filtered.execute_until::<RunEnd>(&mut ctx)?;
assert_eq!(
executed.encoding_id().as_ref(),
"vortex.runend",
"Filter(Slice(RunEnd)) should preserve RunEnd encoding"
);

let expected: Vec<i32> = std::iter::repeat_n(10, 8)
.chain(std::iter::repeat_n(20, 16))
.chain(std::iter::repeat_n(30, 16))
.chain(std::iter::repeat_n(40, 16))
.collect();
assert_arrays_eq!(executed, PrimitiveArray::from_iter(expected));

Ok(())
}
}
19 changes: 12 additions & 7 deletions vortex-array/src/arrays/filter/vtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::buffer::BufferHandle;
use crate::dtype::DType;
use crate::executor::ExecutionCtx;
use crate::executor::ExecutionResult;
use crate::require_child;
use crate::scalar::Scalar;
use crate::serde::ArrayChildren;
use crate::validity::Validity;
Expand Down Expand Up @@ -154,14 +153,20 @@ impl VTable for Filter {
_ => unreachable!("`execute_filter_fast_paths` handles AllTrue and AllFalse"),
};

let array = require_child!(array, array.child(), CHILD_SLOT => AnyCanonical);
if array.child().is_canonical() {
// If the child is already canonical, apply the filter directly.
// TODO(joe): fix the ownership of AnyCanonical
let child = Canonical::from(array.child().as_::<AnyCanonical>());
return Ok(ExecutionResult::done(
execute_filter(child, &mask_values).into_array(),
));
}

// We rely on the optimization pass that runs prior to this execution for filter pushdown,
// so now we can just execute the filter without worrying.
// TODO(joe): fix the ownership of AnyCanonical
let child = Canonical::from(array.child().as_::<AnyCanonical>());
// Execute one step and re-wrap rather than using require_child!(=> AnyCanonical),
// which would decode past intermediate encodings that have their own FilterKernels.
let child = array.child().clone().execute::<ArrayRef>(ctx)?;
Ok(ExecutionResult::done(
execute_filter(child, &mask_values).into_array(),
child.filter(Mask::Values(mask_values))?,
))
}

Expand Down
Loading