11// SPDX-License-Identifier: Apache-2.0
22// SPDX-FileCopyrightText: Copyright the Vortex contributors
33
4- use arrayref:: array_mut_ref;
5- use arrayref:: array_ref;
4+ use std:: mem;
5+ use std:: mem:: MaybeUninit ;
6+
67use fastlanes:: Delta ;
78use fastlanes:: FastLanes ;
89use fastlanes:: Transpose ;
9- use num_traits:: WrappingSub ;
10+ use vortex_array:: Canonical ;
11+ use vortex_array:: ExecutionCtx ;
12+ use vortex_array:: IntoArray ;
13+ use vortex_array:: arrays:: BoolArray ;
1014use vortex_array:: arrays:: PrimitiveArray ;
1115use vortex_array:: dtype:: NativePType ;
1216use vortex_array:: match_each_unsigned_integer_ptype;
17+ use vortex_array:: validity:: Validity ;
1318use vortex_array:: vtable:: ValidityHelper ;
19+ use vortex_buffer:: BitBuffer ;
1420use vortex_buffer:: Buffer ;
1521use vortex_buffer:: BufferMut ;
1622use vortex_error:: VortexResult ;
1723
18- pub fn delta_compress ( array : & PrimitiveArray ) -> VortexResult < ( PrimitiveArray , PrimitiveArray ) > {
19- // TODO(ngates): fill forward nulls?
20- // let filled = fill_forward(array)?.to_primitive()?;
24+ use crate :: bit_transpose:: transpose_bitbuffer;
2125
22- // Compress the filled array
26+ pub fn delta_compress (
27+ array : & PrimitiveArray ,
28+ ctx : & mut ExecutionCtx ,
29+ ) -> VortexResult < ( PrimitiveArray , PrimitiveArray ) > {
2330 let ( bases, deltas) = match_each_unsigned_integer_ptype ! ( array. ptype( ) , |T | {
2431 const LANES : usize = T :: LANES ;
2532 let ( bases, deltas) = compress_primitive:: <T , LANES >( array. as_slice:: <T >( ) ) ;
33+ let validity = transpose_and_pad_validity( array. validity( ) , deltas. len( ) , ctx) ?;
2634 (
27- // To preserve nullability, we include Validity
2835 PrimitiveArray :: new( bases, array. dtype( ) . nullability( ) . into( ) ) ,
29- PrimitiveArray :: new( deltas, array . validity( ) . clone ( ) ) ,
36+ PrimitiveArray :: new( deltas, validity) ,
3037 )
3138 } ) ;
3239
3340 Ok ( ( bases, deltas) )
3441}
3542
36- fn compress_primitive < T : NativePType + Delta + Transpose + WrappingSub , const LANES : usize > (
43+ /// Transpose and pad validity to match the padded deltas length.
44+ ///
45+ /// For [`Validity::Array`], the validity bits are transposed into FastLanes order and then
46+ /// extended to `padded_len`. The underlying byte buffer from transposition is already
47+ /// padded to 128-byte alignment (1024 bits), which exactly matches our 1024-element chunks.
48+ fn transpose_and_pad_validity (
49+ validity : & Validity ,
50+ padded_len : usize ,
51+ ctx : & mut ExecutionCtx ,
52+ ) -> VortexResult < Validity > {
53+ match validity {
54+ Validity :: Array ( mask) => {
55+ let bools = mask
56+ . clone ( )
57+ . execute :: < Canonical > ( ctx) ?
58+ . into_bool ( )
59+ . into_bit_buffer ( ) ;
60+ let transposed = transpose_bitbuffer ( bools) ;
61+ let ( offset, _len, bytes) = transposed. into_inner ( ) ;
62+ let padded = BitBuffer :: new_with_offset ( bytes, padded_len, offset) ;
63+ Ok ( Validity :: Array (
64+ BoolArray :: new ( padded, Validity :: NonNullable ) . into_array ( ) ,
65+ ) )
66+ }
67+ v @ Validity :: AllValid | v @ Validity :: AllInvalid | v @ Validity :: NonNullable => {
68+ Ok ( v. clone ( ) )
69+ }
70+ }
71+ }
72+
73+ fn compress_primitive < T : NativePType + Delta + Transpose , const LANES : usize > (
3774 array : & [ T ] ,
3875) -> ( Buffer < T > , Buffer < T > ) {
39- // How many fastlanes vectors we will process.
40- let num_chunks = array. len ( ) / 1024 ;
76+ let padded_len = array. len ( ) . next_multiple_of ( 1024 ) ;
77+ let num_chunks = padded_len / 1024 ;
78+ let bases_len = num_chunks * LANES ;
79+
80+ // Split into full 1024-element chunks and a remainder.
81+ let ( full_chunks, remainder) = array. as_chunks :: < 1024 > ( ) ;
4182
4283 // Allocate result arrays.
43- let mut bases = BufferMut :: with_capacity ( num_chunks * T :: LANES + 1 ) ;
44- let mut deltas = BufferMut :: with_capacity ( array. len ( ) ) ;
45-
46- // Loop over all the 1024-element chunks.
47- if num_chunks > 0 {
48- let mut transposed: [ T ; 1024 ] = [ T :: default ( ) ; 1024 ] ;
49-
50- for i in 0 ..num_chunks {
51- let start_elem = i * 1024 ;
52- let chunk: & [ T ; 1024 ] = array_ref ! [ array, start_elem, 1024 ] ;
53- Transpose :: transpose ( chunk, & mut transposed) ;
54-
55- // Initialize and store the base vector for each chunk
56- bases. extend_from_slice ( & transposed[ 0 ..T :: LANES ] ) ;
57-
58- deltas. reserve ( 1024 ) ;
59- let delta_len = deltas. len ( ) ;
60- unsafe {
61- deltas. set_len ( delta_len + 1024 ) ;
62- Delta :: delta :: < LANES > (
63- & transposed,
64- & * ( transposed[ 0 ..T :: LANES ] . as_ptr ( ) . cast ( ) ) ,
65- array_mut_ref ! [ deltas[ delta_len..] , 0 , 1024 ] ,
66- ) ;
67- }
84+ let mut bases = BufferMut :: with_capacity ( bases_len) ;
85+ let mut deltas = BufferMut :: with_capacity ( padded_len) ;
86+ let ( output_deltas, _) = deltas. spare_capacity_mut ( ) . as_chunks_mut :: < 1024 > ( ) ;
87+
88+ // Loop over all full 1024-element chunks.
89+ let mut transposed: [ T ; 1024 ] = [ T :: default ( ) ; 1024 ] ;
90+ for ( chunk, output) in full_chunks. iter ( ) . zip ( output_deltas. iter_mut ( ) ) {
91+ Transpose :: transpose ( chunk, & mut transposed) ;
92+ bases. extend_from_slice ( & transposed[ 0 ..T :: LANES ] ) ;
93+
94+ unsafe {
95+ Delta :: delta :: < LANES > (
96+ & transposed,
97+ & * ( transposed[ 0 ..T :: LANES ] . as_ptr ( ) . cast ( ) ) ,
98+ mem:: transmute :: < & mut [ MaybeUninit < T > ; 1024 ] , & mut [ T ; 1024 ] > ( output) ,
99+ ) ;
68100 }
69101 }
70102
71- // To avoid padding, the remainder is encoded with scalar logic.
72- let remainder_size = array. len ( ) % 1024 ;
73- if remainder_size > 0 {
74- let chunk = & array[ array. len ( ) - remainder_size..] ;
75- let mut base_scalar = chunk[ 0 ] ;
76- bases. push ( base_scalar) ;
77- for next in chunk {
78- let diff = next. wrapping_sub ( & base_scalar) ;
79- deltas. push ( diff) ;
80- base_scalar = * next;
103+ // Pad the remainder to 1024 elements and process as a full chunk.
104+ if !remainder. is_empty ( ) {
105+ let mut padded_chunk = [ T :: default ( ) ; 1024 ] ;
106+ padded_chunk[ ..remainder. len ( ) ] . copy_from_slice ( remainder) ;
107+
108+ Transpose :: transpose ( & padded_chunk, & mut transposed) ;
109+ bases. extend_from_slice ( & transposed[ 0 ..T :: LANES ] ) ;
110+
111+ unsafe {
112+ Delta :: delta :: < LANES > (
113+ & transposed,
114+ & * ( transposed[ 0 ..T :: LANES ] . as_ptr ( ) . cast ( ) ) ,
115+ mem:: transmute :: < & mut [ MaybeUninit < T > ; 1024 ] , & mut [ T ; 1024 ] > (
116+ & mut output_deltas[ full_chunks. len ( ) ] ,
117+ ) ,
118+ ) ;
81119 }
82120 }
83121
84- assert_eq ! (
85- bases. len( ) ,
86- num_chunks * T :: LANES + ( if remainder_size > 0 { 1 } else { 0 } )
87- ) ;
88- assert_eq ! ( deltas. len( ) , array. len( ) ) ;
122+ unsafe { deltas. set_len ( padded_len) } ;
123+
124+ assert_eq ! ( bases. len( ) , bases_len) ;
125+ assert_eq ! ( deltas. len( ) , padded_len) ;
89126
90127 ( bases. freeze ( ) , deltas. freeze ( ) )
91128}
@@ -94,6 +131,7 @@ fn compress_primitive<T: NativePType + Delta + Transpose + WrappingSub, const LA
94131mod tests {
95132 use std:: sync:: LazyLock ;
96133
134+ use rstest:: rstest;
97135 use vortex_array:: VortexSessionExecute ;
98136 use vortex_array:: arrays:: PrimitiveArray ;
99137 use vortex_array:: assert_arrays_eq;
@@ -107,28 +145,18 @@ mod tests {
107145 static SESSION : LazyLock < VortexSession > =
108146 LazyLock :: new ( || VortexSession :: empty ( ) . with :: < ArraySession > ( ) ) ;
109147
110- #[ test]
111- fn test_compress ( ) -> VortexResult < ( ) > {
112- do_roundtrip_test ( ( 0u32 ..10_000 ) . collect ( ) )
113- }
114-
115- #[ test]
116- fn test_compress_nullable ( ) -> VortexResult < ( ) > {
117- do_roundtrip_test ( PrimitiveArray :: from_option_iter (
148+ #[ rstest]
149+ #[ case( ( 0u32 ..10_000 ) . collect( ) ) ]
150+ #[ case( ( 0 ..10_000 ) . map( |i| ( i % ( u8 :: MAX as i32 ) ) as u8 ) . collect( ) ) ]
151+ #[ case( PrimitiveArray :: from_option_iter(
118152 ( 0u32 ..10_000 ) . map( |i| ( i % 2 == 0 ) . then_some( i) ) ,
119- ) )
120- }
121-
122- #[ test]
123- fn test_compress_overflow ( ) -> VortexResult < ( ) > {
124- do_roundtrip_test ( ( 0 ..10_000 ) . map ( |i| ( i % ( u8:: MAX as i32 ) ) as u8 ) . collect ( ) )
125- }
126-
127- fn do_roundtrip_test ( input : PrimitiveArray ) -> VortexResult < ( ) > {
128- let delta = DeltaArray :: try_from_primitive_array ( & input) ?;
129- assert_eq ! ( delta. len( ) , input. len( ) ) ;
153+ ) ) ]
154+ fn test_compress ( #[ case] array : PrimitiveArray ) -> VortexResult < ( ) > {
155+ let delta =
156+ DeltaArray :: try_from_primitive_array ( & array, & mut SESSION . create_execution_ctx ( ) ) ?;
157+ assert_eq ! ( delta. len( ) , array. len( ) ) ;
130158 let decompressed = delta_decompress ( & delta, & mut SESSION . create_execution_ctx ( ) ) ?;
131- assert_arrays_eq ! ( decompressed, input ) ;
159+ assert_arrays_eq ! ( decompressed, array ) ;
132160 Ok ( ( ) )
133161 }
134162}
0 commit comments