Skip to content

Commit 59b0083

Browse files
alp_rd: speedup (#7064)
<!-- Thank you for submitting a pull request! We appreciate your time and effort. Please make sure to provide enough information so that we can review your pull request. The Summary and Testing sections below contain guidance on what to include. --> ## Summary <!-- If this PR is related to a tracked effort, please link to the relevant issue here (e.g., `Closes: #123`). Otherwise, feel free to ignore / delete this. In this section, please: 1. Explain the rationale for this change. 2. Summarize the changes included in this PR. A general rule of thumb is that larger PRs should have larger summaries. If there are a lot of changes, please help us review the code by explaining what was changed and why. If there is an issue or discussion attached, there is no need to duplicate all the details, but clarity is always preferred over brevity. --> Closes: #000 <!-- ## API Changes Uncomment this section if there are any user-facing changes. Consider whether the change affects users in one of the following ways: 1. Breaks public APIs in some way. 2. Changes the underlying behavior of one of the engine integrations. 3. Should some documentation be updated to reflect this change? If a public API is changed in a breaking manner, make sure to add the appropriate label. You can run `./scripts/public-api.sh` locally to see if there are any public API changes (and this also runs in our CI). --> ## Testing <!-- Please describe how this change was tested. Here are some common categories for testing in Vortex: 1. Verifying existing behavior is maintained. 2. Verifying new behavior and functionality works correctly. 3. Serialization compatibility (backwards and forwards) should be maintained or explicitly broken. --> --------- Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent 6fdd0b3 commit 59b0083

File tree

4 files changed

+96
-50
lines changed

4 files changed

+96
-50
lines changed

encodings/alp/benches/alp_compress.rs

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,20 +103,47 @@ fn decompress_alp<T: ALPFloat + NativePType>(bencher: Bencher, args: (usize, f64
103103
.bench_values(|(v, mut ctx)| decompress_into_array(v, &mut ctx));
104104
}
105105

106-
#[divan::bench(types = [f32, f64], args = [10_000, 100_000])]
107-
fn compress_rd<T: ALPRDFloat>(bencher: Bencher, n: usize) {
108-
let primitive = PrimitiveArray::new(buffer![T::from(1.23).unwrap(); n], Validity::NonNullable);
109-
let encoder = RDEncoder::new(&[T::from(1.23).unwrap()]);
106+
const RD_BENCH_ARGS: &[(usize, f64)] = &[
107+
// length, fraction_patch
108+
(10_000, 0.0),
109+
(10_000, 0.01),
110+
(10_000, 0.1),
111+
(100_000, 0.0),
112+
(100_000, 0.01),
113+
(100_000, 0.1),
114+
];
115+
116+
fn make_rd_array<T: ALPRDFloat + NativePType>(n: usize, fraction_patch: f64) -> PrimitiveArray {
117+
let base_val = T::from(1.23).unwrap();
118+
let mut rng = StdRng::seed_from_u64(42);
119+
let mut values = buffer![base_val; n].into_mut();
120+
if fraction_patch > 0.0 {
121+
let outlier = T::from(1000.0).unwrap();
122+
for index in 0..values.len() {
123+
if rng.random_bool(fraction_patch) {
124+
values[index] = outlier;
125+
}
126+
}
127+
}
128+
PrimitiveArray::new(values.freeze(), Validity::NonNullable)
129+
}
130+
131+
#[divan::bench(types = [f32, f64], args = RD_BENCH_ARGS)]
132+
fn compress_rd<T: ALPRDFloat + NativePType>(bencher: Bencher, args: (usize, f64)) {
133+
let (n, fraction_patch) = args;
134+
let primitive = make_rd_array::<T>(n, fraction_patch);
135+
let encoder = RDEncoder::new(primitive.as_slice::<T>());
110136

111137
bencher
112138
.with_inputs(|| (&primitive, &encoder))
113139
.bench_refs(|(primitive, encoder)| encoder.encode(primitive))
114140
}
115141

116-
#[divan::bench(types = [f32, f64], args = [10_000, 100_000])]
117-
fn decompress_rd<T: ALPRDFloat>(bencher: Bencher, n: usize) {
118-
let primitive = PrimitiveArray::new(buffer![T::from(1.23).unwrap(); n], Validity::NonNullable);
119-
let encoder = RDEncoder::new(&[T::from(1.23).unwrap()]);
142+
#[divan::bench(types = [f32, f64], args = RD_BENCH_ARGS)]
143+
fn decompress_rd<T: ALPRDFloat + NativePType>(bencher: Bencher, args: (usize, f64)) {
144+
let (n, fraction_patch) = args;
145+
let primitive = make_rd_array::<T>(n, fraction_patch);
146+
let encoder = RDEncoder::new(primitive.as_slice::<T>());
120147
let encoded = encoder.encode(&primitive);
121148

122149
bencher

encodings/alp/public-api.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,7 @@ pub fn f64::to_u16(bits: Self::UINT) -> u16
582582

583583
pub fn vortex_alp::alp_encode(parray: &vortex_array::arrays::primitive::array::PrimitiveArray, exponents: core::option::Option<vortex_alp::Exponents>) -> vortex_error::VortexResult<vortex_alp::ALPArray>
584584

585-
pub fn vortex_alp::alp_rd_decode<T: vortex_alp::ALPRDFloat>(left_parts: vortex_buffer::buffer::Buffer<u16>, left_parts_dict: &[u16], right_bit_width: u8, right_parts: vortex_buffer::buffer::Buffer<<T as vortex_alp::ALPRDFloat>::UINT>, left_parts_patches: core::option::Option<vortex_array::patches::Patches>, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_buffer::buffer::Buffer<T>>
585+
pub fn vortex_alp::alp_rd_decode<T: vortex_alp::ALPRDFloat>(left_parts: vortex_buffer::buffer_mut::BufferMut<u16>, left_parts_dict: &[u16], right_bit_width: u8, right_parts: vortex_buffer::buffer_mut::BufferMut<<T as vortex_alp::ALPRDFloat>::UINT>, left_parts_patches: core::option::Option<vortex_array::patches::Patches>, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_buffer::buffer::Buffer<T>>
586586

587587
pub fn vortex_alp::decompress_into_array(array: vortex_alp::ALPArray, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::arrays::primitive::array::PrimitiveArray>
588588

encodings/alp/src/alp_rd/array.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -307,10 +307,10 @@ impl VTable for ALPRD {
307307
let decoded_array = if ptype == PType::F32 {
308308
PrimitiveArray::new(
309309
alp_rd_decode::<f32>(
310-
left_parts.into_buffer::<u16>(),
310+
left_parts.into_buffer_mut::<u16>(),
311311
&left_parts_dict,
312312
right_bit_width,
313-
right_parts.into_buffer::<u32>(),
313+
right_parts.into_buffer_mut::<u32>(),
314314
left_parts_patches,
315315
ctx,
316316
)?,
@@ -319,10 +319,10 @@ impl VTable for ALPRD {
319319
} else {
320320
PrimitiveArray::new(
321321
alp_rd_decode::<f64>(
322-
left_parts.into_buffer::<u16>(),
322+
left_parts.into_buffer_mut::<u16>(),
323323
&left_parts_dict,
324324
right_bit_width,
325-
right_parts.into_buffer::<u64>(),
325+
right_parts.into_buffer_mut::<u64>(),
326326
left_parts_patches,
327327
ctx,
328328
)?,

encodings/alp/src/alp_rd/mod.rs

Lines changed: 56 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -284,44 +284,68 @@ impl RDEncoder {
284284
}
285285
}
286286

287-
/// Decode a vector of ALP-RD encoded values back into their original floating point format.
287+
/// Decode ALP-RD encoded values back into their original floating point format.
288288
///
289289
/// # Panics
290290
///
291-
/// The function panics if the provided `left_parts` and `right_parts` differ in length.
291+
/// Panics if `left_parts` and `right_parts` differ in length.
292292
pub fn alp_rd_decode<T: ALPRDFloat>(
293-
left_parts: Buffer<u16>,
293+
mut left_parts: BufferMut<u16>,
294294
left_parts_dict: &[u16],
295295
right_bit_width: u8,
296-
right_parts: Buffer<T::UINT>,
296+
right_parts: BufferMut<T::UINT>,
297297
left_parts_patches: Option<Patches>,
298298
ctx: &mut ExecutionCtx,
299299
) -> VortexResult<Buffer<T>> {
300300
if left_parts.len() != right_parts.len() {
301301
vortex_panic!("alp_rd_decode: left_parts.len != right_parts.len");
302302
}
303303

304-
// Decode the left-parts dictionary
305-
let mut values = BufferMut::<u16>::from_iter(
306-
left_parts
307-
.iter()
308-
.map(|code| left_parts_dict[*code as usize]),
309-
);
304+
let shift = right_bit_width as usize;
310305

311-
// Apply any patches
312306
if let Some(patches) = left_parts_patches {
307+
// Patched path: some left-part codes map to exception values that live outside
308+
// the dictionary. We must dictionary-decode first, then overwrite the exceptions,
309+
// before we can combine with right-parts.
310+
311+
// Dictionary-decode every code in-place (code → actual left bit-pattern).
312+
for code in left_parts.iter_mut() {
313+
*code = left_parts_dict[*code as usize];
314+
}
315+
316+
// Overwrite exception positions with their true left bit-patterns.
313317
let indices = patches.indices().clone().execute::<PrimitiveArray>(ctx)?;
314318
let patch_values = patches.values().clone().execute::<PrimitiveArray>(ctx)?;
315-
alp_rd_apply_patches(&mut values, &indices, &patch_values, patches.offset());
316-
}
319+
alp_rd_apply_patches(&mut left_parts, &indices, &patch_values, patches.offset());
320+
321+
// Reconstruct floats by shifting each decoded left value into the MSBs
322+
// and OR-ing with the corresponding right value.
323+
alp_rd_combine_inplace::<T>(
324+
right_parts,
325+
|right, &left| {
326+
*right = (<T as ALPRDFloat>::from_u16(left) << shift) | *right;
327+
},
328+
left_parts.as_ref(),
329+
)
330+
} else {
331+
// Non-patched fast path: every code maps through the dictionary, so we can
332+
// pre-shift the entire dictionary once and reduce the per-element hot loop to
333+
// a single table lookup + OR.
334+
let mut shifted_dict = [T::UINT::default(); MAX_DICT_SIZE as usize];
335+
for (i, &entry) in left_parts_dict.iter().enumerate() {
336+
shifted_dict[i] = <T as ALPRDFloat>::from_u16(entry) << shift;
337+
}
317338

318-
// Shift the left-parts and add in the right-parts.
319-
Ok(alp_rd_decode_core(
320-
left_parts_dict,
321-
right_bit_width,
322-
right_parts,
323-
values,
324-
))
339+
// Each element: look up the pre-shifted left value by code, OR with right-parts.
340+
alp_rd_combine_inplace::<T>(
341+
right_parts,
342+
|right, &code| {
343+
// SAFETY: codes are bounded by dict size (< left_parts_dict.len() <= MAX_DICT_SIZE).
344+
*right = unsafe { *shifted_dict.get_unchecked(code as usize) } | *right;
345+
},
346+
left_parts.as_ref(),
347+
)
348+
}
325349
}
326350

327351
/// Apply patches to the decoded left-parts values.
@@ -342,23 +366,18 @@ fn alp_rd_apply_patches(
342366
})
343367
}
344368

345-
/// Core decode logic shared between `alp_rd_decode` and `execute_alp_rd_decode`.
346-
fn alp_rd_decode_core<T: ALPRDFloat>(
347-
_left_parts_dict: &[u16],
348-
right_bit_width: u8,
349-
right_parts: Buffer<T::UINT>,
350-
values: BufferMut<u16>,
351-
) -> Buffer<T> {
352-
// Shift the left-parts and add in the right-parts.
353-
let mut index = 0;
354-
right_parts
355-
.map_each_in_place(|right| {
356-
let left = values[index];
357-
index += 1;
358-
let left = <T as ALPRDFloat>::from_u16(left);
359-
T::from_bits((left << (right_bit_width as usize)) | right)
360-
})
361-
.freeze()
369+
/// Zip `right_parts` with `left_data`, apply `combine_fn` per element, then reinterpret the
370+
/// buffer from `T::UINT` to `T` (same bit-width: u32↔f32, u64↔f64).
371+
fn alp_rd_combine_inplace<T: ALPRDFloat>(
372+
mut right_parts: BufferMut<T::UINT>,
373+
combine_fn: impl Fn(&mut T::UINT, &u16),
374+
left_data: &[u16],
375+
) -> VortexResult<Buffer<T>> {
376+
for (right, left) in right_parts.as_mut_slice().iter_mut().zip(left_data.iter()) {
377+
combine_fn(right, left);
378+
}
379+
// SAFETY: all bit patterns of T::UINT are valid T (u32↔f32 or u64↔f64).
380+
Ok(unsafe { right_parts.transmute::<T>() }.freeze())
362381
}
363382
/// Find the best "cut point" for a set of floating point values such that we can
364383
/// cast them all to the relevant value instead.

0 commit comments

Comments
 (0)