Skip to content

Commit c660607

Browse files
authored
add env flag for Patched array (#7314)
## Summary Part of roll out of Patched array. This PR adds a new environment variable, `VORTEX_EXPERIMENTAL_PATCHED_ARRAY`. When the env var is set 1. On read, any flat layout containing a `BitPacked` array with interior patches will be decoded as a `Patched` array wrapping a `BitPacked` with no interior patches 2. The default BtrBlocks compressor will write a Patched array of BitPacked instead of BitPacked arrary with interior patches. This allows us to do some more soak testing of the encoding before we switch it to be the default. ## Testing On a previous commit in this PR, I've setup the benchmarks to run with the experimental Patched encoding enabled, the benchmark results all seem within the margin of error. The file sizes are generally the same but in the worst case, 2-3% larger. --------- Signed-off-by: Andrew Duffy <andrew@a10y.dev>
1 parent 77c8e95 commit c660607

File tree

10 files changed

+327
-27
lines changed

10 files changed

+327
-27
lines changed

.github/workflows/fuzz.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ jobs:
109109
fuzz_target: array_ops
110110
fuzz_name: array_ops_unstable_encodings
111111
extra_features: "vortex/unstable_encodings"
112+
extra_env: "VORTEX_EXPERIMENTAL_PATCHED_ARRAY=1"
112113
jobs: 16
113114
secrets:
114115
R2_FUZZ_ACCESS_KEY_ID: ${{ secrets.R2_FUZZ_ACCESS_KEY_ID }}

.github/workflows/run-fuzzer.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ on:
2727
required: false
2828
type: string
2929
default: ""
30+
extra_env:
31+
description: "Extra environment variables to set (space-separated KEY=VALUE pairs)"
32+
required: false
33+
type: string
34+
default: ""
3035
jobs:
3136
description: "Number of parallel fuzzing jobs (libfuzzer -fork=N). Set to match available vCPUs."
3237
required: false
@@ -119,7 +124,7 @@ jobs:
119124
if [ "${{ inputs.jobs }}" -gt 1 ]; then
120125
FORK_FLAG="-fork=${{ inputs.jobs }}"
121126
fi
122-
RUSTFLAGS="--cfg vortex_nightly" RUST_BACKTRACE=1 \
127+
${{ inputs.extra_env }} RUSTFLAGS="--cfg vortex_nightly" RUST_BACKTRACE=1 \
123128
cargo +$NIGHTLY_TOOLCHAIN fuzz run --release --debug-assertions \
124129
$FEATURES_FLAG \
125130
${{ inputs.fuzz_target }} -- \

encodings/fastlanes/public-api.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,8 @@ impl vortex_array::hash::ArrayHash for vortex_fastlanes::RLEData
542542

543543
pub fn vortex_fastlanes::RLEData::array_hash<H: core::hash::Hasher>(&self, state: &mut H, _precision: vortex_array::hash::Precision)
544544

545+
pub static vortex_fastlanes::USE_EXPERIMENTAL_PATCHES: std::sync::lazy_lock::LazyLock<bool>
546+
545547
pub trait vortex_fastlanes::BitPackedArrayExt: vortex_array::array::typed::TypedArrayRef<vortex_fastlanes::BitPacked>
546548

547549
pub fn vortex_fastlanes::BitPackedArrayExt::bit_width(&self) -> u8

encodings/fastlanes/src/bitpacking/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ pub use array::unpack_iter;
1111

1212
pub(crate) mod compute;
1313

14+
mod plugin;
1415
mod vtable;
16+
17+
pub(crate) use plugin::BitPackedPatchedPlugin;
1518
pub use vtable::BitPacked;
1619
pub use vtable::BitPackedArray;
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
//! A custom [`ArrayPlugin`] that lets you load in and deserialize a `BitPacked` array with interior
5+
//! patches as a `PatchedArray` that wraps a patchless `BitPacked` array.
6+
//!
7+
//! This enables zero-cost backward compatibility with previously written datasets.
8+
9+
use vortex_array::ArrayId;
10+
use vortex_array::ArrayPlugin;
11+
use vortex_array::ArrayRef;
12+
use vortex_array::IntoArray;
13+
use vortex_array::VortexSessionExecute;
14+
use vortex_array::arrays::Patched;
15+
use vortex_array::buffer::BufferHandle;
16+
use vortex_array::dtype::DType;
17+
use vortex_array::serde::ArrayChildren;
18+
use vortex_error::VortexResult;
19+
use vortex_error::vortex_err;
20+
use vortex_session::VortexSession;
21+
22+
use crate::BitPacked;
23+
use crate::BitPackedArrayExt;
24+
25+
/// Custom deserialization plugin that converts a BitPacked array with interior
26+
/// Patches into a PatchedArray holding a BitPacked array.
27+
#[derive(Debug, Clone)]
28+
pub(crate) struct BitPackedPatchedPlugin;
29+
30+
impl ArrayPlugin for BitPackedPatchedPlugin {
31+
fn id(&self) -> ArrayId {
32+
// We reuse the existing `BitPacked` ID so that we can take over its
33+
// deserialization pathway.
34+
BitPacked::ID
35+
}
36+
37+
fn serialize(
38+
&self,
39+
array: &ArrayRef,
40+
session: &VortexSession,
41+
) -> VortexResult<Option<Vec<u8>>> {
42+
// delegate to BitPacked VTable for serialization
43+
BitPacked.serialize(array, session)
44+
}
45+
46+
fn deserialize(
47+
&self,
48+
dtype: &DType,
49+
len: usize,
50+
metadata: &[u8],
51+
buffers: &[BufferHandle],
52+
children: &dyn ArrayChildren,
53+
session: &VortexSession,
54+
) -> VortexResult<ArrayRef> {
55+
let bitpacked = BitPacked
56+
.deserialize(dtype, len, metadata, buffers, children, session)?
57+
.try_downcast::<BitPacked>()
58+
.map_err(|_| {
59+
vortex_err!("BitPacked plugin should only deserialize fastlanes.bitpacked")
60+
})?;
61+
62+
// Create a new BitPackedArray without the interior patches installed.
63+
let Some(patches) = bitpacked.patches() else {
64+
return Ok(bitpacked.into_array());
65+
};
66+
67+
let packed = bitpacked.packed().clone();
68+
let ptype = bitpacked.dtype().as_ptype();
69+
let validity = bitpacked.validity()?;
70+
let bw = bitpacked.bit_width;
71+
let len = bitpacked.len();
72+
let offset = bitpacked.offset();
73+
74+
let bitpacked_without_patches =
75+
BitPacked::try_new(packed, ptype, validity, None, bw, len, offset)?.into_array();
76+
77+
let patched = Patched::from_array_and_patches(
78+
bitpacked_without_patches,
79+
&patches,
80+
&mut session.create_execution_ctx(),
81+
)?;
82+
83+
Ok(patched.into_array())
84+
}
85+
}
86+
87+
#[cfg(test)]
88+
mod tests {
89+
use std::sync::LazyLock;
90+
91+
use vortex_array::ArrayPlugin;
92+
use vortex_array::IntoArray;
93+
use vortex_array::arrays::PatchedArray;
94+
use vortex_array::arrays::PrimitiveArray;
95+
use vortex_array::arrays::patched::PatchedArrayExt;
96+
use vortex_array::buffer::BufferHandle;
97+
use vortex_array::session::ArraySession;
98+
use vortex_array::session::ArraySessionExt;
99+
use vortex_buffer::Buffer;
100+
use vortex_error::VortexResult;
101+
use vortex_error::vortex_err;
102+
use vortex_session::VortexSession;
103+
104+
use super::BitPackedPatchedPlugin;
105+
use crate::BitPacked;
106+
use crate::BitPackedArray;
107+
use crate::BitPackedArrayExt;
108+
use crate::BitPackedData;
109+
110+
static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
111+
let session = VortexSession::empty().with::<ArraySession>();
112+
session.arrays().register(BitPackedPatchedPlugin);
113+
session
114+
});
115+
116+
#[test]
117+
fn test_decode_bitpacked_patches() -> VortexResult<()> {
118+
// Create values where some exceed the bit width, causing patches.
119+
// With bit_width=9, max value is 511. Values >=512 become patches.
120+
let values: Buffer<i32> = (0i32..=512).collect();
121+
let parray = values.into_array();
122+
let bitpacked = BitPackedData::encode(&parray, 9)?;
123+
124+
assert!(
125+
bitpacked.patches().is_some(),
126+
"Expected BitPacked array to have patches"
127+
);
128+
129+
let array = bitpacked.as_array();
130+
131+
let metadata = array.metadata(&SESSION)?.unwrap_or_default();
132+
let children = array.children();
133+
let buffers = array
134+
.buffers()
135+
.into_iter()
136+
.map(BufferHandle::new_host)
137+
.collect::<Vec<_>>();
138+
139+
let deserialized = BitPackedPatchedPlugin.deserialize(
140+
array.dtype(),
141+
array.len(),
142+
&metadata,
143+
&buffers,
144+
&children,
145+
&SESSION,
146+
)?;
147+
148+
let patched: PatchedArray = deserialized
149+
.try_downcast()
150+
.map_err(|a| vortex_err!("Expected Patched, got {}", a.encoding_id()))?;
151+
152+
let inner_bitpacked: BitPackedArray = patched
153+
.base_array()
154+
.clone()
155+
.try_downcast()
156+
.map_err(|a| vortex_err!("Expected inner BitPacked, got {}", a.encoding_id()))?;
157+
158+
assert!(
159+
inner_bitpacked.patches().is_none(),
160+
"Inner BitPacked should NOT have patches"
161+
);
162+
163+
Ok(())
164+
}
165+
166+
#[test]
167+
fn bitpacked_without_patches_stays_bitpacked() -> VortexResult<()> {
168+
// With bit_width=16, max value is 65535. All values 0..100 fit.
169+
let values: Buffer<i32> = (0i32..100).collect();
170+
let parray = values.into_array();
171+
let bitpacked = BitPackedData::encode(&parray, 16)?;
172+
173+
assert!(
174+
bitpacked.patches().is_none(),
175+
"Expected BitPacked array without patches"
176+
);
177+
178+
let array = bitpacked.as_array();
179+
180+
let metadata = array.metadata(&SESSION)?.unwrap_or_default();
181+
let children = array.children();
182+
let buffers = array
183+
.buffers()
184+
.into_iter()
185+
.map(BufferHandle::new_host)
186+
.collect::<Vec<_>>();
187+
188+
let deserialized = BitPackedPatchedPlugin.deserialize(
189+
array.dtype(),
190+
array.len(),
191+
&metadata,
192+
&buffers,
193+
&children,
194+
&SESSION,
195+
)?;
196+
197+
let result = deserialized
198+
.try_downcast::<BitPacked>()
199+
.map_err(|a| vortex_err!("Expected deserialize BitPacked, got {}", a.encoding_id()))?;
200+
201+
assert!(result.patches().is_none(), "Result should not have patches");
202+
203+
Ok(())
204+
}
205+
206+
#[test]
207+
fn primitive_array_returns_error() -> VortexResult<()> {
208+
let array = PrimitiveArray::from_iter([1i32, 2, 3]).into_array();
209+
210+
let metadata = array.metadata(&SESSION)?.unwrap_or_default();
211+
let children = array.children();
212+
let buffers = array
213+
.buffers()
214+
.into_iter()
215+
.map(BufferHandle::new_host)
216+
.collect::<Vec<_>>();
217+
218+
let result = BitPackedPatchedPlugin.deserialize(
219+
array.dtype(),
220+
array.len(),
221+
&metadata,
222+
&buffers,
223+
&children,
224+
&SESSION,
225+
);
226+
227+
assert!(
228+
result.is_err(),
229+
"Expected error when deserializing PrimitiveArray with BitPackedPatchedPlugin"
230+
);
231+
232+
Ok(())
233+
}
234+
}

encodings/fastlanes/src/lib.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33

44
#![allow(clippy::cast_possible_truncation)]
55

6+
use std::env;
7+
use std::sync::LazyLock;
8+
69
pub use bitpacking::*;
710
pub use delta::*;
811
pub use r#for::*;
@@ -31,9 +34,25 @@ use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
3134
use vortex_array::session::ArraySessionExt;
3235
use vortex_session::VortexSession;
3336

37+
/// Flag indicating if experimental patched array support is enabled.
38+
///
39+
/// This is set using the environment variable `VORTEX_EXPERIMENTAL_PATCHED_ARRAY`.
40+
///
41+
/// When this is true, any BitPacked array with interior patches will be read as a `Patched`
42+
/// array, and the builtin compressor will use Patched array with BitPacked instead of
43+
/// BitPacked array with interior patches.
44+
pub static USE_EXPERIMENTAL_PATCHES: LazyLock<bool> =
45+
LazyLock::new(|| env::var("VORTEX_EXPERIMENTAL_PATCHED_ARRAY").is_ok());
46+
3447
/// Initialize fastlanes encodings in the given session.
3548
pub fn initialize(session: &VortexSession) {
36-
session.arrays().register(BitPacked);
49+
// If we're using the experimental Patched encoding, register a shim
50+
// for BitPacked with interior patches decode as Patched array.
51+
if *USE_EXPERIMENTAL_PATCHES {
52+
session.arrays().register(BitPackedPatchedPlugin);
53+
} else {
54+
session.arrays().register(BitPacked);
55+
}
3756
session.arrays().register(Delta);
3857
session.arrays().register(FoR);
3958
session.arrays().register(RLE);

vortex-array/public-api.lock

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18654,13 +18654,13 @@ pub fn vortex_array::serde::ArrayChildren::is_empty(&self) -> bool
1865418654

1865518655
pub fn vortex_array::serde::ArrayChildren::len(&self) -> usize
1865618656

18657-
impl vortex_array::serde::ArrayChildren for &[vortex_array::ArrayRef]
18657+
impl<T: core::convert::AsRef<[vortex_array::ArrayRef]>> vortex_array::serde::ArrayChildren for T
1865818658

18659-
pub fn &[vortex_array::ArrayRef]::get(&self, index: usize, dtype: &vortex_array::dtype::DType, len: usize) -> vortex_error::VortexResult<vortex_array::ArrayRef>
18659+
pub fn T::get(&self, index: usize, dtype: &vortex_array::dtype::DType, len: usize) -> vortex_error::VortexResult<vortex_array::ArrayRef>
1866018660

18661-
pub fn &[vortex_array::ArrayRef]::is_empty(&self) -> bool
18661+
pub fn T::is_empty(&self) -> bool
1866218662

18663-
pub fn &[vortex_array::ArrayRef]::len(&self) -> usize
18663+
pub fn T::len(&self) -> usize
1866418664

1866518665
pub mod vortex_array::session
1866618666

vortex-array/src/serde.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,16 +278,16 @@ pub trait ArrayChildren {
278278
}
279279
}
280280

281-
impl ArrayChildren for &[ArrayRef] {
281+
impl<T: AsRef<[ArrayRef]>> ArrayChildren for T {
282282
fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
283-
let array = self[index].clone();
283+
let array = self.as_ref()[index].clone();
284284
assert_eq!(array.len(), len);
285285
assert_eq!(array.dtype(), dtype);
286286
Ok(array)
287287
}
288288

289289
fn len(&self) -> usize {
290-
<[_]>::len(self)
290+
self.as_ref().len()
291291
}
292292
}
293293

0 commit comments

Comments
 (0)