Skip to content

Commit afd4f72

Browse files
authored
ALP decoding plugin (#7370)
Follow up to #7314 this time for ALP --------- Signed-off-by: Andrew Duffy <andrew@a10y.dev>
1 parent ae906c7 commit afd4f72

10 files changed

Lines changed: 284 additions & 22 deletions

File tree

encodings/alp/src/alp/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ mod compress;
1818
pub(crate) mod compute;
1919
mod decompress;
2020
mod ops;
21+
mod plugin;
2122
mod rules;
2223

24+
pub(crate) use plugin::ALPPatchedPlugin;
25+
2326
#[cfg(test)]
2427
mod tests {
2528
use prost::Message;

encodings/alp/src/alp/plugin.rs

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
//! A custom [`ArrayPlugin`] that lets you load in and deserialize an `ALP` array with interior
5+
//! patches as a `PatchedArray` that wraps a patchless `ALP` array.
6+
//!
7+
//! This enables zero-cost backward compatibility with previously written datasets.
8+
9+
use vortex_array::ArrayId;
10+
use vortex_array::ArrayPlugin;
11+
use vortex_array::ArrayRef;
12+
use vortex_array::IntoArray;
13+
use vortex_array::VortexSessionExecute;
14+
use vortex_array::arrays::Patched;
15+
use vortex_array::buffer::BufferHandle;
16+
use vortex_array::dtype::DType;
17+
use vortex_array::serde::ArrayChildren;
18+
use vortex_error::VortexResult;
19+
use vortex_error::vortex_err;
20+
use vortex_session::VortexSession;
21+
22+
use crate::ALP;
23+
use crate::ALPArrayExt;
24+
use crate::ALPArrayOwnedExt;
25+
26+
/// Custom deserialization plugin that converts an ALP array with interior
27+
/// patches into a PatchedArray holding an ALP array.
28+
#[derive(Debug, Clone)]
29+
pub(crate) struct ALPPatchedPlugin;
30+
31+
impl ArrayPlugin for ALPPatchedPlugin {
32+
fn id(&self) -> ArrayId {
33+
// We reuse the existing `ALP` ID so that we can take over its
34+
// deserialization pathway.
35+
ALP::ID
36+
}
37+
38+
fn serialize(
39+
&self,
40+
array: &ArrayRef,
41+
session: &VortexSession,
42+
) -> VortexResult<Option<Vec<u8>>> {
43+
// Delegate to ALP's metadata serde
44+
ALP.serialize(array, session)
45+
}
46+
47+
fn deserialize(
48+
&self,
49+
dtype: &DType,
50+
len: usize,
51+
metadata: &[u8],
52+
buffers: &[BufferHandle],
53+
children: &dyn ArrayChildren,
54+
session: &VortexSession,
55+
) -> VortexResult<ArrayRef> {
56+
let alp_array = ALP
57+
.deserialize(dtype, len, metadata, buffers, children, session)?
58+
.try_downcast::<ALP>()
59+
.map_err(|_| vortex_err!("ALP plugin should only deserialize vortex.alp"))?;
60+
61+
// Check if there are interior patches to externalize.
62+
let Some(patches) = alp_array.patches() else {
63+
return Ok(alp_array.into_array());
64+
};
65+
66+
// Extract components and create a new ALP array without patches.
67+
let (encoded, exponents, _) = alp_array.into_parts();
68+
69+
let alp_without_patches = ALP::try_new(encoded, exponents, None)?.into_array();
70+
71+
let patched = Patched::from_array_and_patches(
72+
alp_without_patches,
73+
&patches,
74+
&mut session.create_execution_ctx(),
75+
)?;
76+
77+
Ok(patched.into_array())
78+
}
79+
}
80+
81+
#[cfg(test)]
82+
mod tests {
83+
use std::f64::consts::PI;
84+
use std::sync::LazyLock;
85+
86+
use vortex_array::ArrayPlugin;
87+
use vortex_array::IntoArray;
88+
use vortex_array::arrays::PatchedArray;
89+
use vortex_array::arrays::PrimitiveArray;
90+
use vortex_array::arrays::patched::PatchedArraySlotsExt;
91+
use vortex_array::buffer::BufferHandle;
92+
use vortex_array::session::ArraySession;
93+
use vortex_array::session::ArraySessionExt;
94+
use vortex_error::VortexResult;
95+
use vortex_error::vortex_err;
96+
use vortex_session::VortexSession;
97+
98+
use super::ALPPatchedPlugin;
99+
use crate::ALP;
100+
use crate::ALPArray;
101+
use crate::ALPArrayExt;
102+
use crate::alp_encode;
103+
104+
static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
105+
let session = VortexSession::empty().with::<ArraySession>();
106+
session.arrays().register(ALPPatchedPlugin);
107+
session
108+
});
109+
110+
#[test]
111+
fn test_decode_alp_patches() -> VortexResult<()> {
112+
// Create values where some don't encode cleanly with ALP, causing patches.
113+
// PI doesn't encode cleanly.
114+
let values: Vec<f64> = (0..100)
115+
.map(|i| if i % 4 == 3 { PI } else { i as f64 })
116+
.collect();
117+
118+
let parray = PrimitiveArray::from_iter(values);
119+
let alp_encoded = alp_encode(&parray, None)?;
120+
121+
assert!(
122+
alp_encoded.patches().is_some(),
123+
"Expected ALP array to have patches"
124+
);
125+
126+
let array = alp_encoded.as_array();
127+
128+
let metadata = array.metadata(&SESSION)?.unwrap_or_default();
129+
let children = array.children();
130+
let buffers = array
131+
.buffers()
132+
.into_iter()
133+
.map(BufferHandle::new_host)
134+
.collect::<Vec<_>>();
135+
136+
let deserialized = ALPPatchedPlugin.deserialize(
137+
array.dtype(),
138+
array.len(),
139+
&metadata,
140+
&buffers,
141+
&children,
142+
&SESSION,
143+
)?;
144+
145+
let patched: PatchedArray = deserialized
146+
.try_downcast()
147+
.map_err(|a| vortex_err!("Expected Patched, got {}", a.encoding_id()))?;
148+
149+
let inner_alp: ALPArray = patched
150+
.inner()
151+
.clone()
152+
.try_downcast()
153+
.map_err(|a| vortex_err!("Expected inner ALP, got {}", a.encoding_id()))?;
154+
155+
assert!(
156+
inner_alp.patches().is_none(),
157+
"Inner ALP should NOT have patches"
158+
);
159+
160+
Ok(())
161+
}
162+
163+
#[test]
164+
fn alp_without_patches_stays_alp() -> VortexResult<()> {
165+
// Values that encode cleanly without patches.
166+
let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
167+
let parray = PrimitiveArray::from_iter(values);
168+
let alp_encoded = alp_encode(&parray, None)?;
169+
170+
assert!(
171+
alp_encoded.patches().is_none(),
172+
"Expected ALP array without patches"
173+
);
174+
175+
let array = alp_encoded.as_array();
176+
177+
let metadata = array.metadata(&SESSION)?.unwrap_or_default();
178+
let children = array.children();
179+
let buffers = array
180+
.buffers()
181+
.into_iter()
182+
.map(BufferHandle::new_host)
183+
.collect::<Vec<_>>();
184+
185+
let deserialized = ALPPatchedPlugin.deserialize(
186+
array.dtype(),
187+
array.len(),
188+
&metadata,
189+
&buffers,
190+
&children,
191+
&SESSION,
192+
)?;
193+
194+
let result = deserialized
195+
.try_downcast::<ALP>()
196+
.map_err(|a| vortex_err!("Expected deserialized ALP, got {}", a.encoding_id()))?;
197+
198+
assert!(result.patches().is_none(), "Result should not have patches");
199+
200+
Ok(())
201+
}
202+
203+
#[test]
204+
#[should_panic(expected = "index out of bounds")]
205+
fn primitive_array_returns_error() {
206+
let array = PrimitiveArray::from_iter([1.0f64, 2.0, 3.0]).into_array();
207+
208+
let metadata = array.metadata(&SESSION).unwrap().unwrap_or_default();
209+
let children = array.children();
210+
let buffers = array
211+
.buffers()
212+
.into_iter()
213+
.map(BufferHandle::new_host)
214+
.collect::<Vec<_>>();
215+
216+
// This panics because PrimitiveArray has no children and ALP requires encoded child.
217+
let _result = ALPPatchedPlugin.deserialize(
218+
array.dtype(),
219+
array.len(),
220+
&metadata,
221+
&buffers,
222+
&children,
223+
&SESSION,
224+
);
225+
}
226+
}

encodings/alp/src/lib.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub use alp_rd::*;
2121
use vortex_array::aggregate_fn::AggregateFnVTable;
2222
use vortex_array::aggregate_fn::fns::nan_count::NanCount;
2323
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
24+
use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES;
2425
use vortex_array::session::ArraySessionExt;
2526
use vortex_session::VortexSession;
2627

@@ -29,7 +30,13 @@ mod alp_rd;
2930

3031
/// Initialize ALP encoding in the given session.
3132
pub fn initialize(session: &VortexSession) {
32-
session.arrays().register(ALP);
33+
// If we're using the experimental Patched encoding, register a shim
34+
// for ALP with interior patches to decode as Patched array.
35+
if *USE_EXPERIMENTAL_PATCHES {
36+
session.arrays().register(ALPPatchedPlugin);
37+
} else {
38+
session.arrays().register(ALP);
39+
}
3340
session.arrays().register(ALPRD);
3441

3542
// Register the ALP-specific NaN count aggregate kernel.

encodings/fastlanes/public-api.lock

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -570,8 +570,6 @@ impl vortex_array::hash::ArrayHash for vortex_fastlanes::RLEData
570570

571571
pub fn vortex_fastlanes::RLEData::array_hash<H: core::hash::Hasher>(&self, state: &mut H, _precision: vortex_array::hash::Precision)
572572

573-
pub static vortex_fastlanes::USE_EXPERIMENTAL_PATCHES: std::sync::lazy_lock::LazyLock<bool>
574-
575573
pub trait vortex_fastlanes::BitPackedArrayExt: vortex_fastlanes::BitPackedArraySlotsExt
576574

577575
pub fn vortex_fastlanes::BitPackedArrayExt::bit_width(&self) -> u8

encodings/fastlanes/src/lib.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33

44
#![allow(clippy::cast_possible_truncation)]
55

6-
use std::env;
7-
use std::sync::LazyLock;
8-
96
pub use bitpacking::*;
107
pub use delta::*;
118
pub use r#for::*;
@@ -31,19 +28,10 @@ use vortex_array::aggregate_fn::AggregateFnVTable;
3128
use vortex_array::aggregate_fn::fns::is_constant::IsConstant;
3229
use vortex_array::aggregate_fn::fns::is_sorted::IsSorted;
3330
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
31+
use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES;
3432
use vortex_array::session::ArraySessionExt;
3533
use vortex_session::VortexSession;
3634

37-
/// Flag indicating if experimental patched array support is enabled.
38-
///
39-
/// This is set using the environment variable `VORTEX_EXPERIMENTAL_PATCHED_ARRAY`.
40-
///
41-
/// When this is true, any BitPacked array with interior patches will be read as a `Patched`
42-
/// array, and the builtin compressor will use Patched array with BitPacked instead of
43-
/// BitPacked array with interior patches.
44-
pub static USE_EXPERIMENTAL_PATCHES: LazyLock<bool> =
45-
LazyLock::new(|| env::var("VORTEX_EXPERIMENTAL_PATCHED_ARRAY").is_ok());
46-
4735
/// Initialize fastlanes encodings in the given session.
4836
pub fn initialize(session: &VortexSession) {
4937
// If we're using the experimental Patched encoding, register a shim

vortex-array/public-api.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3538,6 +3538,8 @@ pub fn vortex_array::arrays::patched::PatchedSlotsView<'a>::fmt(&self, f: &mut c
35383538

35393539
impl<'a> core::marker::Copy for vortex_array::arrays::patched::PatchedSlotsView<'a>
35403540

3541+
pub static vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES: std::sync::lazy_lock::LazyLock<bool>
3542+
35413543
pub trait vortex_array::arrays::patched::PatchedArrayExt: vortex_array::arrays::patched::PatchedArraySlotsExt
35423544

35433545
pub fn vortex_array::arrays::patched::PatchedArrayExt::lane_range(&self, chunk: usize, lane: usize) -> vortex_error::VortexResult<core::ops::range::Range<usize>>

vortex-array/src/arrays/patched/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ mod array;
7171
mod compute;
7272
mod vtable;
7373

74+
use std::env;
75+
use std::sync::LazyLock;
76+
7477
pub use array::*;
7578
use vortex_buffer::ByteBuffer;
7679
pub use vtable::*;
@@ -96,3 +99,14 @@ const fn patch_lanes<V: Sized>() -> usize {
9699
// from shared to global memory.
97100
if size_of::<V>() < 8 { 32 } else { 16 }
98101
}
102+
103+
/// Flag indicating if experimental patched array support is enabled.
104+
///
105+
/// This is set using the environment variable `VORTEX_EXPERIMENTAL_PATCHED_ARRAY`.
106+
///
107+
/// When this is true, any arrays with interior `Patches` will be read as a `Patched`
108+
/// array, and eliminate the interior patches.
109+
///
110+
/// The builtin compressor will also generate Patched arrays.
111+
pub static USE_EXPERIMENTAL_PATCHES: LazyLock<bool> =
112+
LazyLock::new(|| env::var("VORTEX_EXPERIMENTAL_PATCHED_ARRAY").is_ok());

vortex-btrblocks/src/schemes/float.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ use vortex_alp::alp_encode;
1313
use vortex_array::ArrayRef;
1414
use vortex_array::Canonical;
1515
use vortex_array::IntoArray;
16+
use vortex_array::LEGACY_SESSION;
1617
use vortex_array::ToCanonical;
18+
use vortex_array::VortexSessionExecute;
19+
use vortex_array::arrays::Patched;
20+
use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES;
1721
use vortex_array::arrays::primitive::PrimitiveArrayExt;
1822
use vortex_array::dtype::PType;
1923
use vortex_compressor::estimate::CompressionEstimate;
@@ -107,11 +111,30 @@ impl Scheme for ALPScheme {
107111
let compressed_alp_ints =
108112
compressor.compress_child(alp_encoded.encoded(), &ctx, self.id(), 0)?;
109113

110-
// Patches are not compressed. They should be infrequent, and if they are not then we want
111-
// to keep them linear for easy indexing.
112-
let patches = alp_encoded.patches().map(compress_patches).transpose()?;
114+
let alp_stats = alp_encoded.as_array().statistics().to_owned();
115+
let exponents = alp_encoded.exponents();
116+
117+
if *USE_EXPERIMENTAL_PATCHES {
118+
let patches = alp_encoded.patches();
119+
120+
// Create ALP array without interior patches.
121+
let alp_array = ALP::new(compressed_alp_ints, exponents, None).into_array();
122+
123+
match patches {
124+
None => Ok(alp_array),
125+
Some(p) => Ok(Patched::from_array_and_patches(
126+
alp_array,
127+
&p,
128+
&mut LEGACY_SESSION.create_execution_ctx(),
129+
)?
130+
.with_stats_set(alp_stats)
131+
.into_array()),
132+
}
133+
} else {
134+
let patches = alp_encoded.patches().map(compress_patches).transpose()?;
113135

114-
Ok(ALP::new(compressed_alp_ints, alp_encoded.exponents(), patches).into_array())
136+
Ok(ALP::new(compressed_alp_ints, exponents, patches).into_array())
137+
}
115138
}
116139
}
117140

vortex-btrblocks/src/schemes/integer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use vortex_array::ToCanonical;
1111
use vortex_array::VortexSessionExecute;
1212
use vortex_array::arrays::ConstantArray;
1313
use vortex_array::arrays::Patched;
14+
use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES;
1415
use vortex_array::arrays::primitive::PrimitiveArrayExt;
1516
use vortex_array::scalar::Scalar;
1617
use vortex_compressor::builtins::FloatDictScheme;
@@ -32,7 +33,6 @@ use vortex_fastlanes::FoR;
3233
use vortex_fastlanes::FoRArrayExt;
3334
use vortex_fastlanes::RLE;
3435
use vortex_fastlanes::RLEArrayExt;
35-
use vortex_fastlanes::USE_EXPERIMENTAL_PATCHES;
3636
use vortex_fastlanes::bitpack_compress::bit_width_histogram;
3737
use vortex_fastlanes::bitpack_compress::bitpack_encode;
3838
use vortex_fastlanes::bitpack_compress::find_best_bit_width;

0 commit comments

Comments
 (0)