From 9a3ecc6623b6cd26361ee25302f3a721a7ced994 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Tue, 24 Mar 2026 15:07:56 -0400 Subject: [PATCH] fill_forward_nulls resets at 1024 chunk boundary Signed-off-by: Robert Kruszewski --- encodings/fastlanes/src/lib.rs | 60 +++++++++++++++++++++--- encodings/fastlanes/src/rle/array/mod.rs | 46 ++++++++++++++++++ 2 files changed, 99 insertions(+), 7 deletions(-) diff --git a/encodings/fastlanes/src/lib.rs b/encodings/fastlanes/src/lib.rs index 8f7edbc267e..4ae8785f68a 100644 --- a/encodings/fastlanes/src/lib.rs +++ b/encodings/fastlanes/src/lib.rs @@ -57,6 +57,12 @@ pub fn initialize(session: &mut VortexSession) { /// Fill-forward null values in a buffer, replacing each null with the last valid value seen. /// +/// The fill-forward state resets to `T::default()` at every [`FL_CHUNK_SIZE`] boundary +/// so that values from one chunk never leak into the next. This is important because +/// both RLE and Delta encodings treat each chunk independently: a fill-forwarded value +/// that crosses a chunk boundary can become an invalid chunk-local index (for RLE) or +/// an incorrect delta base (for Delta). +/// /// Returns the original buffer if there are no nulls (i.e. the validity is /// `NonNullable` or `AllValid`), avoiding any allocation or copy. pub(crate) fn fill_forward_nulls( @@ -71,9 +77,13 @@ pub(crate) fn fill_forward_nulls( let mut last_valid = T::default(); match values.try_into_mut() { Ok(mut to_fill_mut) => { - for (v, is_valid) in to_fill_mut.iter_mut().zip(bit_buffer.iter()) { + for (i, (v, is_valid)) in + to_fill_mut.iter_mut().zip(bit_buffer.iter()).enumerate() + { if is_valid { last_valid = *v; + } else if i.is_multiple_of(FL_CHUNK_SIZE) { + last_valid = T::default(); } else { *v = last_valid; } @@ -82,14 +92,20 @@ pub(crate) fn fill_forward_nulls( } Err(to_fill) => { let mut to_fill_mut = BufferMut::::with_capacity(to_fill.len()); - for (v, (out, is_valid)) in to_fill.iter().zip( - to_fill_mut - .spare_capacity_mut() - .iter_mut() - .zip(bit_buffer.iter()), - ) { + for (i, (v, (out, is_valid))) in to_fill + .iter() + .zip( + to_fill_mut + .spare_capacity_mut() + .iter_mut() + .zip(bit_buffer.iter()), + ) + .enumerate() + { if is_valid { last_valid = *v; + } else if i.is_multiple_of(FL_CHUNK_SIZE) { + last_valid = T::default(); } out.write(last_valid); } @@ -106,6 +122,7 @@ mod test { use std::sync::LazyLock; use vortex_array::session::ArraySessionExt; + use vortex_buffer::BitBufferMut; use vortex_session::VortexSession; use super::*; @@ -118,4 +135,33 @@ mod test { session.arrays().register(RLE); session }); + + #[test] + fn fill_forward_nulls_resets_at_chunk_boundary() { + // Build a buffer spanning two chunks where the last valid value in chunk 0 + // is non-zero. Null positions at the start of chunk 1 must get T::default() + // (0), not the carry-over from chunk 0. + let mut values = BufferMut::zeroed(2 * FL_CHUNK_SIZE); + // Place a non-zero valid value near the end of chunk 0. + values[FL_CHUNK_SIZE - 1] = 42; + + let mut validity_bits = BitBufferMut::new_unset(2 * FL_CHUNK_SIZE); + validity_bits.set(FL_CHUNK_SIZE - 1); // only this position is valid + + let validity = Validity::from(validity_bits.freeze()); + let result = fill_forward_nulls(values.freeze(), &validity); + + // Within chunk 0, nulls before the valid element get 0 (default), and the + // valid element itself is 42. + assert_eq!(result[FL_CHUNK_SIZE - 1], 42); + + // Chunk 1 has no valid elements. Every position must be T::default() (0), + // NOT 42 carried over from chunk 0. + for i in FL_CHUNK_SIZE..2 * FL_CHUNK_SIZE { + assert_eq!( + result[i], 0, + "position {i} should be 0, not carried from chunk 0" + ); + } + } } diff --git a/encodings/fastlanes/src/rle/array/mod.rs b/encodings/fastlanes/src/rle/array/mod.rs index 77c584ce331..6faa1951999 100644 --- a/encodings/fastlanes/src/rle/array/mod.rs +++ b/encodings/fastlanes/src/rle/array/mod.rs @@ -230,8 +230,10 @@ mod tests { use vortex_array::validity::Validity; use vortex_buffer::Buffer; use vortex_buffer::ByteBufferMut; + use vortex_error::VortexResult; use vortex_session::registry::ReadContext; + use crate::FL_CHUNK_SIZE; use crate::RLEArray; use crate::test::SESSION; @@ -518,4 +520,48 @@ mod tests { assert_arrays_eq!(original_data, decoded_data); } + + /// Regression test: re-encoding RLE indices with RLE must not corrupt + /// chunk-local index values via cross-chunk fill-forward. + /// + /// The scenario: an array spanning 2 chunks where chunk 0 has 2 distinct + /// non-null values (producing chunk-local indices 0 and 1) and chunk 1 is + /// entirely null. When fill_forward_nulls propagated the last valid index + /// (1) from chunk 0 into chunk 1 during re-encoding, decoding panicked + /// because chunk 1 only had 1 unique value and index 1 was out of bounds. + #[test] + fn test_recompress_indices_no_cross_chunk_leak() -> VortexResult<()> { + let len = FL_CHUNK_SIZE + 100; + let mut values: Vec> = vec![None; len]; + // Two distinct values in chunk 0 → indices 0 and 1. + values[0] = Some(10); + values[500] = Some(20); + // Chunk 1 (positions 1024..) is all-null. + + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEArray::encode(&original)?; + + // Simulate cascading compression: narrow u16→u8 then re-encode with RLE, + // matching the path taken by the BtrBlocks compressor. + let indices_prim = rle.indices().to_primitive().narrow()?; + let re_encoded = RLEArray::encode(&indices_prim)?; + + // Reconstruct the outer RLE with re-encoded indices. + // SAFETY: we only replace the indices child; all other invariants hold. + let reconstructed = unsafe { + RLEArray::new_unchecked( + rle.values().clone(), + re_encoded.into_array(), + rle.values_idx_offsets().clone(), + rle.dtype().clone(), + rle.offset(), + rle.len(), + ) + }; + + // Decompress — panicked before the fill_forward_nulls chunk-boundary fix. + let decoded = reconstructed.to_primitive(); + assert_arrays_eq!(decoded, original); + Ok(()) + } }