Skip to content

Commit 3000bc0

Browse files
feat[fastlanes]: allow delta to support signed bases (#7923)
Allow signed bases in delta. --------- Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent 3006be6 commit 3000bc0

6 files changed

Lines changed: 237 additions & 12 deletions

File tree

encodings/fastlanes/src/delta/array/delta_compress.rs

Lines changed: 190 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use fastlanes::FastLanes;
99
use fastlanes::Transpose;
1010
use vortex_array::ExecutionCtx;
1111
use vortex_array::arrays::PrimitiveArray;
12+
use vortex_array::arrays::primitive::PrimitiveArrayExt;
1213
use vortex_array::dtype::NativePType;
1314
use vortex_array::match_each_unsigned_integer_ptype;
1415
use vortex_buffer::Buffer;
@@ -23,6 +24,9 @@ pub fn delta_compress(
2324
ctx: &mut ExecutionCtx,
2425
) -> VortexResult<(PrimitiveArray, PrimitiveArray)> {
2526
let validity = array.validity()?;
27+
let original_ptype = array.ptype();
28+
let array = array.reinterpret_cast(original_ptype.to_unsigned());
29+
2630
let (bases, deltas) = match_each_unsigned_integer_ptype!(array.ptype(), |T| {
2731
// Fill-forward null values so that transposed deltas at null positions remain
2832
// small. Without this, bitpacking may skip patches for null positions, and the
@@ -37,7 +41,10 @@ pub fn delta_compress(
3741
)
3842
});
3943

40-
Ok((bases, deltas))
44+
Ok((
45+
bases.reinterpret_cast(original_ptype),
46+
deltas.reinterpret_cast(original_ptype),
47+
))
4148
}
4249

4350
fn compress_primitive<T, const LANES: usize>(array: &[T]) -> (Buffer<T>, Buffer<T>)
@@ -113,11 +120,31 @@ mod tests {
113120
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
114121

115122
#[rstest]
116-
#[case((0u32..10_000).collect())]
117-
#[case((0..10_000).map(|i| (i % (u8::MAX as i32)) as u8).collect())]
118-
#[case(PrimitiveArray::from_option_iter(
123+
#[case::u32((0u32..10_000).collect())]
124+
#[case::u8((0..10_000).map(|i| (i % (u8::MAX as i32)) as u8).collect())]
125+
#[case::nullable_u32(PrimitiveArray::from_option_iter(
119126
(0u32..10_000).map(|i| (i % 2 == 0).then_some(i)),
120127
))]
128+
// Signed inputs that stay non-negative: encoded deltas are identical to the u32 case
129+
// bit-for-bit, but the buffer's dtype carries the signedness through round-trip.
130+
#[case::i32_non_negative((0i32..10_000).collect())]
131+
// Signed inputs crossing zero: deltas alternate in sign, which under wrapping_sub
132+
// populates the high bits of negative deltas. Bit-packing without preprocessing
133+
// would explode here, but round-tripping the raw delta buffer is still correct.
134+
#[case::i32_crossing_zero((-5_000i32..5_000).collect())]
135+
// All-negative signed values.
136+
#[case::i32_all_negative((-10_000i32..0).collect())]
137+
// i8 across the full type range: tests T::MIN / T::MAX boundaries and the
138+
// remainder-padded chunk path (256 < FL_CHUNK_SIZE).
139+
#[case::i8_full_range((i8::MIN..=i8::MAX).collect())]
140+
// i16 crossing zero.
141+
#[case::i16_crossing_zero((-2_000i16..2_000).collect())]
142+
// i64 with large negative offset.
143+
#[case::i64_large_negative((0i64..5_000).map(|i| i - 1_000_000_000_000).collect())]
144+
// Nullable signed array with values around zero.
145+
#[case::nullable_i32_crossing(PrimitiveArray::from_option_iter(
146+
(-2_000i32..2_000).map(|i| (i % 3 != 0).then_some(i)),
147+
))]
121148
fn test_compress(#[case] array: PrimitiveArray) -> VortexResult<()> {
122149
let delta = Delta::try_from_primitive_array(&array, &mut SESSION.create_execution_ctx())?;
123150
assert_eq!(delta.len(), array.len());
@@ -151,4 +178,163 @@ mod tests {
151178
assert_arrays_eq!(packed_delta_prim, array);
152179
Ok(())
153180
}
181+
182+
/// Measures compression of delta-encoded signed columns under three bit-packing strategies:
183+
/// * `naive`: bit-packing the raw delta bytes (every negative delta sets the high bits,
184+
/// so the OR mask forces `W = T`).
185+
/// * `FFoR`: subtracting the per-column `min(delta)` before bit-packing
186+
/// (`W = ceil(log2(max - min + 1))`).
187+
/// * `zigzag`: `(n << 1) ^ (n >> 31)` before bit-packing
188+
/// (`W = 1 + ceil(log2(max(|min|, |max|)))`).
189+
///
190+
/// Asserts that FFoR beats or ties naive on every workload and beats zigzag on the
191+
/// asymmetric workloads. Run with `--nocapture` to see the full table.
192+
#[test]
193+
fn synthetic_workload_compression() -> VortexResult<()> {
194+
let mut ctx = SESSION.create_execution_ctx();
195+
const N: usize = 8 * 1024; // 8 full FastLanes chunks per workload
196+
197+
let monotone: Vec<i32> = (0..N as i32).collect();
198+
// Deterministic LCG so the test is reproducible.
199+
let mut lcg = 0u32;
200+
let mut next = || {
201+
lcg = lcg.wrapping_mul(1_664_525).wrapping_add(1_013_904_223);
202+
(lcg >> 16) as i32
203+
};
204+
let sensor: Vec<i32> = (0..N).map(|_| (next() % 201) - 100).collect();
205+
let offset: Vec<i32> = (0..N as i32).map(|i| -1_000_000_000 + i).collect();
206+
let mut lcg2 = 0u32;
207+
let mut prev = 0i32;
208+
let near_monotone: Vec<i32> = (0..N)
209+
.map(|_| {
210+
lcg2 = lcg2.wrapping_mul(1_664_525).wrapping_add(1_013_904_223);
211+
let step = if (lcg2 >> 24) < 13 { -2 } else { 1 }; // ~5% backtrack
212+
prev = prev.wrapping_add(step);
213+
prev
214+
})
215+
.collect();
216+
let workloads = [
217+
("monotone i32 (0..N)", monotone),
218+
("sensor i32 in [-100, 100]", sensor),
219+
("offset i32 base=-1e9", offset),
220+
("near-monotone i32 (5% backtrack)", near_monotone),
221+
];
222+
223+
println!();
224+
println!(
225+
"{:<36} {:>10} {:>14} {:>5} {:>5} {:>5} {:>10} {:>9} {:>10} {:>5} {:>10} {:>9}",
226+
"workload",
227+
"raw (B)",
228+
"Δ range",
229+
"Wnaive",
230+
"Wffor",
231+
"Wzig",
232+
"FFoR (B)",
233+
"FFoR x",
234+
"bases (B)",
235+
"Wb",
236+
"+bcomp (B)",
237+
"+bcomp x",
238+
);
239+
println!("{}", "-".repeat(140));
240+
241+
for (name, values) in workloads {
242+
let raw_bytes = size_of_val(values.as_slice());
243+
let array = PrimitiveArray::from_iter(values);
244+
let (bases, deltas) = delta_compress(&array, &mut ctx)?;
245+
let deltas_buf: &[i32] = deltas.as_slice();
246+
let bases_buf: &[i32] = bases.as_slice();
247+
248+
let min_d = *deltas_buf.iter().min().unwrap();
249+
let max_d = *deltas_buf.iter().max().unwrap();
250+
251+
// Naive width = OR of raw u32 bit-patterns of every delta. Any negative delta
252+
// sets the high bits and forces W = 32.
253+
let or: u32 = deltas_buf.iter().fold(0u32, |a, &d| a | (d as u32));
254+
let naive_w = if or == 0 {
255+
0
256+
} else {
257+
32 - or.leading_zeros() as usize
258+
};
259+
260+
// FFoR width = ceil(log2(span)) where span = (max - min + 1).
261+
let span = (max_d as i64 - min_d as i64) as u64 + 1;
262+
let ffor_w = if span <= 1 {
263+
0
264+
} else {
265+
64 - (span - 1).leading_zeros() as usize
266+
};
267+
268+
// ZigZag width = 1 + ceil(log2(max(|min|, |max|))) for any nonzero delta.
269+
let zz_mag = (min_d.unsigned_abs()).max(max_d.unsigned_abs());
270+
let zz_w = if zz_mag == 0 {
271+
0
272+
} else {
273+
1 + (32 - zz_mag.leading_zeros() as usize)
274+
};
275+
276+
// FFoR encoded byte size: bases (already unpacked) + ref + ceil(packed bits / 8).
277+
let bases_bytes = size_of_val(bases_buf);
278+
let ref_bytes = size_of::<i32>();
279+
let packed_bits = deltas_buf.len() * ffor_w;
280+
let ffor_packed_bytes = packed_bits.div_ceil(8);
281+
let ffor_total = bases_bytes + ref_bytes + ffor_packed_bytes;
282+
let ratio = raw_bytes as f64 / ffor_total as f64;
283+
284+
// Bases compressibility: what we save if the bases child is recursively
285+
// delta-encoded or FoR-encoded. The bases are the "first row of the transposed
286+
// chunk" per lane, so they form a sub-sequence that inherits the smoothness of
287+
// the input. We approximate with FFoR over the bases alone (no recursive Delta,
288+
// which would force padding to 1024 elements per FastLanes chunk and could lose
289+
// for short base sequences).
290+
let min_b = *bases_buf.iter().min().unwrap();
291+
let max_b = *bases_buf.iter().max().unwrap();
292+
let bspan = (max_b as i64 - min_b as i64) as u64 + 1;
293+
let bases_w = if bspan <= 1 {
294+
0
295+
} else {
296+
64 - (bspan - 1).leading_zeros() as usize
297+
};
298+
let bases_compressed = (bases_buf.len() * bases_w).div_ceil(8) + ref_bytes;
299+
let total_with_bcomp = bases_compressed + ref_bytes + ffor_packed_bytes;
300+
let ratio_with_bcomp = raw_bytes as f64 / total_with_bcomp as f64;
301+
302+
println!(
303+
"{name:<36} {raw_bytes:>10} {:>14} {naive_w:>5} {ffor_w:>5} {zz_w:>5} {ffor_total:>10} {ratio:>8.2}x {bases_bytes:>10} {bases_w:>5} {total_with_bcomp:>10} {ratio_with_bcomp:>8.2}x",
304+
format!("[{min_d}, {max_d}]"),
305+
);
306+
307+
// Sanity assertions. naive_w is 32 (or near it) for any delta sequence that
308+
// contains a negative value; FFoR/ZigZag width must be strictly smaller for these
309+
// workloads.
310+
assert!(
311+
ffor_w <= naive_w.max(1),
312+
"FFoR must never exceed naive for {name}"
313+
);
314+
if min_d < 0 {
315+
assert_eq!(
316+
naive_w, 32,
317+
"any negative delta forces naive W to 32 for {name}"
318+
);
319+
assert!(ffor_w < 32, "FFoR must compress below T for {name}");
320+
}
321+
// On the asymmetric workloads (offset, near-monotone) FFoR must beat ZigZag.
322+
if min_d > 0 || max_d < 0 {
323+
assert!(
324+
ffor_w < zz_w,
325+
"FFoR should beat ZigZag on asymmetric {name}"
326+
);
327+
}
328+
// Sorted inputs => the bases inherit smoothness => the bases bit-width should be
329+
// far smaller than `T` for sorted columns.
330+
if name.starts_with("monotone") || name.starts_with("offset") {
331+
assert!(
332+
bases_w < 16,
333+
"sorted bases should pack below 16 bits for {name}"
334+
);
335+
}
336+
}
337+
338+
Ok(())
339+
}
154340
}

encodings/fastlanes/src/delta/array/delta_decompress.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use fastlanes::Transpose;
1010
use itertools::Itertools;
1111
use vortex_array::ExecutionCtx;
1212
use vortex_array::arrays::PrimitiveArray;
13+
use vortex_array::arrays::primitive::PrimitiveArrayExt;
1314
use vortex_array::dtype::NativePType;
1415
use vortex_array::match_each_unsigned_integer_ptype;
1516
use vortex_buffer::Buffer;
@@ -33,14 +34,22 @@ pub fn delta_decompress(
3334
let validity = untranspose_validity(&deltas.validity()?, ctx)?;
3435
let validity = validity.slice(start..end)?;
3536

36-
Ok(match_each_unsigned_integer_ptype!(deltas.ptype(), |T| {
37+
let original_ptype = deltas.ptype();
38+
// Signed inputs are processed through their unsigned counterpart; `wrapping_add` on the
39+
// raw bytes inverts the `wrapping_sub` done at compress time regardless of signedness.
40+
let bases = bases.reinterpret_cast(original_ptype.to_unsigned());
41+
let deltas = deltas.reinterpret_cast(original_ptype.to_unsigned());
42+
43+
let decoded = match_each_unsigned_integer_ptype!(deltas.ptype(), |T| {
3744
const LANES: usize = T::LANES;
3845

3946
let buffer = decompress_primitive::<T, LANES>(bases.as_slice(), deltas.as_slice());
4047
let buffer = buffer.slice(start..end);
4148

4249
PrimitiveArray::new(buffer, validity)
43-
}))
50+
});
51+
52+
Ok(decoded.reinterpret_cast(original_ptype))
4453
}
4554

4655
/// Performs the low-level delta decompression on primitive values.

encodings/fastlanes/src/delta/array/mod.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,21 @@ pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["bases", "deltas"];
4343
/// let array = Delta::try_from_primitive_array(&primitive, &mut session.create_execution_ctx()).unwrap();
4444
/// ```
4545
///
46+
/// Signed inputs are also supported; deltas across negative values are encoded by
47+
/// `wrapping_sub` and recovered by `wrapping_add` at decompress time:
48+
///
49+
/// ```
50+
/// use vortex_array::arrays::PrimitiveArray;
51+
/// use vortex_array::VortexSessionExecute;
52+
/// use vortex_array::session::ArraySession;
53+
/// use vortex_session::VortexSession;
54+
/// use vortex_fastlanes::Delta;
55+
///
56+
/// let session = VortexSession::empty().with::<ArraySession>();
57+
/// let primitive = PrimitiveArray::from_iter([-3_i32, -2, -1, 0, 1, 2]);
58+
/// let array = Delta::try_from_primitive_array(&primitive, &mut session.create_execution_ctx()).unwrap();
59+
/// ```
60+
///
4661
/// # Details
4762
///
4863
/// To facilitate slicing, this array accepts an `offset` and `logical_len`. The offset must be
@@ -101,5 +116,5 @@ impl DeltaData {
101116
}
102117

103118
pub(crate) fn lane_count(ptype: PType) -> usize {
104-
match_each_unsigned_integer_ptype!(ptype, |T| { T::LANES })
119+
match_each_unsigned_integer_ptype!(ptype.to_unsigned(), |T| { T::LANES })
105120
}

encodings/fastlanes/src/delta/compute/cast.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ impl CastReduce for Delta {
2424
if target_ptype.is_signed_int() || source_ptype.bit_width() > target_ptype.bit_width() {
2525
return Ok(None);
2626
}
27+
// Signed sources need a different cast policy than the lossless widening cast
28+
// used here. The delta bytes are stored as the result of `wrapping_sub`, so e.g.
29+
// a delta of -1i8 has the bit pattern 0xFF. Widening *as a value* (the cast op's
30+
// semantics) sign-extends that to 0xFFFFFFFF, which means `wrapping_add(base, delta)`
31+
// at the wider type produces a different result than at the source type — round-trip
32+
// breaks. Cross-signedness widening has the same hazard for the same reason. Fall
33+
// back to decompress-and-re-encode for both cases.
34+
if source_ptype.is_signed_int() {
35+
return Ok(None);
36+
}
2737

2838
// Cast both bases and deltas to the target type
2939
let casted_bases = array.bases().cast(dtype.with_nullability(NonNullable))?;

encodings/fastlanes/src/delta/vtable/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
use std::hash::Hash;
55
use std::hash::Hasher;
66

7-
use fastlanes::FastLanes;
87
use prost::Message;
98
use vortex_array::Array;
109
use vortex_array::ArrayEq;
@@ -21,7 +20,6 @@ use vortex_array::arrays::PrimitiveArray;
2120
use vortex_array::buffer::BufferHandle;
2221
use vortex_array::dtype::DType;
2322
use vortex_array::dtype::PType;
24-
use vortex_array::match_each_unsigned_integer_ptype;
2523
use vortex_array::serde::ArrayChildren;
2624
use vortex_array::smallvec::smallvec;
2725
use vortex_array::vtable::VTable;
@@ -156,7 +154,7 @@ impl VTable for Delta {
156154
);
157155
let metadata = DeltaMetadata::decode(metadata)?;
158156
let ptype = PType::try_from(dtype)?;
159-
let lanes = match_each_unsigned_integer_ptype!(ptype, |T| { <T as FastLanes>::LANES });
157+
let lanes = lane_count(ptype);
160158

161159
// Compute the length of the bases array
162160
let deltas_len = usize::try_from(metadata.deltas_len)
@@ -227,8 +225,8 @@ fn validate_parts(
227225
);
228226

229227
vortex_ensure!(
230-
bases.dtype().is_unsigned_int(),
231-
"DeltaArray: dtype must be an unsigned integer, got {}",
228+
bases.dtype().is_int(),
229+
"DeltaArray: dtype must be an integer, got {}",
232230
bases.dtype()
233231
);
234232

encodings/fastlanes/src/delta/vtable/operations.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,11 @@ mod tests {
248248
#[case::delta_large_u64((0u64..2048).collect())]
249249
// Single element
250250
#[case::delta_single(PrimitiveArray::new(buffer![42u32], Validity::NonNullable))]
251+
// Signed inputs (added with signed-delta support).
252+
#[case::delta_i32_crossing_zero((-100i32..100).collect())]
253+
#[case::delta_i64_negative((0i64..100).map(|i| -i * 10).collect())]
254+
#[case::delta_large_i32((-1024i32..1024).collect())]
255+
#[case::delta_single_negative(PrimitiveArray::new(buffer![-42i32], Validity::NonNullable))]
251256
fn test_delta_consistency(#[case] array: PrimitiveArray) {
252257
test_array_consistency(&da(&array).into_array());
253258
}
@@ -258,6 +263,8 @@ mod tests {
258263
#[case::delta_u32_basic(PrimitiveArray::new(buffer![1u32, 1, 1, 1, 1], Validity::NonNullable))]
259264
#[case::delta_u64_basic(PrimitiveArray::new(buffer![1u64, 1, 1, 1, 1], Validity::NonNullable))]
260265
#[case::delta_u32_large(PrimitiveArray::new(buffer![1u32; 100], Validity::NonNullable))]
266+
#[case::delta_i8_basic(PrimitiveArray::new(buffer![-1i8, -1, -1, -1, -1], Validity::NonNullable))]
267+
#[case::delta_i32_basic(PrimitiveArray::new(buffer![-1i32, -1, -1, -1, -1], Validity::NonNullable))]
261268
fn test_delta_binary_numeric(#[case] array: PrimitiveArray) {
262269
test_binary_numeric_array(da(&array).into_array());
263270
}

0 commit comments

Comments
 (0)