Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["cudf-sys", "cudf-cxx", "cudf", "cudf-polars"]
resolver = "2"

[workspace.package]
version = "0.2.1"
version = "0.3.0"
edition = "2024"
rust-version = "1.85"
license = "Apache-2.0 OR MIT"
Expand Down
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ Unofficial Rust bindings for NVIDIA's [libcudf](https://github.com/rapidsai/cudf

> **This project is unofficial and not affiliated with NVIDIA or RAPIDS.**

## What's New in v0.3.0

- **Polars integration via `_collect_post_opt`**: `collect_gpu()` now uses polars' native post-optimization callback. Zero changes to polars fork required for standalone use.
- **`Engine::Gpu` dispatch**: polars-fork integration enables `lf.collect_with_engine(Engine::Gpu)` -- cudf-polars as a native GPU backend.
- **New expressions**: `Not` (Bool logical / Int bitwise), `IsIn` (with null propagation), GroupBy `Quantile`.
- **Window functions**: `AExpr::Over` with `GroupsToRows` mapping -- groupby then scatter broadcast (O(n)).
- **Temporal types**: Date, Datetime (naive), Duration mapped to cudf timestamp/duration types.
- **Performance**: HStack/HConcat zero-copy via `into_parts()`, GPU-native scalar broadcast (`sequence_*`), HashMap name lookup O(1).
- **Safety**: `try_data_type()` fallible API, Arrow FFI debug assertions, First/Last empty guard.

## Features

- **Near-zero unsafe public API** -- all `unsafe` is confined to the internal FFI layer (sole exception: `DLPackTensor::from_raw_ptr`)
Expand Down Expand Up @@ -61,7 +71,7 @@ export CUDA_PATH=/usr/local/cuda

```toml
[dependencies]
cudf = "0.2"
cudf = "0.3"
```

### 4. Build
Expand Down Expand Up @@ -211,8 +221,8 @@ Arrow C Data Interface, Arrow IPC, DLPack tensor exchange, pack/unpack/contiguou
- **GroupBy `maintain_order`**: Approximated by key-column sort, not true input-order preservation.
- **Std/Var ddof**: Default standalone reduction uses ddof=1. Full ddof support via `reduce_var_with_ddof` / `reduce_std_with_ddof`.
- **Polars version**: cudf-polars is compatible with Polars 0.53.0.
- **Unsupported types**: Date, Datetime, Duration, Categorical, List, Struct are not yet mapped (returns explicit error).
- **Unsupported expressions**: Window functions (`.over()`), `IsIn`, expression-level Sort/Filter/Slice.
- **Unsupported types**: Categorical, List, Struct are not yet mapped (returns explicit error).
- **Unsupported expressions**: expression-level Sort/Filter/Slice.
- **Unsupported IR nodes**: `Cache`, `MapFunction`, `ExtContext`, `Sink`.
- **Multi-file Parquet**: Only reads the first file in multi-file scans.

Expand Down
5 changes: 3 additions & 2 deletions cudf-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ polars-core = { version = "=0.53.0", default-features = false }
polars-utils = { version = "=0.53.0" }
polars-error = { version = "=0.53.0" }
polars-arrow = { version = "=0.53.0" }
polars-plan = { version = "=0.53.0", default-features = false, features = ["parquet", "semi_anti_join", "abs"] }
polars-plan = { version = "=0.53.0", default-features = false, features = ["parquet", "semi_anti_join", "abs", "is_in"] }
polars-ops = { version = "=0.53.0", default-features = false, features = ["semi_anti_join"] }
polars-lazy = { version = "=0.53.0", default-features = false, features = ["abs", "parquet", "semi_anti_join"] }
polars-lazy = { version = "=0.53.0", default-features = false, features = ["abs", "parquet", "semi_anti_join"], optional = true }
arrow = { version = "54", default-features = false, features = ["ffi"] }

[features]
default = []
lazy = ["polars-lazy"]
gpu-tests = []

[[example]]
Expand Down
30 changes: 30 additions & 0 deletions cudf-polars/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ fn polars_arrow_to_arrow_ffi(
// We move ownership via ptr::read + mem::forget to avoid double-drop of the release
// callback.
let ffi_schema = unsafe {
debug_assert_eq!(
std::mem::size_of_val(&polars_c_schema),
std::mem::size_of::<arrow::ffi::FFI_ArrowSchema>(),
"ArrowSchema size mismatch at runtime"
);
// SAFETY: polars_c_schema and FFI_ArrowSchema are both #[repr(C)] Arrow C Data
// Interface structs with identical layout (verified by const assertion + debug_assert).
// We read-then-forget to transfer ownership without double-drop.
std::ptr::read(
&polars_c_schema as *const polars_arrow::ffi::ArrowSchema
as *const arrow::ffi::FFI_ArrowSchema,
Expand All @@ -128,6 +136,14 @@ fn polars_arrow_to_arrow_ffi(
std::mem::forget(polars_c_schema);

let ffi_array = unsafe {
debug_assert_eq!(
std::mem::size_of_val(&polars_c_array),
std::mem::size_of::<arrow::ffi::FFI_ArrowArray>(),
"ArrowArray size mismatch at runtime"
);
// SAFETY: polars_c_array and FFI_ArrowArray are both #[repr(C)] Arrow C Data
// Interface structs with identical layout (verified by const assertion + debug_assert).
// We read-then-forget to transfer ownership without double-drop.
std::ptr::read(
&polars_c_array as *const polars_arrow::ffi::ArrowArray
as *const arrow::ffi::FFI_ArrowArray,
Expand All @@ -154,6 +170,13 @@ fn arrow_to_polars_arrow_ffi(
// SAFETY: Same layout invariant as above — verified by compile-time assertions
// at module level (size and alignment match).
let polars_c_schema = unsafe {
debug_assert_eq!(
std::mem::size_of_val(&ffi_schema),
std::mem::size_of::<polars_arrow::ffi::ArrowSchema>(),
"ArrowSchema size mismatch at runtime"
);
// SAFETY: FFI_ArrowSchema and polars ArrowSchema are both #[repr(C)] Arrow C Data
// Interface structs with identical layout (verified by const assertion + debug_assert).
std::ptr::read(
&ffi_schema as *const arrow::ffi::FFI_ArrowSchema
as *const polars_arrow::ffi::ArrowSchema,
Expand All @@ -162,6 +185,13 @@ fn arrow_to_polars_arrow_ffi(
std::mem::forget(ffi_schema);

let polars_c_array = unsafe {
debug_assert_eq!(
std::mem::size_of_val(&ffi_array),
std::mem::size_of::<polars_arrow::ffi::ArrowArray>(),
"ArrowArray size mismatch at runtime"
);
// SAFETY: FFI_ArrowArray and polars ArrowArray are both #[repr(C)] Arrow C Data
// Interface structs with identical layout (verified by const assertion + debug_assert).
std::ptr::read(
&ffi_array as *const arrow::ffi::FFI_ArrowArray as *const polars_arrow::ffi::ArrowArray,
)
Expand Down
138 changes: 104 additions & 34 deletions cudf-polars/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
//! GPU execution engine: walks the IR tree and executes nodes on GPU.

use std::collections::HashMap;

use polars_core::prelude::*;
use polars_error::{PolarsResult, polars_bail};
use polars_plan::plans::{AExpr, IR, IRAggExpr, IRPlan};
use polars_plan::plans::{AExpr, IR, IRAggExpr, IRPlan, LiteralValue};
use polars_utils::arena::{Arena, Node};

use cudf::aggregation::AggregationKind;
Expand All @@ -16,7 +18,7 @@ use crate::expr;
use crate::gpu_frame::GpuDataFrame;

/// Execute an IR node recursively, producing a GPU-resident data frame.
pub fn execute_node(
pub(crate) fn execute_node(
node: Node,
lp_arena: &Arena<IR>,
expr_arena: &Arena<AExpr>,
Expand Down Expand Up @@ -87,35 +89,57 @@ pub fn execute_node(
let table = execute_node(input_node, lp_arena, expr_arena)?;

// HStack adds new columns to the existing frame.
// Use Option<Column> to allow zero-copy reordering without dummy GPU allocations.
let existing_width = table.width();
// Evaluate new expressions first (before consuming table for zero-copy decomposition)
let mut new_cols = Vec::with_capacity(exprs.len());
let mut new_names = Vec::with_capacity(exprs.len());
for e in &exprs {
let col = expr::eval_expr(e.node(), expr_arena, &table)?;
new_cols.push(col);
new_names.push(e.output_name().to_string());
}

// Decompose table into columns (zero-copy) instead of deep-copying each
let (existing_cols, existing_names) = table.into_parts()?;
let existing_width = existing_cols.len();
let mut all_columns: Vec<Option<cudf::Column>> =
Vec::with_capacity(existing_width + exprs.len());
let mut all_names = Vec::with_capacity(existing_width + exprs.len());
Vec::with_capacity(existing_width + new_cols.len());
let mut all_names = Vec::with_capacity(existing_width + new_names.len());

for i in 0..existing_width {
all_columns.push(Some(table.column(i)?));
all_names.push(table.names()[i].clone());
for col in existing_cols {
all_columns.push(Some(col));
}
all_names.extend(existing_names);

for e in &exprs {
let col = expr::eval_expr(e.node(), expr_arena, &table)?;
let name = e.output_name().to_string();
// Build name→position index for O(1) lookup instead of O(n) linear scan
let mut name_index: HashMap<String, usize> = all_names
.iter()
.enumerate()
.map(|(i, n)| (n.clone(), i))
.collect();

if let Some(pos) = all_names.iter().position(|n| n == &name) {
// Merge new columns (replace or append)
for (col, name) in new_cols.into_iter().zip(new_names) {
if let Some(&pos) = name_index.get(&name) {
all_columns[pos] = Some(col);
} else {
let new_pos = all_columns.len();
all_columns.push(Some(col));
name_index.insert(name.clone(), new_pos);
all_names.push(name);
}
}

// Reorder to match the output schema
// Reorder to match the output schema using HashMap for O(1) lookup
let schema_names: Vec<&str> = schema.iter_names().map(|n| n.as_str()).collect();
let name_pos: HashMap<&str, usize> = all_names
.iter()
.enumerate()
.map(|(i, n)| (n.as_str(), i))
.collect();
let mut ordered_columns = Vec::with_capacity(schema_names.len());
let mut ordered_names = Vec::with_capacity(schema_names.len());
for &sn in &schema_names {
if let Some(pos) = all_names.iter().position(|n| n == sn) {
if let Some(&pos) = name_pos.get(sn) {
let col = all_columns[pos].take().ok_or_else(|| {
polars_err!(ColumnNotFound: "duplicate reference to column '{}' in HStack schema", sn)
})?;
Expand Down Expand Up @@ -567,14 +591,13 @@ pub fn execute_node(
polars_bail!(ComputeError: "GPU HConcat requires all inputs to have the same height, got {:?}", heights);
}

// Collect all columns from all tables
// Decompose all tables into columns (zero-copy) instead of deep-copying each
let mut all_columns = Vec::new();
let mut all_names = Vec::new();
for t in &tables {
for i in 0..t.width() {
all_columns.push(t.column(i)?);
all_names.push(t.names()[i].clone());
}
for t in tables {
let (cols, names) = t.into_parts()?;
all_columns.extend(cols);
all_names.extend(names);
}

let combined = GpuDataFrame::from_columns(all_columns, all_names)?;
Expand All @@ -594,22 +617,54 @@ pub fn execute_node(
/// In polars-plan 0.53+, `Alias` is no longer an `AExpr` variant — it lives on the
/// `ExprIR` wrapper (`OutputName::Alias`), so the caller strips it via `ExprIR::node()`.
/// This function handles `Cast` wrappers that the optimizer may insert around `Agg`.
fn extract_agg_info(
pub(crate) fn extract_agg_info(
node: Node,
expr_arena: &Arena<AExpr>,
) -> PolarsResult<(Node, AggregationKind)> {
match expr_arena.get(node) {
AExpr::Agg(agg) => {
let (input, kind) = map_ir_agg(agg)?;
Ok((input, kind))
match agg {
IRAggExpr::Quantile { expr, quantile, method: _ } => {
// Extract the quantile value from the expression arena
// Polars 0.53 LiteralValue has Dyn/Scalar/Series/Range variants
let q_value = match expr_arena.get(*quantile) {
AExpr::Literal(LiteralValue::Dyn(dyn_val)) => {
use polars_plan::plans::DynLiteralValue;
match dyn_val {
DynLiteralValue::Float(q) => *q,
DynLiteralValue::Int(q) => *q as f64,
_ => polars_bail!(ComputeError: "GPU engine: Quantile requires a numeric literal"),
}
}
AExpr::Literal(LiteralValue::Scalar(s)) => {
use polars_core::prelude::AnyValue;
match s.value() {
AnyValue::Float64(q) => *q,
AnyValue::Float32(q) => *q as f64,
AnyValue::Int32(q) => *q as f64,
AnyValue::Int64(q) => *q as f64,
AnyValue::UInt32(q) => *q as f64,
AnyValue::UInt64(q) => *q as f64,
_ => polars_bail!(ComputeError: "GPU engine: Quantile scalar must be numeric, got {:?}", s.dtype()),
}
}
_ => polars_bail!(ComputeError: "GPU engine: Quantile requires a literal quantile value"),
};
Ok((*expr, AggregationKind::Quantile { q: q_value }))
}
other => {
let (input, kind) = map_ir_agg(other)?;
Ok((input, kind))
}
}
}
AExpr::Cast { expr, .. } => extract_agg_info(*expr, expr_arena),
_ => polars_bail!(ComputeError: "GPU engine: expected aggregation expression in GroupBy"),
}
}

/// Map an IRAggExpr to its input node and cudf AggregationKind.
fn map_ir_agg(agg: &IRAggExpr) -> PolarsResult<(Node, AggregationKind)> {
pub(crate) fn map_ir_agg(agg: &IRAggExpr) -> PolarsResult<(Node, AggregationKind)> {
match agg {
IRAggExpr::Sum(input) => Ok((*input, AggregationKind::Sum)),
IRAggExpr::Min { input, .. } => Ok((*input, AggregationKind::Min)),
Expand All @@ -633,9 +688,7 @@ fn map_ir_agg(agg: &IRAggExpr) -> PolarsResult<(Node, AggregationKind)> {
IRAggExpr::Var(input, ddof) => {
Ok((*input, AggregationKind::Variance { ddof: *ddof as i32 }))
}
IRAggExpr::Quantile { .. } => {
polars_bail!(ComputeError: "GPU engine: Quantile aggregation not yet supported")
}
// Quantile is handled in extract_agg_info (needs expr_arena access)
other => {
polars_bail!(ComputeError: "GPU engine: unsupported aggregation type: {:?}", other)
}
Expand Down Expand Up @@ -693,11 +746,13 @@ pub fn execute_plan(plan: IRPlan) -> PolarsResult<DataFrame> {
result.to_polars()
}

/// Execute a Polars LazyFrame on the GPU.
#[cfg(feature = "lazy")]
/// Execute a Polars LazyFrame on the GPU using polars' `_collect_post_opt` callback.
///
/// This is the main entry point for GPU-accelerated query execution.
/// It takes a LazyFrame, runs Polars' query optimizer, then executes
/// the optimized plan on the GPU via libcudf.
/// This integrates with polars' physical-plan pipeline: after the optimizer runs,
/// our callback receives the optimized IR, executes it on GPU, and replaces the
/// root node with a `DataFrameScan` holding the result. Polars then creates a
/// trivial physical plan that simply returns the pre-computed DataFrame.
///
/// # Example
/// ```no_run
Expand All @@ -711,6 +766,21 @@ pub fn execute_plan(plan: IRPlan) -> PolarsResult<DataFrame> {
/// ).unwrap();
/// ```
pub fn collect_gpu(lf: polars_lazy::frame::LazyFrame) -> PolarsResult<DataFrame> {
let plan = lf.to_alp_optimized()?;
execute_plan(plan)
lf._collect_post_opt(|root, lp_arena, expr_arena, _timing| {
// Execute the optimized IR on GPU directly in the callback
let gpu_result = execute_node(root, lp_arena, expr_arena)?;
let df = gpu_result.to_polars()?;

// Replace the root node with a DataFrameScan holding the GPU result.
// Polars' physical plan will simply read this pre-computed DataFrame.
let schema = df.schema().clone();
let replacement = IR::DataFrameScan {
df: std::sync::Arc::new(df),
schema,
output_schema: None,
};
lp_arena.replace(root, replacement);

Ok(())
})
}
Loading
Loading