11// SPDX-License-Identifier: Apache-2.0
22// SPDX-FileCopyrightText: Copyright the Vortex contributors
33
4- #![ expect( clippy:: unwrap_used) ]
4+ #![ expect(
5+ clippy:: unwrap_used,
6+ clippy:: expect_used,
7+ clippy:: cast_possible_truncation
8+ ) ]
59
610//! Row-encoding an FSST-compressed string column: the only realizable strategy is
711//! "unpack then convert" (decompress FSST to a canonical `VarBinView`, then row-encode it),
@@ -29,9 +33,17 @@ use vortex_array::Canonical;
2933use vortex_array:: IntoArray ;
3034use vortex_array:: LEGACY_SESSION ;
3135use vortex_array:: VortexSessionExecute ;
36+ use vortex_array:: arrays:: ListViewArray ;
37+ use vortex_array:: arrays:: PrimitiveArray ;
3238use vortex_array:: arrays:: VarBinArray ;
3339use vortex_array:: dtype:: DType ;
3440use vortex_array:: dtype:: Nullability ;
41+ use vortex_array:: match_each_integer_ptype;
42+ use vortex_array:: validity:: Validity ;
43+ use vortex_buffer:: Buffer ;
44+ use vortex_buffer:: ByteBufferMut ;
45+ use vortex_fsst:: FSST ;
46+ use vortex_fsst:: FSSTArrayExt ;
3547use vortex_fsst:: fsst_compress;
3648use vortex_fsst:: fsst_train_compressor;
3749use vortex_row:: RowEncoder ;
@@ -80,6 +92,117 @@ fn decompress(fsst: &ArrayRef) -> ArrayRef {
8092 . into_array ( )
8193}
8294
95+ const VARLEN_BLOCK : usize = 32 ;
96+ const VARLEN_BLOCK_TOTAL : usize = 33 ;
97+ // Sentinel for a non-empty varlen value (ascending, non-null) — value is irrelevant to timing.
98+ const NON_EMPTY_SENTINEL : u8 = 0x02 ;
99+
100+ /// Encoded row-key length for a non-empty value of `len` decompressed bytes: a leading
101+ /// sentinel plus `ceil(len/32)` 32-byte blocks, each followed by a continuation/length byte.
102+ fn encoded_len ( len : usize ) -> u32 {
103+ if len == 0 {
104+ 1
105+ } else {
106+ 1 + ( len. div_ceil ( VARLEN_BLOCK ) as u32 ) * VARLEN_BLOCK_TOTAL as u32
107+ }
108+ }
109+
110+ /// Block-encode `bytes` (ascending) into `out`, matching vortex-row's varlen body format.
111+ fn block_encode ( bytes : & [ u8 ] , out : & mut [ u8 ] ) {
112+ let len = bytes. len ( ) ;
113+ let full = len / VARLEN_BLOCK ;
114+ let partial = len % VARLEN_BLOCK ;
115+ let ( full_to_write, partial_len) = if partial == 0 {
116+ ( full - 1 , VARLEN_BLOCK )
117+ } else {
118+ ( full, partial)
119+ } ;
120+ let mut src = 0 ;
121+ let mut dst = 0 ;
122+ for _ in 0 ..full_to_write {
123+ out[ dst..dst + VARLEN_BLOCK ] . copy_from_slice ( & bytes[ src..src + VARLEN_BLOCK ] ) ;
124+ out[ dst + VARLEN_BLOCK ] = 0xFF ;
125+ src += VARLEN_BLOCK ;
126+ dst += VARLEN_BLOCK_TOTAL ;
127+ }
128+ out[ dst..dst + partial_len] . copy_from_slice ( & bytes[ src..src + partial_len] ) ;
129+ for b in & mut out[ dst + partial_len..dst + VARLEN_BLOCK ] {
130+ * b = 0 ;
131+ }
132+ out[ dst + VARLEN_BLOCK ] = partial_len as u8 ;
133+ }
134+
135+ /// Fused FSST → row-key kernel: bulk-decompress the code heap into one contiguous buffer (no
136+ /// intermediate `VarBinViewArray`), then block-encode each row straight into the row-key
137+ /// `ListView<u8>` using the stored `uncompressed_lengths` for boundaries (no size-pass walk).
138+ fn fast_fused ( fsst : & ArrayRef ) -> ArrayRef {
139+ let mut ctx = LEGACY_SESSION . create_execution_ctx ( ) ;
140+ let view = fsst. as_opt :: < FSST > ( ) . expect ( "FSST array" ) ;
141+
142+ // Per-row decompressed lengths are already stored — the size pass is free.
143+ let lens_arr = view
144+ . uncompressed_lengths ( )
145+ . clone ( )
146+ . execute :: < PrimitiveArray > ( & mut ctx)
147+ . unwrap ( ) ;
148+ let lens: Vec < usize > = match_each_integer_ptype ! ( lens_arr. ptype( ) , |P | {
149+ lens_arr
150+ . as_slice:: <P >( )
151+ . iter( )
152+ . map( |x| * x as usize )
153+ . collect( )
154+ } ) ;
155+
156+ // Bulk-decompress the whole code heap once into a contiguous buffer (no VarBinView).
157+ let heap = view. codes_bytes ( ) ;
158+ let total: usize = lens. iter ( ) . sum ( ) ;
159+ let decompressor = view. decompressor ( ) ;
160+ let mut decompressed = ByteBufferMut :: with_capacity ( total + 7 ) ;
161+ let n = decompressor. decompress_into ( heap. as_slice ( ) , decompressed. spare_capacity_mut ( ) ) ;
162+ unsafe { decompressed. set_len ( n) } ;
163+ let bytes = decompressed. as_slice ( ) ;
164+
165+ // Size + offsets for the row-key ListView (lengths are free, no view walk).
166+ let nrows = lens. len ( ) ;
167+ let mut offsets: Vec < u32 > = Vec :: with_capacity ( nrows) ;
168+ let mut sizes: Vec < u32 > = Vec :: with_capacity ( nrows) ;
169+ let mut acc: u32 = 0 ;
170+ for & l in & lens {
171+ offsets. push ( acc) ;
172+ let sz = encoded_len ( l) ;
173+ sizes. push ( sz) ;
174+ acc += sz;
175+ }
176+
177+ // Block-encode every row directly into the elements buffer. No zero-init (every byte is
178+ // written: sentinel + block body with zero-padded final block) and no Vec→Buffer copy.
179+ let mut out = ByteBufferMut :: with_capacity ( acc as usize ) ;
180+ unsafe { out. set_len ( acc as usize ) } ;
181+ let out_slice = out. as_mut_slice ( ) ;
182+ let mut src = 0usize ;
183+ for ( i, & l) in lens. iter ( ) . enumerate ( ) {
184+ let pos = offsets[ i] as usize ;
185+ out_slice[ pos] = NON_EMPTY_SENTINEL ;
186+ if l != 0 {
187+ block_encode ( & bytes[ src..src + l] , & mut out_slice[ pos + 1 ..] ) ;
188+ }
189+ src += l;
190+ }
191+
192+ let elements = PrimitiveArray :: new ( out. freeze ( ) , Validity :: NonNullable ) ;
193+ let offsets_arr =
194+ PrimitiveArray :: new ( Buffer :: < u32 > :: copy_from ( & offsets) , Validity :: NonNullable ) ;
195+ let sizes_arr = PrimitiveArray :: new ( Buffer :: < u32 > :: copy_from ( & sizes) , Validity :: NonNullable ) ;
196+ ListViewArray :: try_new (
197+ elements. into_array ( ) ,
198+ offsets_arr. into_array ( ) ,
199+ sizes_arr. into_array ( ) ,
200+ Validity :: NonNullable ,
201+ )
202+ . unwrap ( )
203+ . into_array ( )
204+ }
205+
83206fn main ( ) {
84207 divan:: main ( ) ;
85208}
@@ -102,6 +225,16 @@ fn fsst_unpack_then_convert(bencher: divan::Bencher) {
102225 } ) ;
103226}
104227
228+ /// Fused fast path: bulk-decompress directly into the row-key block format, skipping the
229+ /// intermediate `VarBinViewArray` and the generic row-encoder (size pass is free).
230+ #[ divan:: bench]
231+ fn fsst_fast_fused ( bencher : divan:: Bencher ) {
232+ let ( fsst, total_bytes) = build_fsst ( ) ;
233+ bencher
234+ . counter ( BytesCount :: new ( total_bytes) )
235+ . bench_local ( || fast_fused ( & fsst) ) ;
236+ }
237+
105238/// Irreducible floor: FSST decompression alone (a direct kernel must still produce these
106239/// bytes, since the sort key *is* the decompressed bytes).
107240#[ divan:: bench]
0 commit comments