Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/fuzz.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ jobs:
fuzz_target: array_ops
fuzz_name: array_ops_unstable_encodings
extra_features: "vortex/unstable_encodings"
extra_env: "VORTEX_EXPERIMENTAL_PATCHED_ARRAY=1"
Comment on lines 111 to +112
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems odd we need both?

Copy link
Copy Markdown
Contributor

@robert3005 robert3005 Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should convert cargo feature flags to environment variables/config. It's not useful if you're using a non rust package and have to recompile the world to try out a feature

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think this is something we want as a runtime/env flag not a compile flag

jobs: 16
secrets:
R2_FUZZ_ACCESS_KEY_ID: ${{ secrets.R2_FUZZ_ACCESS_KEY_ID }}
Expand Down
7 changes: 6 additions & 1 deletion .github/workflows/run-fuzzer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ on:
required: false
type: string
default: ""
extra_env:
description: "Extra environment variables to set (space-separated KEY=VALUE pairs)"
required: false
type: string
default: ""
jobs:
description: "Number of parallel fuzzing jobs (libfuzzer -fork=N). Set to match available vCPUs."
required: false
Expand Down Expand Up @@ -119,7 +124,7 @@ jobs:
if [ "${{ inputs.jobs }}" -gt 1 ]; then
FORK_FLAG="-fork=${{ inputs.jobs }}"
fi
RUSTFLAGS="--cfg vortex_nightly" RUST_BACKTRACE=1 \
${{ inputs.extra_env }} RUSTFLAGS="--cfg vortex_nightly" RUST_BACKTRACE=1 \
cargo +$NIGHTLY_TOOLCHAIN fuzz run --release --debug-assertions \
$FEATURES_FLAG \
${{ inputs.fuzz_target }} -- \
Expand Down
2 changes: 2 additions & 0 deletions encodings/fastlanes/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,8 @@ impl vortex_array::hash::ArrayHash for vortex_fastlanes::RLEData

pub fn vortex_fastlanes::RLEData::array_hash<H: core::hash::Hasher>(&self, state: &mut H, _precision: vortex_array::hash::Precision)

pub static vortex_fastlanes::USE_EXPERIMENTAL_PATCHES: std::sync::lazy_lock::LazyLock<bool>

pub trait vortex_fastlanes::BitPackedArrayExt: vortex_array::array::typed::TypedArrayRef<vortex_fastlanes::BitPacked>

pub fn vortex_fastlanes::BitPackedArrayExt::bit_width(&self) -> u8
Expand Down
3 changes: 3 additions & 0 deletions encodings/fastlanes/src/bitpacking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub use array::unpack_iter;

pub(crate) mod compute;

mod plugin;
mod vtable;

pub(crate) use plugin::BitPackedPatchedPlugin;
pub use vtable::BitPacked;
pub use vtable::BitPackedArray;
234 changes: 234 additions & 0 deletions encodings/fastlanes/src/bitpacking/plugin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! A custom [`ArrayPlugin`] that lets you load in and deserialize a `BitPacked` array with interior
//! patches as a `PatchedArray` that wraps a patchless `BitPacked` array.
//!
//! This enables zero-cost backward compatibility with previously written datasets.

Comment thread
a10y marked this conversation as resolved.
use vortex_array::ArrayId;
use vortex_array::ArrayPlugin;
use vortex_array::ArrayRef;
use vortex_array::IntoArray;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::Patched;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::serde::ArrayChildren;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_session::VortexSession;

use crate::BitPacked;
use crate::BitPackedArrayExt;

/// Custom deserialization plugin that converts a BitPacked array with interior
/// Patches into a PatchedArray holding a BitPacked array.
#[derive(Debug, Clone)]
pub(crate) struct BitPackedPatchedPlugin;

impl ArrayPlugin for BitPackedPatchedPlugin {
fn id(&self) -> ArrayId {
// We reuse the existing `BitPacked` ID so that we can take over its
// deserialization pathway.
BitPacked::ID
}

fn serialize(
&self,
array: &ArrayRef,
session: &VortexSession,
) -> VortexResult<Option<Vec<u8>>> {
// delegate to BitPacked VTable for serialization
BitPacked.serialize(array, session)
}

fn deserialize(
&self,
dtype: &DType,
len: usize,
metadata: &[u8],
buffers: &[BufferHandle],
children: &dyn ArrayChildren,
session: &VortexSession,
) -> VortexResult<ArrayRef> {
let bitpacked = BitPacked
.deserialize(dtype, len, metadata, buffers, children, session)?
.try_downcast::<BitPacked>()
.map_err(|_| {
vortex_err!("BitPacked plugin should only deserialize fastlanes.bitpacked")
})?;

// Create a new BitPackedArray without the interior patches installed.
let Some(patches) = bitpacked.patches() else {
return Ok(bitpacked.into_array());
};

let packed = bitpacked.packed().clone();
let ptype = bitpacked.dtype().as_ptype();
let validity = bitpacked.validity()?;
let bw = bitpacked.bit_width;
Comment thread
a10y marked this conversation as resolved.
let len = bitpacked.len();
let offset = bitpacked.offset();

let bitpacked_without_patches =
BitPacked::try_new(packed, ptype, validity, None, bw, len, offset)?.into_array();

let patched = Patched::from_array_and_patches(
bitpacked_without_patches,
&patches,
&mut session.create_execution_ctx(),
)?;

Ok(patched.into_array())
}
}

#[cfg(test)]
mod tests {
use std::sync::LazyLock;

use vortex_array::ArrayPlugin;
use vortex_array::IntoArray;
use vortex_array::arrays::PatchedArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::patched::PatchedArrayExt;
use vortex_array::buffer::BufferHandle;
use vortex_array::session::ArraySession;
use vortex_array::session::ArraySessionExt;
use vortex_buffer::Buffer;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_session::VortexSession;

use super::BitPackedPatchedPlugin;
use crate::BitPacked;
use crate::BitPackedArray;
use crate::BitPackedArrayExt;
use crate::BitPackedData;

static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
let session = VortexSession::empty().with::<ArraySession>();
session.arrays().register(BitPackedPatchedPlugin);
session
});

#[test]
fn test_decode_bitpacked_patches() -> VortexResult<()> {
// Create values where some exceed the bit width, causing patches.
// With bit_width=9, max value is 511. Values >=512 become patches.
let values: Buffer<i32> = (0i32..=512).collect();
let parray = values.into_array();
let bitpacked = BitPackedData::encode(&parray, 9)?;

assert!(
bitpacked.patches().is_some(),
"Expected BitPacked array to have patches"
);

let array = bitpacked.as_array();

let metadata = array.metadata(&SESSION)?.unwrap_or_default();
let children = array.children();
let buffers = array
.buffers()
.into_iter()
.map(BufferHandle::new_host)
.collect::<Vec<_>>();

let deserialized = BitPackedPatchedPlugin.deserialize(
array.dtype(),
array.len(),
&metadata,
&buffers,
&children,
&SESSION,
)?;

let patched: PatchedArray = deserialized
.try_downcast()
.map_err(|a| vortex_err!("Expected Patched, got {}", a.encoding_id()))?;

let inner_bitpacked: BitPackedArray = patched
.base_array()
.clone()
.try_downcast()
.map_err(|a| vortex_err!("Expected inner BitPacked, got {}", a.encoding_id()))?;

assert!(
inner_bitpacked.patches().is_none(),
"Inner BitPacked should NOT have patches"
);

Ok(())
}

#[test]
fn bitpacked_without_patches_stays_bitpacked() -> VortexResult<()> {
// With bit_width=16, max value is 65535. All values 0..100 fit.
let values: Buffer<i32> = (0i32..100).collect();
let parray = values.into_array();
let bitpacked = BitPackedData::encode(&parray, 16)?;

assert!(
bitpacked.patches().is_none(),
"Expected BitPacked array without patches"
);

let array = bitpacked.as_array();

let metadata = array.metadata(&SESSION)?.unwrap_or_default();
let children = array.children();
let buffers = array
.buffers()
.into_iter()
.map(BufferHandle::new_host)
.collect::<Vec<_>>();

let deserialized = BitPackedPatchedPlugin.deserialize(
array.dtype(),
array.len(),
&metadata,
&buffers,
&children,
&SESSION,
)?;

let result = deserialized
.try_downcast::<BitPacked>()
.map_err(|a| vortex_err!("Expected deserialize BitPacked, got {}", a.encoding_id()))?;

assert!(result.patches().is_none(), "Result should not have patches");

Ok(())
}

#[test]
fn primitive_array_returns_error() -> VortexResult<()> {
let array = PrimitiveArray::from_iter([1i32, 2, 3]).into_array();

let metadata = array.metadata(&SESSION)?.unwrap_or_default();
let children = array.children();
let buffers = array
.buffers()
.into_iter()
.map(BufferHandle::new_host)
.collect::<Vec<_>>();

let result = BitPackedPatchedPlugin.deserialize(
array.dtype(),
array.len(),
&metadata,
&buffers,
&children,
&SESSION,
);

assert!(
result.is_err(),
"Expected error when deserializing PrimitiveArray with BitPackedPatchedPlugin"
);

Ok(())
}
}
21 changes: 20 additions & 1 deletion encodings/fastlanes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

#![allow(clippy::cast_possible_truncation)]

use std::env;
use std::sync::LazyLock;

pub use bitpacking::*;
pub use delta::*;
pub use r#for::*;
Expand Down Expand Up @@ -31,9 +34,25 @@ use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
use vortex_array::session::ArraySessionExt;
use vortex_session::VortexSession;

/// Flag indicating if experimental patched array support is enabled.
///
/// This is set using the environment variable `VORTEX_EXPERIMENTAL_PATCHED_ARRAY`.
///
/// When this is true, any BitPacked array with interior patches will be read as a `Patched`
/// array, and the builtin compressor will use Patched array with BitPacked instead of
/// BitPacked array with interior patches.
pub static USE_EXPERIMENTAL_PATCHES: LazyLock<bool> =
LazyLock::new(|| env::var("VORTEX_EXPERIMENTAL_PATCHED_ARRAY").is_ok());

/// Initialize fastlanes encodings in the given session.
pub fn initialize(session: &VortexSession) {
session.arrays().register(BitPacked);
// If we're using the experimental Patched encoding, register a shim
// for BitPacked with interior patches decode as Patched array.
if *USE_EXPERIMENTAL_PATCHES {
session.arrays().register(BitPackedPatchedPlugin);
} else {
session.arrays().register(BitPacked);
}
session.arrays().register(Delta);
session.arrays().register(FoR);
session.arrays().register(RLE);
Expand Down
8 changes: 4 additions & 4 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -18654,13 +18654,13 @@ pub fn vortex_array::serde::ArrayChildren::is_empty(&self) -> bool

pub fn vortex_array::serde::ArrayChildren::len(&self) -> usize

impl vortex_array::serde::ArrayChildren for &[vortex_array::ArrayRef]
impl<T: core::convert::AsRef<[vortex_array::ArrayRef]>> vortex_array::serde::ArrayChildren for T

pub fn &[vortex_array::ArrayRef]::get(&self, index: usize, dtype: &vortex_array::dtype::DType, len: usize) -> vortex_error::VortexResult<vortex_array::ArrayRef>
pub fn T::get(&self, index: usize, dtype: &vortex_array::dtype::DType, len: usize) -> vortex_error::VortexResult<vortex_array::ArrayRef>

pub fn &[vortex_array::ArrayRef]::is_empty(&self) -> bool
pub fn T::is_empty(&self) -> bool

pub fn &[vortex_array::ArrayRef]::len(&self) -> usize
pub fn T::len(&self) -> usize

pub mod vortex_array::session

Expand Down
6 changes: 3 additions & 3 deletions vortex-array/src/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,16 @@ pub trait ArrayChildren {
}
}

impl ArrayChildren for &[ArrayRef] {
impl<T: AsRef<[ArrayRef]>> ArrayChildren for T {
fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
let array = self[index].clone();
let array = self.as_ref()[index].clone();
assert_eq!(array.len(), len);
assert_eq!(array.dtype(), dtype);
Ok(array)
}

fn len(&self) -> usize {
<[_]>::len(self)
self.as_ref().len()
}
}

Expand Down
Loading
Loading