Skip to content

Commit bf59fc6

Browse files
committed
more
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent 0ba7deb commit bf59fc6

2 files changed

Lines changed: 132 additions & 13 deletions

File tree

encodings/fastlanes/src/rle/array/rle_compress.rs

Lines changed: 116 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,7 @@ where
5555

5656
let (chunks, remainder) = values.as_chunks::<FL_CHUNK_SIZE>();
5757

58-
let mut process_chunk = |chunk_start_idx: usize,
59-
input: &[T; FL_CHUNK_SIZE],
60-
rle_idxs: &mut [u16; FL_CHUNK_SIZE]| {
58+
let mut process_chunk = |input: &[T; FL_CHUNK_SIZE], rle_idxs: &mut [u16; FL_CHUNK_SIZE]| {
6159
// SAFETY: NativeValue is repr(transparent)
6260
let input: &[NativeValue<T>; FL_CHUNK_SIZE] = unsafe { std::mem::transmute(input) };
6361

@@ -78,12 +76,12 @@ where
7876
value_count_acc += value_count;
7977
};
8078

81-
for (chunk_idx, (chunk_slice, rle_idxs)) in
82-
chunks.iter().zip(indices_uninit.iter_mut()).enumerate()
83-
{
79+
for (chunk_slice, rle_idxs) in chunks.iter().zip(indices_uninit.iter_mut()) {
8480
// SAFETY: `MaybeUninit<u16>` and `u16` have the same layout.
85-
process_chunk(chunk_idx * FL_CHUNK_SIZE, chunk_slice, unsafe {
86-
std::mem::transmute(rle_idxs)
81+
process_chunk(chunk_slice, unsafe {
82+
std::mem::transmute::<&mut [std::mem::MaybeUninit<u16>; 1024], &mut [u16; 1024]>(
83+
rle_idxs,
84+
)
8785
});
8886
}
8987

@@ -95,11 +93,11 @@ where
9593
let last_idx_chunk = indices_uninit
9694
.last_mut()
9795
.vortex_expect("Must have the trailing chunk");
98-
process_chunk(
99-
(len / FL_CHUNK_SIZE) * FL_CHUNK_SIZE,
100-
&padded_chunk,
101-
unsafe { std::mem::transmute(last_idx_chunk) },
102-
);
96+
process_chunk(&padded_chunk, unsafe {
97+
std::mem::transmute::<&mut [std::mem::MaybeUninit<u16>; 1024], &mut [u16; 1024]>(
98+
last_idx_chunk,
99+
)
100+
});
103101
}
104102

105103
unsafe {
@@ -357,6 +355,111 @@ mod tests {
357355
Ok(())
358356
}
359357

358+
/// Replaces indices at invalid (null) positions with random garbage values.
359+
///
360+
/// This simulates a compressor that doesn't preserve index values at null
361+
/// positions, which can happen when indices are further compressed and the
362+
/// compressor clobbers invalid entries with arbitrary data.
363+
fn with_random_invalid_indices(rle: &RLEArray) -> VortexResult<RLEArray> {
364+
let indices_prim = rle.indices().to_primitive();
365+
let mut indices_data: Vec<u16> = indices_prim.as_slice::<u16>().to_vec();
366+
367+
// Use a simple deterministic "random" sequence.
368+
let mut rng_state: u32 = 0xDEAD_BEEF;
369+
let validity = indices_prim.validity();
370+
for (i, idx) in indices_data.iter_mut().enumerate() {
371+
if !validity.is_valid(i).unwrap_or(true) {
372+
// xorshift32
373+
rng_state ^= rng_state << 13;
374+
rng_state ^= rng_state >> 17;
375+
rng_state ^= rng_state << 5;
376+
*idx = rng_state as u16;
377+
}
378+
}
379+
380+
let clobbered_indices =
381+
PrimitiveArray::new(Buffer::from(indices_data), indices_prim.validity()).into_array();
382+
383+
Ok(unsafe {
384+
RLEArra::new_unchecked(
385+
rle.values().clone(),
386+
clobbered_indices,
387+
rle.values_idx_offsets().clone(),
388+
rle.dtype().clone(),
389+
rle.offset(),
390+
rle.len(),
391+
)
392+
})
393+
}
394+
395+
#[test]
396+
fn test_random_invalid_indices_all_null_chunk() -> VortexResult<()> {
397+
let values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
398+
let original = PrimitiveArray::from_option_iter(values);
399+
let rle = RLEData::encode(&original)?;
400+
let clobbered = with_random_invalid_indices(&rle)?;
401+
assert_arrays_eq!(clobbered, original);
402+
Ok(())
403+
}
404+
405+
#[test]
406+
fn test_random_invalid_indices_sparse_values() -> VortexResult<()> {
407+
let mut values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
408+
values[0] = Some(10);
409+
values[500] = Some(20);
410+
values[1000] = Some(30);
411+
let original = PrimitiveArray::from_option_iter(values);
412+
let rle = RLEData::encode(&original)?;
413+
let clobbered = with_random_invalid_indices(&rle)?;
414+
assert_arrays_eq!(clobbered, original);
415+
Ok(())
416+
}
417+
418+
#[test]
419+
fn test_random_invalid_indices_multi_chunk() -> VortexResult<()> {
420+
// Two chunks: first has scattered values, second is all null.
421+
let mut values: Vec<Option<i16>> = vec![None; 2 * FL_CHUNK_SIZE];
422+
values[0] = Some(10);
423+
values[500] = Some(20);
424+
values[FL_CHUNK_SIZE + 100] = Some(42);
425+
let original = PrimitiveArray::from_option_iter(values);
426+
let rle = RLEData::encode(&original)?;
427+
let clobbered = with_random_invalid_indices(&rle)?;
428+
assert_arrays_eq!(clobbered, original);
429+
Ok(())
430+
}
431+
432+
#[test]
433+
fn test_random_invalid_indices_partial_last_chunk() -> VortexResult<()> {
434+
// 1085 elements: chunk 0 has values at scattered positions, chunk 1 is
435+
// a partial (61 elements padded to 1024) that is entirely null.
436+
let mut values: Vec<Option<u32>> = vec![None; 1085];
437+
for i in (100..200).step_by(7) {
438+
values[i] = Some(i as u32);
439+
}
440+
let original = PrimitiveArray::from_option_iter(values);
441+
let rle = RLEData::encode(&original)?;
442+
let clobbered = with_random_invalid_indices(&rle)?;
443+
assert_arrays_eq!(clobbered, original);
444+
Ok(())
445+
}
446+
447+
#[test]
448+
fn test_random_invalid_indices_mostly_valid() -> VortexResult<()> {
449+
// Most positions are valid, only a few are null with garbage indices.
450+
let mut values: Vec<Option<u64>> =
451+
(0..FL_CHUNK_SIZE).map(|i| Some((i / 100) as u64)).collect();
452+
// Sprinkle in some nulls.
453+
for i in (0..FL_CHUNK_SIZE).step_by(37) {
454+
values[i] = None;
455+
}
456+
let original = PrimitiveArray::from_option_iter(values);
457+
let rle = RLEData::encode(&original)?;
458+
let clobbered = with_random_invalid_indices(&rle)?;
459+
assert_arrays_eq!(clobbered, original);
460+
Ok(())
461+
}
462+
360463
// Regression test: RLE compression properly supports decoding pos/neg zeros
361464
// See <https://github.com/vortex-data/vortex/issues/6491>
362465
#[rstest]

encodings/fastlanes/src/rle/array/rle_decompress.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ use crate::RLEArray;
2323
reason = "complexity is from nested match_each_* macros"
2424
)]
2525
pub fn rle_decompress(array: &RLEArray, ctx: &mut ExecutionCtx) -> VortexResult<PrimitiveArray> {
26+
if array.all_invalid()? {
27+
return Ok(Canonical::empty(array.dtype()).into_primitive());
28+
}
29+
2630
match_each_native_ptype!(array.values().dtype().as_ptype(), |V| {
2731
match_each_unsigned_integer_ptype!(array.values_idx_offsets().dtype().as_ptype(), |O| {
2832
// RLE indices are always u16 (or u8 if downcasted).
@@ -53,6 +57,7 @@ where
5357

5458
let indices = array.indices().clone().execute::<PrimitiveArray>(ctx)?;
5559
assert!(indices.len().is_multiple_of(FL_CHUNK_SIZE));
60+
let has_invalid = !indices.all_valid()?;
5661
let (indices_sl, _) = indices.as_slice::<I>().as_chunks::<FL_CHUNK_SIZE>();
5762

5863
let chunk_start_idx = array.offset() / FL_CHUNK_SIZE;
@@ -92,6 +97,17 @@ where
9297
// access. The indices may contain values other than 0 when they
9398
// have been further compressed (e.g., as a masked constant).
9499
buffer_values.fill(chunk_values[0]);
100+
} else if has_invalid {
101+
// When the indices array has invalid (null) positions, those
102+
// positions may contain arbitrary garbage values after further
103+
// compression. Clamp all indices into [0, num_chunk_values) to
104+
// prevent out-of-bounds access in the fastlanes decoder.
105+
let mut sanitized = *chunk_indices;
106+
for idx in sanitized.iter_mut() {
107+
let v: usize = (*idx).into();
108+
*idx = NumCast::from(v % num_chunk_values).unwrap_or_default();
109+
}
110+
V::decode(chunk_values, &sanitized, buffer_values);
95111
} else {
96112
V::decode(chunk_values, chunk_indices, buffer_values);
97113
}

0 commit comments

Comments
 (0)