Skip to content

Commit 29a5f43

Browse files
authored
RLE handles decompression of indices where invalid positions are clobbered (#7255)
RLE array handles indices array invalid positions being clobbered with out of bounds indices --------- Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent 006ed43 commit 29a5f43

File tree

5 files changed

+254
-47
lines changed

5 files changed

+254
-47
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ anyhow = "1.0.97"
8989
arbitrary = "1.3.2"
9090
arc-swap = "1.8"
9191
arcref = "0.2.0"
92-
arrayref = "0.3.7"
9392
arrow-arith = "58"
9493
arrow-array = "58"
9594
arrow-buffer = "58"

encodings/fastlanes/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ version = { workspace = true }
1717
workspace = true
1818

1919
[dependencies]
20-
arrayref = { workspace = true }
2120
fastlanes = { workspace = true }
2221
itertools = { workspace = true }
2322
lending-iterator = { workspace = true }

encodings/fastlanes/src/rle/array/rle_compress.rs

Lines changed: 215 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use arrayref::array_mut_ref;
4+
use std::mem;
5+
56
use fastlanes::RLE as FastLanesRLE;
67
use vortex_array::IntoArray;
78
use vortex_array::ToCanonical;
@@ -51,46 +52,51 @@ where
5152
let mut values_idx_offsets = BufferMut::<u64>::with_capacity(len.div_ceil(FL_CHUNK_SIZE));
5253

5354
let values_uninit = values_buf.spare_capacity_mut();
54-
let indices_uninit = indices_buf.spare_capacity_mut();
55+
// We don't care about the trailing chunk that exists due to overallocation by the underlying allocator.
56+
let (indices_uninit, _) = indices_buf
57+
.spare_capacity_mut()
58+
.as_chunks_mut::<FL_CHUNK_SIZE>();
5559
let mut value_count_acc = 0; // Chunk value count prefix sum.
5660

5761
let (chunks, remainder) = values.as_chunks::<FL_CHUNK_SIZE>();
5862

59-
let mut process_chunk = |chunk_start_idx: usize, input: &[T; FL_CHUNK_SIZE]| {
60-
// SAFETY: NativeValue is repr(transparent)
61-
let input: &[NativeValue<T>; FL_CHUNK_SIZE] = unsafe { std::mem::transmute(input) };
62-
63-
// SAFETY: `MaybeUninit<NativeValue<T>>` and `NativeValue<T>` have the same layout.
64-
let rle_vals: &mut [NativeValue<T>] =
65-
unsafe { std::mem::transmute(&mut values_uninit[value_count_acc..][..FL_CHUNK_SIZE]) };
63+
let mut process_chunk =
64+
|input: &[T; FL_CHUNK_SIZE], rle_idxs: &mut [mem::MaybeUninit<u16>; FL_CHUNK_SIZE]| {
65+
// SAFETY: NativeValue is repr(transparent)
66+
let input: &[NativeValue<T>; FL_CHUNK_SIZE] = unsafe { mem::transmute(input) };
67+
let rle_idxs: &mut [u16; FL_CHUNK_SIZE] = unsafe { mem::transmute(rle_idxs) };
6668

67-
// SAFETY: `MaybeUninit<u16>` and `u16` have the same layout.
68-
let rle_idxs: &mut [u16] =
69-
unsafe { std::mem::transmute(&mut indices_uninit[chunk_start_idx..][..FL_CHUNK_SIZE]) };
69+
// SAFETY: `MaybeUninit<NativeValue<T>>` and `NativeValue<T>` have the same layout.
70+
let rle_vals: &mut [NativeValue<T>] =
71+
unsafe { mem::transmute(&mut values_uninit[value_count_acc..][..FL_CHUNK_SIZE]) };
7072

71-
// Capture chunk start indices. This is necessary as indices
72-
// returned from `T::encode` are relative to the chunk.
73-
values_idx_offsets.push(value_count_acc as u64);
73+
// Capture chunk start indices. This is necessary as indices
74+
// returned from `T::encode` are relative to the chunk.
75+
values_idx_offsets.push(value_count_acc as u64);
7476

75-
let value_count = NativeValue::<T>::encode(
76-
input,
77-
array_mut_ref![rle_vals, 0, FL_CHUNK_SIZE],
78-
array_mut_ref![rle_idxs, 0, FL_CHUNK_SIZE],
79-
);
77+
let value_count = NativeValue::<T>::encode(
78+
input,
79+
unsafe { &mut *(rle_vals.as_mut_ptr() as *mut [_; FL_CHUNK_SIZE]) },
80+
rle_idxs,
81+
);
8082

81-
value_count_acc += value_count;
82-
};
83+
value_count_acc += value_count;
84+
};
8385

84-
for (chunk_idx, chunk_slice) in chunks.iter().enumerate() {
85-
process_chunk(chunk_idx * FL_CHUNK_SIZE, chunk_slice);
86+
for (chunk_slice, rle_idxs) in chunks.iter().zip(indices_uninit.iter_mut()) {
87+
// SAFETY: `MaybeUninit<u16>` and `u16` have the same layout.
88+
process_chunk(chunk_slice, rle_idxs);
8689
}
8790

8891
if !remainder.is_empty() {
8992
// Repeat the last value for padding to prevent
9093
// accounting for an additional value change.
9194
let mut padded_chunk = [values[len - 1]; FL_CHUNK_SIZE];
9295
padded_chunk[..remainder.len()].copy_from_slice(remainder);
93-
process_chunk((len / FL_CHUNK_SIZE) * FL_CHUNK_SIZE, &padded_chunk);
96+
// There might be more entries in indices_uninit than necessary if the allocator gave us extra memory.
97+
// Remainder has to go to the last chunk after full chunks have been processed.
98+
let last_idx_chunk = &mut indices_uninit[chunks.len()];
99+
process_chunk(&padded_chunk, last_idx_chunk);
94100
}
95101

96102
unsafe {
@@ -143,11 +149,14 @@ mod tests {
143149
use rstest::rstest;
144150
use vortex_array::IntoArray;
145151
use vortex_array::ToCanonical;
152+
use vortex_array::arrays::ConstantArray;
153+
use vortex_array::arrays::MaskedArray;
154+
use vortex_array::arrays::PrimitiveArray;
146155
use vortex_array::assert_arrays_eq;
147156
use vortex_array::dtype::half::f16;
148157
use vortex_buffer::Buffer;
149158
use vortex_buffer::buffer;
150-
use vortex_error::VortexExpect;
159+
use vortex_error::VortexResult;
151160

152161
use super::*;
153162
use crate::rle::array::RLEArrayExt;
@@ -271,6 +280,186 @@ mod tests {
271280
assert_arrays_eq!(decoded, expected);
272281
}
273282

283+
/// Replaces the indices of an RLE array with MaskedArray(ConstantArray(1u16), validity).
284+
///
285+
/// Simulates a compressor that represents indices as a masked constant.
286+
/// Valid when every chunk has at least two RLE dictionary entries (the
287+
/// fill-forward default at index 0 and the actual value at index 1), which
288+
/// holds whenever the first position of each chunk is null.
289+
fn with_masked_constant_indices(rle: &RLEArray) -> VortexResult<RLEArray> {
290+
let indices_prim = rle.indices().to_primitive();
291+
let masked_indices = MaskedArray::try_new(
292+
ConstantArray::new(1u16, indices_prim.len()).into_array(),
293+
indices_prim.validity()?,
294+
)?
295+
.into_array();
296+
RLE::try_new(
297+
rle.values().clone(),
298+
masked_indices,
299+
rle.values_idx_offsets().clone(),
300+
rle.offset(),
301+
rle.len(),
302+
)
303+
}
304+
305+
#[test]
306+
fn test_encode_all_null_chunk() -> VortexResult<()> {
307+
let values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
308+
let original = PrimitiveArray::from_option_iter(values);
309+
let rle = RLEData::encode(&original)?;
310+
let decoded = with_masked_constant_indices(&rle)?;
311+
assert_arrays_eq!(decoded, original);
312+
Ok(())
313+
}
314+
315+
#[test]
316+
fn test_encode_all_null_chunk_then_value_chunk() -> VortexResult<()> {
317+
// First chunk is entirely null, second chunk has a value preceded by nulls.
318+
let mut values: Vec<Option<u32>> = vec![None; 2 * FL_CHUNK_SIZE];
319+
values[FL_CHUNK_SIZE + 100] = Some(42);
320+
let original = PrimitiveArray::from_option_iter(values);
321+
let rle = RLEData::encode(&original)?;
322+
let decoded = with_masked_constant_indices(&rle)?;
323+
assert_arrays_eq!(decoded, original);
324+
Ok(())
325+
}
326+
327+
#[test]
328+
fn test_encode_one_value_near_end() -> VortexResult<()> {
329+
// Single distinct value near the end of the chunk.
330+
let mut values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
331+
values[1000] = Some(42);
332+
let original = PrimitiveArray::from_option_iter(values);
333+
let rle = RLEData::encode(&original)?;
334+
let decoded = with_masked_constant_indices(&rle)?;
335+
assert_arrays_eq!(decoded, original);
336+
Ok(())
337+
}
338+
339+
#[test]
340+
fn test_encode_value_chunk_then_all_null_remainder() -> VortexResult<()> {
341+
// 1085 elements (2 chunks: 1024 + 61 padded to 1024).
342+
// Chunk 0 has -1i16 at scattered positions (273..=366), rest null.
343+
// Chunk 1 (the remainder) is entirely null.
344+
const NEG1_POSITIONS: &[usize] = &[
345+
273, 276, 277, 278, 279, 281, 282, 284, 285, 286, 287, 288, 289, 291, 292, 293, 296,
346+
298, 299, 302, 304, 308, 310, 311, 313, 314, 315, 317, 318, 322, 324, 325, 334, 335,
347+
336, 337, 338, 339, 340, 341, 342, 343, 344, 346, 347, 348, 350, 352, 353, 355, 358,
348+
359, 362, 363, 364, 366,
349+
];
350+
let mut values: Vec<Option<i16>> = vec![None; 1085];
351+
for &pos in NEG1_POSITIONS {
352+
values[pos] = Some(-1);
353+
}
354+
let original = PrimitiveArray::from_option_iter(values);
355+
let rle = RLEData::encode(&original)?;
356+
let decoded = with_masked_constant_indices(&rle)?;
357+
assert_arrays_eq!(decoded, original);
358+
Ok(())
359+
}
360+
361+
/// Replaces indices at invalid (null) positions with random garbage values.
362+
///
363+
/// This simulates a compressor that doesn't preserve index values at null
364+
/// positions, which can happen when indices are further compressed and the
365+
/// compressor clobbers invalid entries with arbitrary data.
366+
fn with_random_invalid_indices(rle: &RLEArray) -> VortexResult<RLEArray> {
367+
let indices_prim = rle.indices().to_primitive();
368+
let mut indices_data: Vec<u16> = indices_prim.as_slice::<u16>().to_vec();
369+
370+
// Use a simple deterministic "random" sequence.
371+
let mut rng_state: u32 = 0xDEAD_BEEF;
372+
let validity = indices_prim.validity()?;
373+
for (i, idx) in indices_data.iter_mut().enumerate() {
374+
if !validity.is_valid(i).unwrap_or(true) {
375+
// xorshift32
376+
rng_state ^= rng_state << 13;
377+
rng_state ^= rng_state >> 17;
378+
rng_state ^= rng_state << 5;
379+
*idx = rng_state as u16;
380+
}
381+
}
382+
383+
let clobbered_indices =
384+
PrimitiveArray::new(Buffer::from(indices_data), indices_prim.validity()?).into_array();
385+
386+
RLE::try_new(
387+
rle.values().clone(),
388+
clobbered_indices,
389+
rle.values_idx_offsets().clone(),
390+
rle.offset(),
391+
rle.len(),
392+
)
393+
}
394+
395+
#[test]
396+
fn test_random_invalid_indices_all_null_chunk() -> VortexResult<()> {
397+
let values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
398+
let original = PrimitiveArray::from_option_iter(values);
399+
let rle = RLEData::encode(&original)?;
400+
let clobbered = with_random_invalid_indices(&rle)?;
401+
assert_arrays_eq!(clobbered, original);
402+
Ok(())
403+
}
404+
405+
#[test]
406+
fn test_random_invalid_indices_sparse_values() -> VortexResult<()> {
407+
let mut values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
408+
values[0] = Some(10);
409+
values[500] = Some(20);
410+
values[1000] = Some(30);
411+
let original = PrimitiveArray::from_option_iter(values);
412+
let rle = RLEData::encode(&original)?;
413+
let clobbered = with_random_invalid_indices(&rle)?;
414+
assert_arrays_eq!(clobbered, original);
415+
Ok(())
416+
}
417+
418+
#[test]
419+
fn test_random_invalid_indices_multi_chunk() -> VortexResult<()> {
420+
// Two chunks: first has scattered values, second is all null.
421+
let mut values: Vec<Option<i16>> = vec![None; 2 * FL_CHUNK_SIZE];
422+
values[0] = Some(10);
423+
values[500] = Some(20);
424+
values[FL_CHUNK_SIZE + 100] = Some(42);
425+
let original = PrimitiveArray::from_option_iter(values);
426+
let rle = RLEData::encode(&original)?;
427+
let clobbered = with_random_invalid_indices(&rle)?;
428+
assert_arrays_eq!(clobbered, original);
429+
Ok(())
430+
}
431+
432+
#[test]
433+
fn test_random_invalid_indices_partial_last_chunk() -> VortexResult<()> {
434+
// 1085 elements: chunk 0 has values at scattered positions, chunk 1 is
435+
// a partial (61 elements padded to 1024) that is entirely null.
436+
let mut values: Vec<Option<u32>> = vec![None; 1085];
437+
for i in (100..200).step_by(7) {
438+
values[i] = Some(i as u32);
439+
}
440+
let original = PrimitiveArray::from_option_iter(values);
441+
let rle = RLEData::encode(&original)?;
442+
let clobbered = with_random_invalid_indices(&rle)?;
443+
assert_arrays_eq!(clobbered, original);
444+
Ok(())
445+
}
446+
447+
#[test]
448+
fn test_random_invalid_indices_mostly_valid() -> VortexResult<()> {
449+
// Most positions are valid, only a few are null with garbage indices.
450+
let mut values: Vec<Option<u64>> =
451+
(0..FL_CHUNK_SIZE).map(|i| Some((i / 100) as u64)).collect();
452+
// Sprinkle in some nulls.
453+
for i in (0..FL_CHUNK_SIZE).step_by(37) {
454+
values[i] = None;
455+
}
456+
let original = PrimitiveArray::from_option_iter(values);
457+
let rle = RLEData::encode(&original)?;
458+
let clobbered = with_random_invalid_indices(&rle)?;
459+
assert_arrays_eq!(clobbered, original);
460+
Ok(())
461+
}
462+
274463
// Regression test: RLE compression properly supports decoding pos/neg zeros
275464
// See <https://github.com/vortex-data/vortex/issues/6491>
276465
#[rstest]

0 commit comments

Comments
 (0)