@@ -8,70 +8,87 @@ use onpair::Offset;
88use vortex_array:: ArrayRef ;
99use vortex_array:: ExecutionCtx ;
1010use vortex_array:: IntoArray ;
11- use vortex_array:: LEGACY_SESSION ;
12- use vortex_array:: VortexSessionExecute ;
13- use vortex_array:: accessor:: ArrayAccessor ;
11+ use vortex_array:: arrays:: ConstantArray ;
1412use vortex_array:: arrays:: VarBinViewArray ;
13+ use vortex_array:: arrays:: varbinview:: BinaryView ;
1514use vortex_array:: buffer:: BufferHandle ;
16- use vortex_array:: dtype:: DType ;
17- use vortex_array:: dtype:: Nullability ;
1815use vortex_array:: validity:: Validity ;
16+ use vortex_buffer:: Alignment ;
1917use vortex_buffer:: Buffer ;
2018use vortex_buffer:: BufferMut ;
2119use vortex_buffer:: ByteBuffer ;
20+ use vortex_buffer:: ByteBufferMut ;
2221use vortex_error:: VortexExpect ;
2322use vortex_error:: VortexResult ;
2423use vortex_error:: vortex_err;
24+ use vortex_mask:: AllOr ;
2525
2626use crate :: OnPair ;
2727use crate :: OnPairArray ;
2828
2929/// Default OnPair training configuration: 12-bit codes ("dict-12").
3030pub const DEFAULT_DICT12_CONFIG : Config = onpair:: DEFAULT_CONFIG ;
3131
32- /// Compress an iterable of optional byte strings via the OnPair encoder.
33- pub fn onpair_compress_iter < ' a , I > (
34- iter : I ,
35- len : usize ,
36- dtype : DType ,
37- config : Config ,
38- ) -> VortexResult < OnPairArray >
39- where
40- I : Iterator < Item = Option < & ' a [ u8 ] > > ,
41- {
42- onpair_compress_iter_with_offsets :: < u64 , _ > ( iter, len, dtype, config)
43- }
44-
45- fn onpair_compress_iter_with_offsets < ' a , O , I > (
46- iter : I ,
47- len : usize ,
48- dtype : DType ,
32+ fn onpair_compress_varbinview < O > (
33+ array : VarBinViewArray ,
4934 config : Config ,
35+ ctx : & mut ExecutionCtx ,
5036) -> VortexResult < OnPairArray >
5137where
5238 O : Offset ,
53- I : Iterator < Item = Option < & ' a [ u8 ] > > ,
5439{
40+ let len = array. len ( ) ;
41+ let mask = array. validity ( ) ?. execute_mask ( len, ctx) ?;
42+ if mask. all_false ( ) {
43+ return OnPair :: try_new (
44+ array. dtype ( ) . clone ( ) ,
45+ BufferHandle :: new_host ( ByteBuffer :: empty ( ) ) ,
46+ ConstantArray :: new ( 0 , len) . into_array ( ) ,
47+ ConstantArray :: new ( 0u16 , len) . into_array ( ) ,
48+ ConstantArray :: new ( 0u32 , len + 1 ) . into_array ( ) ,
49+ ConstantArray :: new ( 0i32 , len) . into_array ( ) ,
50+ Validity :: AllInvalid ,
51+ 9 ,
52+ ) ;
53+ }
54+
5555 let mut flat: Vec < u8 > = Vec :: with_capacity ( len * 16 ) ;
5656 let mut offsets: Vec < O > = Vec :: with_capacity ( len + 1 ) ;
5757 let mut uncompressed_lengths: BufferMut < i32 > = BufferMut :: with_capacity ( len) ;
58- let mut validity_bits: Vec < bool > = Vec :: with_capacity ( len) ;
59- offsets. push ( <O as Offset >:: from_usize ( 0 ) ) ;
58+ offsets. push ( O :: from_usize ( 0 ) ) ;
59+ let views = array. views ( ) ;
60+ let buffers = array
61+ . data_buffers ( )
62+ . as_ref ( )
63+ . iter ( )
64+ . map ( |b| b. as_host ( ) )
65+ . collect :: < Vec < _ > > ( ) ;
6066
61- for item in iter {
62- match item {
63- Some ( bytes) => {
67+ match mask. bit_buffer ( ) {
68+ AllOr :: All => {
69+ for view in views {
70+ let bytes = view_bytes ( view, & buffers) ;
6471 flat. extend_from_slice ( bytes) ;
65- offsets. push ( <O as Offset >:: from_usize ( flat. len ( ) ) ) ;
66- uncompressed_lengths. push (
67- i32:: try_from ( bytes. len ( ) ) . vortex_expect ( "string length must fit in i32" ) ,
68- ) ;
69- validity_bits. push ( true ) ;
72+ offsets. push ( O :: from_usize ( flat. len ( ) ) ) ;
73+ uncompressed_lengths
74+ . push ( i32:: try_from ( view. len ( ) ) . vortex_expect ( "must fit in i32" ) ) ;
7075 }
71- None => {
72- offsets. push ( <O as Offset >:: from_usize ( flat. len ( ) ) ) ;
73- uncompressed_lengths. push ( 0 ) ;
74- validity_bits. push ( false ) ;
76+ }
77+ AllOr :: None => {
78+ unreachable ! ( "all_false() should have been caught earlier" ) ;
79+ }
80+ AllOr :: Some ( validity) => {
81+ for ( view, valid) in views. iter ( ) . zip ( validity. iter ( ) ) {
82+ if valid {
83+ let bytes = view_bytes ( view, & buffers) ;
84+ flat. extend_from_slice ( bytes) ;
85+ offsets. push ( O :: from_usize ( flat. len ( ) ) ) ;
86+ uncompressed_lengths
87+ . push ( i32:: try_from ( view. len ( ) ) . vortex_expect ( "must fit in i32" ) ) ;
88+ } else {
89+ offsets. push ( O :: from_usize ( flat. len ( ) ) ) ;
90+ uncompressed_lengths. push ( 0 ) ;
91+ }
7592 }
7693 }
7794 }
@@ -80,44 +97,53 @@ where
8097 . map_err ( |e| vortex_err ! ( "OnPair compress failed: {e}" ) ) ?;
8198 let bits = column. bits ;
8299 let dict_bytes = dict_bytes_to_buffer ( column. dict_bytes ) ;
83- let codes_offsets = build_codes_offsets ( & column. codes , & column. dict_offsets , & offsets) ?;
100+ let codes_offsets =
101+ build_codes_offsets ( & column. codes , & column. dict_offsets , & offsets) ?. into_array ( ) ;
84102 let codes = Buffer :: from ( column. codes ) . into_array ( ) ;
85103 let dict_offsets = Buffer :: from ( column. dict_offsets ) . into_array ( ) ;
86- let codes_offsets = Buffer :: from ( codes_offsets) . into_array ( ) ;
87104
88105 let uncompressed_lengths = uncompressed_lengths. into_array ( ) ;
89- let validity = match dtype. nullability ( ) {
90- Nullability :: NonNullable => Validity :: NonNullable ,
91- Nullability :: Nullable => Validity :: from_iter ( validity_bits) ,
92- } ;
93106
94107 OnPair :: try_new (
95- dtype,
108+ array . dtype ( ) . clone ( ) ,
96109 dict_bytes,
97110 dict_offsets,
98111 codes,
99112 codes_offsets,
100113 uncompressed_lengths,
101- validity,
114+ array . validity ( ) ? ,
102115 bits,
103116 )
104117}
105118
119+ fn view_bytes < ' a > ( view : & ' a BinaryView , buffers : & ' a [ & ByteBuffer ] ) -> & ' a [ u8 ] {
120+ if view. is_inlined ( ) {
121+ view. as_inlined ( ) . value ( )
122+ } else {
123+ let view_ref = view. as_view ( ) ;
124+ & buffers[ view_ref. buffer_index as usize ] [ view_ref. as_range ( ) ]
125+ }
126+ }
127+
106128/// Lift compressed dictionary bytes into the Vortex buffer slot.
107129fn dict_bytes_to_buffer ( dict_bytes : Vec < u8 > ) -> BufferHandle {
108130 // Pad the dictionary blob with MAX_TOKEN_SIZE zero bytes so the
109131 // over-copy decoder can issue a fixed 16-byte load for every token
110132 // without risking an OOB read on the last entry.
111- let mut padded = Vec :: with_capacity ( dict_bytes. len ( ) + onpair:: MAX_TOKEN_SIZE ) ;
112- padded. extend_from_slice ( & dict_bytes) ;
113- padded. resize ( dict_bytes. len ( ) + onpair:: MAX_TOKEN_SIZE , 0 ) ;
133+ //
114134 // Align dict_bytes to 8 bytes so the segment that ultimately holds the
115135 // OnPair tree starts at an 8-aligned in-memory address. Without this
116136 // anchor, the per-buffer padding the serializer inserts is only
117137 // *relative* to the segment start; if the segment lands at a u8-aligned
118138 // heap address, downstream `PrimitiveArray<u32>::deserialize` panics
119139 // with `Misaligned buffer cannot be used to build PrimitiveArray of u32`.
120- BufferHandle :: new_host ( ByteBuffer :: from ( padded) . aligned ( vortex_buffer:: Alignment :: new ( 8 ) ) )
140+ let mut padded = ByteBufferMut :: with_capacity_aligned (
141+ dict_bytes. len ( ) + onpair:: MAX_TOKEN_SIZE ,
142+ Alignment :: new ( 8 ) ,
143+ ) ;
144+ padded. extend_from_slice ( & dict_bytes) ;
145+ unsafe { padded. push_n_unchecked ( 0 , dict_bytes. len ( ) + onpair:: MAX_TOKEN_SIZE - padded. len ( ) ) } ;
146+ BufferHandle :: new_host ( padded. freeze ( ) )
121147}
122148
123149/// Reconstruct the per-row `codes_offsets` from the flat `codes`, the
@@ -128,9 +154,9 @@ fn build_codes_offsets<O: Offset>(
128154 codes : & [ u16 ] ,
129155 dict_offsets : & [ u32 ] ,
130156 row_byte_offsets : & [ O ] ,
131- ) -> VortexResult < Vec < u32 > > {
157+ ) -> VortexResult < Buffer < u32 > > {
132158 let nrows = row_byte_offsets. len ( ) - 1 ;
133- let mut codes_offsets = Vec :: with_capacity ( nrows + 1 ) ;
159+ let mut codes_offsets = BufferMut :: with_capacity ( nrows + 1 ) ;
134160 codes_offsets. push ( 0u32 ) ;
135161 let mut decoded_bytes: u64 = 0 ;
136162 let mut code_idx: usize = 0 ;
@@ -149,38 +175,16 @@ fn build_codes_offsets<O: Offset>(
149175 . map_err ( |_| vortex_err ! ( "OnPair: code boundary {code_idx} does not fit u32" ) ) ?,
150176 ) ;
151177 }
152- Ok ( codes_offsets)
153- }
154-
155- /// Compress a byte-string accessor (typically a `VarBinArray` or
156- /// `VarBinViewArray`).
157- pub fn onpair_compress < A : ArrayAccessor < [ u8 ] > > (
158- array : A ,
159- len : usize ,
160- dtype : & DType ,
161- config : Config ,
162- ) -> VortexResult < OnPairArray > {
163- array. with_iterator ( |iter| onpair_compress_iter ( iter, len, dtype. clone ( ) , config) )
178+ Ok ( codes_offsets. freeze ( ) )
164179}
165180
166181/// Compress any [`ArrayRef`] whose canonical form is a string array, by first
167182/// canonicalising to `VarBinViewArray`.
168- pub fn onpair_compress_array (
183+ pub fn onpair_compress (
169184 array : & ArrayRef ,
170185 config : Config ,
171186 ctx : & mut ExecutionCtx ,
172187) -> VortexResult < OnPairArray > {
173188 let view = array. clone ( ) . execute :: < VarBinViewArray > ( ctx) ?;
174- let len = view. len ( ) ;
175- let dtype = view. dtype ( ) . clone ( ) ;
176- onpair_compress ( & view, len, & dtype, config)
177- }
178-
179- /// Convenience: build a default `ExecutionCtx` from `LEGACY_SESSION`.
180- pub fn onpair_compress_array_default (
181- array : & ArrayRef ,
182- config : Config ,
183- ) -> VortexResult < OnPairArray > {
184- let mut ctx = LEGACY_SESSION . create_execution_ctx ( ) ;
185- onpair_compress_array ( array, config, & mut ctx)
189+ onpair_compress_varbinview :: < u64 > ( view, config, ctx)
186190}
0 commit comments