Skip to content

Commit 339a05f

Browse files
committed
more
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent d87bac3 commit 339a05f

2 files changed

Lines changed: 132 additions & 13 deletions

File tree

encodings/fastlanes/src/rle/array/rle_compress.rs

Lines changed: 116 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,7 @@ where
5757

5858
let (chunks, remainder) = values.as_chunks::<FL_CHUNK_SIZE>();
5959

60-
let mut process_chunk = |chunk_start_idx: usize,
61-
input: &[T; FL_CHUNK_SIZE],
62-
rle_idxs: &mut [u16; FL_CHUNK_SIZE]| {
60+
let mut process_chunk = |input: &[T; FL_CHUNK_SIZE], rle_idxs: &mut [u16; FL_CHUNK_SIZE]| {
6361
// SAFETY: NativeValue is repr(transparent)
6462
let input: &[NativeValue<T>; FL_CHUNK_SIZE] = unsafe { std::mem::transmute(input) };
6563

@@ -80,12 +78,12 @@ where
8078
value_count_acc += value_count;
8179
};
8280

83-
for (chunk_idx, (chunk_slice, rle_idxs)) in
84-
chunks.iter().zip(indices_uninit.iter_mut()).enumerate()
85-
{
81+
for (chunk_slice, rle_idxs) in chunks.iter().zip(indices_uninit.iter_mut()) {
8682
// SAFETY: `MaybeUninit<u16>` and `u16` have the same layout.
87-
process_chunk(chunk_idx * FL_CHUNK_SIZE, chunk_slice, unsafe {
88-
std::mem::transmute(rle_idxs)
83+
process_chunk(chunk_slice, unsafe {
84+
std::mem::transmute::<&mut [std::mem::MaybeUninit<u16>; 1024], &mut [u16; 1024]>(
85+
rle_idxs,
86+
)
8987
});
9088
}
9189

@@ -97,11 +95,11 @@ where
9795
let last_idx_chunk = indices_uninit
9896
.last_mut()
9997
.vortex_expect("Must have the trailing chunk");
100-
process_chunk(
101-
(len / FL_CHUNK_SIZE) * FL_CHUNK_SIZE,
102-
&padded_chunk,
103-
unsafe { std::mem::transmute(last_idx_chunk) },
104-
);
98+
process_chunk(&padded_chunk, unsafe {
99+
std::mem::transmute::<&mut [std::mem::MaybeUninit<u16>; 1024], &mut [u16; 1024]>(
100+
last_idx_chunk,
101+
)
102+
});
105103
}
106104

107105
unsafe {
@@ -363,6 +361,111 @@ mod tests {
363361
Ok(())
364362
}
365363

364+
/// Replaces indices at invalid (null) positions with random garbage values.
365+
///
366+
/// This simulates a compressor that doesn't preserve index values at null
367+
/// positions, which can happen when indices are further compressed and the
368+
/// compressor clobbers invalid entries with arbitrary data.
369+
fn with_random_invalid_indices(rle: &RLEArray) -> VortexResult<RLEArray> {
370+
let indices_prim = rle.indices().to_primitive();
371+
let mut indices_data: Vec<u16> = indices_prim.as_slice::<u16>().to_vec();
372+
373+
// Use a simple deterministic "random" sequence.
374+
let mut rng_state: u32 = 0xDEAD_BEEF;
375+
let validity = indices_prim.validity();
376+
for (i, idx) in indices_data.iter_mut().enumerate() {
377+
if !validity.is_valid(i).unwrap_or(true) {
378+
// xorshift32
379+
rng_state ^= rng_state << 13;
380+
rng_state ^= rng_state >> 17;
381+
rng_state ^= rng_state << 5;
382+
*idx = rng_state as u16;
383+
}
384+
}
385+
386+
let clobbered_indices =
387+
PrimitiveArray::new(Buffer::from(indices_data), indices_prim.validity()).into_array();
388+
389+
Ok(unsafe {
390+
RLEArra::new_unchecked(
391+
rle.values().clone(),
392+
clobbered_indices,
393+
rle.values_idx_offsets().clone(),
394+
rle.dtype().clone(),
395+
rle.offset(),
396+
rle.len(),
397+
)
398+
})
399+
}
400+
401+
#[test]
402+
fn test_random_invalid_indices_all_null_chunk() -> VortexResult<()> {
403+
let values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
404+
let original = PrimitiveArray::from_option_iter(values);
405+
let rle = RLEData::encode(&original)?;
406+
let clobbered = with_random_invalid_indices(&rle)?;
407+
assert_arrays_eq!(clobbered, original);
408+
Ok(())
409+
}
410+
411+
#[test]
412+
fn test_random_invalid_indices_sparse_values() -> VortexResult<()> {
413+
let mut values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
414+
values[0] = Some(10);
415+
values[500] = Some(20);
416+
values[1000] = Some(30);
417+
let original = PrimitiveArray::from_option_iter(values);
418+
let rle = RLEData::encode(&original)?;
419+
let clobbered = with_random_invalid_indices(&rle)?;
420+
assert_arrays_eq!(clobbered, original);
421+
Ok(())
422+
}
423+
424+
#[test]
425+
fn test_random_invalid_indices_multi_chunk() -> VortexResult<()> {
426+
// Two chunks: first has scattered values, second is all null.
427+
let mut values: Vec<Option<i16>> = vec![None; 2 * FL_CHUNK_SIZE];
428+
values[0] = Some(10);
429+
values[500] = Some(20);
430+
values[FL_CHUNK_SIZE + 100] = Some(42);
431+
let original = PrimitiveArray::from_option_iter(values);
432+
let rle = RLEData::encode(&original)?;
433+
let clobbered = with_random_invalid_indices(&rle)?;
434+
assert_arrays_eq!(clobbered, original);
435+
Ok(())
436+
}
437+
438+
#[test]
439+
fn test_random_invalid_indices_partial_last_chunk() -> VortexResult<()> {
440+
// 1085 elements: chunk 0 has values at scattered positions, chunk 1 is
441+
// a partial (61 elements padded to 1024) that is entirely null.
442+
let mut values: Vec<Option<u32>> = vec![None; 1085];
443+
for i in (100..200).step_by(7) {
444+
values[i] = Some(i as u32);
445+
}
446+
let original = PrimitiveArray::from_option_iter(values);
447+
let rle = RLEData::encode(&original)?;
448+
let clobbered = with_random_invalid_indices(&rle)?;
449+
assert_arrays_eq!(clobbered, original);
450+
Ok(())
451+
}
452+
453+
#[test]
454+
fn test_random_invalid_indices_mostly_valid() -> VortexResult<()> {
455+
// Most positions are valid, only a few are null with garbage indices.
456+
let mut values: Vec<Option<u64>> =
457+
(0..FL_CHUNK_SIZE).map(|i| Some((i / 100) as u64)).collect();
458+
// Sprinkle in some nulls.
459+
for i in (0..FL_CHUNK_SIZE).step_by(37) {
460+
values[i] = None;
461+
}
462+
let original = PrimitiveArray::from_option_iter(values);
463+
let rle = RLEData::encode(&original)?;
464+
let clobbered = with_random_invalid_indices(&rle)?;
465+
assert_arrays_eq!(clobbered, original);
466+
Ok(())
467+
}
468+
366469
// Regression test: RLE compression properly supports decoding pos/neg zeros
367470
// See <https://github.com/vortex-data/vortex/issues/6491>
368471
#[rstest]

encodings/fastlanes/src/rle/array/rle_decompress.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ use crate::rle::RLEArrayExt;
2424
reason = "complexity is from nested match_each_* macros"
2525
)]
2626
pub fn rle_decompress(array: &RLEArray, ctx: &mut ExecutionCtx) -> VortexResult<PrimitiveArray> {
27+
if array.all_invalid()? {
28+
return Ok(Canonical::empty(array.dtype()).into_primitive());
29+
}
30+
2731
match_each_native_ptype!(array.values().dtype().as_ptype(), |V| {
2832
match_each_unsigned_integer_ptype!(array.values_idx_offsets().dtype().as_ptype(), |O| {
2933
// RLE indices are always u16 (or u8 if downcasted).
@@ -54,6 +58,7 @@ where
5458

5559
let indices = array.indices().clone().execute::<PrimitiveArray>(ctx)?;
5660
assert!(indices.len().is_multiple_of(FL_CHUNK_SIZE));
61+
let has_invalid = !indices.all_valid()?;
5762
let (indices_sl, _) = indices.as_slice::<I>().as_chunks::<FL_CHUNK_SIZE>();
5863

5964
let chunk_start_idx = array.offset() / FL_CHUNK_SIZE;
@@ -93,6 +98,17 @@ where
9398
// access. The indices may contain values other than 0 when they
9499
// have been further compressed (e.g., as a masked constant).
95100
buffer_values.fill(chunk_values[0]);
101+
} else if has_invalid {
102+
// When the indices array has invalid (null) positions, those
103+
// positions may contain arbitrary garbage values after further
104+
// compression. Clamp all indices into [0, num_chunk_values) to
105+
// prevent out-of-bounds access in the fastlanes decoder.
106+
let mut sanitized = *chunk_indices;
107+
for idx in sanitized.iter_mut() {
108+
let v: usize = (*idx).into();
109+
*idx = NumCast::from(v % num_chunk_values).unwrap_or_default();
110+
}
111+
V::decode(chunk_values, &sanitized, buffer_values);
96112
} else {
97113
V::decode(chunk_values, chunk_indices, buffer_values);
98114
}

0 commit comments

Comments
 (0)