Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 190 additions & 4 deletions encodings/fastlanes/src/delta/array/delta_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use fastlanes::FastLanes;
use fastlanes::Transpose;
use vortex_array::ExecutionCtx;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::primitive::PrimitiveArrayExt;
use vortex_array::dtype::NativePType;
use vortex_array::match_each_unsigned_integer_ptype;
use vortex_buffer::Buffer;
Expand All @@ -23,6 +24,9 @@ pub fn delta_compress(
ctx: &mut ExecutionCtx,
) -> VortexResult<(PrimitiveArray, PrimitiveArray)> {
let validity = array.validity()?;
let original_ptype = array.ptype();
let array = array.reinterpret_cast(original_ptype.to_unsigned());

let (bases, deltas) = match_each_unsigned_integer_ptype!(array.ptype(), |T| {
// Fill-forward null values so that transposed deltas at null positions remain
// small. Without this, bitpacking may skip patches for null positions, and the
Expand All @@ -37,7 +41,10 @@ pub fn delta_compress(
)
});

Ok((bases, deltas))
Ok((
bases.reinterpret_cast(original_ptype),
deltas.reinterpret_cast(original_ptype),
))
}

fn compress_primitive<T, const LANES: usize>(array: &[T]) -> (Buffer<T>, Buffer<T>)
Expand Down Expand Up @@ -113,11 +120,31 @@ mod tests {
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());

#[rstest]
#[case((0u32..10_000).collect())]
#[case((0..10_000).map(|i| (i % (u8::MAX as i32)) as u8).collect())]
#[case(PrimitiveArray::from_option_iter(
#[case::u32((0u32..10_000).collect())]
#[case::u8((0..10_000).map(|i| (i % (u8::MAX as i32)) as u8).collect())]
#[case::nullable_u32(PrimitiveArray::from_option_iter(
(0u32..10_000).map(|i| (i % 2 == 0).then_some(i)),
))]
// Signed inputs that stay non-negative: encoded deltas are identical to the u32 case
// bit-for-bit, but the buffer's dtype carries the signedness through round-trip.
#[case::i32_non_negative((0i32..10_000).collect())]
// Signed inputs crossing zero: deltas alternate in sign, which under wrapping_sub
// populates the high bits of negative deltas. Bit-packing without preprocessing
// would explode here, but round-tripping the raw delta buffer is still correct.
#[case::i32_crossing_zero((-5_000i32..5_000).collect())]
// All-negative signed values.
#[case::i32_all_negative((-10_000i32..0).collect())]
// i8 across the full type range: tests T::MIN / T::MAX boundaries and the
// remainder-padded chunk path (256 < FL_CHUNK_SIZE).
#[case::i8_full_range((i8::MIN..=i8::MAX).collect())]
// i16 crossing zero.
#[case::i16_crossing_zero((-2_000i16..2_000).collect())]
// i64 with large negative offset.
#[case::i64_large_negative((0i64..5_000).map(|i| i - 1_000_000_000_000).collect())]
// Nullable signed array with values around zero.
#[case::nullable_i32_crossing(PrimitiveArray::from_option_iter(
(-2_000i32..2_000).map(|i| (i % 3 != 0).then_some(i)),
))]
fn test_compress(#[case] array: PrimitiveArray) -> VortexResult<()> {
let delta = Delta::try_from_primitive_array(&array, &mut SESSION.create_execution_ctx())?;
assert_eq!(delta.len(), array.len());
Expand Down Expand Up @@ -151,4 +178,163 @@ mod tests {
assert_arrays_eq!(packed_delta_prim, array);
Ok(())
}

/// Measures compression of delta-encoded signed columns under three bit-packing strategies:
/// * `naive`: bit-packing the raw delta bytes (every negative delta sets the high bits,
/// so the OR mask forces `W = T`).
/// * `FFoR`: subtracting the per-column `min(delta)` before bit-packing
/// (`W = ceil(log2(max - min + 1))`).
/// * `zigzag`: `(n << 1) ^ (n >> 31)` before bit-packing
/// (`W = 1 + ceil(log2(max(|min|, |max|)))`).
///
/// Asserts that FFoR beats or ties naive on every workload and beats zigzag on the
/// asymmetric workloads. Run with `--nocapture` to see the full table.
#[test]
fn synthetic_workload_compression() -> VortexResult<()> {
let mut ctx = SESSION.create_execution_ctx();
const N: usize = 8 * 1024; // 8 full FastLanes chunks per workload

let monotone: Vec<i32> = (0..N as i32).collect();
// Deterministic LCG so the test is reproducible.
let mut lcg = 0u32;
let mut next = || {
lcg = lcg.wrapping_mul(1_664_525).wrapping_add(1_013_904_223);
(lcg >> 16) as i32
};
let sensor: Vec<i32> = (0..N).map(|_| (next() % 201) - 100).collect();
let offset: Vec<i32> = (0..N as i32).map(|i| -1_000_000_000 + i).collect();
let mut lcg2 = 0u32;
let mut prev = 0i32;
let near_monotone: Vec<i32> = (0..N)
.map(|_| {
lcg2 = lcg2.wrapping_mul(1_664_525).wrapping_add(1_013_904_223);
let step = if (lcg2 >> 24) < 13 { -2 } else { 1 }; // ~5% backtrack
prev = prev.wrapping_add(step);
prev
})
.collect();
let workloads = [
("monotone i32 (0..N)", monotone),
("sensor i32 in [-100, 100]", sensor),
("offset i32 base=-1e9", offset),
("near-monotone i32 (5% backtrack)", near_monotone),
];

println!();
println!(
"{:<36} {:>10} {:>14} {:>5} {:>5} {:>5} {:>10} {:>9} {:>10} {:>5} {:>10} {:>9}",
"workload",
"raw (B)",
"Δ range",
"Wnaive",
"Wffor",
"Wzig",
"FFoR (B)",
"FFoR x",
"bases (B)",
"Wb",
"+bcomp (B)",
"+bcomp x",
);
println!("{}", "-".repeat(140));

for (name, values) in workloads {
let raw_bytes = size_of_val(values.as_slice());
let array = PrimitiveArray::from_iter(values);
let (bases, deltas) = delta_compress(&array, &mut ctx)?;
let deltas_buf: &[i32] = deltas.as_slice();
let bases_buf: &[i32] = bases.as_slice();

let min_d = *deltas_buf.iter().min().unwrap();
let max_d = *deltas_buf.iter().max().unwrap();

// Naive width = OR of raw u32 bit-patterns of every delta. Any negative delta
// sets the high bits and forces W = 32.
let or: u32 = deltas_buf.iter().fold(0u32, |a, &d| a | (d as u32));
let naive_w = if or == 0 {
0
} else {
32 - or.leading_zeros() as usize
};

// FFoR width = ceil(log2(span)) where span = (max - min + 1).
let span = (max_d as i64 - min_d as i64) as u64 + 1;
let ffor_w = if span <= 1 {
0
} else {
64 - (span - 1).leading_zeros() as usize
};

// ZigZag width = 1 + ceil(log2(max(|min|, |max|))) for any nonzero delta.
let zz_mag = (min_d.unsigned_abs()).max(max_d.unsigned_abs());
let zz_w = if zz_mag == 0 {
0
} else {
1 + (32 - zz_mag.leading_zeros() as usize)
};

// FFoR encoded byte size: bases (already unpacked) + ref + ceil(packed bits / 8).
let bases_bytes = size_of_val(bases_buf);
let ref_bytes = size_of::<i32>();
let packed_bits = deltas_buf.len() * ffor_w;
let ffor_packed_bytes = packed_bits.div_ceil(8);
let ffor_total = bases_bytes + ref_bytes + ffor_packed_bytes;
let ratio = raw_bytes as f64 / ffor_total as f64;

// Bases compressibility: what we save if the bases child is recursively
// delta-encoded or FoR-encoded. The bases are the "first row of the transposed
// chunk" per lane, so they form a sub-sequence that inherits the smoothness of
// the input. We approximate with FFoR over the bases alone (no recursive Delta,
// which would force padding to 1024 elements per FastLanes chunk and could lose
// for short base sequences).
let min_b = *bases_buf.iter().min().unwrap();
let max_b = *bases_buf.iter().max().unwrap();
let bspan = (max_b as i64 - min_b as i64) as u64 + 1;
let bases_w = if bspan <= 1 {
0
} else {
64 - (bspan - 1).leading_zeros() as usize
};
let bases_compressed = (bases_buf.len() * bases_w).div_ceil(8) + ref_bytes;
let total_with_bcomp = bases_compressed + ref_bytes + ffor_packed_bytes;
let ratio_with_bcomp = raw_bytes as f64 / total_with_bcomp as f64;

println!(
"{name:<36} {raw_bytes:>10} {:>14} {naive_w:>5} {ffor_w:>5} {zz_w:>5} {ffor_total:>10} {ratio:>8.2}x {bases_bytes:>10} {bases_w:>5} {total_with_bcomp:>10} {ratio_with_bcomp:>8.2}x",
format!("[{min_d}, {max_d}]"),
);

// Sanity assertions. naive_w is 32 (or near it) for any delta sequence that
// contains a negative value; FFoR/ZigZag width must be strictly smaller for these
// workloads.
assert!(
ffor_w <= naive_w.max(1),
"FFoR must never exceed naive for {name}"
);
if min_d < 0 {
assert_eq!(
naive_w, 32,
"any negative delta forces naive W to 32 for {name}"
);
assert!(ffor_w < 32, "FFoR must compress below T for {name}");
}
// On the asymmetric workloads (offset, near-monotone) FFoR must beat ZigZag.
if min_d > 0 || max_d < 0 {
assert!(
ffor_w < zz_w,
"FFoR should beat ZigZag on asymmetric {name}"
);
}
// Sorted inputs => the bases inherit smoothness => the bases bit-width should be
// far smaller than `T` for sorted columns.
if name.starts_with("monotone") || name.starts_with("offset") {
assert!(
bases_w < 16,
"sorted bases should pack below 16 bits for {name}"
);
}
}

Ok(())
}
}
13 changes: 11 additions & 2 deletions encodings/fastlanes/src/delta/array/delta_decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use fastlanes::Transpose;
use itertools::Itertools;
use vortex_array::ExecutionCtx;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::primitive::PrimitiveArrayExt;
use vortex_array::dtype::NativePType;
use vortex_array::match_each_unsigned_integer_ptype;
use vortex_buffer::Buffer;
Expand All @@ -33,14 +34,22 @@ pub fn delta_decompress(
let validity = untranspose_validity(&deltas.validity()?, ctx)?;
let validity = validity.slice(start..end)?;

Ok(match_each_unsigned_integer_ptype!(deltas.ptype(), |T| {
let original_ptype = deltas.ptype();
// Signed inputs are processed through their unsigned counterpart; `wrapping_add` on the
// raw bytes inverts the `wrapping_sub` done at compress time regardless of signedness.
let bases = bases.reinterpret_cast(original_ptype.to_unsigned());
let deltas = deltas.reinterpret_cast(original_ptype.to_unsigned());

let decoded = match_each_unsigned_integer_ptype!(deltas.ptype(), |T| {
const LANES: usize = T::LANES;

let buffer = decompress_primitive::<T, LANES>(bases.as_slice(), deltas.as_slice());
let buffer = buffer.slice(start..end);

PrimitiveArray::new(buffer, validity)
}))
});

Ok(decoded.reinterpret_cast(original_ptype))
}

/// Performs the low-level delta decompression on primitive values.
Expand Down
17 changes: 16 additions & 1 deletion encodings/fastlanes/src/delta/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["bases", "deltas"];
/// let array = Delta::try_from_primitive_array(&primitive, &mut session.create_execution_ctx()).unwrap();
/// ```
///
/// Signed inputs are also supported; deltas across negative values are encoded by
/// `wrapping_sub` and recovered by `wrapping_add` at decompress time:
///
/// ```
/// use vortex_array::arrays::PrimitiveArray;
/// use vortex_array::VortexSessionExecute;
/// use vortex_array::session::ArraySession;
/// use vortex_session::VortexSession;
/// use vortex_fastlanes::Delta;
///
/// let session = VortexSession::empty().with::<ArraySession>();
/// let primitive = PrimitiveArray::from_iter([-3_i32, -2, -1, 0, 1, 2]);
/// let array = Delta::try_from_primitive_array(&primitive, &mut session.create_execution_ctx()).unwrap();
/// ```
///
/// # Details
///
/// To facilitate slicing, this array accepts an `offset` and `logical_len`. The offset must be
Expand Down Expand Up @@ -101,5 +116,5 @@ impl DeltaData {
}

pub(crate) fn lane_count(ptype: PType) -> usize {
match_each_unsigned_integer_ptype!(ptype, |T| { T::LANES })
match_each_unsigned_integer_ptype!(ptype.to_unsigned(), |T| { T::LANES })
}
10 changes: 10 additions & 0 deletions encodings/fastlanes/src/delta/compute/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ impl CastReduce for Delta {
if target_ptype.is_signed_int() || source_ptype.bit_width() > target_ptype.bit_width() {
return Ok(None);
}
// Signed sources need a different cast policy than the lossless widening cast
// used here. The delta bytes are stored as the result of `wrapping_sub`, so e.g.
// a delta of -1i8 has the bit pattern 0xFF. Widening *as a value* (the cast op's
// semantics) sign-extends that to 0xFFFFFFFF, which means `wrapping_add(base, delta)`
// at the wider type produces a different result than at the source type — round-trip
// breaks. Cross-signedness widening has the same hazard for the same reason. Fall
// back to decompress-and-re-encode for both cases.
if source_ptype.is_signed_int() {
return Ok(None);
}

// Cast both bases and deltas to the target type
let casted_bases = array.bases().cast(dtype.with_nullability(NonNullable))?;
Expand Down
8 changes: 3 additions & 5 deletions encodings/fastlanes/src/delta/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use std::hash::Hash;
use std::hash::Hasher;

use fastlanes::FastLanes;
use prost::Message;
use vortex_array::Array;
use vortex_array::ArrayEq;
Expand All @@ -21,7 +20,6 @@ use vortex_array::arrays::PrimitiveArray;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::dtype::PType;
use vortex_array::match_each_unsigned_integer_ptype;
use vortex_array::serde::ArrayChildren;
use vortex_array::smallvec::smallvec;
use vortex_array::vtable::VTable;
Expand Down Expand Up @@ -156,7 +154,7 @@ impl VTable for Delta {
);
let metadata = DeltaMetadata::decode(metadata)?;
let ptype = PType::try_from(dtype)?;
let lanes = match_each_unsigned_integer_ptype!(ptype, |T| { <T as FastLanes>::LANES });
let lanes = lane_count(ptype);

// Compute the length of the bases array
let deltas_len = usize::try_from(metadata.deltas_len)
Expand Down Expand Up @@ -227,8 +225,8 @@ fn validate_parts(
);

vortex_ensure!(
bases.dtype().is_unsigned_int(),
"DeltaArray: dtype must be an unsigned integer, got {}",
bases.dtype().is_int(),
"DeltaArray: dtype must be an integer, got {}",
bases.dtype()
);

Expand Down
7 changes: 7 additions & 0 deletions encodings/fastlanes/src/delta/vtable/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,11 @@ mod tests {
#[case::delta_large_u64((0u64..2048).collect())]
// Single element
#[case::delta_single(PrimitiveArray::new(buffer![42u32], Validity::NonNullable))]
// Signed inputs (added with signed-delta support).
#[case::delta_i32_crossing_zero((-100i32..100).collect())]
#[case::delta_i64_negative((0i64..100).map(|i| -i * 10).collect())]
#[case::delta_large_i32((-1024i32..1024).collect())]
#[case::delta_single_negative(PrimitiveArray::new(buffer![-42i32], Validity::NonNullable))]
fn test_delta_consistency(#[case] array: PrimitiveArray) {
test_array_consistency(&da(&array).into_array());
}
Expand All @@ -258,6 +263,8 @@ mod tests {
#[case::delta_u32_basic(PrimitiveArray::new(buffer![1u32, 1, 1, 1, 1], Validity::NonNullable))]
#[case::delta_u64_basic(PrimitiveArray::new(buffer![1u64, 1, 1, 1, 1], Validity::NonNullable))]
#[case::delta_u32_large(PrimitiveArray::new(buffer![1u32; 100], Validity::NonNullable))]
#[case::delta_i8_basic(PrimitiveArray::new(buffer![-1i8, -1, -1, -1, -1], Validity::NonNullable))]
#[case::delta_i32_basic(PrimitiveArray::new(buffer![-1i32, -1, -1, -1, -1], Validity::NonNullable))]
fn test_delta_binary_numeric(#[case] array: PrimitiveArray) {
test_binary_numeric_array(da(&array).into_array());
}
Expand Down
Loading