diff --git a/encodings/fastlanes/src/delta/array/delta_compress.rs b/encodings/fastlanes/src/delta/array/delta_compress.rs index d51cef72b49..e35778ad29e 100644 --- a/encodings/fastlanes/src/delta/array/delta_compress.rs +++ b/encodings/fastlanes/src/delta/array/delta_compress.rs @@ -9,6 +9,7 @@ use fastlanes::FastLanes; use fastlanes::Transpose; use vortex_array::ExecutionCtx; use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::primitive::PrimitiveArrayExt; use vortex_array::dtype::NativePType; use vortex_array::match_each_unsigned_integer_ptype; use vortex_buffer::Buffer; @@ -23,6 +24,9 @@ pub fn delta_compress( ctx: &mut ExecutionCtx, ) -> VortexResult<(PrimitiveArray, PrimitiveArray)> { let validity = array.validity()?; + let original_ptype = array.ptype(); + let array = array.reinterpret_cast(original_ptype.to_unsigned()); + let (bases, deltas) = match_each_unsigned_integer_ptype!(array.ptype(), |T| { // Fill-forward null values so that transposed deltas at null positions remain // small. Without this, bitpacking may skip patches for null positions, and the @@ -37,7 +41,10 @@ pub fn delta_compress( ) }); - Ok((bases, deltas)) + Ok(( + bases.reinterpret_cast(original_ptype), + deltas.reinterpret_cast(original_ptype), + )) } fn compress_primitive(array: &[T]) -> (Buffer, Buffer) @@ -113,11 +120,31 @@ mod tests { LazyLock::new(|| VortexSession::empty().with::()); #[rstest] - #[case((0u32..10_000).collect())] - #[case((0..10_000).map(|i| (i % (u8::MAX as i32)) as u8).collect())] - #[case(PrimitiveArray::from_option_iter( + #[case::u32((0u32..10_000).collect())] + #[case::u8((0..10_000).map(|i| (i % (u8::MAX as i32)) as u8).collect())] + #[case::nullable_u32(PrimitiveArray::from_option_iter( (0u32..10_000).map(|i| (i % 2 == 0).then_some(i)), ))] + // Signed inputs that stay non-negative: encoded deltas are identical to the u32 case + // bit-for-bit, but the buffer's dtype carries the signedness through round-trip. + #[case::i32_non_negative((0i32..10_000).collect())] + // Signed inputs crossing zero: deltas alternate in sign, which under wrapping_sub + // populates the high bits of negative deltas. Bit-packing without preprocessing + // would explode here, but round-tripping the raw delta buffer is still correct. + #[case::i32_crossing_zero((-5_000i32..5_000).collect())] + // All-negative signed values. + #[case::i32_all_negative((-10_000i32..0).collect())] + // i8 across the full type range: tests T::MIN / T::MAX boundaries and the + // remainder-padded chunk path (256 < FL_CHUNK_SIZE). + #[case::i8_full_range((i8::MIN..=i8::MAX).collect())] + // i16 crossing zero. + #[case::i16_crossing_zero((-2_000i16..2_000).collect())] + // i64 with large negative offset. + #[case::i64_large_negative((0i64..5_000).map(|i| i - 1_000_000_000_000).collect())] + // Nullable signed array with values around zero. + #[case::nullable_i32_crossing(PrimitiveArray::from_option_iter( + (-2_000i32..2_000).map(|i| (i % 3 != 0).then_some(i)), + ))] fn test_compress(#[case] array: PrimitiveArray) -> VortexResult<()> { let delta = Delta::try_from_primitive_array(&array, &mut SESSION.create_execution_ctx())?; assert_eq!(delta.len(), array.len()); @@ -151,4 +178,163 @@ mod tests { assert_arrays_eq!(packed_delta_prim, array); Ok(()) } + + /// Measures compression of delta-encoded signed columns under three bit-packing strategies: + /// * `naive`: bit-packing the raw delta bytes (every negative delta sets the high bits, + /// so the OR mask forces `W = T`). + /// * `FFoR`: subtracting the per-column `min(delta)` before bit-packing + /// (`W = ceil(log2(max - min + 1))`). + /// * `zigzag`: `(n << 1) ^ (n >> 31)` before bit-packing + /// (`W = 1 + ceil(log2(max(|min|, |max|)))`). + /// + /// Asserts that FFoR beats or ties naive on every workload and beats zigzag on the + /// asymmetric workloads. Run with `--nocapture` to see the full table. + #[test] + fn synthetic_workload_compression() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + const N: usize = 8 * 1024; // 8 full FastLanes chunks per workload + + let monotone: Vec = (0..N as i32).collect(); + // Deterministic LCG so the test is reproducible. + let mut lcg = 0u32; + let mut next = || { + lcg = lcg.wrapping_mul(1_664_525).wrapping_add(1_013_904_223); + (lcg >> 16) as i32 + }; + let sensor: Vec = (0..N).map(|_| (next() % 201) - 100).collect(); + let offset: Vec = (0..N as i32).map(|i| -1_000_000_000 + i).collect(); + let mut lcg2 = 0u32; + let mut prev = 0i32; + let near_monotone: Vec = (0..N) + .map(|_| { + lcg2 = lcg2.wrapping_mul(1_664_525).wrapping_add(1_013_904_223); + let step = if (lcg2 >> 24) < 13 { -2 } else { 1 }; // ~5% backtrack + prev = prev.wrapping_add(step); + prev + }) + .collect(); + let workloads = [ + ("monotone i32 (0..N)", monotone), + ("sensor i32 in [-100, 100]", sensor), + ("offset i32 base=-1e9", offset), + ("near-monotone i32 (5% backtrack)", near_monotone), + ]; + + println!(); + println!( + "{:<36} {:>10} {:>14} {:>5} {:>5} {:>5} {:>10} {:>9} {:>10} {:>5} {:>10} {:>9}", + "workload", + "raw (B)", + "Δ range", + "Wnaive", + "Wffor", + "Wzig", + "FFoR (B)", + "FFoR x", + "bases (B)", + "Wb", + "+bcomp (B)", + "+bcomp x", + ); + println!("{}", "-".repeat(140)); + + for (name, values) in workloads { + let raw_bytes = size_of_val(values.as_slice()); + let array = PrimitiveArray::from_iter(values); + let (bases, deltas) = delta_compress(&array, &mut ctx)?; + let deltas_buf: &[i32] = deltas.as_slice(); + let bases_buf: &[i32] = bases.as_slice(); + + let min_d = *deltas_buf.iter().min().unwrap(); + let max_d = *deltas_buf.iter().max().unwrap(); + + // Naive width = OR of raw u32 bit-patterns of every delta. Any negative delta + // sets the high bits and forces W = 32. + let or: u32 = deltas_buf.iter().fold(0u32, |a, &d| a | (d as u32)); + let naive_w = if or == 0 { + 0 + } else { + 32 - or.leading_zeros() as usize + }; + + // FFoR width = ceil(log2(span)) where span = (max - min + 1). + let span = (max_d as i64 - min_d as i64) as u64 + 1; + let ffor_w = if span <= 1 { + 0 + } else { + 64 - (span - 1).leading_zeros() as usize + }; + + // ZigZag width = 1 + ceil(log2(max(|min|, |max|))) for any nonzero delta. + let zz_mag = (min_d.unsigned_abs()).max(max_d.unsigned_abs()); + let zz_w = if zz_mag == 0 { + 0 + } else { + 1 + (32 - zz_mag.leading_zeros() as usize) + }; + + // FFoR encoded byte size: bases (already unpacked) + ref + ceil(packed bits / 8). + let bases_bytes = size_of_val(bases_buf); + let ref_bytes = size_of::(); + let packed_bits = deltas_buf.len() * ffor_w; + let ffor_packed_bytes = packed_bits.div_ceil(8); + let ffor_total = bases_bytes + ref_bytes + ffor_packed_bytes; + let ratio = raw_bytes as f64 / ffor_total as f64; + + // Bases compressibility: what we save if the bases child is recursively + // delta-encoded or FoR-encoded. The bases are the "first row of the transposed + // chunk" per lane, so they form a sub-sequence that inherits the smoothness of + // the input. We approximate with FFoR over the bases alone (no recursive Delta, + // which would force padding to 1024 elements per FastLanes chunk and could lose + // for short base sequences). + let min_b = *bases_buf.iter().min().unwrap(); + let max_b = *bases_buf.iter().max().unwrap(); + let bspan = (max_b as i64 - min_b as i64) as u64 + 1; + let bases_w = if bspan <= 1 { + 0 + } else { + 64 - (bspan - 1).leading_zeros() as usize + }; + let bases_compressed = (bases_buf.len() * bases_w).div_ceil(8) + ref_bytes; + let total_with_bcomp = bases_compressed + ref_bytes + ffor_packed_bytes; + let ratio_with_bcomp = raw_bytes as f64 / total_with_bcomp as f64; + + println!( + "{name:<36} {raw_bytes:>10} {:>14} {naive_w:>5} {ffor_w:>5} {zz_w:>5} {ffor_total:>10} {ratio:>8.2}x {bases_bytes:>10} {bases_w:>5} {total_with_bcomp:>10} {ratio_with_bcomp:>8.2}x", + format!("[{min_d}, {max_d}]"), + ); + + // Sanity assertions. naive_w is 32 (or near it) for any delta sequence that + // contains a negative value; FFoR/ZigZag width must be strictly smaller for these + // workloads. + assert!( + ffor_w <= naive_w.max(1), + "FFoR must never exceed naive for {name}" + ); + if min_d < 0 { + assert_eq!( + naive_w, 32, + "any negative delta forces naive W to 32 for {name}" + ); + assert!(ffor_w < 32, "FFoR must compress below T for {name}"); + } + // On the asymmetric workloads (offset, near-monotone) FFoR must beat ZigZag. + if min_d > 0 || max_d < 0 { + assert!( + ffor_w < zz_w, + "FFoR should beat ZigZag on asymmetric {name}" + ); + } + // Sorted inputs => the bases inherit smoothness => the bases bit-width should be + // far smaller than `T` for sorted columns. + if name.starts_with("monotone") || name.starts_with("offset") { + assert!( + bases_w < 16, + "sorted bases should pack below 16 bits for {name}" + ); + } + } + + Ok(()) + } } diff --git a/encodings/fastlanes/src/delta/array/delta_decompress.rs b/encodings/fastlanes/src/delta/array/delta_decompress.rs index d6404dfb905..fe2567e63c7 100644 --- a/encodings/fastlanes/src/delta/array/delta_decompress.rs +++ b/encodings/fastlanes/src/delta/array/delta_decompress.rs @@ -10,6 +10,7 @@ use fastlanes::Transpose; use itertools::Itertools; use vortex_array::ExecutionCtx; use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::primitive::PrimitiveArrayExt; use vortex_array::dtype::NativePType; use vortex_array::match_each_unsigned_integer_ptype; use vortex_buffer::Buffer; @@ -33,14 +34,22 @@ pub fn delta_decompress( let validity = untranspose_validity(&deltas.validity()?, ctx)?; let validity = validity.slice(start..end)?; - Ok(match_each_unsigned_integer_ptype!(deltas.ptype(), |T| { + let original_ptype = deltas.ptype(); + // Signed inputs are processed through their unsigned counterpart; `wrapping_add` on the + // raw bytes inverts the `wrapping_sub` done at compress time regardless of signedness. + let bases = bases.reinterpret_cast(original_ptype.to_unsigned()); + let deltas = deltas.reinterpret_cast(original_ptype.to_unsigned()); + + let decoded = match_each_unsigned_integer_ptype!(deltas.ptype(), |T| { const LANES: usize = T::LANES; let buffer = decompress_primitive::(bases.as_slice(), deltas.as_slice()); let buffer = buffer.slice(start..end); PrimitiveArray::new(buffer, validity) - })) + }); + + Ok(decoded.reinterpret_cast(original_ptype)) } /// Performs the low-level delta decompression on primitive values. diff --git a/encodings/fastlanes/src/delta/array/mod.rs b/encodings/fastlanes/src/delta/array/mod.rs index 4e96edc59ca..33ece0deddd 100644 --- a/encodings/fastlanes/src/delta/array/mod.rs +++ b/encodings/fastlanes/src/delta/array/mod.rs @@ -43,6 +43,21 @@ pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["bases", "deltas"]; /// let array = Delta::try_from_primitive_array(&primitive, &mut session.create_execution_ctx()).unwrap(); /// ``` /// +/// Signed inputs are also supported; deltas across negative values are encoded by +/// `wrapping_sub` and recovered by `wrapping_add` at decompress time: +/// +/// ``` +/// use vortex_array::arrays::PrimitiveArray; +/// use vortex_array::VortexSessionExecute; +/// use vortex_array::session::ArraySession; +/// use vortex_session::VortexSession; +/// use vortex_fastlanes::Delta; +/// +/// let session = VortexSession::empty().with::(); +/// let primitive = PrimitiveArray::from_iter([-3_i32, -2, -1, 0, 1, 2]); +/// let array = Delta::try_from_primitive_array(&primitive, &mut session.create_execution_ctx()).unwrap(); +/// ``` +/// /// # Details /// /// To facilitate slicing, this array accepts an `offset` and `logical_len`. The offset must be @@ -101,5 +116,5 @@ impl DeltaData { } pub(crate) fn lane_count(ptype: PType) -> usize { - match_each_unsigned_integer_ptype!(ptype, |T| { T::LANES }) + match_each_unsigned_integer_ptype!(ptype.to_unsigned(), |T| { T::LANES }) } diff --git a/encodings/fastlanes/src/delta/compute/cast.rs b/encodings/fastlanes/src/delta/compute/cast.rs index 324e2fc9c22..43a247df9f0 100644 --- a/encodings/fastlanes/src/delta/compute/cast.rs +++ b/encodings/fastlanes/src/delta/compute/cast.rs @@ -24,6 +24,16 @@ impl CastReduce for Delta { if target_ptype.is_signed_int() || source_ptype.bit_width() > target_ptype.bit_width() { return Ok(None); } + // Signed sources need a different cast policy than the lossless widening cast + // used here. The delta bytes are stored as the result of `wrapping_sub`, so e.g. + // a delta of -1i8 has the bit pattern 0xFF. Widening *as a value* (the cast op's + // semantics) sign-extends that to 0xFFFFFFFF, which means `wrapping_add(base, delta)` + // at the wider type produces a different result than at the source type — round-trip + // breaks. Cross-signedness widening has the same hazard for the same reason. Fall + // back to decompress-and-re-encode for both cases. + if source_ptype.is_signed_int() { + return Ok(None); + } // Cast both bases and deltas to the target type let casted_bases = array.bases().cast(dtype.with_nullability(NonNullable))?; diff --git a/encodings/fastlanes/src/delta/vtable/mod.rs b/encodings/fastlanes/src/delta/vtable/mod.rs index 96b6ec27e0c..b5e68791ceb 100644 --- a/encodings/fastlanes/src/delta/vtable/mod.rs +++ b/encodings/fastlanes/src/delta/vtable/mod.rs @@ -4,7 +4,6 @@ use std::hash::Hash; use std::hash::Hasher; -use fastlanes::FastLanes; use prost::Message; use vortex_array::Array; use vortex_array::ArrayEq; @@ -21,7 +20,6 @@ use vortex_array::arrays::PrimitiveArray; use vortex_array::buffer::BufferHandle; use vortex_array::dtype::DType; use vortex_array::dtype::PType; -use vortex_array::match_each_unsigned_integer_ptype; use vortex_array::serde::ArrayChildren; use vortex_array::smallvec::smallvec; use vortex_array::vtable::VTable; @@ -156,7 +154,7 @@ impl VTable for Delta { ); let metadata = DeltaMetadata::decode(metadata)?; let ptype = PType::try_from(dtype)?; - let lanes = match_each_unsigned_integer_ptype!(ptype, |T| { ::LANES }); + let lanes = lane_count(ptype); // Compute the length of the bases array let deltas_len = usize::try_from(metadata.deltas_len) @@ -227,8 +225,8 @@ fn validate_parts( ); vortex_ensure!( - bases.dtype().is_unsigned_int(), - "DeltaArray: dtype must be an unsigned integer, got {}", + bases.dtype().is_int(), + "DeltaArray: dtype must be an integer, got {}", bases.dtype() ); diff --git a/encodings/fastlanes/src/delta/vtable/operations.rs b/encodings/fastlanes/src/delta/vtable/operations.rs index 943f11379ac..785cdf6aca8 100644 --- a/encodings/fastlanes/src/delta/vtable/operations.rs +++ b/encodings/fastlanes/src/delta/vtable/operations.rs @@ -248,6 +248,11 @@ mod tests { #[case::delta_large_u64((0u64..2048).collect())] // Single element #[case::delta_single(PrimitiveArray::new(buffer![42u32], Validity::NonNullable))] + // Signed inputs (added with signed-delta support). + #[case::delta_i32_crossing_zero((-100i32..100).collect())] + #[case::delta_i64_negative((0i64..100).map(|i| -i * 10).collect())] + #[case::delta_large_i32((-1024i32..1024).collect())] + #[case::delta_single_negative(PrimitiveArray::new(buffer![-42i32], Validity::NonNullable))] fn test_delta_consistency(#[case] array: PrimitiveArray) { test_array_consistency(&da(&array).into_array()); } @@ -258,6 +263,8 @@ mod tests { #[case::delta_u32_basic(PrimitiveArray::new(buffer![1u32, 1, 1, 1, 1], Validity::NonNullable))] #[case::delta_u64_basic(PrimitiveArray::new(buffer![1u64, 1, 1, 1, 1], Validity::NonNullable))] #[case::delta_u32_large(PrimitiveArray::new(buffer![1u32; 100], Validity::NonNullable))] + #[case::delta_i8_basic(PrimitiveArray::new(buffer![-1i8, -1, -1, -1, -1], Validity::NonNullable))] + #[case::delta_i32_basic(PrimitiveArray::new(buffer![-1i32, -1, -1, -1, -1], Validity::NonNullable))] fn test_delta_binary_numeric(#[case] array: PrimitiveArray) { test_binary_numeric_array(da(&array).into_array()); }