diff --git a/encodings/alp/src/alp/mod.rs b/encodings/alp/src/alp/mod.rs index 7d5049b59e3..88375fe2ae6 100644 --- a/encodings/alp/src/alp/mod.rs +++ b/encodings/alp/src/alp/mod.rs @@ -18,8 +18,11 @@ mod compress; pub(crate) mod compute; mod decompress; mod ops; +mod plugin; mod rules; +pub(crate) use plugin::ALPPatchedPlugin; + #[cfg(test)] mod tests { use prost::Message; diff --git a/encodings/alp/src/alp/plugin.rs b/encodings/alp/src/alp/plugin.rs new file mode 100644 index 00000000000..87f777ee9a2 --- /dev/null +++ b/encodings/alp/src/alp/plugin.rs @@ -0,0 +1,226 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! A custom [`ArrayPlugin`] that lets you load in and deserialize an `ALP` array with interior +//! patches as a `PatchedArray` that wraps a patchless `ALP` array. +//! +//! This enables zero-cost backward compatibility with previously written datasets. + +use vortex_array::ArrayId; +use vortex_array::ArrayPlugin; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::Patched; +use vortex_array::buffer::BufferHandle; +use vortex_array::dtype::DType; +use vortex_array::serde::ArrayChildren; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_session::VortexSession; + +use crate::ALP; +use crate::ALPArrayExt; +use crate::ALPArrayOwnedExt; + +/// Custom deserialization plugin that converts an ALP array with interior +/// patches into a PatchedArray holding an ALP array. +#[derive(Debug, Clone)] +pub(crate) struct ALPPatchedPlugin; + +impl ArrayPlugin for ALPPatchedPlugin { + fn id(&self) -> ArrayId { + // We reuse the existing `ALP` ID so that we can take over its + // deserialization pathway. + ALP::ID + } + + fn serialize( + &self, + array: &ArrayRef, + session: &VortexSession, + ) -> VortexResult>> { + // Delegate to ALP's metadata serde + ALP.serialize(array, session) + } + + fn deserialize( + &self, + dtype: &DType, + len: usize, + metadata: &[u8], + buffers: &[BufferHandle], + children: &dyn ArrayChildren, + session: &VortexSession, + ) -> VortexResult { + let alp_array = ALP + .deserialize(dtype, len, metadata, buffers, children, session)? + .try_downcast::() + .map_err(|_| vortex_err!("ALP plugin should only deserialize vortex.alp"))?; + + // Check if there are interior patches to externalize. + let Some(patches) = alp_array.patches() else { + return Ok(alp_array.into_array()); + }; + + // Extract components and create a new ALP array without patches. + let (encoded, exponents, _) = alp_array.into_parts(); + + let alp_without_patches = ALP::try_new(encoded, exponents, None)?.into_array(); + + let patched = Patched::from_array_and_patches( + alp_without_patches, + &patches, + &mut session.create_execution_ctx(), + )?; + + Ok(patched.into_array()) + } +} + +#[cfg(test)] +mod tests { + use std::f64::consts::PI; + use std::sync::LazyLock; + + use vortex_array::ArrayPlugin; + use vortex_array::IntoArray; + use vortex_array::arrays::PatchedArray; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::arrays::patched::PatchedArraySlotsExt; + use vortex_array::buffer::BufferHandle; + use vortex_array::session::ArraySession; + use vortex_array::session::ArraySessionExt; + use vortex_error::VortexResult; + use vortex_error::vortex_err; + use vortex_session::VortexSession; + + use super::ALPPatchedPlugin; + use crate::ALP; + use crate::ALPArray; + use crate::ALPArrayExt; + use crate::alp_encode; + + static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + session.arrays().register(ALPPatchedPlugin); + session + }); + + #[test] + fn test_decode_alp_patches() -> VortexResult<()> { + // Create values where some don't encode cleanly with ALP, causing patches. + // PI doesn't encode cleanly. + let values: Vec = (0..100) + .map(|i| if i % 4 == 3 { PI } else { i as f64 }) + .collect(); + + let parray = PrimitiveArray::from_iter(values); + let alp_encoded = alp_encode(&parray, None)?; + + assert!( + alp_encoded.patches().is_some(), + "Expected ALP array to have patches" + ); + + let array = alp_encoded.as_array(); + + let metadata = array.metadata(&SESSION)?.unwrap_or_default(); + let children = array.children(); + let buffers = array + .buffers() + .into_iter() + .map(BufferHandle::new_host) + .collect::>(); + + let deserialized = ALPPatchedPlugin.deserialize( + array.dtype(), + array.len(), + &metadata, + &buffers, + &children, + &SESSION, + )?; + + let patched: PatchedArray = deserialized + .try_downcast() + .map_err(|a| vortex_err!("Expected Patched, got {}", a.encoding_id()))?; + + let inner_alp: ALPArray = patched + .inner() + .clone() + .try_downcast() + .map_err(|a| vortex_err!("Expected inner ALP, got {}", a.encoding_id()))?; + + assert!( + inner_alp.patches().is_none(), + "Inner ALP should NOT have patches" + ); + + Ok(()) + } + + #[test] + fn alp_without_patches_stays_alp() -> VortexResult<()> { + // Values that encode cleanly without patches. + let values: Vec = (0..100).map(|i| i as f64).collect(); + let parray = PrimitiveArray::from_iter(values); + let alp_encoded = alp_encode(&parray, None)?; + + assert!( + alp_encoded.patches().is_none(), + "Expected ALP array without patches" + ); + + let array = alp_encoded.as_array(); + + let metadata = array.metadata(&SESSION)?.unwrap_or_default(); + let children = array.children(); + let buffers = array + .buffers() + .into_iter() + .map(BufferHandle::new_host) + .collect::>(); + + let deserialized = ALPPatchedPlugin.deserialize( + array.dtype(), + array.len(), + &metadata, + &buffers, + &children, + &SESSION, + )?; + + let result = deserialized + .try_downcast::() + .map_err(|a| vortex_err!("Expected deserialized ALP, got {}", a.encoding_id()))?; + + assert!(result.patches().is_none(), "Result should not have patches"); + + Ok(()) + } + + #[test] + #[should_panic(expected = "index out of bounds")] + fn primitive_array_returns_error() { + let array = PrimitiveArray::from_iter([1.0f64, 2.0, 3.0]).into_array(); + + let metadata = array.metadata(&SESSION).unwrap().unwrap_or_default(); + let children = array.children(); + let buffers = array + .buffers() + .into_iter() + .map(BufferHandle::new_host) + .collect::>(); + + // This panics because PrimitiveArray has no children and ALP requires encoded child. + let _result = ALPPatchedPlugin.deserialize( + array.dtype(), + array.len(), + &metadata, + &buffers, + &children, + &SESSION, + ); + } +} diff --git a/encodings/alp/src/lib.rs b/encodings/alp/src/lib.rs index 3a955f368af..35a7534d7aa 100644 --- a/encodings/alp/src/lib.rs +++ b/encodings/alp/src/lib.rs @@ -21,6 +21,7 @@ pub use alp_rd::*; use vortex_array::aggregate_fn::AggregateFnVTable; use vortex_array::aggregate_fn::fns::nan_count::NanCount; use vortex_array::aggregate_fn::session::AggregateFnSessionExt; +use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES; use vortex_array::session::ArraySessionExt; use vortex_session::VortexSession; @@ -29,7 +30,13 @@ mod alp_rd; /// Initialize ALP encoding in the given session. pub fn initialize(session: &VortexSession) { - session.arrays().register(ALP); + // If we're using the experimental Patched encoding, register a shim + // for ALP with interior patches to decode as Patched array. + if *USE_EXPERIMENTAL_PATCHES { + session.arrays().register(ALPPatchedPlugin); + } else { + session.arrays().register(ALP); + } session.arrays().register(ALPRD); // Register the ALP-specific NaN count aggregate kernel. diff --git a/encodings/fastlanes/public-api.lock b/encodings/fastlanes/public-api.lock index 15cc889bf9f..e65c373b460 100644 --- a/encodings/fastlanes/public-api.lock +++ b/encodings/fastlanes/public-api.lock @@ -570,8 +570,6 @@ impl vortex_array::hash::ArrayHash for vortex_fastlanes::RLEData pub fn vortex_fastlanes::RLEData::array_hash(&self, state: &mut H, _precision: vortex_array::hash::Precision) -pub static vortex_fastlanes::USE_EXPERIMENTAL_PATCHES: std::sync::lazy_lock::LazyLock - pub trait vortex_fastlanes::BitPackedArrayExt: vortex_fastlanes::BitPackedArraySlotsExt pub fn vortex_fastlanes::BitPackedArrayExt::bit_width(&self) -> u8 diff --git a/encodings/fastlanes/src/lib.rs b/encodings/fastlanes/src/lib.rs index 9a9f37d9af8..a00e13b53fa 100644 --- a/encodings/fastlanes/src/lib.rs +++ b/encodings/fastlanes/src/lib.rs @@ -3,9 +3,6 @@ #![allow(clippy::cast_possible_truncation)] -use std::env; -use std::sync::LazyLock; - pub use bitpacking::*; pub use delta::*; pub use r#for::*; @@ -31,19 +28,10 @@ use vortex_array::aggregate_fn::AggregateFnVTable; use vortex_array::aggregate_fn::fns::is_constant::IsConstant; use vortex_array::aggregate_fn::fns::is_sorted::IsSorted; use vortex_array::aggregate_fn::session::AggregateFnSessionExt; +use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES; use vortex_array::session::ArraySessionExt; use vortex_session::VortexSession; -/// Flag indicating if experimental patched array support is enabled. -/// -/// This is set using the environment variable `VORTEX_EXPERIMENTAL_PATCHED_ARRAY`. -/// -/// When this is true, any BitPacked array with interior patches will be read as a `Patched` -/// array, and the builtin compressor will use Patched array with BitPacked instead of -/// BitPacked array with interior patches. -pub static USE_EXPERIMENTAL_PATCHES: LazyLock = - LazyLock::new(|| env::var("VORTEX_EXPERIMENTAL_PATCHED_ARRAY").is_ok()); - /// Initialize fastlanes encodings in the given session. pub fn initialize(session: &VortexSession) { // If we're using the experimental Patched encoding, register a shim diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 19dc56a37f2..de53b12a037 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -3538,6 +3538,8 @@ pub fn vortex_array::arrays::patched::PatchedSlotsView<'a>::fmt(&self, f: &mut c impl<'a> core::marker::Copy for vortex_array::arrays::patched::PatchedSlotsView<'a> +pub static vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES: std::sync::lazy_lock::LazyLock + pub trait vortex_array::arrays::patched::PatchedArrayExt: vortex_array::arrays::patched::PatchedArraySlotsExt pub fn vortex_array::arrays::patched::PatchedArrayExt::lane_range(&self, chunk: usize, lane: usize) -> vortex_error::VortexResult> diff --git a/vortex-array/src/arrays/patched/mod.rs b/vortex-array/src/arrays/patched/mod.rs index 56024f50d8e..b0453c68677 100644 --- a/vortex-array/src/arrays/patched/mod.rs +++ b/vortex-array/src/arrays/patched/mod.rs @@ -71,6 +71,9 @@ mod array; mod compute; mod vtable; +use std::env; +use std::sync::LazyLock; + pub use array::*; use vortex_buffer::ByteBuffer; pub use vtable::*; @@ -96,3 +99,14 @@ const fn patch_lanes() -> usize { // from shared to global memory. if size_of::() < 8 { 32 } else { 16 } } + +/// Flag indicating if experimental patched array support is enabled. +/// +/// This is set using the environment variable `VORTEX_EXPERIMENTAL_PATCHED_ARRAY`. +/// +/// When this is true, any arrays with interior `Patches` will be read as a `Patched` +/// array, and eliminate the interior patches. +/// +/// The builtin compressor will also generate Patched arrays. +pub static USE_EXPERIMENTAL_PATCHES: LazyLock = + LazyLock::new(|| env::var("VORTEX_EXPERIMENTAL_PATCHED_ARRAY").is_ok()); diff --git a/vortex-btrblocks/src/schemes/float.rs b/vortex-btrblocks/src/schemes/float.rs index e74b130bbd0..6c3dae9ab64 100644 --- a/vortex-btrblocks/src/schemes/float.rs +++ b/vortex-btrblocks/src/schemes/float.rs @@ -13,7 +13,11 @@ use vortex_alp::alp_encode; use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; use vortex_array::ToCanonical; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::Patched; +use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES; use vortex_array::arrays::primitive::PrimitiveArrayExt; use vortex_array::dtype::PType; use vortex_compressor::estimate::CompressionEstimate; @@ -107,11 +111,30 @@ impl Scheme for ALPScheme { let compressed_alp_ints = compressor.compress_child(alp_encoded.encoded(), &ctx, self.id(), 0)?; - // Patches are not compressed. They should be infrequent, and if they are not then we want - // to keep them linear for easy indexing. - let patches = alp_encoded.patches().map(compress_patches).transpose()?; + let alp_stats = alp_encoded.as_array().statistics().to_owned(); + let exponents = alp_encoded.exponents(); + + if *USE_EXPERIMENTAL_PATCHES { + let patches = alp_encoded.patches(); + + // Create ALP array without interior patches. + let alp_array = ALP::new(compressed_alp_ints, exponents, None).into_array(); + + match patches { + None => Ok(alp_array), + Some(p) => Ok(Patched::from_array_and_patches( + alp_array, + &p, + &mut LEGACY_SESSION.create_execution_ctx(), + )? + .with_stats_set(alp_stats) + .into_array()), + } + } else { + let patches = alp_encoded.patches().map(compress_patches).transpose()?; - Ok(ALP::new(compressed_alp_ints, alp_encoded.exponents(), patches).into_array()) + Ok(ALP::new(compressed_alp_ints, exponents, patches).into_array()) + } } } diff --git a/vortex-btrblocks/src/schemes/integer.rs b/vortex-btrblocks/src/schemes/integer.rs index f56b8d478f0..1fd75c61910 100644 --- a/vortex-btrblocks/src/schemes/integer.rs +++ b/vortex-btrblocks/src/schemes/integer.rs @@ -11,6 +11,7 @@ use vortex_array::ToCanonical; use vortex_array::VortexSessionExecute; use vortex_array::arrays::ConstantArray; use vortex_array::arrays::Patched; +use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES; use vortex_array::arrays::primitive::PrimitiveArrayExt; use vortex_array::scalar::Scalar; use vortex_compressor::builtins::FloatDictScheme; @@ -32,7 +33,6 @@ use vortex_fastlanes::FoR; use vortex_fastlanes::FoRArrayExt; use vortex_fastlanes::RLE; use vortex_fastlanes::RLEArrayExt; -use vortex_fastlanes::USE_EXPERIMENTAL_PATCHES; use vortex_fastlanes::bitpack_compress::bit_width_histogram; use vortex_fastlanes::bitpack_compress::bitpack_encode; use vortex_fastlanes::bitpack_compress::find_best_bit_width; diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index 3253d998bf5..a30d28ab886 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -26,6 +26,7 @@ use vortex_array::arrays::Primitive; use vortex_array::arrays::Struct; use vortex_array::arrays::VarBin; use vortex_array::arrays::VarBinView; +use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES; use vortex_array::dtype::FieldPath; use vortex_btrblocks::BtrBlocksCompressorBuilder; use vortex_btrblocks::SchemeExt; @@ -91,7 +92,7 @@ pub static ALLOWED_ENCODINGS: LazyLock> = LazyLock::new(|| { allowed.insert(Masked.id()); allowed.insert(Dict.id()); - if *vortex_fastlanes::USE_EXPERIMENTAL_PATCHES { + if *USE_EXPERIMENTAL_PATCHES { allowed.insert(Patched.id()); }