Skip to content

Commit 0ba7deb

Browse files
committed
RLE handles decompression of indices where invalid positions are clobbered
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent f43e12b commit 0ba7deb

6 files changed

Lines changed: 132 additions & 31 deletions

File tree

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ anyhow = "1.0.97"
8989
arbitrary = "1.3.2"
9090
arc-swap = "1.8"
9191
arcref = "0.2.0"
92-
arrayref = "0.3.7"
9392
arrow-arith = "58"
9493
arrow-array = "58"
9594
arrow-buffer = "58"

encodings/fastlanes/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ version = { workspace = true }
1717
workspace = true
1818

1919
[dependencies]
20-
arrayref = { workspace = true }
2120
fastlanes = { workspace = true }
2221
itertools = { workspace = true }
2322
lending-iterator = { workspace = true }

encodings/fastlanes/src/rle/array/rle_compress.rs

Lines changed: 111 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use arrayref::array_mut_ref;
54
use fastlanes::RLE;
65
use vortex_array::IntoArray;
76
use vortex_array::ToCanonical;
@@ -12,6 +11,7 @@ use vortex_array::match_each_native_ptype;
1211
use vortex_array::validity::Validity;
1312
use vortex_buffer::BitBufferMut;
1413
use vortex_buffer::BufferMut;
14+
use vortex_error::VortexExpect;
1515
use vortex_error::VortexResult;
1616

1717
use crate::FL_CHUNK_SIZE;
@@ -48,46 +48,58 @@ where
4848
let mut values_idx_offsets = BufferMut::<u64>::with_capacity(len.div_ceil(FL_CHUNK_SIZE));
4949

5050
let values_uninit = values_buf.spare_capacity_mut();
51-
let indices_uninit = indices_buf.spare_capacity_mut();
51+
let (indices_uninit, _) = indices_buf
52+
.spare_capacity_mut()
53+
.as_chunks_mut::<FL_CHUNK_SIZE>();
5254
let mut value_count_acc = 0; // Chunk value count prefix sum.
5355

5456
let (chunks, remainder) = values.as_chunks::<FL_CHUNK_SIZE>();
5557

56-
let mut process_chunk = |chunk_start_idx: usize, input: &[T; FL_CHUNK_SIZE]| {
58+
let mut process_chunk = |chunk_start_idx: usize,
59+
input: &[T; FL_CHUNK_SIZE],
60+
rle_idxs: &mut [u16; FL_CHUNK_SIZE]| {
5761
// SAFETY: NativeValue is repr(transparent)
5862
let input: &[NativeValue<T>; FL_CHUNK_SIZE] = unsafe { std::mem::transmute(input) };
5963

6064
// SAFETY: `MaybeUninit<NativeValue<T>>` and `NativeValue<T>` have the same layout.
6165
let rle_vals: &mut [NativeValue<T>] =
6266
unsafe { std::mem::transmute(&mut values_uninit[value_count_acc..][..FL_CHUNK_SIZE]) };
6367

64-
// SAFETY: `MaybeUninit<u16>` and `u16` have the same layout.
65-
let rle_idxs: &mut [u16] =
66-
unsafe { std::mem::transmute(&mut indices_uninit[chunk_start_idx..][..FL_CHUNK_SIZE]) };
67-
6868
// Capture chunk start indices. This is necessary as indices
6969
// returned from `T::encode` are relative to the chunk.
7070
values_idx_offsets.push(value_count_acc as u64);
7171

7272
let value_count = NativeValue::<T>::encode(
7373
input,
74-
array_mut_ref![rle_vals, 0, FL_CHUNK_SIZE],
75-
array_mut_ref![rle_idxs, 0, FL_CHUNK_SIZE],
74+
unsafe { &mut *(rle_vals.as_mut_ptr() as *mut [_; FL_CHUNK_SIZE]) },
75+
rle_idxs,
7676
);
7777

7878
value_count_acc += value_count;
7979
};
8080

81-
for (chunk_idx, chunk_slice) in chunks.iter().enumerate() {
82-
process_chunk(chunk_idx * FL_CHUNK_SIZE, chunk_slice);
81+
for (chunk_idx, (chunk_slice, rle_idxs)) in
82+
chunks.iter().zip(indices_uninit.iter_mut()).enumerate()
83+
{
84+
// SAFETY: `MaybeUninit<u16>` and `u16` have the same layout.
85+
process_chunk(chunk_idx * FL_CHUNK_SIZE, chunk_slice, unsafe {
86+
std::mem::transmute(rle_idxs)
87+
});
8388
}
8489

8590
if !remainder.is_empty() {
8691
// Repeat the last value for padding to prevent
8792
// accounting for an additional value change.
8893
let mut padded_chunk = [values[len - 1]; FL_CHUNK_SIZE];
8994
padded_chunk[..remainder.len()].copy_from_slice(remainder);
90-
process_chunk((len / FL_CHUNK_SIZE) * FL_CHUNK_SIZE, &padded_chunk);
95+
let last_idx_chunk = indices_uninit
96+
.last_mut()
97+
.vortex_expect("Must have the trailing chunk");
98+
process_chunk(
99+
(len / FL_CHUNK_SIZE) * FL_CHUNK_SIZE,
100+
&padded_chunk,
101+
unsafe { std::mem::transmute(last_idx_chunk) },
102+
);
91103
}
92104

93105
unsafe {
@@ -137,10 +149,14 @@ mod tests {
137149
use rstest::rstest;
138150
use vortex_array::IntoArray;
139151
use vortex_array::ToCanonical;
152+
use vortex_array::arrays::ConstantArray;
153+
use vortex_array::arrays::MaskedArray;
154+
use vortex_array::arrays::PrimitiveArray;
140155
use vortex_array::assert_arrays_eq;
141156
use vortex_array::dtype::half::f16;
142157
use vortex_buffer::Buffer;
143158
use vortex_buffer::buffer;
159+
use vortex_error::VortexResult;
144160

145161
use super::*;
146162

@@ -258,6 +274,89 @@ mod tests {
258274
assert_arrays_eq!(decoded, expected);
259275
}
260276

277+
/// Replaces the indices of an RLE array with MaskedArray(ConstantArray(1u16), validity).
278+
///
279+
/// Simulates a compressor that represents indices as a masked constant.
280+
/// Valid when every chunk has at least two RLE dictionary entries (the
281+
/// fill-forward default at index 0 and the actual value at index 1), which
282+
/// holds whenever the first position of each chunk is null.
283+
fn with_masked_constant_indices(rle: &RLEArray) -> VortexResult<RLEArray> {
284+
let indices_prim = rle.indices().to_primitive();
285+
let masked_indices = MaskedArray::try_new(
286+
ConstantArray::new(1u16, indices_prim.len()).into_array(),
287+
indices_prim.validity(),
288+
)?
289+
.into_array();
290+
// SAFETY: we only replace the indices child; dtype and length are unchanged
291+
// and index 1 is valid in every chunk because each has ≥ 2 dictionary entries.
292+
unsafe {
293+
RLEArray::try_from_data(RLEData::new_unchecked(
294+
rle.values().clone(),
295+
masked_indices,
296+
rle.values_idx_offsets().clone(),
297+
rle.dtype().clone(),
298+
rle.offset(),
299+
rle.len(),
300+
))
301+
}
302+
}
303+
304+
#[test]
305+
fn test_encode_all_null_chunk() -> VortexResult<()> {
306+
let values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
307+
let original = PrimitiveArray::from_option_iter(values);
308+
let rle = RLEData::encode(&original)?;
309+
let decoded = with_masked_constant_indices(&rle)?;
310+
assert_arrays_eq!(decoded, original);
311+
Ok(())
312+
}
313+
314+
#[test]
315+
fn test_encode_all_null_chunk_then_value_chunk() -> VortexResult<()> {
316+
// First chunk is entirely null, second chunk has a value preceded by nulls.
317+
let mut values: Vec<Option<u32>> = vec![None; 2 * FL_CHUNK_SIZE];
318+
values[FL_CHUNK_SIZE + 100] = Some(42);
319+
let original = PrimitiveArray::from_option_iter(values);
320+
let rle = RLEData::encode(&original)?;
321+
let decoded = with_masked_constant_indices(&rle)?;
322+
assert_arrays_eq!(decoded, original);
323+
Ok(())
324+
}
325+
326+
#[test]
327+
fn test_encode_one_value_near_end() -> VortexResult<()> {
328+
// Single distinct value near the end of the chunk.
329+
let mut values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
330+
values[1000] = Some(42);
331+
let original = PrimitiveArray::from_option_iter(values);
332+
let rle = RLEData::encode(&original)?;
333+
let decoded = with_masked_constant_indices(&rle)?;
334+
assert_arrays_eq!(decoded, original);
335+
Ok(())
336+
}
337+
338+
#[test]
339+
fn test_encode_value_chunk_then_all_null_remainder() -> VortexResult<()> {
340+
// 1085 elements (2 chunks: 1024 + 61 padded to 1024).
341+
// Chunk 0 has -1i16 at scattered positions (273..=366), rest null.
342+
// Chunk 1 (the remainder) is entirely null.
343+
const NEG1_POSITIONS: &[usize] = &[
344+
273, 276, 277, 278, 279, 281, 282, 284, 285, 286, 287, 288, 289, 291, 292, 293, 296,
345+
298, 299, 302, 304, 308, 310, 311, 313, 314, 315, 317, 318, 322, 324, 325, 334, 335,
346+
336, 337, 338, 339, 340, 341, 342, 343, 344, 346, 347, 348, 350, 352, 353, 355, 358,
347+
359, 362, 363, 364, 366,
348+
];
349+
let mut values: Vec<Option<i16>> = vec![None; 1085];
350+
for &pos in NEG1_POSITIONS {
351+
values[pos] = Some(-1);
352+
}
353+
let original = PrimitiveArray::from_option_iter(values);
354+
let rle = RLEData::encode(&original)?;
355+
let decoded = with_masked_constant_indices(&rle)?;
356+
assert_arrays_eq!(decoded, original);
357+
Ok(())
358+
}
359+
261360
// Regression test: RLE compression properly supports decoding pos/neg zeros
262361
// See <https://github.com/vortex-data/vortex/issues/6491>
263362
#[rstest]

encodings/fastlanes/src/rle/array/rle_decompress.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use arrayref::array_mut_ref;
5-
use arrayref::array_ref;
64
use fastlanes::RLE;
75
use num_traits::AsPrimitive;
86
use vortex_array::ExecutionCtx;
@@ -54,42 +52,49 @@ where
5452
let values = values.as_slice::<V>();
5553

5654
let indices = array.indices().clone().execute::<PrimitiveArray>(ctx)?;
57-
let indices = indices.as_slice::<I>();
5855
assert!(indices.len().is_multiple_of(FL_CHUNK_SIZE));
56+
let (indices_sl, _) = indices.as_slice::<I>().as_chunks::<FL_CHUNK_SIZE>();
5957

6058
let chunk_start_idx = array.offset() / FL_CHUNK_SIZE;
6159
let chunk_end_idx = (array.offset() + array.len()).div_ceil(FL_CHUNK_SIZE);
6260
let num_chunks = chunk_end_idx - chunk_start_idx;
6361

6462
let mut buffer = BufferMut::<V>::with_capacity(num_chunks * FL_CHUNK_SIZE);
65-
let buffer_uninit = buffer.spare_capacity_mut();
63+
let (out_buf, _) = buffer.spare_capacity_mut().as_chunks_mut::<FL_CHUNK_SIZE>();
6664

6765
let values_idx_offsets = array
6866
.values_idx_offsets()
6967
.clone()
7068
.execute::<PrimitiveArray>(ctx)?;
7169
let values_idx_offsets = values_idx_offsets.as_slice::<O>();
7270

73-
for chunk_idx in 0..num_chunks {
71+
for (chunk_idx, (chunk_indices, chunk_out)) in
72+
indices_sl.iter().zip(out_buf.iter_mut()).enumerate()
73+
{
7474
// Offsets in `values_idx_offsets` are absolute and need to be shifted
7575
// by the offset of the first chunk, respective the current slice, in
7676
// order to make them relative.
7777
let value_idx_offset =
7878
(values_idx_offsets[chunk_idx].as_() - values_idx_offsets[0].as_()) as usize;
7979

80-
let chunk_values = &values[value_idx_offset..];
81-
let chunk_indices = &indices[chunk_idx * FL_CHUNK_SIZE..];
82-
83-
// SAFETY: `MaybeUninit<T>` and `T` have the same layout.
84-
let buffer_values: &mut [V] = unsafe {
85-
std::mem::transmute(&mut buffer_uninit[chunk_idx * FL_CHUNK_SIZE..][..FL_CHUNK_SIZE])
80+
let next_value_idx_offset = if chunk_idx + 1 < num_chunks {
81+
(values_idx_offsets[chunk_idx + 1].as_() - values_idx_offsets[0].as_()) as usize
82+
} else {
83+
values.len()
8684
};
85+
let num_chunk_values = next_value_idx_offset - value_idx_offset;
8786

88-
V::decode(
89-
chunk_values,
90-
array_ref![chunk_indices, 0, FL_CHUNK_SIZE],
91-
array_mut_ref![buffer_values, 0, FL_CHUNK_SIZE],
92-
);
87+
// SAFETY: `MaybeUninit<T>` and `T` have the same layout.
88+
let buffer_values: &mut [V; FL_CHUNK_SIZE] = unsafe { std::mem::transmute(chunk_out) };
89+
let chunk_values = &values[value_idx_offset..];
90+
if num_chunk_values == 1 {
91+
// Single-value chunk: fill directly to avoid out-of-bounds index
92+
// access. The indices may contain values other than 0 when they
93+
// have been further compressed (e.g., as a masked constant).
94+
buffer_values.fill(chunk_values[0]);
95+
} else {
96+
V::decode(chunk_values, chunk_indices, buffer_values);
97+
}
9398
}
9499

95100
unsafe {
3.28 KB
Binary file not shown.

0 commit comments

Comments
 (0)