|
4 | 4 | #![expect( |
5 | 5 | clippy::unwrap_used, |
6 | 6 | clippy::expect_used, |
7 | | - clippy::cast_possible_truncation |
| 7 | + clippy::cast_possible_truncation, |
| 8 | + clippy::many_single_char_names |
8 | 9 | )] |
9 | 10 |
|
10 | 11 | //! Row-encoding an FSST-compressed string column: the only realizable strategy is |
@@ -36,6 +37,8 @@ use vortex_array::VortexSessionExecute; |
36 | 37 | use vortex_array::arrays::ListViewArray; |
37 | 38 | use vortex_array::arrays::PrimitiveArray; |
38 | 39 | use vortex_array::arrays::VarBinArray; |
| 40 | +use vortex_array::arrays::varbin::VarBinArrayExt; |
| 41 | +use vortex_array::assert_arrays_eq; |
39 | 42 | use vortex_array::dtype::DType; |
40 | 43 | use vortex_array::dtype::Nullability; |
41 | 44 | use vortex_array::match_each_integer_ptype; |
@@ -203,10 +206,134 @@ fn fast_fused(fsst: &ArrayRef) -> ArrayRef { |
203 | 206 | .into_array() |
204 | 207 | } |
205 | 208 |
|
| 209 | +/// "Scatter right": keep FSST's fast contiguous bulk decompressor, but run it into a |
| 210 | +/// cache-resident scratch one row-batch at a time, then scatter each row into block form from |
| 211 | +/// cache. The decompressed bytes never round-trip through main memory — unlike `fast_fused`, |
| 212 | +/// which materializes the whole 6.4 MB decompressed buffer and reads it back to block-encode. |
| 213 | +fn fast_scatter(fsst: &ArrayRef) -> ArrayRef { |
| 214 | + // Scratch sized to stay resident in L1/L2; each batch decompresses up to this many bytes. |
| 215 | + const SCRATCH: usize = 16 * 1024; |
| 216 | + |
| 217 | + let mut ctx = LEGACY_SESSION.create_execution_ctx(); |
| 218 | + let view = fsst.as_opt::<FSST>().expect("FSST array"); |
| 219 | + |
| 220 | + let lens_arr = view |
| 221 | + .uncompressed_lengths() |
| 222 | + .clone() |
| 223 | + .execute::<PrimitiveArray>(&mut ctx) |
| 224 | + .unwrap(); |
| 225 | + let lens: Vec<usize> = match_each_integer_ptype!(lens_arr.ptype(), |P| { |
| 226 | + lens_arr |
| 227 | + .as_slice::<P>() |
| 228 | + .iter() |
| 229 | + .map(|x| *x as usize) |
| 230 | + .collect() |
| 231 | + }); |
| 232 | + let nrows = lens.len(); |
| 233 | + |
| 234 | + // Per-row compressed code offsets (relative to the sliced heap start). |
| 235 | + let codes = view.codes(); |
| 236 | + let heap = codes.sliced_bytes(); |
| 237 | + let code_off_arr = codes |
| 238 | + .offsets() |
| 239 | + .clone() |
| 240 | + .execute::<PrimitiveArray>(&mut ctx) |
| 241 | + .unwrap(); |
| 242 | + let base = match_each_integer_ptype!(code_off_arr.ptype(), |P| { |
| 243 | + code_off_arr.as_slice::<P>()[0] as usize |
| 244 | + }); |
| 245 | + let code_off: Vec<usize> = match_each_integer_ptype!(code_off_arr.ptype(), |P| { |
| 246 | + code_off_arr |
| 247 | + .as_slice::<P>() |
| 248 | + .iter() |
| 249 | + .map(|x| *x as usize - base) |
| 250 | + .collect() |
| 251 | + }); |
| 252 | + |
| 253 | + // Output sizing (free from stored lengths). |
| 254 | + let mut offsets: Vec<u32> = Vec::with_capacity(nrows); |
| 255 | + let mut sizes: Vec<u32> = Vec::with_capacity(nrows); |
| 256 | + let mut acc: u32 = 0; |
| 257 | + let mut max_row = 0usize; |
| 258 | + for &l in &lens { |
| 259 | + offsets.push(acc); |
| 260 | + let sz = encoded_len(l); |
| 261 | + sizes.push(sz); |
| 262 | + acc += sz; |
| 263 | + max_row = max_row.max(l); |
| 264 | + } |
| 265 | + let mut out = ByteBufferMut::with_capacity(acc as usize); |
| 266 | + unsafe { out.set_len(acc as usize) }; |
| 267 | + let out_slice = out.as_mut_slice(); |
| 268 | + |
| 269 | + let decompressor = view.decompressor(); |
| 270 | + let scratch_cap = SCRATCH.max(max_row) + 8; |
| 271 | + let mut scratch = ByteBufferMut::with_capacity(scratch_cap); |
| 272 | + |
| 273 | + let mut r = 0usize; |
| 274 | + while r < nrows { |
| 275 | + // Grow a batch until it would overflow the scratch (always at least one row). |
| 276 | + let bs = r; |
| 277 | + let mut batch_bytes = 0usize; |
| 278 | + while r < nrows && (r == bs || batch_bytes + lens[r] <= SCRATCH) { |
| 279 | + batch_bytes += lens[r]; |
| 280 | + r += 1; |
| 281 | + } |
| 282 | + let be = r; |
| 283 | + |
| 284 | + // Decompress this batch's codes in one fast call into the cache-resident scratch. |
| 285 | + let cslice = &heap.as_slice()[code_off[bs]..code_off[be]]; |
| 286 | + let n = decompressor.decompress_into(cslice, scratch.spare_capacity_mut()); |
| 287 | + unsafe { scratch.set_len(n) }; |
| 288 | + let sbytes = scratch.as_slice(); |
| 289 | + |
| 290 | + // Scatter each row from cache into block form. |
| 291 | + let mut local = 0usize; |
| 292 | + for i in bs..be { |
| 293 | + let l = lens[i]; |
| 294 | + let pos = offsets[i] as usize; |
| 295 | + out_slice[pos] = NON_EMPTY_SENTINEL; |
| 296 | + if l != 0 { |
| 297 | + block_encode(&sbytes[local..local + l], &mut out_slice[pos + 1..]); |
| 298 | + } |
| 299 | + local += l; |
| 300 | + } |
| 301 | + unsafe { scratch.set_len(0) }; |
| 302 | + } |
| 303 | + |
| 304 | + let elements = PrimitiveArray::new(out.freeze(), Validity::NonNullable); |
| 305 | + let offsets_arr = |
| 306 | + PrimitiveArray::new(Buffer::<u32>::copy_from(&offsets), Validity::NonNullable); |
| 307 | + let sizes_arr = PrimitiveArray::new(Buffer::<u32>::copy_from(&sizes), Validity::NonNullable); |
| 308 | + ListViewArray::try_new( |
| 309 | + elements.into_array(), |
| 310 | + offsets_arr.into_array(), |
| 311 | + sizes_arr.into_array(), |
| 312 | + Validity::NonNullable, |
| 313 | + ) |
| 314 | + .unwrap() |
| 315 | + .into_array() |
| 316 | +} |
| 317 | + |
206 | 318 | fn main() { |
| 319 | + // Correctness: the batched cache-resident scatter must produce identical row keys to the |
| 320 | + // straightforward fused path. |
| 321 | + { |
| 322 | + let (fsst, _) = build_fsst(); |
| 323 | + assert_arrays_eq!(fast_scatter(&fsst), fast_fused(&fsst)); |
| 324 | + } |
207 | 325 | divan::main(); |
208 | 326 | } |
209 | 327 |
|
| 328 | +/// "Scatter right" fused path: cache-resident batched decompress + scatter into block form. |
| 329 | +#[divan::bench] |
| 330 | +fn fsst_fast_scatter(bencher: divan::Bencher) { |
| 331 | + let (fsst, total_bytes) = build_fsst(); |
| 332 | + bencher |
| 333 | + .counter(BytesCount::new(total_bytes)) |
| 334 | + .bench_local(|| fast_scatter(&fsst)); |
| 335 | +} |
| 336 | + |
210 | 337 | /// Status quo: decompress FSST to a canonical `VarBinView`, then row-encode it. |
211 | 338 | #[divan::bench] |
212 | 339 | fn fsst_unpack_then_convert(bencher: divan::Bencher) { |
|
0 commit comments