Skip to content

Commit 4c937f7

Browse files
committed
ALP decoding plugin
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
1 parent c660607 commit 4c937f7

3 files changed

Lines changed: 237 additions & 1 deletion

File tree

encodings/alp/src/alp/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ mod compress;
1818
pub(crate) mod compute;
1919
mod decompress;
2020
mod ops;
21+
mod plugin;
2122
mod rules;
2223

24+
pub(crate) use plugin::ALPPatchedPlugin;
25+
2326
#[cfg(test)]
2427
mod tests {
2528
use prost::Message;

encodings/alp/src/alp/plugin.rs

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
//! A custom [`ArrayPlugin`] that lets you load in and deserialize an `ALP` array with interior
5+
//! patches as a `PatchedArray` that wraps a patchless `ALP` array.
6+
//!
7+
//! This enables zero-cost backward compatibility with previously written datasets.
8+
9+
use vortex_array::ArrayId;
10+
use vortex_array::ArrayPlugin;
11+
use vortex_array::ArrayRef;
12+
use vortex_array::IntoArray;
13+
use vortex_array::VortexSessionExecute;
14+
use vortex_array::arrays::Patched;
15+
use vortex_array::buffer::BufferHandle;
16+
use vortex_array::dtype::DType;
17+
use vortex_array::serde::ArrayChildren;
18+
use vortex_error::VortexResult;
19+
use vortex_error::vortex_err;
20+
use vortex_session::VortexSession;
21+
22+
use crate::ALP;
23+
use crate::ALPArrayExt;
24+
use crate::ALPArrayOwnedExt;
25+
26+
/// Custom deserialization plugin that converts an ALP array with interior
27+
/// patches into a PatchedArray holding an ALP array.
28+
#[derive(Debug, Clone)]
29+
pub(crate) struct ALPPatchedPlugin;
30+
31+
impl ArrayPlugin for ALPPatchedPlugin {
32+
fn id(&self) -> ArrayId {
33+
// We reuse the existing `ALP` ID so that we can take over its
34+
// deserialization pathway.
35+
ALP::ID
36+
}
37+
38+
fn serialize(
39+
&self,
40+
array: &ArrayRef,
41+
session: &VortexSession,
42+
) -> VortexResult<Option<Vec<u8>>> {
43+
// Delegate to ALP's metadata serde
44+
ALP.serialize(array, session)
45+
}
46+
47+
fn deserialize(
48+
&self,
49+
dtype: &DType,
50+
len: usize,
51+
metadata: &[u8],
52+
buffers: &[BufferHandle],
53+
children: &dyn ArrayChildren,
54+
session: &VortexSession,
55+
) -> VortexResult<ArrayRef> {
56+
let alp_array = ALP
57+
.deserialize(dtype, len, metadata, buffers, children, session)?
58+
.try_downcast::<ALP>()
59+
.map_err(|_| vortex_err!("ALP plugin should only deserialize vortex.alp"))?;
60+
61+
// Check if there are interior patches to externalize.
62+
let Some(patches) = alp_array.patches() else {
63+
return Ok(alp_array.into_array());
64+
};
65+
66+
// Extract components and create a new ALP array without patches.
67+
let (encoded, exponents, _) = alp_array.into_parts();
68+
69+
let alp_without_patches = ALP::try_new(encoded, exponents, None)?.into_array();
70+
71+
let patched = Patched::from_array_and_patches(
72+
alp_without_patches,
73+
&patches,
74+
&mut session.create_execution_ctx(),
75+
)?;
76+
77+
Ok(patched.into_array())
78+
}
79+
}
80+
81+
#[cfg(test)]
82+
mod tests {
83+
use std::f64::consts::PI;
84+
use std::sync::LazyLock;
85+
86+
use vortex_array::ArrayPlugin;
87+
use vortex_array::IntoArray;
88+
use vortex_array::arrays::PatchedArray;
89+
use vortex_array::arrays::PrimitiveArray;
90+
use vortex_array::arrays::patched::PatchedArrayExt;
91+
use vortex_array::buffer::BufferHandle;
92+
use vortex_array::session::ArraySession;
93+
use vortex_array::session::ArraySessionExt;
94+
use vortex_error::VortexResult;
95+
use vortex_error::vortex_err;
96+
use vortex_session::VortexSession;
97+
98+
use super::ALPPatchedPlugin;
99+
use crate::ALP;
100+
use crate::ALPArray;
101+
use crate::ALPArrayExt;
102+
use crate::alp_encode;
103+
104+
static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
105+
let session = VortexSession::empty().with::<ArraySession>();
106+
session.arrays().register(ALPPatchedPlugin);
107+
session
108+
});
109+
110+
#[test]
111+
fn test_decode_alp_patches() -> VortexResult<()> {
112+
// Create values where some don't encode cleanly with ALP, causing patches.
113+
// PI doesn't encode cleanly.
114+
let values: Vec<f64> = (0..100)
115+
.map(|i| if i % 4 == 3 { PI } else { i as f64 })
116+
.collect();
117+
118+
let parray = PrimitiveArray::from_iter(values);
119+
let alp_encoded = alp_encode(&parray, None)?;
120+
121+
assert!(
122+
alp_encoded.patches().is_some(),
123+
"Expected ALP array to have patches"
124+
);
125+
126+
let array = alp_encoded.as_array();
127+
128+
let metadata = array.metadata(&SESSION)?.unwrap_or_default();
129+
let children = array.children();
130+
let buffers = array
131+
.buffers()
132+
.into_iter()
133+
.map(BufferHandle::new_host)
134+
.collect::<Vec<_>>();
135+
136+
let deserialized = ALPPatchedPlugin.deserialize(
137+
array.dtype(),
138+
array.len(),
139+
&metadata,
140+
&buffers,
141+
&children,
142+
&SESSION,
143+
)?;
144+
145+
let patched: PatchedArray = deserialized
146+
.try_downcast()
147+
.map_err(|a| vortex_err!("Expected Patched, got {}", a.encoding_id()))?;
148+
149+
let inner_alp: ALPArray = patched
150+
.base_array()
151+
.clone()
152+
.try_downcast()
153+
.map_err(|a| vortex_err!("Expected inner ALP, got {}", a.encoding_id()))?;
154+
155+
assert!(
156+
inner_alp.patches().is_none(),
157+
"Inner ALP should NOT have patches"
158+
);
159+
160+
Ok(())
161+
}
162+
163+
#[test]
164+
fn alp_without_patches_stays_alp() -> VortexResult<()> {
165+
// Values that encode cleanly without patches.
166+
let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
167+
let parray = PrimitiveArray::from_iter(values);
168+
let alp_encoded = alp_encode(&parray, None)?;
169+
170+
assert!(
171+
alp_encoded.patches().is_none(),
172+
"Expected ALP array without patches"
173+
);
174+
175+
let array = alp_encoded.as_array();
176+
177+
let metadata = array.metadata(&SESSION)?.unwrap_or_default();
178+
let children = array.children();
179+
let buffers = array
180+
.buffers()
181+
.into_iter()
182+
.map(BufferHandle::new_host)
183+
.collect::<Vec<_>>();
184+
185+
let deserialized = ALPPatchedPlugin.deserialize(
186+
array.dtype(),
187+
array.len(),
188+
&metadata,
189+
&buffers,
190+
&children,
191+
&SESSION,
192+
)?;
193+
194+
let result = deserialized
195+
.try_downcast::<ALP>()
196+
.map_err(|a| vortex_err!("Expected deserialized ALP, got {}", a.encoding_id()))?;
197+
198+
assert!(result.patches().is_none(), "Result should not have patches");
199+
200+
Ok(())
201+
}
202+
203+
#[test]
204+
#[should_panic(expected = "index out of bounds")]
205+
fn primitive_array_returns_error() {
206+
let array = PrimitiveArray::from_iter([1.0f64, 2.0, 3.0]).into_array();
207+
208+
let metadata = array.metadata(&SESSION).unwrap().unwrap_or_default();
209+
let children = array.children();
210+
let buffers = array
211+
.buffers()
212+
.into_iter()
213+
.map(BufferHandle::new_host)
214+
.collect::<Vec<_>>();
215+
216+
// This panics because PrimitiveArray has no children and ALP requires encoded child.
217+
let _result = ALPPatchedPlugin.deserialize(
218+
array.dtype(),
219+
array.len(),
220+
&metadata,
221+
&buffers,
222+
&children,
223+
&SESSION,
224+
);
225+
}
226+
}

encodings/alp/src/lib.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,21 @@ use vortex_array::aggregate_fn::AggregateFnVTable;
2222
use vortex_array::aggregate_fn::fns::nan_count::NanCount;
2323
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
2424
use vortex_array::session::ArraySessionExt;
25+
use vortex_fastlanes::USE_EXPERIMENTAL_PATCHES;
2526
use vortex_session::VortexSession;
2627

2728
mod alp;
2829
mod alp_rd;
2930

3031
/// Initialize ALP encoding in the given session.
3132
pub fn initialize(session: &VortexSession) {
32-
session.arrays().register(ALP);
33+
// If we're using the experimental Patched encoding, register a shim
34+
// for ALP with interior patches to decode as Patched array.
35+
if *USE_EXPERIMENTAL_PATCHES {
36+
session.arrays().register(ALPPatchedPlugin);
37+
} else {
38+
session.arrays().register(ALP);
39+
}
3340
session.arrays().register(ALPRD);
3441

3542
// Register the ALP-specific NaN count aggregate kernel.

0 commit comments

Comments
 (0)