Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 53 additions & 7 deletions encodings/fastlanes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ pub fn initialize(session: &mut VortexSession) {

/// Fill-forward null values in a buffer, replacing each null with the last valid value seen.
///
/// The fill-forward state resets to `T::default()` at every [`FL_CHUNK_SIZE`] boundary
/// so that values from one chunk never leak into the next. This is important because
/// both RLE and Delta encodings treat each chunk independently: a fill-forwarded value
/// that crosses a chunk boundary can become an invalid chunk-local index (for RLE) or
/// an incorrect delta base (for Delta).
///
/// Returns the original buffer if there are no nulls (i.e. the validity is
/// `NonNullable` or `AllValid`), avoiding any allocation or copy.
pub(crate) fn fill_forward_nulls<T: Copy + Default>(
Expand All @@ -71,9 +77,13 @@ pub(crate) fn fill_forward_nulls<T: Copy + Default>(
let mut last_valid = T::default();
match values.try_into_mut() {
Ok(mut to_fill_mut) => {
for (v, is_valid) in to_fill_mut.iter_mut().zip(bit_buffer.iter()) {
for (i, (v, is_valid)) in
to_fill_mut.iter_mut().zip(bit_buffer.iter()).enumerate()
{
if is_valid {
last_valid = *v;
} else if i.is_multiple_of(FL_CHUNK_SIZE) {
last_valid = T::default();
} else {
*v = last_valid;
}
Expand All @@ -82,14 +92,20 @@ pub(crate) fn fill_forward_nulls<T: Copy + Default>(
}
Err(to_fill) => {
let mut to_fill_mut = BufferMut::<T>::with_capacity(to_fill.len());
for (v, (out, is_valid)) in to_fill.iter().zip(
to_fill_mut
.spare_capacity_mut()
.iter_mut()
.zip(bit_buffer.iter()),
) {
for (i, (v, (out, is_valid))) in to_fill
.iter()
.zip(
to_fill_mut
.spare_capacity_mut()
.iter_mut()
.zip(bit_buffer.iter()),
)
.enumerate()
{
if is_valid {
last_valid = *v;
} else if i.is_multiple_of(FL_CHUNK_SIZE) {
last_valid = T::default();
}
out.write(last_valid);
}
Expand All @@ -106,6 +122,7 @@ mod test {
use std::sync::LazyLock;

use vortex_array::session::ArraySessionExt;
use vortex_buffer::BitBufferMut;
use vortex_session::VortexSession;

use super::*;
Expand All @@ -118,4 +135,33 @@ mod test {
session.arrays().register(RLE);
session
});

#[test]
fn fill_forward_nulls_resets_at_chunk_boundary() {
// Build a buffer spanning two chunks where the last valid value in chunk 0
// is non-zero. Null positions at the start of chunk 1 must get T::default()
// (0), not the carry-over from chunk 0.
let mut values = BufferMut::zeroed(2 * FL_CHUNK_SIZE);
// Place a non-zero valid value near the end of chunk 0.
values[FL_CHUNK_SIZE - 1] = 42;

let mut validity_bits = BitBufferMut::new_unset(2 * FL_CHUNK_SIZE);
validity_bits.set(FL_CHUNK_SIZE - 1); // only this position is valid

let validity = Validity::from(validity_bits.freeze());
let result = fill_forward_nulls(values.freeze(), &validity);

// Within chunk 0, nulls before the valid element get 0 (default), and the
// valid element itself is 42.
assert_eq!(result[FL_CHUNK_SIZE - 1], 42);

// Chunk 1 has no valid elements. Every position must be T::default() (0),
// NOT 42 carried over from chunk 0.
for i in FL_CHUNK_SIZE..2 * FL_CHUNK_SIZE {
assert_eq!(
result[i], 0,
"position {i} should be 0, not carried from chunk 0"
);
}
}
}
46 changes: 46 additions & 0 deletions encodings/fastlanes/src/rle/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,10 @@ mod tests {
use vortex_array::validity::Validity;
use vortex_buffer::Buffer;
use vortex_buffer::ByteBufferMut;
use vortex_error::VortexResult;
use vortex_session::registry::ReadContext;

use crate::FL_CHUNK_SIZE;
use crate::RLEArray;
use crate::test::SESSION;

Expand Down Expand Up @@ -518,4 +520,48 @@ mod tests {

assert_arrays_eq!(original_data, decoded_data);
}

/// Regression test: re-encoding RLE indices with RLE must not corrupt
/// chunk-local index values via cross-chunk fill-forward.
///
/// The scenario: an array spanning 2 chunks where chunk 0 has 2 distinct
/// non-null values (producing chunk-local indices 0 and 1) and chunk 1 is
/// entirely null. When fill_forward_nulls propagated the last valid index
/// (1) from chunk 0 into chunk 1 during re-encoding, decoding panicked
/// because chunk 1 only had 1 unique value and index 1 was out of bounds.
#[test]
fn test_recompress_indices_no_cross_chunk_leak() -> VortexResult<()> {
let len = FL_CHUNK_SIZE + 100;
let mut values: Vec<Option<i16>> = vec![None; len];
// Two distinct values in chunk 0 → indices 0 and 1.
values[0] = Some(10);
values[500] = Some(20);
// Chunk 1 (positions 1024..) is all-null.

let original = PrimitiveArray::from_option_iter(values);
let rle = RLEArray::encode(&original)?;

// Simulate cascading compression: narrow u16→u8 then re-encode with RLE,
// matching the path taken by the BtrBlocks compressor.
let indices_prim = rle.indices().to_primitive().narrow()?;
let re_encoded = RLEArray::encode(&indices_prim)?;

// Reconstruct the outer RLE with re-encoded indices.
// SAFETY: we only replace the indices child; all other invariants hold.
let reconstructed = unsafe {
RLEArray::new_unchecked(
rle.values().clone(),
re_encoded.into_array(),
rle.values_idx_offsets().clone(),
rle.dtype().clone(),
rle.offset(),
rle.len(),
)
};

// Decompress — panicked before the fill_forward_nulls chunk-boundary fix.
let decoded = reconstructed.to_primitive();
assert_arrays_eq!(decoded, original);
Ok(())
}
}
Loading