Skip to content

Commit 60530b1

Browse files
authored
feat[gpu]: nvidia cub filter kernel (#6188)
Signed-off-by: Alexander Droste <alexander.droste@protonmail.com>
1 parent 6ab6b5f commit 60530b1

17 files changed

Lines changed: 1280 additions & 2 deletions

File tree

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ jobs:
219219
target: wasm32-unknown-unknown
220220
env:
221221
rustflags: "RUSTFLAGS='-A warnings --cfg getrandom_backend=\"wasm_js\"'"
222-
args: "--target wasm32-unknown-unknown --exclude vortex --exclude vortex-cuda --exclude vortex-nvcomp --exclude vortex-datafusion --exclude vortex-duckdb --exclude vortex-tui --exclude vortex-zstd"
222+
args: "--target wasm32-unknown-unknown --exclude vortex --exclude vortex-cuda --exclude vortex-cub --exclude vortex-nvcomp --exclude vortex-datafusion --exclude vortex-duckdb --exclude vortex-tui --exclude vortex-zstd"
223223
steps:
224224
- uses: runs-on/action@v2
225225
with:
@@ -579,7 +579,7 @@ jobs:
579579
- name: Rust Tests (Windows)
580580
if: matrix.os == 'windows-x64'
581581
run: |
582-
cargo nextest run --locked --workspace --all-features --no-fail-fast --exclude vortex-bench --exclude vortex-python --exclude vortex-duckdb --exclude vortex-fuzz --exclude vortex-cuda --exclude vortex-nvcomp --exclude duckdb-bench --exclude lance-bench --exclude datafusion-bench --exclude random-access-bench --exclude compress-bench --exclude xtask
582+
cargo nextest run --locked --workspace --all-features --no-fail-fast --exclude vortex-bench --exclude vortex-python --exclude vortex-duckdb --exclude vortex-fuzz --exclude vortex-cuda --exclude vortex-nvcomp --exclude vortex-cub --exclude duckdb-bench --exclude lance-bench --exclude datafusion-bench --exclude random-access-bench --exclude compress-bench --exclude xtask
583583
- name: Rust Tests (Other)
584584
if: matrix.os != 'windows-x64'
585585
run: cargo nextest run --locked --workspace --all-features --no-fail-fast --exclude vortex-bench --exclude xtask

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ members = [
2727
"vortex-cuda",
2828
"vortex-cuda/macros",
2929
"vortex-cuda/nvcomp",
30+
"vortex-cuda/cub",
3031
"vortex-cxx",
3132
"vortex-ffi",
3233
"fuzz",

vortex-cuda/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@ cudarc = { workspace = true }
2626
fastlanes = { workspace = true }
2727
futures = { workspace = true, features = ["executor"] }
2828
kanal = { workspace = true }
29+
paste = { workspace = true }
2930
tracing = { workspace = true }
3031
vortex-alp = { workspace = true }
3132
vortex-array = { workspace = true }
3233
vortex-buffer = { workspace = true }
34+
vortex-cub = { path = "cub" }
3335
vortex-cuda-macros = { workspace = true }
3436
vortex-decimal-byte-parts = { workspace = true }
3537
vortex-dtype = { workspace = true, features = ["cudarc"] }
@@ -66,3 +68,7 @@ harness = false
6668
[[bench]]
6769
name = "zstd_cuda"
6870
harness = false
71+
72+
[[bench]]
73+
name = "filter_cuda"
74+
harness = false

vortex-cuda/benches/filter_cuda.rs

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
//! CUDA benchmarks for GPU filtering using CUB DeviceSelect::Flagged.
5+
6+
#![allow(clippy::unwrap_used)]
7+
#![allow(clippy::cast_possible_truncation)]
8+
9+
use std::ffi::c_void;
10+
use std::mem::size_of;
11+
use std::time::Duration;
12+
13+
use criterion::BenchmarkId;
14+
use criterion::Criterion;
15+
use criterion::Throughput;
16+
use cudarc::driver::CudaSlice;
17+
use cudarc::driver::CudaView;
18+
use cudarc::driver::DevicePtr;
19+
use cudarc::driver::DevicePtrMut;
20+
use cudarc::driver::sys::CUevent_flags;
21+
use futures::executor::block_on;
22+
use vortex_cub::filter::CubFilterable;
23+
use vortex_cub::filter::cudaStream_t;
24+
use vortex_cuda::CudaDeviceBuffer;
25+
use vortex_cuda::CudaExecutionCtx;
26+
use vortex_cuda::CudaSession;
27+
use vortex_cuda_macros::cuda_available;
28+
use vortex_cuda_macros::cuda_not_available;
29+
use vortex_error::VortexExpect;
30+
use vortex_error::VortexResult;
31+
use vortex_error::vortex_err;
32+
use vortex_session::VortexSession;
33+
34+
const BENCH_SIZES: &[(usize, &str)] = &[(1_000_000, "1M"), (10_000_000, "10M")];
35+
const SELECTIVITIES: &[(f64, &str)] = &[(0.1, "10pct"), (0.5, "50pct"), (0.9, "90pct")];
36+
37+
/// Creates input data of the given length.
38+
fn make_input_data<T: From<u8> + Clone>(len: usize) -> Vec<T> {
39+
(0..len).map(|i| T::from((i % 256) as u8)).collect()
40+
}
41+
42+
/// Creates a packed bitmask with the given selectivity.
43+
fn make_bitmask(len: usize, selectivity: f64) -> (Vec<u8>, usize) {
44+
let num_bytes = len.div_ceil(8);
45+
let mut packed = vec![0u8; num_bytes];
46+
let mut true_count = 0;
47+
48+
// Calculate stride to spread selections evenly across the array.
49+
let stride = ((1.0 / selectivity).round() as usize).max(1);
50+
51+
for idx in 0..len {
52+
// Select every stride-th element to spread work across threads.
53+
if idx % stride == 0 {
54+
let byte_idx = idx / 8;
55+
let bit_idx = idx % 8;
56+
packed[byte_idx] |= 1 << bit_idx;
57+
true_count += 1;
58+
}
59+
}
60+
61+
(packed, true_count)
62+
}
63+
64+
/// Runs the CUB filter kernel and returns elapsed GPU time.
65+
#[expect(clippy::too_many_arguments)]
66+
async fn run_filter_timed<T: CubFilterable + cudarc::driver::DeviceRepr>(
67+
d_input: CudaView<'_, T>,
68+
d_bitmask: CudaView<'_, u8>,
69+
d_output: &mut CudaSlice<T>,
70+
d_temp: &mut CudaSlice<u8>,
71+
d_num_selected: &mut CudaSlice<i64>,
72+
num_items: i64,
73+
temp_bytes: usize,
74+
cuda_ctx: &mut CudaExecutionCtx,
75+
) -> VortexResult<Duration> {
76+
let stream = cuda_ctx.stream();
77+
let ctx = stream.context();
78+
79+
let start_event = ctx
80+
.new_event(Some(CUevent_flags::CU_EVENT_BLOCKING_SYNC))
81+
.map_err(|e| vortex_err!("Failed to create start event: {:?}", e))?;
82+
start_event
83+
.record(stream)
84+
.map_err(|e| vortex_err!("Failed to record start event: {:?}", e))?;
85+
86+
// Get raw pointers
87+
let stream_ptr = stream.cu_stream() as cudaStream_t;
88+
let d_input_ptr = d_input.device_ptr(stream).0 as *const T;
89+
let d_bitmask_ptr = d_bitmask.device_ptr(stream).0 as *const u8;
90+
let d_output_ptr = d_output.device_ptr_mut(stream).0 as *mut T;
91+
let d_temp_ptr = d_temp.device_ptr_mut(stream).0 as *mut c_void;
92+
let d_num_selected_ptr = d_num_selected.device_ptr_mut(stream).0 as *mut i64;
93+
94+
unsafe {
95+
T::filter_bitmask(
96+
d_temp_ptr,
97+
temp_bytes,
98+
d_input_ptr,
99+
d_bitmask_ptr,
100+
0, // bit_offset
101+
d_output_ptr,
102+
d_num_selected_ptr,
103+
num_items,
104+
stream_ptr,
105+
)
106+
.map_err(|e| vortex_err!("Filter kernel execution failed: {}", e))?;
107+
}
108+
109+
let end_event = ctx
110+
.new_event(Some(CUevent_flags::CU_EVENT_BLOCKING_SYNC))
111+
.map_err(|e| vortex_err!("Failed to create end event: {:?}", e))?;
112+
113+
end_event
114+
.record(stream)
115+
.map_err(|e| vortex_err!("Failed to record end event: {:?}", e))?;
116+
117+
let elapsed_ms = start_event
118+
.elapsed_ms(&end_event)
119+
.map_err(|e| vortex_err!("Failed to get elapsed time: {:?}", e))?;
120+
121+
Ok(Duration::from_secs_f32(elapsed_ms / 1000.0))
122+
}
123+
124+
/// Benchmark filter for a specific type.
125+
fn benchmark_filter_type<T>(c: &mut Criterion, type_name: &str)
126+
where
127+
T: CubFilterable + cudarc::driver::DeviceRepr + From<u8> + Clone + Send + Sync + 'static,
128+
{
129+
let mut group = c.benchmark_group(format!("Filter_cuda_{type_name}"));
130+
group.sample_size(10);
131+
132+
for (len, len_label) in BENCH_SIZES {
133+
for (selectivity, sel_label) in SELECTIVITIES {
134+
let input_data = make_input_data::<T>(*len);
135+
let (bitmask, true_count) = make_bitmask(*len, *selectivity);
136+
137+
// Throughput based on input size (bytes read)
138+
group.throughput(Throughput::Bytes((len * size_of::<T>()) as u64));
139+
140+
group.bench_with_input(
141+
BenchmarkId::new(format!("{len_label}_{sel_label}"), true_count),
142+
&(input_data, bitmask, true_count),
143+
|b, (input_data, bitmask, true_count)| {
144+
b.iter_custom(|iters| {
145+
let mut cuda_ctx =
146+
CudaSession::create_execution_ctx(&VortexSession::empty())
147+
.vortex_expect("failed to create execution context");
148+
149+
let num_items = input_data.len() as i64;
150+
#[expect(clippy::expect_used)]
151+
let temp_bytes =
152+
T::get_temp_size(num_items).expect("failed to get temp size");
153+
154+
let mut total_time = Duration::ZERO;
155+
156+
for _ in 0..iters {
157+
// Copy input to device
158+
let d_input_handle =
159+
block_on(cuda_ctx.copy_to_device(input_data.clone()).unwrap())
160+
.vortex_expect("failed to copy input to device");
161+
let d_input = d_input_handle
162+
.as_device()
163+
.as_any()
164+
.downcast_ref::<CudaDeviceBuffer<T>>()
165+
.unwrap();
166+
167+
// Copy bitmask to device
168+
let d_bitmask_handle =
169+
block_on(cuda_ctx.copy_to_device(bitmask.clone()).unwrap())
170+
.vortex_expect("failed to copy bitmask to device");
171+
let d_bitmask = d_bitmask_handle
172+
.as_device()
173+
.as_any()
174+
.downcast_ref::<CudaDeviceBuffer<u8>>()
175+
.unwrap();
176+
177+
// Allocate output and temp buffers
178+
let mut d_output: CudaSlice<T> = cuda_ctx
179+
.device_alloc(*true_count)
180+
.vortex_expect("failed to allocate output");
181+
let mut d_temp: CudaSlice<u8> = cuda_ctx
182+
.device_alloc(temp_bytes.max(1))
183+
.vortex_expect("failed to allocate temp");
184+
let mut d_num_selected: CudaSlice<i64> = cuda_ctx
185+
.device_alloc(1)
186+
.vortex_expect("failed to allocate num_selected");
187+
188+
let kernel_time = block_on(run_filter_timed(
189+
d_input.as_view(),
190+
d_bitmask.as_view(),
191+
&mut d_output,
192+
&mut d_temp,
193+
&mut d_num_selected,
194+
num_items,
195+
temp_bytes,
196+
&mut cuda_ctx,
197+
))
198+
.vortex_expect("kernel execution failed");
199+
200+
total_time += kernel_time;
201+
}
202+
203+
total_time
204+
});
205+
},
206+
);
207+
}
208+
}
209+
210+
group.finish();
211+
}
212+
213+
/// Benchmark i32 filter
214+
fn benchmark_filter_i32(c: &mut Criterion) {
215+
benchmark_filter_type::<i32>(c, "i32");
216+
}
217+
218+
/// Benchmark i64 filter
219+
fn benchmark_filter_i64(c: &mut Criterion) {
220+
benchmark_filter_type::<i64>(c, "i64");
221+
}
222+
223+
/// Benchmark f64 filter
224+
fn benchmark_filter_f64(c: &mut Criterion) {
225+
benchmark_filter_type::<f64>(c, "f64");
226+
}
227+
228+
pub fn benchmark_filter_cuda(c: &mut Criterion) {
229+
benchmark_filter_i32(c);
230+
benchmark_filter_i64(c);
231+
benchmark_filter_f64(c);
232+
}
233+
234+
criterion::criterion_group!(benches, benchmark_filter_cuda);
235+
236+
#[cuda_available]
237+
criterion::criterion_main!(benches);
238+
239+
#[cuda_not_available]
240+
fn main() {}

vortex-cuda/cub/Cargo.toml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
[package]
5+
name = "vortex-cub"
6+
authors.workspace = true
7+
description = "Rust bindings to NVIDIA CUB library for Vortex GPU operations"
8+
edition = { workspace = true }
9+
homepage = { workspace = true }
10+
categories = { workspace = true }
11+
include = { workspace = true }
12+
keywords = { workspace = true }
13+
license = { workspace = true }
14+
readme = { workspace = true }
15+
repository = { workspace = true }
16+
rust-version = { workspace = true }
17+
version = { workspace = true }
18+
19+
[lints]
20+
workspace = true
21+
22+
[dependencies]
23+
libloading = { workspace = true }
24+
paste = { workspace = true }
25+
vortex-cuda-macros = { workspace = true }
26+
27+
[build-dependencies]
28+
bindgen = { workspace = true }

0 commit comments

Comments
 (0)