diff --git a/vortex-array/src/array/erased.rs b/vortex-array/src/array/erased.rs index 84367076314..3046e2ba57c 100644 --- a/vortex-array/src/array/erased.rs +++ b/vortex-array/src/array/erased.rs @@ -431,6 +431,56 @@ impl ArrayRef { self.with_slots(slots) } + /// Take a slot for executor-owned physical rewrites. This has the result that the array may + /// either be taken or cloned from the parent. + /// + /// The array can be put back with [`put_slot_unchecked`]. + /// + /// # Safety + /// The caller must put back a slot with the same logical dtype and length before exposing the + /// parent array, and must only use this for physical rewrites. + pub(crate) unsafe fn take_slot_unchecked( + mut self, + slot_idx: usize, + ) -> VortexResult<(ArrayRef, ArrayRef)> { + let child = if let Some(inner) = Arc::get_mut(&mut self.0) { + // # Safety: ensured by the caller. + unsafe { inner.slots_mut()[slot_idx].take() } + .vortex_expect("take_slot_unchecked cannot take an absent slot") + } else { + self.slots()[slot_idx] + .as_ref() + .vortex_expect("take_slot_unchecked cannot take an absent slot") + .clone() + }; + + Ok((self, child)) + } + + /// Puts an array into `slot_idx` by either, cloning the inner array if the Arc is not exclusive + /// or replacing the slot in this `ArrayRef`. + /// This is the mirror of [`take_slot_unchecked`]. + /// + /// # Safety + /// The replacement must have the same logical dtype and length as the taken slot, and this + /// must only be used for physical rewrites. + pub(crate) unsafe fn put_slot_unchecked( + mut self, + slot_idx: usize, + replacement: ArrayRef, + ) -> VortexResult { + if let Some(inner) = Arc::get_mut(&mut self.0) { + // # Safety: ensured by the caller. + unsafe { inner.slots_mut()[slot_idx] = Some(replacement) }; + return Ok(self); + } + + let mut slots = self.slots().to_vec(); + slots[slot_idx] = Some(replacement); + let inner = Arc::clone(&self.0); + inner.with_slots(self, slots) + } + /// Returns a new array with the provided slots. /// /// This is only valid for physical rewrites: slot count, presence, logical `DType`, and @@ -611,6 +661,7 @@ impl Matcher for V { fn try_match<'a>(array: &'a ArrayRef) -> Option> { let inner = array.0.as_any().downcast_ref::>()?; + // # Safety checked by `downcast_ref`. Some(unsafe { ArrayView::new_unchecked(array, &inner.data) }) } } diff --git a/vortex-array/src/array/mod.rs b/vortex-array/src/array/mod.rs index 8193c128b1e..888e7bf6893 100644 --- a/vortex-array/src/array/mod.rs +++ b/vortex-array/src/array/mod.rs @@ -68,6 +68,13 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug { /// Returns the slots of the array. fn slots(&self) -> &[Option]; + /// Returns mutable slots of the array. + /// + /// # Safety: any slot (Some(child)) that replaces an existing slot must have a compatible + /// DType and length. Currently compatible means equal, but there is no reason why that must + /// be the case. + unsafe fn slots_mut(&mut self) -> &mut [Option]; + /// Returns the encoding ID of the array. fn encoding_id(&self) -> ArrayId; @@ -212,6 +219,10 @@ impl DynArray for ArrayInner { &self.slots } + unsafe fn slots_mut(&mut self) -> &mut [Option] { + &mut self.slots + } + fn encoding_id(&self) -> ArrayId { self.vtable.id() } diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index 8e0c86ff44f..4e2b2dfce53 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -33,6 +33,7 @@ use crate::AnyCanonical; use crate::ArrayRef; use crate::Canonical; use crate::IntoArray; +use crate::dtype::DType; use crate::matcher::Matcher; use crate::memory::HostAllocatorRef; use crate::memory::MemorySessionExt; @@ -107,22 +108,21 @@ impl ArrayRef { /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`). pub fn execute_until(self, ctx: &mut ExecutionCtx) -> VortexResult { let mut current = self.optimize()?; - // Stack frames: (parent, slot_idx, done_predicate_for_slot) - let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new(); + let mut stack: Vec = Vec::new(); for _ in 0..max_iterations() { // Step 1: done / canonical — splice back into stacked parent or return. let is_done = stack .last() - .map_or(M::matches as DonePredicate, |frame| frame.2); + .map_or(M::matches as DonePredicate, |frame| frame.done); if is_done(¤t) || AnyCanonical::matches(¤t) { match stack.pop() { None => { ctx.log(format_args!("-> {}", current)); return Ok(current); } - Some((parent, slot_idx, _)) => { - current = parent.with_slot(slot_idx, current)?.optimize()?; + Some(frame) => { + current = frame.put_back(current)?.optimize()?; continue; } } @@ -139,8 +139,8 @@ impl ArrayRef { current, rewritten )); current = rewritten.optimize()?; - if let Some((parent, slot_idx, _)) = stack.pop() { - current = parent.with_slot(slot_idx, current)?.optimize()?; + if let Some(frame) = stack.pop() { + current = frame.put_back(current)?.optimize()?; } continue; } @@ -150,14 +150,15 @@ impl ArrayRef { let (array, step) = result.into_parts(); match step { ExecutionStep::ExecuteSlot(i, done) => { - let child = array.slots()[i] - .clone() - .vortex_expect("ExecuteSlot index in bounds"); + // SAFETY: we record the child's dtype and len, and assert they are preserved + // when the slot is put back via `put_slot_unchecked`. + let (parent, child) = unsafe { array.take_slot_unchecked(i) }?; ctx.log(format_args!( "ExecuteSlot({i}): pushing {}, focusing on {}", - array, child + parent, child )); - stack.push((array, i, done)); + let frame = StackFrame::new(parent, i, done, &child); + stack.push(frame); current = child.optimize()?; } ExecutionStep::Done => { @@ -174,6 +175,49 @@ impl ArrayRef { } } +/// A stack frame for the iterative executor, tracking the parent array whose slot is being +/// executed and the original child's dtype/len for validation on put-back. +struct StackFrame { + parent: ArrayRef, + slot_idx: usize, + done: DonePredicate, + original_dtype: DType, + original_len: usize, +} + +impl StackFrame { + fn new(parent: ArrayRef, slot_idx: usize, done: DonePredicate, child: &ArrayRef) -> Self { + Self { + parent, + slot_idx, + done, + original_dtype: child.dtype().clone(), + original_len: child.len(), + } + } + + fn put_back(self, replacement: ArrayRef) -> VortexResult { + debug_assert_eq!( + replacement.dtype(), + &self.original_dtype, + "slot {} dtype changed from {} to {} during execution", + self.slot_idx, + self.original_dtype, + replacement.dtype() + ); + debug_assert_eq!( + replacement.len(), + self.original_len, + "slot {} len changed from {} to {} during execution", + self.slot_idx, + self.original_len, + replacement.len() + ); + // SAFETY: we assert above that dtype and len are preserved. + unsafe { self.parent.put_slot_unchecked(self.slot_idx, replacement) } + } +} + /// Execution context for batch CPU compute. #[derive(Debug, Clone)] pub struct ExecutionCtx {