From 680cbdabcf99c9781ec24164f4047ae552654a89 Mon Sep 17 00:00:00 2001 From: Alfonso Subiotto Marques Date: Thu, 30 Apr 2026 15:36:06 +0200 Subject: [PATCH] feat[vortex-cuda]: GPU FSST decompression kernel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit implements on-GPU decompression of the existing FSST encoding. This kernel achieves ~42% max throughput utilization as compared to the `throughput_cuda` benchmark on a DGX spark. CPU work is required to compute the output offsets. The core performance win is buffering up to 24 bytes of decompressed data in three u64 registers and emitting the widest aligned stores possible up to u128 (st.global.v2.u64). The 256-entry symbol table (≤ 2 KB) is read directly from global memory. Staging it into shared memory measured ~3% slower at 10M rows and ~15% slower at 1M rows. The hypothesis is that L1 already holds the table after a few iterations and the explicit shared copy adds bank-conflict latency on the warp-divergent symbols[code] reads; the gap is wider at 1M because the kernel is less bandwidth-bound there. Further optimizations would require an encoding change. Splits-style intra-string parallelism (one GPU thread per ~32-compressed-byte chunk instead of per-string) was prototyped on top of this kernel and measured an additional +30% kernel throughput at 1M clickbench URLs, +26% at 5M, +12% at 10M. Four kernel variants are generated for the unsigned widths of codes_offsets (u8/u16/u32/u64); signed integer ptypes are reinterpreted as their unsigned equivalent on the Rust side, so the bit pattern is preserved without copying. Signed-off-by: Alfonso Subiotto Marques --- .github/workflows/codspeed.yml | 2 +- Cargo.lock | 1 + vortex-cuda/Cargo.toml | 5 + vortex-cuda/benches/fsst_cuda.rs | 95 ++++++++ vortex-cuda/kernels/src/fsst.cu | 226 ++++++++++++++++++ vortex-cuda/src/kernel/encodings/fsst.rs | 285 +++++++++++++++++++++++ vortex-cuda/src/kernel/encodings/mod.rs | 2 + vortex-cuda/src/lib.rs | 3 + 8 files changed, 618 insertions(+), 1 deletion(-) create mode 100644 vortex-cuda/benches/fsst_cuda.rs create mode 100644 vortex-cuda/kernels/src/fsst.cu create mode 100644 vortex-cuda/src/kernel/encodings/fsst.rs diff --git a/.github/workflows/codspeed.yml b/.github/workflows/codspeed.yml index ff42ff589b9..8f3e62c44e7 100644 --- a/.github/workflows/codspeed.yml +++ b/.github/workflows/codspeed.yml @@ -73,7 +73,7 @@ jobs: include: - { shard: 1, name: "Bitpacked", benches: "bitpacked_cuda" } - { shard: 2, name: "Dynamic dispatch", benches: "dynamic_dispatch_cuda" } - - { shard: 3, name: "Standalone kernels", benches: "alp_cuda date_time_parts_cuda dict_cuda runend_cuda" } + - { shard: 3, name: "Standalone kernels", benches: "alp_cuda date_time_parts_cuda dict_cuda fsst_cuda runend_cuda" } name: "Benchmark with Codspeed (CUDA Shard #${{ matrix.shard }} - ${{ matrix.name }})" timeout-minutes: 30 runs-on: runs-on=${{ github.run_id }}/family=g5/image=ubuntu24-gpu-x64/tag=bench-codspeed-cuda-${{ matrix.shard }} diff --git a/Cargo.lock b/Cargo.lock index 85daffdaa83..137d6234e8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10601,6 +10601,7 @@ dependencies = [ "vortex-cuda", "vortex-cuda-macros", "vortex-error", + "vortex-fsst", "vortex-nvcomp", ] diff --git a/vortex-cuda/Cargo.toml b/vortex-cuda/Cargo.toml index 9065a3f1eb4..832e449519f 100644 --- a/vortex-cuda/Cargo.toml +++ b/vortex-cuda/Cargo.toml @@ -50,6 +50,7 @@ rstest = { workspace = true } tokio = { workspace = true, features = ["rt", "macros"] } vortex-array = { workspace = true, features = ["_test-harness"] } vortex-cuda = { path = ".", features = ["_test-harness"] } +vortex-fsst = { workspace = true, features = ["_test-harness"] } [build-dependencies] bindgen = { workspace = true } @@ -98,3 +99,7 @@ harness = false [[bench]] name = "load_to_device_cuda" harness = false + +[[bench]] +name = "fsst_cuda" +harness = false diff --git a/vortex-cuda/benches/fsst_cuda.rs b/vortex-cuda/benches/fsst_cuda.rs new file mode 100644 index 00000000000..c7aeed28151 --- /dev/null +++ b/vortex-cuda/benches/fsst_cuda.rs @@ -0,0 +1,95 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! CUDA benchmarks for FSST decompression. + +#![expect(clippy::unwrap_used)] +#![expect(clippy::cast_possible_truncation)] + +#[allow(dead_code)] +mod bench_config; +mod timed_launch_strategy; + +use std::sync::Arc; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use criterion::BenchmarkId; +use criterion::Criterion; +use criterion::Throughput; +use futures::executor::block_on; +use vortex::array::IntoArray; +use vortex::array::arrays::PrimitiveArray; +use vortex::array::match_each_integer_ptype; +use vortex::encodings::fsst::FSSTArrayExt; +use vortex::error::VortexExpect; +use vortex::session::VortexSession; +use vortex_cuda::CudaSession; +use vortex_cuda::executor::CudaArrayExt; +use vortex_cuda_macros::cuda_available; +use vortex_cuda_macros::cuda_not_available; +use vortex_fsst::test_utils::make_fsst_clickbench_urls; + +use crate::timed_launch_strategy::TimedLaunchStrategy; + +// Bench-local size instead of the workspace 100M default: each input is a +// clickbench URL, much heavier per-element than the fixed-width primitives +// other kernels benchmark. +const BENCH_SIZES: &[(usize, &str)] = &[(10_000_000, "10M")]; + +fn benchmark_fsst_cuda_decompress(c: &mut Criterion) { + let mut group = c.benchmark_group("cuda"); + + for &(n, len_str) in BENCH_SIZES { + let mut setup_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + let fsst = make_fsst_clickbench_urls(n, setup_ctx.execution_ctx()); + + let lens = fsst + .uncompressed_lengths() + .clone() + .execute::(setup_ctx.execution_ctx()) + .vortex_expect("canonicalize uncompressed_lengths"); + let total_size: usize = match_each_integer_ptype!(lens.ptype(), |P| { + lens.as_slice::

().iter().map(|x| *x as usize).sum() + }); + let uncompressed_size = total_size as u64; + + let fsst_array = fsst.into_array(); + + group.throughput(Throughput::Bytes(uncompressed_size)); + group.bench_with_input( + BenchmarkId::new("cuda/fsst/decompress", len_str), + &fsst_array, + |b, fsst_array| { + b.iter_custom(|iters| { + let timed = TimedLaunchStrategy::default(); + let timer = timed.timer(); + + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context") + .with_launch_strategy(Arc::new(timed)); + + for _ in 0..iters { + block_on(fsst_array.clone().execute_cuda(&mut cuda_ctx)).unwrap(); + } + Duration::from_nanos(timer.load(Ordering::Relaxed)) + }); + }, + ); + } + + group.finish(); +} + +criterion::criterion_group! { + name = benches; + config = bench_config::cuda_bench_config(); + targets = benchmark_fsst_cuda_decompress +} + +#[cuda_available] +criterion::criterion_main!(benches); + +#[cuda_not_available] +fn main() {} diff --git a/vortex-cuda/kernels/src/fsst.cu b/vortex-cuda/kernels/src/fsst.cu new file mode 100644 index 00000000000..938f262c3da --- /dev/null +++ b/vortex-cuda/kernels/src/fsst.cu @@ -0,0 +1,226 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#include "config.cuh" +#include +#include +#include + +// FSST decompression. A thread decodes one string at a time. +// +// Per-thread `Scratch` holds 24 bytes across three u64 lanes (`low`, `mid`, +// `high`) plus a `cursor` byte counter. Byte i lives at bit (8 * (i mod 8)) +// of: +// low for i in 0..8 +// mid for i in 8..16 +// high for i in 16..24 +// +// lsb msb +// low: [ b0 | b1 | b2 | b3 | b4 | b5 | b6 | b7 ] +// mid: [ b8 | b9 |b10 |b11 |b12 |b13 |b14 |b15 ] +// high: [b16 |b17 |b18 |b19 |b20 |b21 |b22 |b23 ] +// +// `Scratch::drain` picks the largest aligned store the gates allow +// (alignment of out_pos, cursor, remaining out_end room). Bytes leave from +// the low end (`low` byte 0); the kept bytes slide N positions toward that +// low end across all three lanes i.e. each u64 right-shifts by N*8 and +// pulls the next lane's low bits up to fill the vacated high bits. +// `Scratch::push` inserts a length-`len` masked symbol at byte offset +// `cursor`, spanning at most two of the three lanes. +// +// width gate ptx +// ------ ------------------------------------------ ---------------- +// 16 B out_pos % 16 == 0, cursor ≥ 16, room ≥ 16 st.global.v2.u64 +// 8 B out_pos % 8 == 0, cursor ≥ 8, room ≥ 8 st.global.u64 +// 4 B out_pos % 4 == 0, cursor ≥ 4, room ≥ 4 st.global.u32 +// 2 B out_pos % 2 == 0, cursor ≥ 2, room ≥ 2 st.global.u16 +// 1 B (always) st.global.u8 +// +// The narrow widths cover the prologue alignment-up (out_pos not yet +// 16-aligned) and the epilogue tail (< 16 bytes left, no room for u128). +// In steady state out_pos stays 16-aligned and u128 fires repeatedly. +// +// The 256-entry symbol table (≤ 2 KB) is read directly from global memory. +// Staging it into shared memory measured ~3% slower at 10M rows and ~15% +// slower at 1M rows (benchmarked on clickbench URLs). The hypothesis is that L1 +// already holds the table after a few iterations and the explicit shared copy +// adds bank-conflict latency on the warp-divergent `symbols[code]` reads; the +// gap is wider at 1M because the kernel is less bandwidth-bound there, so +// per-load latency shows up more. +// +// Decoded symbols are masked to their valid byte length so the table's high +// bits never leak. The main loop drains to `scratch.cursor ≤ 16`, keeping +// the next add (≤ 8 bytes) within the 24-byte capacity. +// +// `codes_offsets` is templated over the four unsigned integer widths +// (u8/u16/u32/u64). `output_offsets` is uint64_t. + +// 24-byte scratch buffer split across three u64 lanes. `cursor` is the +// number of bytes currently buffered and the next-push offset. +struct Scratch { + uint64_t low = 0; + uint64_t mid = 0; + uint64_t high = 0; + uint32_t cursor = 0; + + // Insert a length-`len` masked symbol at byte offset `cursor`. The + // symbol spans at most two of the three lanes. Caller must ensure + // cursor + len ≤ 24. + __device__ inline void push(uint64_t sym, uint32_t len) { + if (cursor < 8) { + low |= sym << (8u * cursor); + if (cursor + len > 8) { + mid |= sym >> (8u * (8u - cursor)); + } + } else { + mid |= sym << (8u * (cursor - 8u)); + if (cursor + len > 16) { + high |= sym >> (8u * (16u - cursor)); + } + } + cursor += len; + } + + // Emit one variable-width aligned store from the low end and slide the + // kept bytes toward the low end across all three lanes. + __device__ inline void drain(uint8_t *__restrict out, uint64_t &out_pos, uint64_t out_end) { + if (cursor >= 16 && (out_pos & 15u) == 0 && out_pos + 16 <= out_end) { + *reinterpret_cast(out + out_pos) = make_ulonglong2(low, mid); + low = high; + mid = 0; + high = 0; + out_pos += 16; + cursor -= 16; + } else if (cursor >= 8 && (out_pos & 7u) == 0 && out_pos + 8 <= out_end) { + *reinterpret_cast(out + out_pos) = low; + low = mid; + mid = high; + high = 0; + out_pos += 8; + cursor -= 8; + } else if (cursor >= 4 && (out_pos & 3u) == 0 && out_pos + 4 <= out_end) { + *reinterpret_cast(out + out_pos) = (uint32_t)low; + low = (low >> 32) | (mid << 32); + mid = (mid >> 32) | (high << 32); + high >>= 32; + out_pos += 4; + cursor -= 4; + } else if (cursor >= 2 && (out_pos & 1u) == 0 && out_pos + 2 <= out_end) { + *reinterpret_cast(out + out_pos) = (uint16_t)low; + low = (low >> 16) | (mid << 48); + mid = (mid >> 16) | (high << 48); + high >>= 16; + out_pos += 2; + cursor -= 2; + } else { + out[out_pos] = (uint8_t)low; + low = (low >> 8) | (mid << 56); + mid = (mid >> 8) | (high << 56); + high >>= 8; + out_pos += 1; + cursor -= 1; + } + } +}; + +template +struct FSSTArgs { + // Compressed FSST code stream, contiguous across all strings. String + // `sid`'s codes live in `[codes_offsets[sid], codes_offsets[sid + 1])`. + const uint8_t *__restrict codes_bytes; + // Per-string offsets into `codes_bytes`, length `num_strings + 1`. + const OffT *__restrict codes_offsets; + // FSST symbol table. + const uint64_t *__restrict symbols; + // Length in bytes (1..=8) of each entry in `symbols`. The remaining bits + // are unspecified. + const uint8_t *__restrict symbol_lengths; + // Buffer to write decoded data into. + uint8_t *__restrict output_bytes; + // Per-string offsets into `output_bytes`, length `num_strings + 1`. + const uint64_t *__restrict output_offsets; + // Validity of each string. + const uint8_t *__restrict validity_bits; +}; + +template +__device__ inline void fsst_decode_string(const FSSTArgs &args, uint64_t sid) { + if (((args.validity_bits[sid >> 3] >> (sid & 7u)) & 1u) == 0u) { + return; + } + + OffT in_pos = args.codes_offsets[sid]; + const OffT in_end = args.codes_offsets[sid + 1]; + uint64_t out_pos = args.output_offsets[sid]; + const uint64_t out_end = args.output_offsets[sid + 1]; + + Scratch scratch; + + while (in_pos < in_end) { + // Drain to scratch.cursor ≤ 16 so the next ≤8-byte symbol fits in 24. + while (scratch.cursor > 16) { + scratch.drain(args.output_bytes, out_pos, out_end); + } + + // Decode next code. 255 is the escape for raw literal bytes. + const uint8_t code = args.codes_bytes[in_pos]; + uint64_t sym; + uint32_t len, consumed; + if (code == 255) { + sym = (uint64_t)args.codes_bytes[in_pos + 1]; + len = 1; + consumed = 2; + } else { + sym = args.symbols[code]; + len = args.symbol_lengths[code]; + consumed = 1; + } + + // Zero out the symbol's high bytes beyond its valid length. + const uint64_t mask = (len == 8) ? ~0ULL : ((1ULL << (8u * len)) - 1ULL); + sym &= mask; + + scratch.push(sym, len); + in_pos += (OffT)consumed; + } + + // Epilogue: drain everything that's left. + while (scratch.cursor > 0) { + scratch.drain(args.output_bytes, out_pos, out_end); + } +} + +#define GENERATE_FSST_KERNEL(suffix, OffT) \ + extern "C" __global__ void fsst_##suffix(const uint8_t *__restrict codes_bytes, \ + const OffT *__restrict codes_offsets, \ + const uint64_t *__restrict symbols, \ + const uint8_t *__restrict symbol_lengths, \ + const uint64_t *__restrict output_offsets, \ + const uint8_t *__restrict validity_bits, \ + uint8_t *__restrict output_bytes, \ + uint64_t num_strings) { \ + const FSSTArgs args = { \ + codes_bytes, \ + codes_offsets, \ + symbols, \ + symbol_lengths, \ + output_bytes, \ + output_offsets, \ + validity_bits, \ + }; \ + \ + const uint64_t elements_per_block = (uint64_t)blockDim.x * ELEMENTS_PER_THREAD; \ + const uint64_t block_start = (uint64_t)blockIdx.x * elements_per_block; \ + const uint64_t block_end = (block_start + elements_per_block < num_strings) \ + ? (block_start + elements_per_block) \ + : num_strings; \ + \ + for (uint64_t sid = block_start + threadIdx.x; sid < block_end; sid += blockDim.x) { \ + fsst_decode_string(args, sid); \ + } \ + } + +GENERATE_FSST_KERNEL(u8, uint8_t) +GENERATE_FSST_KERNEL(u16, uint16_t) +GENERATE_FSST_KERNEL(u32, uint32_t) +GENERATE_FSST_KERNEL(u64, uint64_t) diff --git a/vortex-cuda/src/kernel/encodings/fsst.rs b/vortex-cuda/src/kernel/encodings/fsst.rs new file mode 100644 index 00000000000..5d3d66eaf04 --- /dev/null +++ b/vortex-cuda/src/kernel/encodings/fsst.rs @@ -0,0 +1,285 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! CUDA executor for FSST decompression. + +use std::fmt::Debug; +use std::sync::Arc; + +use async_trait::async_trait; +use cudarc::driver::DevicePtr; +use cudarc::driver::DeviceRepr; +use cudarc::driver::PushKernelArg; +use tracing::instrument; +use vortex::array::ArrayRef; +use vortex::array::Canonical; +use vortex::array::arrays::PrimitiveArray; +use vortex::array::arrays::VarBinViewArray; +use vortex::array::arrays::primitive::PrimitiveDataParts; +use vortex::array::arrays::varbin::VarBinArrayExt; +use vortex::array::arrays::varbinview::BinaryView; +use vortex::array::arrays::varbinview::build_views::MAX_BUFFER_LEN; +use vortex::array::arrays::varbinview::build_views::build_views; +use vortex::array::buffer::DeviceBuffer; +use vortex::array::match_each_integer_ptype; +use vortex::array::match_each_unsigned_integer_ptype; +use vortex::array::validity::Validity; +use vortex::buffer::Alignment; +use vortex::buffer::Buffer; +use vortex::dtype::NativePType; +use vortex::encodings::fsst::FSST; +use vortex::encodings::fsst::FSSTArray; +use vortex::encodings::fsst::FSSTArrayExt; +use vortex::error::VortexExpect; +use vortex::error::VortexResult; +use vortex::error::vortex_err; + +use crate::CudaBufferExt; +use crate::CudaDeviceBuffer; +use crate::executor::CudaExecute; +use crate::executor::CudaExecutionCtx; + +/// CUDA decoder for FSST. +#[derive(Debug)] +pub(crate) struct FSSTExecutor; + +impl FSSTExecutor { + fn try_specialize(array: ArrayRef) -> Option { + array.try_downcast::().ok() + } +} + +#[async_trait] +impl CudaExecute for FSSTExecutor { + #[instrument(level = "trace", skip_all, fields(executor = ?self))] + async fn execute( + &self, + array: ArrayRef, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult { + let fsst = Self::try_specialize(array).ok_or_else(|| vortex_err!("Expected FSSTArray"))?; + + let dtype = fsst.dtype().clone(); + let validity = fsst.codes().validity()?; + + if fsst.is_empty() || matches!(validity, Validity::AllInvalid) { + let empty = unsafe { + VarBinViewArray::new_unchecked( + Buffer::::zeroed(fsst.len()), + Arc::from([]), + dtype, + validity, + ) + }; + return Ok(Canonical::VarBinView(empty)); + } + + let lens = fsst + .uncompressed_lengths() + .clone() + .execute::(ctx.execution_ctx())?; + let codes_offsets = fsst + .codes() + .offsets() + .clone() + .execute::(ctx.execution_ctx())?; + + // Prefix-sum lens to per-string u64 output offsets so the kernel + // knows where to write each decoded string. + let output_offsets: Vec = match_each_integer_ptype!(lens.ptype(), |P| { + let mut out = Vec::with_capacity(lens.len() + 1); + let mut acc: u64 = 0; + out.push(0u64); + #[allow(clippy::unnecessary_cast)] + for &l in lens.as_slice::

() { + acc += l as u64; + out.push(acc); + } + out + }); + + // Dispatch on the unsigned width; signed and unsigned offsets of the + // same width share an identical byte representation. + match_each_unsigned_integer_ptype!(codes_offsets.ptype().to_unsigned(), |U| { + decode_fsst::(fsst, codes_offsets, lens, output_offsets, ctx).await + }) + } +} + +async fn decode_fsst( + fsst: FSSTArray, + codes_offsets: PrimitiveArray, + lens: PrimitiveArray, + output_offsets: Vec, + ctx: &mut CudaExecutionCtx, +) -> VortexResult +where + U: NativePType + DeviceRepr + Send + Sync + 'static, +{ + let dtype = fsst.dtype().clone(); + let validity = fsst.codes().validity()?; + let num_strings = fsst.len(); + let num_strings_u64 = num_strings as u64; + let total_size = usize::try_from( + *output_offsets + .last() + .vortex_expect("output_offsets has at least one entry"), + ) + .vortex_expect("total_size fits in usize"); + + let symbols_u64: Vec = fsst.symbols().iter().map(|s| s.to_u64()).collect(); + let symbol_lengths = fsst.symbol_lengths().clone(); + let codes_bytes_handle = fsst.codes_bytes_handle().clone(); + let PrimitiveDataParts { + buffer: codes_offsets_buffer, + .. + } = codes_offsets.into_data_parts(); + + let (.., validity_bits) = validity + .clone() + .execute_mask(num_strings, ctx.execution_ctx())? + .into_bit_buffer() + .sliced() + .into_inner(); + + let (symbols, symbol_lengths, output_offsets, validity_device, codes_bytes, codes_offsets) = futures::try_join!( + ctx.copy_to_device(symbols_u64)?, + ctx.copy_to_device(symbol_lengths)?, + ctx.copy_to_device(output_offsets)?, + ctx.copy_to_device(validity_bits.to_vec())?, + ctx.ensure_on_device(codes_bytes_handle), + ctx.ensure_on_device(codes_offsets_buffer), + )?; + + // The kernel checks store alignment relative to the base via + // `out_pos % N`, so the base must satisfy the widest store (u128 → 16). + let device_output = ctx.device_alloc::(total_size)?; + let (output_base_ptr, _) = device_output.device_ptr(ctx.stream()); + assert_eq!( + output_base_ptr % 16, + 0, + "device_output base not 16-aligned: {output_base_ptr:#x}", + ); + + let codes_bytes_view = codes_bytes.cuda_view::()?; + let codes_offsets_view = codes_offsets.cuda_view::()?; + let symbols_view = symbols.cuda_view::()?; + let symbol_lengths_view = symbol_lengths.cuda_view::()?; + let output_offsets_view = output_offsets.cuda_view::()?; + let validity_view = validity_device.cuda_view::()?; + + let cuda_function = ctx.load_function("fsst", &[U::PTYPE])?; + ctx.launch_kernel(&cuda_function, num_strings, |args| { + args.arg(&codes_bytes_view) + .arg(&codes_offsets_view) + .arg(&symbols_view) + .arg(&symbol_lengths_view) + .arg(&output_offsets_view) + .arg(&validity_view) + .arg(&device_output) + .arg(&num_strings_u64); + })?; + + let host_bytes = CudaDeviceBuffer::new(device_output) + .copy_to_host(Alignment::new(1))? + .await?; + let host_bytes = host_bytes.slice(0..total_size); + + let (buffers, views) = match_each_integer_ptype!(lens.ptype(), |P| { + build_views( + 0, + MAX_BUFFER_LEN, + host_bytes.into_mut(), + lens.as_slice::

(), + ) + }); + + Ok(Canonical::VarBinView(unsafe { + VarBinViewArray::new_unchecked(views, Arc::from(buffers), dtype, validity) + })) +} + +#[cfg(test)] +mod tests { + use rstest::rstest; + use vortex::array::IntoArray; + use vortex::array::arrays::VarBinArray; + use vortex::array::assert_arrays_eq; + use vortex::dtype::DType; + use vortex::dtype::Nullability; + use vortex::encodings::fsst::fsst_compress; + use vortex::encodings::fsst::fsst_train_compressor; + use vortex::error::VortexExpect; + use vortex::session::VortexSession; + + use super::*; + use crate::CanonicalCudaExt; + use crate::session::CudaSession; + + #[rstest] + #[case::non_null( + vec![Some(&b"the quick brown fox"[..]), + Some(&b"jumps over the lazy dog"[..]), + Some(&b"hello world"[..]), + Some(&b"vortex fsst test string"[..])], + Nullability::NonNullable, + )] + #[case::partial_nulls( + vec![Some(&b"alpha"[..]), None, Some(&b"gamma"[..]), None, Some(&b"epsilon"[..])], + Nullability::Nullable, + )] + #[case::all_nulls( + vec![None, None, None, None, None], + Nullability::Nullable, + )] + #[crate::test] + async fn test_cuda_fsst_decompression_roundtrip( + #[case] strings: Vec>, + #[case] nullability: Nullability, + ) -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let varbin = VarBinArray::from_iter(strings, DType::Binary(nullability)); + let compressor = fsst_train_compressor(&varbin); + let dtype = varbin.dtype().clone(); + let len = varbin.len(); + let fsst_array = + fsst_compress(&varbin, len, &dtype, &compressor, cuda_ctx.execution_ctx()).into_array(); + + let cpu_result = crate::canonicalize_cpu(fsst_array.clone())?; + let gpu_result = FSSTExecutor + .execute(fsst_array, &mut cuda_ctx) + .await + .vortex_expect("GPU decompression failed") + .into_host() + .await? + .into_array(); + + assert_arrays_eq!(cpu_result.into_array(), gpu_result); + Ok(()) + } + + /// Exercises the multi-block grid-stride path on a larger dataset. + #[crate::test] + async fn test_cuda_fsst_decompression_roundtrip_large() -> VortexResult<()> { + use vortex_fsst::test_utils::make_fsst_clickbench_urls; + + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let fsst_array = make_fsst_clickbench_urls(100_000, cuda_ctx.execution_ctx()).into_array(); + + let cpu_result = crate::canonicalize_cpu(fsst_array.clone())?; + let gpu_result = FSSTExecutor + .execute(fsst_array, &mut cuda_ctx) + .await + .vortex_expect("GPU decompression failed") + .into_host() + .await? + .into_array(); + + assert_arrays_eq!(cpu_result.into_array(), gpu_result); + Ok(()) + } +} diff --git a/vortex-cuda/src/kernel/encodings/mod.rs b/vortex-cuda/src/kernel/encodings/mod.rs index 62a8d9f606d..1571762fc7b 100644 --- a/vortex-cuda/src/kernel/encodings/mod.rs +++ b/vortex-cuda/src/kernel/encodings/mod.rs @@ -6,6 +6,7 @@ mod bitpacked; mod date_time_parts; mod decimal_byte_parts; mod for_; +mod fsst; mod runend; mod sequence; mod zigzag; @@ -18,6 +19,7 @@ pub(crate) use bitpacked::BitPackedExecutor; pub(crate) use date_time_parts::DateTimePartsExecutor; pub(crate) use decimal_byte_parts::DecimalBytePartsExecutor; pub(crate) use for_::FoRExecutor; +pub(crate) use fsst::FSSTExecutor; pub(crate) use runend::RunEndExecutor; pub(crate) use sequence::SequenceExecutor; pub(crate) use zigzag::ZigZagExecutor; diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index bbebef9430d..5692bb6affc 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -37,6 +37,7 @@ use kernel::DateTimePartsExecutor; use kernel::DecimalBytePartsExecutor; pub use kernel::DefaultLaunchStrategy; use kernel::DictExecutor; +use kernel::FSSTExecutor; use kernel::FilterExecutor; use kernel::FoRExecutor; pub use kernel::LaunchStrategy; @@ -70,6 +71,7 @@ use vortex::encodings::datetime_parts::DateTimeParts; use vortex::encodings::decimal_byte_parts::DecimalByteParts; use vortex::encodings::fastlanes::BitPacked; use vortex::encodings::fastlanes::FoR; +use vortex::encodings::fsst::FSST; use vortex::encodings::runend::RunEnd; use vortex::encodings::sequence::Sequence; use vortex::encodings::zigzag::ZigZag; @@ -114,6 +116,7 @@ pub fn initialize_cuda(session: &CudaSession) { session.register_kernel(Dict.id(), &DictExecutor); session.register_kernel(Shared.id(), &SharedExecutor); session.register_kernel(FoR.id(), &FoRExecutor); + session.register_kernel(FSST.id(), &FSSTExecutor); session.register_kernel(RunEnd.id(), &RunEndExecutor); session.register_kernel(Sequence.id(), &SequenceExecutor); session.register_kernel(ZigZag.id(), &ZigZagExecutor);