Skip to content

Commit 1c8667c

Browse files
fix[delta]: compress with nulls undef values use last valid value (or 0) (#7124)
When we have undef values this can affect decompression with a undef value (use the previous valid value instead). Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent cbc8ad1 commit 1c8667c

2 files changed

Lines changed: 82 additions & 1 deletion

File tree

encodings/fastlanes/src/delta/array/delta_compress.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@ use vortex_buffer::BufferMut;
1717
use vortex_error::VortexResult;
1818

1919
use crate::bit_transpose::transpose_validity;
20+
use crate::fill_forward_nulls;
2021

2122
pub fn delta_compress(
2223
array: &PrimitiveArray,
2324
ctx: &mut ExecutionCtx,
2425
) -> VortexResult<(PrimitiveArray, PrimitiveArray)> {
2526
let (bases, deltas) = match_each_unsigned_integer_ptype!(array.ptype(), |T| {
26-
let (bases, deltas) = compress_primitive::<T, { T::LANES }>(array.as_slice::<T>());
27+
// Fill-forward null values so that transposed deltas at null positions remain
28+
// small. Without this, bitpacking may skip patches for null positions, and the
29+
// corrupted delta values propagate through the cumulative sum during decompression.
30+
let filled = fill_forward_nulls(array.to_buffer::<T>(), array.validity());
31+
let (bases, deltas) = compress_primitive::<T, { T::LANES }>(&filled);
2732
// TODO(robert): This can be avoided if we add TransposedBoolArray that performs index translation when necessary.
2833
let validity = transpose_validity(array.validity(), ctx)?;
2934
(
@@ -124,4 +129,30 @@ mod tests {
124129
assert_arrays_eq!(decompressed, array);
125130
Ok(())
126131
}
132+
133+
/// Regression test: delta + bitpacked encoding must correctly round-trip nullable arrays
134+
/// where null positions contain arbitrary values. Without fill-forward, the delta cumulative
135+
/// sum propagates corrupted values from null positions.
136+
#[test]
137+
fn delta_bitpacked_trailing_nulls() {
138+
use vortex_array::IntoArray;
139+
use vortex_array::ToCanonical;
140+
141+
use crate::bitpack_compress::bitpack_encode;
142+
use crate::delta_compress;
143+
144+
let array = PrimitiveArray::from_option_iter(
145+
(0u8..200).map(|i| (!(50..100).contains(&i)).then_some(i)),
146+
);
147+
let (bases, deltas) = delta_compress(&array, &mut SESSION.create_execution_ctx()).unwrap();
148+
let bitpacked_deltas = bitpack_encode(&deltas, 1, None).unwrap();
149+
let packed_delta = DeltaArray::try_new(
150+
bases.into_array(),
151+
bitpacked_deltas.into_array(),
152+
0,
153+
array.len(),
154+
)
155+
.unwrap();
156+
assert_arrays_eq!(packed_delta.to_primitive(), array);
157+
}
127158
}

encodings/fastlanes/src/lib.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ pub use bitpacking::*;
77
pub use delta::*;
88
pub use r#for::*;
99
pub use rle::*;
10+
use vortex_array::ToCanonical;
11+
use vortex_array::validity::Validity;
12+
use vortex_buffer::Buffer;
13+
use vortex_buffer::BufferMut;
1014

1115
pub mod bit_transpose;
1216
mod bitpacking;
@@ -51,6 +55,52 @@ pub fn initialize(session: &mut VortexSession) {
5155
);
5256
}
5357

58+
/// Fill-forward null values in a buffer, replacing each null with the last valid value seen.
59+
///
60+
/// Returns the original buffer if there are no nulls (i.e. the validity is
61+
/// `NonNullable` or `AllValid`), avoiding any allocation or copy.
62+
pub(crate) fn fill_forward_nulls<T: Copy + Default>(
63+
values: Buffer<T>,
64+
validity: &Validity,
65+
) -> Buffer<T> {
66+
match validity {
67+
Validity::NonNullable | Validity::AllValid => values,
68+
Validity::AllInvalid => Buffer::zeroed(values.len()),
69+
Validity::Array(validity_array) => {
70+
let bit_buffer = validity_array.to_bool().to_bit_buffer();
71+
let mut last_valid = T::default();
72+
match values.try_into_mut() {
73+
Ok(mut to_fill_mut) => {
74+
for (v, is_valid) in to_fill_mut.iter_mut().zip(bit_buffer.iter()) {
75+
if is_valid {
76+
last_valid = *v;
77+
} else {
78+
*v = last_valid;
79+
}
80+
}
81+
to_fill_mut.freeze()
82+
}
83+
Err(to_fill) => {
84+
let mut to_fill_mut = BufferMut::<T>::with_capacity(to_fill.len());
85+
for (v, (out, is_valid)) in to_fill.iter().zip(
86+
to_fill_mut
87+
.spare_capacity_mut()
88+
.iter_mut()
89+
.zip(bit_buffer.iter()),
90+
) {
91+
if is_valid {
92+
last_valid = *v;
93+
}
94+
out.write(last_valid);
95+
}
96+
unsafe { to_fill_mut.set_len(to_fill.len()) };
97+
to_fill_mut.freeze()
98+
}
99+
}
100+
}
101+
}
102+
}
103+
54104
#[cfg(test)]
55105
mod test {
56106
use std::sync::LazyLock;

0 commit comments

Comments
 (0)