Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions encodings/alp/src/alp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ mod compress;
pub(crate) mod compute;
mod decompress;
mod ops;
mod plugin;
mod rules;

pub(crate) use plugin::ALPPatchedPlugin;

#[cfg(test)]
mod tests {
use prost::Message;
Expand Down
226 changes: 226 additions & 0 deletions encodings/alp/src/alp/plugin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! A custom [`ArrayPlugin`] that lets you load in and deserialize an `ALP` array with interior
//! patches as a `PatchedArray` that wraps a patchless `ALP` array.
//!
//! This enables zero-cost backward compatibility with previously written datasets.

use vortex_array::ArrayId;
use vortex_array::ArrayPlugin;
use vortex_array::ArrayRef;
use vortex_array::IntoArray;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::Patched;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::serde::ArrayChildren;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_session::VortexSession;

use crate::ALP;
use crate::ALPArrayExt;
use crate::ALPArrayOwnedExt;

/// Custom deserialization plugin that converts an ALP array with interior
/// patches into a PatchedArray holding an ALP array.
#[derive(Debug, Clone)]
pub(crate) struct ALPPatchedPlugin;

impl ArrayPlugin for ALPPatchedPlugin {
fn id(&self) -> ArrayId {
// We reuse the existing `ALP` ID so that we can take over its
// deserialization pathway.
ALP::ID
}

fn serialize(
&self,
array: &ArrayRef,
session: &VortexSession,
) -> VortexResult<Option<Vec<u8>>> {
// Delegate to ALP's metadata serde
ALP.serialize(array, session)
}

fn deserialize(
&self,
dtype: &DType,
len: usize,
metadata: &[u8],
buffers: &[BufferHandle],
children: &dyn ArrayChildren,
session: &VortexSession,
) -> VortexResult<ArrayRef> {
let alp_array = ALP
.deserialize(dtype, len, metadata, buffers, children, session)?
.try_downcast::<ALP>()
.map_err(|_| vortex_err!("ALP plugin should only deserialize vortex.alp"))?;

// Check if there are interior patches to externalize.
let Some(patches) = alp_array.patches() else {
return Ok(alp_array.into_array());
};

// Extract components and create a new ALP array without patches.
let (encoded, exponents, _) = alp_array.into_parts();

let alp_without_patches = ALP::try_new(encoded, exponents, None)?.into_array();

let patched = Patched::from_array_and_patches(
alp_without_patches,
&patches,
&mut session.create_execution_ctx(),
)?;

Ok(patched.into_array())
}
}

#[cfg(test)]
mod tests {
use std::f64::consts::PI;
use std::sync::LazyLock;

use vortex_array::ArrayPlugin;
use vortex_array::IntoArray;
use vortex_array::arrays::PatchedArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::patched::PatchedArraySlotsExt;
use vortex_array::buffer::BufferHandle;
use vortex_array::session::ArraySession;
use vortex_array::session::ArraySessionExt;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_session::VortexSession;

use super::ALPPatchedPlugin;
use crate::ALP;
use crate::ALPArray;
use crate::ALPArrayExt;
use crate::alp_encode;

static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
let session = VortexSession::empty().with::<ArraySession>();
session.arrays().register(ALPPatchedPlugin);
session
});

#[test]
fn test_decode_alp_patches() -> VortexResult<()> {
// Create values where some don't encode cleanly with ALP, causing patches.
// PI doesn't encode cleanly.
let values: Vec<f64> = (0..100)
.map(|i| if i % 4 == 3 { PI } else { i as f64 })
.collect();

let parray = PrimitiveArray::from_iter(values);
let alp_encoded = alp_encode(&parray, None)?;

assert!(
alp_encoded.patches().is_some(),
"Expected ALP array to have patches"
);

let array = alp_encoded.as_array();

let metadata = array.metadata(&SESSION)?.unwrap_or_default();
let children = array.children();
let buffers = array
.buffers()
.into_iter()
.map(BufferHandle::new_host)
.collect::<Vec<_>>();

let deserialized = ALPPatchedPlugin.deserialize(
array.dtype(),
array.len(),
&metadata,
&buffers,
&children,
&SESSION,
)?;

let patched: PatchedArray = deserialized
.try_downcast()
.map_err(|a| vortex_err!("Expected Patched, got {}", a.encoding_id()))?;

let inner_alp: ALPArray = patched
.inner()
.clone()
.try_downcast()
.map_err(|a| vortex_err!("Expected inner ALP, got {}", a.encoding_id()))?;

assert!(
inner_alp.patches().is_none(),
"Inner ALP should NOT have patches"
);

Ok(())
}

#[test]
fn alp_without_patches_stays_alp() -> VortexResult<()> {
// Values that encode cleanly without patches.
let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
let parray = PrimitiveArray::from_iter(values);
let alp_encoded = alp_encode(&parray, None)?;

assert!(
alp_encoded.patches().is_none(),
"Expected ALP array without patches"
);

let array = alp_encoded.as_array();

let metadata = array.metadata(&SESSION)?.unwrap_or_default();
let children = array.children();
let buffers = array
.buffers()
.into_iter()
.map(BufferHandle::new_host)
.collect::<Vec<_>>();

let deserialized = ALPPatchedPlugin.deserialize(
array.dtype(),
array.len(),
&metadata,
&buffers,
&children,
&SESSION,
)?;

let result = deserialized
.try_downcast::<ALP>()
.map_err(|a| vortex_err!("Expected deserialized ALP, got {}", a.encoding_id()))?;

assert!(result.patches().is_none(), "Result should not have patches");

Ok(())
}

#[test]
#[should_panic(expected = "index out of bounds")]
fn primitive_array_returns_error() {
let array = PrimitiveArray::from_iter([1.0f64, 2.0, 3.0]).into_array();

let metadata = array.metadata(&SESSION).unwrap().unwrap_or_default();
let children = array.children();
let buffers = array
.buffers()
.into_iter()
.map(BufferHandle::new_host)
.collect::<Vec<_>>();

// This panics because PrimitiveArray has no children and ALP requires encoded child.
let _result = ALPPatchedPlugin.deserialize(
array.dtype(),
array.len(),
&metadata,
&buffers,
&children,
&SESSION,
);
}
}
9 changes: 8 additions & 1 deletion encodings/alp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub use alp_rd::*;
use vortex_array::aggregate_fn::AggregateFnVTable;
use vortex_array::aggregate_fn::fns::nan_count::NanCount;
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES;
use vortex_array::session::ArraySessionExt;
use vortex_session::VortexSession;

Expand All @@ -29,7 +30,13 @@ mod alp_rd;

/// Initialize ALP encoding in the given session.
pub fn initialize(session: &VortexSession) {
session.arrays().register(ALP);
// If we're using the experimental Patched encoding, register a shim
// for ALP with interior patches to decode as Patched array.
if *USE_EXPERIMENTAL_PATCHES {
Comment thread
a10y marked this conversation as resolved.
session.arrays().register(ALPPatchedPlugin);
} else {
session.arrays().register(ALP);
}
session.arrays().register(ALPRD);

// Register the ALP-specific NaN count aggregate kernel.
Expand Down
2 changes: 0 additions & 2 deletions encodings/fastlanes/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,6 @@ impl vortex_array::hash::ArrayHash for vortex_fastlanes::RLEData

pub fn vortex_fastlanes::RLEData::array_hash<H: core::hash::Hasher>(&self, state: &mut H, _precision: vortex_array::hash::Precision)

pub static vortex_fastlanes::USE_EXPERIMENTAL_PATCHES: std::sync::lazy_lock::LazyLock<bool>

pub trait vortex_fastlanes::BitPackedArrayExt: vortex_fastlanes::BitPackedArraySlotsExt

pub fn vortex_fastlanes::BitPackedArrayExt::bit_width(&self) -> u8
Expand Down
14 changes: 1 addition & 13 deletions encodings/fastlanes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@

#![allow(clippy::cast_possible_truncation)]

use std::env;
use std::sync::LazyLock;

pub use bitpacking::*;
pub use delta::*;
pub use r#for::*;
Expand All @@ -31,19 +28,10 @@ use vortex_array::aggregate_fn::AggregateFnVTable;
use vortex_array::aggregate_fn::fns::is_constant::IsConstant;
use vortex_array::aggregate_fn::fns::is_sorted::IsSorted;
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES;
use vortex_array::session::ArraySessionExt;
use vortex_session::VortexSession;

/// Flag indicating if experimental patched array support is enabled.
///
/// This is set using the environment variable `VORTEX_EXPERIMENTAL_PATCHED_ARRAY`.
///
/// When this is true, any BitPacked array with interior patches will be read as a `Patched`
/// array, and the builtin compressor will use Patched array with BitPacked instead of
/// BitPacked array with interior patches.
pub static USE_EXPERIMENTAL_PATCHES: LazyLock<bool> =
LazyLock::new(|| env::var("VORTEX_EXPERIMENTAL_PATCHED_ARRAY").is_ok());

/// Initialize fastlanes encodings in the given session.
pub fn initialize(session: &VortexSession) {
// If we're using the experimental Patched encoding, register a shim
Expand Down
2 changes: 2 additions & 0 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3538,6 +3538,8 @@ pub fn vortex_array::arrays::patched::PatchedSlotsView<'a>::fmt(&self, f: &mut c

impl<'a> core::marker::Copy for vortex_array::arrays::patched::PatchedSlotsView<'a>

pub static vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES: std::sync::lazy_lock::LazyLock<bool>

pub trait vortex_array::arrays::patched::PatchedArrayExt: vortex_array::arrays::patched::PatchedArraySlotsExt

pub fn vortex_array::arrays::patched::PatchedArrayExt::lane_range(&self, chunk: usize, lane: usize) -> vortex_error::VortexResult<core::ops::range::Range<usize>>
Expand Down
14 changes: 14 additions & 0 deletions vortex-array/src/arrays/patched/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ mod array;
mod compute;
mod vtable;

use std::env;
use std::sync::LazyLock;

pub use array::*;
use vortex_buffer::ByteBuffer;
pub use vtable::*;
Expand All @@ -96,3 +99,14 @@ const fn patch_lanes<V: Sized>() -> usize {
// from shared to global memory.
if size_of::<V>() < 8 { 32 } else { 16 }
}

/// Flag indicating if experimental patched array support is enabled.
///
/// This is set using the environment variable `VORTEX_EXPERIMENTAL_PATCHED_ARRAY`.
///
/// When this is true, any arrays with interior `Patches` will be read as a `Patched`
/// array, and eliminate the interior patches.
///
/// The builtin compressor will also generate Patched arrays.
pub static USE_EXPERIMENTAL_PATCHES: LazyLock<bool> =
LazyLock::new(|| env::var("VORTEX_EXPERIMENTAL_PATCHED_ARRAY").is_ok());
31 changes: 27 additions & 4 deletions vortex-btrblocks/src/schemes/float.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ use vortex_alp::alp_encode;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::ToCanonical;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::Patched;
use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES;
use vortex_array::arrays::primitive::PrimitiveArrayExt;
use vortex_array::dtype::PType;
use vortex_compressor::estimate::CompressionEstimate;
Expand Down Expand Up @@ -107,11 +111,30 @@ impl Scheme for ALPScheme {
let compressed_alp_ints =
compressor.compress_child(alp_encoded.encoded(), &ctx, self.id(), 0)?;

// Patches are not compressed. They should be infrequent, and if they are not then we want
// to keep them linear for easy indexing.
let patches = alp_encoded.patches().map(compress_patches).transpose()?;
let alp_stats = alp_encoded.as_array().statistics().to_owned();
let exponents = alp_encoded.exponents();

if *USE_EXPERIMENTAL_PATCHES {
let patches = alp_encoded.patches();

// Create ALP array without interior patches.
let alp_array = ALP::new(compressed_alp_ints, exponents, None).into_array();

match patches {
None => Ok(alp_array),
Some(p) => Ok(Patched::from_array_and_patches(
alp_array,
&p,
&mut LEGACY_SESSION.create_execution_ctx(),
)?
.with_stats_set(alp_stats)
.into_array()),
}
} else {
let patches = alp_encoded.patches().map(compress_patches).transpose()?;

Ok(ALP::new(compressed_alp_ints, alp_encoded.exponents(), patches).into_array())
Ok(ALP::new(compressed_alp_ints, exponents, patches).into_array())
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion vortex-btrblocks/src/schemes/integer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use vortex_array::ToCanonical;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::Patched;
use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES;
use vortex_array::arrays::primitive::PrimitiveArrayExt;
use vortex_array::scalar::Scalar;
use vortex_compressor::builtins::FloatDictScheme;
Expand All @@ -32,7 +33,6 @@ use vortex_fastlanes::FoR;
use vortex_fastlanes::FoRArrayExt;
use vortex_fastlanes::RLE;
use vortex_fastlanes::RLEArrayExt;
use vortex_fastlanes::USE_EXPERIMENTAL_PATCHES;
use vortex_fastlanes::bitpack_compress::bit_width_histogram;
use vortex_fastlanes::bitpack_compress::bitpack_encode;
use vortex_fastlanes::bitpack_compress::find_best_bit_width;
Expand Down
Loading
Loading