Skip to content

Commit d87bac3

Browse files
committed
RLE handles decompression of indices where invalid positions are clobbered
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent 6c7eb33 commit d87bac3

6 files changed

Lines changed: 126 additions & 32 deletions

File tree

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ anyhow = "1.0.97"
8989
arbitrary = "1.3.2"
9090
arc-swap = "1.8"
9191
arcref = "0.2.0"
92-
arrayref = "0.3.7"
9392
arrow-arith = "58"
9493
arrow-array = "58"
9594
arrow-buffer = "58"

encodings/fastlanes/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ version = { workspace = true }
1717
workspace = true
1818

1919
[dependencies]
20-
arrayref = { workspace = true }
2120
fastlanes = { workspace = true }
2221
itertools = { workspace = true }
2322
lending-iterator = { workspace = true }

encodings/fastlanes/src/rle/array/rle_compress.rs

Lines changed: 105 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use arrayref::array_mut_ref;
54
use fastlanes::RLE as FastLanesRLE;
65
use vortex_array::IntoArray;
76
use vortex_array::ToCanonical;
@@ -51,46 +50,58 @@ where
5150
let mut values_idx_offsets = BufferMut::<u64>::with_capacity(len.div_ceil(FL_CHUNK_SIZE));
5251

5352
let values_uninit = values_buf.spare_capacity_mut();
54-
let indices_uninit = indices_buf.spare_capacity_mut();
53+
let (indices_uninit, _) = indices_buf
54+
.spare_capacity_mut()
55+
.as_chunks_mut::<FL_CHUNK_SIZE>();
5556
let mut value_count_acc = 0; // Chunk value count prefix sum.
5657

5758
let (chunks, remainder) = values.as_chunks::<FL_CHUNK_SIZE>();
5859

59-
let mut process_chunk = |chunk_start_idx: usize, input: &[T; FL_CHUNK_SIZE]| {
60+
let mut process_chunk = |chunk_start_idx: usize,
61+
input: &[T; FL_CHUNK_SIZE],
62+
rle_idxs: &mut [u16; FL_CHUNK_SIZE]| {
6063
// SAFETY: NativeValue is repr(transparent)
6164
let input: &[NativeValue<T>; FL_CHUNK_SIZE] = unsafe { std::mem::transmute(input) };
6265

6366
// SAFETY: `MaybeUninit<NativeValue<T>>` and `NativeValue<T>` have the same layout.
6467
let rle_vals: &mut [NativeValue<T>] =
6568
unsafe { std::mem::transmute(&mut values_uninit[value_count_acc..][..FL_CHUNK_SIZE]) };
6669

67-
// SAFETY: `MaybeUninit<u16>` and `u16` have the same layout.
68-
let rle_idxs: &mut [u16] =
69-
unsafe { std::mem::transmute(&mut indices_uninit[chunk_start_idx..][..FL_CHUNK_SIZE]) };
70-
7170
// Capture chunk start indices. This is necessary as indices
7271
// returned from `T::encode` are relative to the chunk.
7372
values_idx_offsets.push(value_count_acc as u64);
7473

7574
let value_count = NativeValue::<T>::encode(
7675
input,
77-
array_mut_ref![rle_vals, 0, FL_CHUNK_SIZE],
78-
array_mut_ref![rle_idxs, 0, FL_CHUNK_SIZE],
76+
unsafe { &mut *(rle_vals.as_mut_ptr() as *mut [_; FL_CHUNK_SIZE]) },
77+
rle_idxs,
7978
);
8079

8180
value_count_acc += value_count;
8281
};
8382

84-
for (chunk_idx, chunk_slice) in chunks.iter().enumerate() {
85-
process_chunk(chunk_idx * FL_CHUNK_SIZE, chunk_slice);
83+
for (chunk_idx, (chunk_slice, rle_idxs)) in
84+
chunks.iter().zip(indices_uninit.iter_mut()).enumerate()
85+
{
86+
// SAFETY: `MaybeUninit<u16>` and `u16` have the same layout.
87+
process_chunk(chunk_idx * FL_CHUNK_SIZE, chunk_slice, unsafe {
88+
std::mem::transmute(rle_idxs)
89+
});
8690
}
8791

8892
if !remainder.is_empty() {
8993
// Repeat the last value for padding to prevent
9094
// accounting for an additional value change.
9195
let mut padded_chunk = [values[len - 1]; FL_CHUNK_SIZE];
9296
padded_chunk[..remainder.len()].copy_from_slice(remainder);
93-
process_chunk((len / FL_CHUNK_SIZE) * FL_CHUNK_SIZE, &padded_chunk);
97+
let last_idx_chunk = indices_uninit
98+
.last_mut()
99+
.vortex_expect("Must have the trailing chunk");
100+
process_chunk(
101+
(len / FL_CHUNK_SIZE) * FL_CHUNK_SIZE,
102+
&padded_chunk,
103+
unsafe { std::mem::transmute(last_idx_chunk) },
104+
);
94105
}
95106

96107
unsafe {
@@ -143,11 +154,14 @@ mod tests {
143154
use rstest::rstest;
144155
use vortex_array::IntoArray;
145156
use vortex_array::ToCanonical;
157+
use vortex_array::arrays::ConstantArray;
158+
use vortex_array::arrays::MaskedArray;
159+
use vortex_array::arrays::PrimitiveArray;
146160
use vortex_array::assert_arrays_eq;
147161
use vortex_array::dtype::half::f16;
148162
use vortex_buffer::Buffer;
149163
use vortex_buffer::buffer;
150-
use vortex_error::VortexExpect;
164+
use vortex_error::VortexResult;
151165

152166
use super::*;
153167
use crate::rle::array::RLEArrayExt;
@@ -271,6 +285,84 @@ mod tests {
271285
assert_arrays_eq!(decoded, expected);
272286
}
273287

288+
/// Replaces the indices of an RLE array with MaskedArray(ConstantArray(1u16), validity).
289+
///
290+
/// Simulates a compressor that represents indices as a masked constant.
291+
/// Valid when every chunk has at least two RLE dictionary entries (the
292+
/// fill-forward default at index 0 and the actual value at index 1), which
293+
/// holds whenever the first position of each chunk is null.
294+
fn with_masked_constant_indices(rle: &RLEArray) -> VortexResult<RLEArray> {
295+
let indices_prim = rle.indices().to_primitive();
296+
let masked_indices = MaskedArray::try_new(
297+
ConstantArray::new(1u16, indices_prim.len()).into_array(),
298+
indices_prim.validity()?,
299+
)?
300+
.into_array();
301+
RLE::try_new(
302+
rle.values().clone(),
303+
masked_indices,
304+
rle.values_idx_offsets().clone(),
305+
rle.offset(),
306+
rle.len(),
307+
)
308+
}
309+
310+
#[test]
311+
fn test_encode_all_null_chunk() -> VortexResult<()> {
312+
let values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
313+
let original = PrimitiveArray::from_option_iter(values);
314+
let rle = RLEData::encode(&original)?;
315+
let decoded = with_masked_constant_indices(&rle)?;
316+
assert_arrays_eq!(decoded, original);
317+
Ok(())
318+
}
319+
320+
#[test]
321+
fn test_encode_all_null_chunk_then_value_chunk() -> VortexResult<()> {
322+
// First chunk is entirely null, second chunk has a value preceded by nulls.
323+
let mut values: Vec<Option<u32>> = vec![None; 2 * FL_CHUNK_SIZE];
324+
values[FL_CHUNK_SIZE + 100] = Some(42);
325+
let original = PrimitiveArray::from_option_iter(values);
326+
let rle = RLEData::encode(&original)?;
327+
let decoded = with_masked_constant_indices(&rle)?;
328+
assert_arrays_eq!(decoded, original);
329+
Ok(())
330+
}
331+
332+
#[test]
333+
fn test_encode_one_value_near_end() -> VortexResult<()> {
334+
// Single distinct value near the end of the chunk.
335+
let mut values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
336+
values[1000] = Some(42);
337+
let original = PrimitiveArray::from_option_iter(values);
338+
let rle = RLEData::encode(&original)?;
339+
let decoded = with_masked_constant_indices(&rle)?;
340+
assert_arrays_eq!(decoded, original);
341+
Ok(())
342+
}
343+
344+
#[test]
345+
fn test_encode_value_chunk_then_all_null_remainder() -> VortexResult<()> {
346+
// 1085 elements (2 chunks: 1024 + 61 padded to 1024).
347+
// Chunk 0 has -1i16 at scattered positions (273..=366), rest null.
348+
// Chunk 1 (the remainder) is entirely null.
349+
const NEG1_POSITIONS: &[usize] = &[
350+
273, 276, 277, 278, 279, 281, 282, 284, 285, 286, 287, 288, 289, 291, 292, 293, 296,
351+
298, 299, 302, 304, 308, 310, 311, 313, 314, 315, 317, 318, 322, 324, 325, 334, 335,
352+
336, 337, 338, 339, 340, 341, 342, 343, 344, 346, 347, 348, 350, 352, 353, 355, 358,
353+
359, 362, 363, 364, 366,
354+
];
355+
let mut values: Vec<Option<i16>> = vec![None; 1085];
356+
for &pos in NEG1_POSITIONS {
357+
values[pos] = Some(-1);
358+
}
359+
let original = PrimitiveArray::from_option_iter(values);
360+
let rle = RLEData::encode(&original)?;
361+
let decoded = with_masked_constant_indices(&rle)?;
362+
assert_arrays_eq!(decoded, original);
363+
Ok(())
364+
}
365+
274366
// Regression test: RLE compression properly supports decoding pos/neg zeros
275367
// See <https://github.com/vortex-data/vortex/issues/6491>
276368
#[rstest]

encodings/fastlanes/src/rle/array/rle_decompress.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use arrayref::array_mut_ref;
5-
use arrayref::array_ref;
64
use fastlanes::RLE;
75
use num_traits::AsPrimitive;
86
use vortex_array::ExecutionCtx;
@@ -55,42 +53,49 @@ where
5553
let values = values.as_slice::<V>();
5654

5755
let indices = array.indices().clone().execute::<PrimitiveArray>(ctx)?;
58-
let indices = indices.as_slice::<I>();
5956
assert!(indices.len().is_multiple_of(FL_CHUNK_SIZE));
57+
let (indices_sl, _) = indices.as_slice::<I>().as_chunks::<FL_CHUNK_SIZE>();
6058

6159
let chunk_start_idx = array.offset() / FL_CHUNK_SIZE;
6260
let chunk_end_idx = (array.offset() + array.len()).div_ceil(FL_CHUNK_SIZE);
6361
let num_chunks = chunk_end_idx - chunk_start_idx;
6462

6563
let mut buffer = BufferMut::<V>::with_capacity(num_chunks * FL_CHUNK_SIZE);
66-
let buffer_uninit = buffer.spare_capacity_mut();
64+
let (out_buf, _) = buffer.spare_capacity_mut().as_chunks_mut::<FL_CHUNK_SIZE>();
6765

6866
let values_idx_offsets = array
6967
.values_idx_offsets()
7068
.clone()
7169
.execute::<PrimitiveArray>(ctx)?;
7270
let values_idx_offsets = values_idx_offsets.as_slice::<O>();
7371

74-
for chunk_idx in 0..num_chunks {
72+
for (chunk_idx, (chunk_indices, chunk_out)) in
73+
indices_sl.iter().zip(out_buf.iter_mut()).enumerate()
74+
{
7575
// Offsets in `values_idx_offsets` are absolute and need to be shifted
7676
// by the offset of the first chunk, respective the current slice, in
7777
// order to make them relative.
7878
let value_idx_offset =
7979
(values_idx_offsets[chunk_idx].as_() - values_idx_offsets[0].as_()) as usize;
8080

81-
let chunk_values = &values[value_idx_offset..];
82-
let chunk_indices = &indices[chunk_idx * FL_CHUNK_SIZE..];
83-
84-
// SAFETY: `MaybeUninit<T>` and `T` have the same layout.
85-
let buffer_values: &mut [V] = unsafe {
86-
std::mem::transmute(&mut buffer_uninit[chunk_idx * FL_CHUNK_SIZE..][..FL_CHUNK_SIZE])
81+
let next_value_idx_offset = if chunk_idx + 1 < num_chunks {
82+
(values_idx_offsets[chunk_idx + 1].as_() - values_idx_offsets[0].as_()) as usize
83+
} else {
84+
values.len()
8785
};
86+
let num_chunk_values = next_value_idx_offset - value_idx_offset;
8887

89-
V::decode(
90-
chunk_values,
91-
array_ref![chunk_indices, 0, FL_CHUNK_SIZE],
92-
array_mut_ref![buffer_values, 0, FL_CHUNK_SIZE],
93-
);
88+
// SAFETY: `MaybeUninit<T>` and `T` have the same layout.
89+
let buffer_values: &mut [V; FL_CHUNK_SIZE] = unsafe { std::mem::transmute(chunk_out) };
90+
let chunk_values = &values[value_idx_offset..];
91+
if num_chunk_values == 1 {
92+
// Single-value chunk: fill directly to avoid out-of-bounds index
93+
// access. The indices may contain values other than 0 when they
94+
// have been further compressed (e.g., as a masked constant).
95+
buffer_values.fill(chunk_values[0]);
96+
} else {
97+
V::decode(chunk_values, chunk_indices, buffer_values);
98+
}
9499
}
95100

96101
unsafe {
3.28 KB
Binary file not shown.

0 commit comments

Comments
 (0)