From f9a99d08b156bb1854e92cbedee3ebab95abc3cb Mon Sep 17 00:00:00 2001 From: 0xDaizz <90135051+0xDaizz@users.noreply.github.com> Date: Sun, 5 Apr 2026 01:24:00 +0900 Subject: [PATCH 1/7] =?UTF-8?q?feat+fix:=20Phase=201=20=E2=80=94=20=5Fcoll?= =?UTF-8?q?ect=5Fpost=5Fopt=20=EC=BD=9C=EB=B0=B1=20=EA=B8=B0=EB=B0=98=20co?= =?UTF-8?q?llect=5Fgpu=20=EC=9E=AC=EA=B5=AC=ED=98=84=20+=20correctness=20f?= =?UTF-8?q?ixes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - engine: collect_gpu를 lf._collect_post_opt() 콜백 기반으로 재구현하여 polars 내부 코드 변경 없이 GPU 실행 파이프라인 연결 (IR → DataFrameScan 치환) - expr: First/Last agg를 slice+repeat 방식으로 수정하여 올바른 단일값 브로드캐스트 보장, 빈 컬럼/그룹 guard 추가 (null_column_for_type 공통화) - column: try_data_type() 추가 — data_type()의 fallible 버전으로 FFI type_id 불일치 시 panic 대신 Result 반환 - convert: Arrow FFI ptr::read 4곳에 debug_assert_eq 크기 검증 + SAFETY 주석 보강 Co-Authored-By: Claude Opus 4.6 (1M context) --- cudf-polars/src/convert.rs | 30 ++++++++ cudf-polars/src/engine.rs | 28 ++++++-- cudf-polars/src/expr.rs | 138 ++++++++++++++++++++++--------------- cudf/src/column.rs | 35 +++++++--- 4 files changed, 159 insertions(+), 72 deletions(-) diff --git a/cudf-polars/src/convert.rs b/cudf-polars/src/convert.rs index 2a46cd9..fa22a02 100644 --- a/cudf-polars/src/convert.rs +++ b/cudf-polars/src/convert.rs @@ -119,6 +119,14 @@ fn polars_arrow_to_arrow_ffi( // We move ownership via ptr::read + mem::forget to avoid double-drop of the release // callback. let ffi_schema = unsafe { + debug_assert_eq!( + std::mem::size_of_val(&polars_c_schema), + std::mem::size_of::(), + "ArrowSchema size mismatch at runtime" + ); + // SAFETY: polars_c_schema and FFI_ArrowSchema are both #[repr(C)] Arrow C Data + // Interface structs with identical layout (verified by const assertion + debug_assert). + // We read-then-forget to transfer ownership without double-drop. std::ptr::read( &polars_c_schema as *const polars_arrow::ffi::ArrowSchema as *const arrow::ffi::FFI_ArrowSchema, @@ -128,6 +136,14 @@ fn polars_arrow_to_arrow_ffi( std::mem::forget(polars_c_schema); let ffi_array = unsafe { + debug_assert_eq!( + std::mem::size_of_val(&polars_c_array), + std::mem::size_of::(), + "ArrowArray size mismatch at runtime" + ); + // SAFETY: polars_c_array and FFI_ArrowArray are both #[repr(C)] Arrow C Data + // Interface structs with identical layout (verified by const assertion + debug_assert). + // We read-then-forget to transfer ownership without double-drop. std::ptr::read( &polars_c_array as *const polars_arrow::ffi::ArrowArray as *const arrow::ffi::FFI_ArrowArray, @@ -154,6 +170,13 @@ fn arrow_to_polars_arrow_ffi( // SAFETY: Same layout invariant as above — verified by compile-time assertions // at module level (size and alignment match). let polars_c_schema = unsafe { + debug_assert_eq!( + std::mem::size_of_val(&ffi_schema), + std::mem::size_of::(), + "ArrowSchema size mismatch at runtime" + ); + // SAFETY: FFI_ArrowSchema and polars ArrowSchema are both #[repr(C)] Arrow C Data + // Interface structs with identical layout (verified by const assertion + debug_assert). std::ptr::read( &ffi_schema as *const arrow::ffi::FFI_ArrowSchema as *const polars_arrow::ffi::ArrowSchema, @@ -162,6 +185,13 @@ fn arrow_to_polars_arrow_ffi( std::mem::forget(ffi_schema); let polars_c_array = unsafe { + debug_assert_eq!( + std::mem::size_of_val(&ffi_array), + std::mem::size_of::(), + "ArrowArray size mismatch at runtime" + ); + // SAFETY: FFI_ArrowArray and polars ArrowArray are both #[repr(C)] Arrow C Data + // Interface structs with identical layout (verified by const assertion + debug_assert). std::ptr::read( &ffi_array as *const arrow::ffi::FFI_ArrowArray as *const polars_arrow::ffi::ArrowArray, ) diff --git a/cudf-polars/src/engine.rs b/cudf-polars/src/engine.rs index 4f0042b..8702625 100644 --- a/cudf-polars/src/engine.rs +++ b/cudf-polars/src/engine.rs @@ -693,11 +693,12 @@ pub fn execute_plan(plan: IRPlan) -> PolarsResult { result.to_polars() } -/// Execute a Polars LazyFrame on the GPU. +/// Execute a Polars LazyFrame on the GPU using polars' `_collect_post_opt` callback. /// -/// This is the main entry point for GPU-accelerated query execution. -/// It takes a LazyFrame, runs Polars' query optimizer, then executes -/// the optimized plan on the GPU via libcudf. +/// This integrates with polars' physical-plan pipeline: after the optimizer runs, +/// our callback receives the optimized IR, executes it on GPU, and replaces the +/// root node with a `DataFrameScan` holding the result. Polars then creates a +/// trivial physical plan that simply returns the pre-computed DataFrame. /// /// # Example /// ```no_run @@ -711,6 +712,21 @@ pub fn execute_plan(plan: IRPlan) -> PolarsResult { /// ).unwrap(); /// ``` pub fn collect_gpu(lf: polars_lazy::frame::LazyFrame) -> PolarsResult { - let plan = lf.to_alp_optimized()?; - execute_plan(plan) + lf._collect_post_opt(|root, lp_arena, expr_arena, _timing| { + // Execute the optimized IR on GPU directly in the callback + let gpu_result = execute_node(root, lp_arena, expr_arena)?; + let df = gpu_result.to_polars()?; + + // Replace the root node with a DataFrameScan holding the GPU result. + // Polars' physical plan will simply read this pre-computed DataFrame. + let schema = df.schema().clone(); + let replacement = IR::DataFrameScan { + df: std::sync::Arc::new(df), + schema, + output_schema: None, + }; + lp_arena.replace(root, replacement); + + Ok(()) + }) } diff --git a/cudf-polars/src/expr.rs b/cudf-polars/src/expr.rs index 7441969..8abc3e6 100644 --- a/cudf-polars/src/expr.rs +++ b/cudf-polars/src/expr.rs @@ -427,9 +427,29 @@ fn eval_agg_expr( let data = vec![n; height]; gpu_result(GpuColumn::from_slice(&data)) } - IRAggExpr::First(input) | IRAggExpr::Last(input) => { - // In standalone context, just evaluate the input - eval_expr(*input, expr_arena, df) + IRAggExpr::First(input) => { + let col = eval_expr(*input, expr_arena, df)?; + if col.len() == 0 || height == 0 { + return null_column_for_type(col.data_type(), height); + } + // Slice the first element, then repeat to fill height + let single_row_table = gpu_result(cudf::Table::new(vec![col]))?; + let sliced = gpu_result(single_row_table.slice(0, 1))?; + let repeated = gpu_result(sliced.repeat(height))?; + let cols = gpu_result(repeated.into_columns())?; + Ok(cols.into_iter().next().unwrap()) + } + IRAggExpr::Last(input) => { + let col = eval_expr(*input, expr_arena, df)?; + if col.len() == 0 || height == 0 { + return null_column_for_type(col.data_type(), height); + } + let last_idx = col.len() - 1; + let single_row_table = gpu_result(cudf::Table::new(vec![col]))?; + let sliced = gpu_result(single_row_table.slice(last_idx, last_idx + 1))?; + let repeated = gpu_result(sliced.repeat(height))?; + let cols = gpu_result(repeated.into_columns())?; + Ok(cols.into_iter().next().unwrap()) } other => { polars_bail!(ComputeError: "GPU engine: unsupported standalone aggregation: {:?}", other) @@ -462,6 +482,64 @@ fn reduce_and_broadcast( broadcast_scalar(&scalar, height) } +/// Create a column of `height` null values with the given GPU data type. +fn null_column_for_type(dtype: cudf::types::DataType, height: usize) -> PolarsResult { + let tid = dtype.id(); + match tid { + GpuTypeId::Bool8 => { + let opts: Vec> = vec![None; height]; + gpu_result(GpuColumn::from_optional_bool(&opts)) + } + GpuTypeId::Int8 => { + let opts: Vec> = vec![None; height]; + gpu_result(GpuColumn::from_optional_i8(&opts)) + } + GpuTypeId::Int16 => { + let opts: Vec> = vec![None; height]; + gpu_result(GpuColumn::from_optional_i16(&opts)) + } + GpuTypeId::Int32 => { + let opts: Vec> = vec![None; height]; + gpu_result(GpuColumn::from_optional_i32(&opts)) + } + GpuTypeId::Int64 => { + let opts: Vec> = vec![None; height]; + gpu_result(GpuColumn::from_optional_i64(&opts)) + } + GpuTypeId::Uint8 => { + let opts: Vec> = vec![None; height]; + gpu_result(GpuColumn::from_optional_u8(&opts)) + } + GpuTypeId::Uint16 => { + let opts: Vec> = vec![None; height]; + gpu_result(GpuColumn::from_optional_u16(&opts)) + } + GpuTypeId::Uint32 => { + let opts: Vec> = vec![None; height]; + gpu_result(GpuColumn::from_optional_u32(&opts)) + } + GpuTypeId::Uint64 => { + let opts: Vec> = vec![None; height]; + gpu_result(GpuColumn::from_optional_u64(&opts)) + } + GpuTypeId::Float32 => { + let opts: Vec> = vec![None; height]; + gpu_result(GpuColumn::from_optional_f32(&opts)) + } + GpuTypeId::Float64 => { + let opts: Vec> = vec![None; height]; + gpu_result(GpuColumn::from_optional_f64(&opts)) + } + GpuTypeId::String => { + let opts: Vec> = vec![None; height]; + gpu_result(GpuColumn::from_optional_strings(&opts)) + } + other => { + polars_bail!(ComputeError: "GPU engine: cannot create null column of type {:?}", other) + } + } +} + /// Broadcast a scalar value to a column of the given height. fn broadcast_scalar(scalar: &cudf::Scalar, height: usize) -> PolarsResult { let dtype = scalar.data_type(); @@ -469,59 +547,7 @@ fn broadcast_scalar(scalar: &cudf::Scalar, height: usize) -> PolarsResult { - let opts: Vec> = vec![None; height]; - gpu_result(GpuColumn::from_optional_bool(&opts)) - } - GpuTypeId::Int8 => { - let opts: Vec> = vec![None; height]; - gpu_result(GpuColumn::from_optional_i8(&opts)) - } - GpuTypeId::Int16 => { - let opts: Vec> = vec![None; height]; - gpu_result(GpuColumn::from_optional_i16(&opts)) - } - GpuTypeId::Int32 => { - let opts: Vec> = vec![None; height]; - gpu_result(GpuColumn::from_optional_i32(&opts)) - } - GpuTypeId::Int64 => { - let opts: Vec> = vec![None; height]; - gpu_result(GpuColumn::from_optional_i64(&opts)) - } - GpuTypeId::Uint8 => { - let opts: Vec> = vec![None; height]; - gpu_result(GpuColumn::from_optional_u8(&opts)) - } - GpuTypeId::Uint16 => { - let opts: Vec> = vec![None; height]; - gpu_result(GpuColumn::from_optional_u16(&opts)) - } - GpuTypeId::Uint32 => { - let opts: Vec> = vec![None; height]; - gpu_result(GpuColumn::from_optional_u32(&opts)) - } - GpuTypeId::Uint64 => { - let opts: Vec> = vec![None; height]; - gpu_result(GpuColumn::from_optional_u64(&opts)) - } - GpuTypeId::Float32 => { - let opts: Vec> = vec![None; height]; - gpu_result(GpuColumn::from_optional_f32(&opts)) - } - GpuTypeId::Float64 => { - let opts: Vec> = vec![None; height]; - gpu_result(GpuColumn::from_optional_f64(&opts)) - } - GpuTypeId::String => { - let opts: Vec> = vec![None; height]; - gpu_result(GpuColumn::from_optional_strings(&opts)) - } - other => { - polars_bail!(ComputeError: "GPU engine: cannot broadcast null scalar of type {:?}", other) - } - }; + return null_column_for_type(dtype, height); } match tid { diff --git a/cudf/src/column.rs b/cudf/src/column.rs index 31bd245..b290ade 100644 --- a/cudf/src/column.rs +++ b/cudf/src/column.rs @@ -129,24 +129,39 @@ impl Column { } /// The data type of this column. + /// + /// # Panics + /// + /// Panics if the underlying libcudf column has a type_id that this crate + /// does not recognize (e.g. a libcudf version mismatch). Use + /// [`try_data_type`](Self::try_data_type) for a non-panicking alternative. pub fn data_type(&self) -> DataType { + self.try_data_type() + .unwrap_or_else(|e| panic!("cudf: data_type() failed — {e}")) + } + + /// The data type of this column (fallible version). + /// + /// Returns an error instead of panicking when the FFI type_id is + /// unrecognized or the decimal parameters are invalid. + pub fn try_data_type(&self) -> Result { let raw = self.inner.type_id(); - let id = TypeId::from_raw(raw).unwrap_or_else(|| { - panic!( - "cudf: unrecognized type_id {} from FFI — possible libcudf version mismatch", + let id = TypeId::from_raw(raw).ok_or_else(|| { + CudfError::InvalidArgument(format!( + "unrecognized type_id {} from FFI — possible libcudf version mismatch", raw - ) - }); + )) + })?; let scale = self.inner.type_scale(); if scale != 0 { - DataType::decimal(id, scale).unwrap_or_else(|e| { - panic!( - "cudf: invalid decimal type — type_id {:?} with scale {}: {}", + DataType::decimal(id, scale).map_err(|e| { + CudfError::InvalidArgument(format!( + "invalid decimal type — type_id {:?} with scale {}: {}", id, scale, e - ) + )) }) } else { - DataType::new(id) + Ok(DataType::new(id)) } } From 89608433c8bd3c7f9558d579153b0609abf2ecab Mon Sep 17 00:00:00 2001 From: 0xDaizz <90135051+0xDaizz@users.noreply.github.com> Date: Sun, 5 Apr 2026 01:51:17 +0900 Subject: [PATCH 2/7] =?UTF-8?q?feat:=20Phase=202=20quick=20wins=20?= =?UTF-8?q?=E2=80=94=20Not,=20IsIn,=20GroupBy=20Quantile=20expressions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Not expression: Bool→logical not, Int→bitwise invert (Polars semantics) - IsIn expression: values.contains + nulls_equal propagation (null in col → null output when nulls_equal=false) - GroupBy Quantile: extract q via DynLiteralValue/Scalar for Polars 0.53 LiteralValue API (Dyn/Scalar/Series/Range variants) - Enable `is_in` feature in polars-plan dependency - Codex rescue review 3건 반영: LiteralValue variant mismatch, Not integer dispatch, IsIn null semantics Co-Authored-By: Claude Opus 4.6 (1M context) --- cudf-polars/Cargo.toml | 2 +- cudf-polars/src/engine.rs | 42 +++++++++++++++++++++++++++++++++------ cudf-polars/src/expr.rs | 35 ++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 7 deletions(-) diff --git a/cudf-polars/Cargo.toml b/cudf-polars/Cargo.toml index d3a2249..9584a06 100644 --- a/cudf-polars/Cargo.toml +++ b/cudf-polars/Cargo.toml @@ -15,7 +15,7 @@ polars-core = { version = "=0.53.0", default-features = false } polars-utils = { version = "=0.53.0" } polars-error = { version = "=0.53.0" } polars-arrow = { version = "=0.53.0" } -polars-plan = { version = "=0.53.0", default-features = false, features = ["parquet", "semi_anti_join", "abs"] } +polars-plan = { version = "=0.53.0", default-features = false, features = ["parquet", "semi_anti_join", "abs", "is_in"] } polars-ops = { version = "=0.53.0", default-features = false, features = ["semi_anti_join"] } polars-lazy = { version = "=0.53.0", default-features = false, features = ["abs", "parquet", "semi_anti_join"] } arrow = { version = "54", default-features = false, features = ["ffi"] } diff --git a/cudf-polars/src/engine.rs b/cudf-polars/src/engine.rs index 8702625..dcd2c31 100644 --- a/cudf-polars/src/engine.rs +++ b/cudf-polars/src/engine.rs @@ -2,7 +2,7 @@ use polars_core::prelude::*; use polars_error::{PolarsResult, polars_bail}; -use polars_plan::plans::{AExpr, IR, IRAggExpr, IRPlan}; +use polars_plan::plans::{AExpr, IR, IRAggExpr, IRPlan, LiteralValue}; use polars_utils::arena::{Arena, Node}; use cudf::aggregation::AggregationKind; @@ -600,8 +600,40 @@ fn extract_agg_info( ) -> PolarsResult<(Node, AggregationKind)> { match expr_arena.get(node) { AExpr::Agg(agg) => { - let (input, kind) = map_ir_agg(agg)?; - Ok((input, kind)) + match agg { + IRAggExpr::Quantile { expr, quantile, method: _ } => { + // Extract the quantile value from the expression arena + // Polars 0.53 LiteralValue has Dyn/Scalar/Series/Range variants + let q_value = match expr_arena.get(*quantile) { + AExpr::Literal(LiteralValue::Dyn(dyn_val)) => { + use polars_plan::plans::DynLiteralValue; + match dyn_val { + DynLiteralValue::Float(q) => *q, + DynLiteralValue::Int(q) => *q as f64, + _ => polars_bail!(ComputeError: "GPU engine: Quantile requires a numeric literal"), + } + } + AExpr::Literal(LiteralValue::Scalar(s)) => { + use polars_core::prelude::AnyValue; + match s.value() { + AnyValue::Float64(q) => *q, + AnyValue::Float32(q) => *q as f64, + AnyValue::Int32(q) => *q as f64, + AnyValue::Int64(q) => *q as f64, + AnyValue::UInt32(q) => *q as f64, + AnyValue::UInt64(q) => *q as f64, + _ => polars_bail!(ComputeError: "GPU engine: Quantile scalar must be numeric, got {:?}", s.dtype()), + } + } + _ => polars_bail!(ComputeError: "GPU engine: Quantile requires a literal quantile value"), + }; + Ok((*expr, AggregationKind::Quantile { q: q_value })) + } + other => { + let (input, kind) = map_ir_agg(other)?; + Ok((input, kind)) + } + } } AExpr::Cast { expr, .. } => extract_agg_info(*expr, expr_arena), _ => polars_bail!(ComputeError: "GPU engine: expected aggregation expression in GroupBy"), @@ -633,9 +665,7 @@ fn map_ir_agg(agg: &IRAggExpr) -> PolarsResult<(Node, AggregationKind)> { IRAggExpr::Var(input, ddof) => { Ok((*input, AggregationKind::Variance { ddof: *ddof as i32 })) } - IRAggExpr::Quantile { .. } => { - polars_bail!(ComputeError: "GPU engine: Quantile aggregation not yet supported") - } + // Quantile is handled in extract_agg_info (needs expr_arena access) other => { polars_bail!(ComputeError: "GPU engine: unsupported aggregation type: {:?}", other) } diff --git a/cudf-polars/src/expr.rs b/cudf-polars/src/expr.rs index 8abc3e6..a5f5989 100644 --- a/cudf-polars/src/expr.rs +++ b/cudf-polars/src/expr.rs @@ -759,6 +759,41 @@ fn eval_boolean_function( broadcast_scalar(&scalar, height) } } + IRBooleanFunction::Not => { + let col = eval_expr(inputs[0].node(), expr_arena, df)?; + let tid = col.data_type().id(); + if tid == GpuTypeId::Bool8 { + gpu_result(col.unary_op(UnaryOp::Not)) + } else { + // Integer types use bitwise invert (Polars semantics) + gpu_result(col.unary_op(UnaryOp::BitInvert)) + } + } + IRBooleanFunction::IsIn { nulls_equal } => { + if inputs.len() != 2 { + polars_bail!(ComputeError: "GPU engine: IsIn expects 2 inputs"); + } + let col = eval_expr(inputs[0].node(), expr_arena, df)?; + let values = eval_expr(inputs[1].node(), expr_arena, df)?; + // haystack=values (the set), needles=col (each row to check) + let result = gpu_result(values.contains(&col))?; + if !nulls_equal { + // When nulls_equal=false (default), null in col should produce null in output + // cudf's contains returns false for null needles, but Polars wants null + // Fix: where col is null, set result to null + let col_valid = gpu_result(col.is_valid())?; + let null_col = null_column_for_type( + cudf::types::DataType::new(cudf::types::TypeId::Bool8), + col.len(), + )?; + // Where col is valid keep result, where col is null use null_col + gpu_result(result.copy_if_else(&null_col, &col_valid)) + } else { + // nulls_equal=true: null IS IN {null} = true + // cudf returns false for null, keep basic behavior for now + Ok(result) + } + } _ => polars_bail!(ComputeError: "GPU engine: unsupported boolean function {:?}", bf), } } From 0f9ee082e5c1ce2c93950ac72f4dfe35f83dc703 Mon Sep 17 00:00:00 2001 From: 0xDaizz <90135051+0xDaizz@users.noreply.github.com> Date: Sun, 5 Apr 2026 02:01:08 +0900 Subject: [PATCH 3/7] =?UTF-8?q?perf:=20Phase=203=20=E2=80=94=20HStack=20O(?= =?UTF-8?q?1)=20name=20lookup=20+=20GPU-native=20scalar=20broadcast?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit engine.rs: Replace Vec::position() linear scan with HashMap for column name lookup in HStack, eliminating O(n²) behavior on wide DataFrames. expr.rs: Replace host-allocated vec![v; height] + from_slice with cudf::filling::sequence GPU-native constant fill for Int32/Int64/ Float32/Float64 scalar broadcasts, avoiding host→device round-trip. Co-Authored-By: Claude Opus 4.6 (1M context) --- cudf-polars/src/engine.rs | 22 +++++++++++++++++++--- cudf-polars/src/expr.rs | 9 +++++---- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/cudf-polars/src/engine.rs b/cudf-polars/src/engine.rs index dcd2c31..d3a3dff 100644 --- a/cudf-polars/src/engine.rs +++ b/cudf-polars/src/engine.rs @@ -1,5 +1,7 @@ //! GPU execution engine: walks the IR tree and executes nodes on GPU. +use std::collections::HashMap; + use polars_core::prelude::*; use polars_error::{PolarsResult, polars_bail}; use polars_plan::plans::{AExpr, IR, IRAggExpr, IRPlan, LiteralValue}; @@ -98,24 +100,38 @@ pub fn execute_node( all_names.push(table.names()[i].clone()); } + // Build name→position index for O(1) lookup instead of O(n) linear scan + let mut name_index: HashMap = all_names + .iter() + .enumerate() + .map(|(i, n)| (n.clone(), i)) + .collect(); + for e in &exprs { let col = expr::eval_expr(e.node(), expr_arena, &table)?; let name = e.output_name().to_string(); - if let Some(pos) = all_names.iter().position(|n| n == &name) { + if let Some(&pos) = name_index.get(&name) { all_columns[pos] = Some(col); } else { + let new_pos = all_columns.len(); all_columns.push(Some(col)); + name_index.insert(name.clone(), new_pos); all_names.push(name); } } - // Reorder to match the output schema + // Reorder to match the output schema using HashMap for O(1) lookup let schema_names: Vec<&str> = schema.iter_names().map(|n| n.as_str()).collect(); + let name_pos: HashMap<&str, usize> = all_names + .iter() + .enumerate() + .map(|(i, n)| (n.as_str(), i)) + .collect(); let mut ordered_columns = Vec::with_capacity(schema_names.len()); let mut ordered_names = Vec::with_capacity(schema_names.len()); for &sn in &schema_names { - if let Some(pos) = all_names.iter().position(|n| n == sn) { + if let Some(&pos) = name_pos.get(sn) { let col = all_columns[pos].take().ok_or_else(|| { polars_err!(ColumnNotFound: "duplicate reference to column '{}' in HStack schema", sn) })?; diff --git a/cudf-polars/src/expr.rs b/cudf-polars/src/expr.rs index a5f5989..a7fde52 100644 --- a/cudf-polars/src/expr.rs +++ b/cudf-polars/src/expr.rs @@ -565,11 +565,12 @@ fn broadcast_scalar(scalar: &cudf::Scalar, height: usize) -> PolarsResult { let v: i32 = gpu_result(scalar.value())?; - gpu_result(GpuColumn::from_slice(&vec![v; height])) + // GPU-native: sequence with step=0 creates a constant column without host allocation + gpu_result(cudf::filling::sequence_i32(height, v, 0)) } GpuTypeId::Int64 => { let v: i64 = gpu_result(scalar.value())?; - gpu_result(GpuColumn::from_slice(&vec![v; height])) + gpu_result(cudf::filling::sequence_i64(height, v, 0)) } GpuTypeId::Uint8 => { let v: u8 = gpu_result(scalar.value())?; @@ -589,11 +590,11 @@ fn broadcast_scalar(scalar: &cudf::Scalar, height: usize) -> PolarsResult { let v: f32 = gpu_result(scalar.value())?; - gpu_result(GpuColumn::from_slice(&vec![v; height])) + gpu_result(cudf::filling::sequence_f32(height, v, 0.0)) } GpuTypeId::Float64 => { let v: f64 = gpu_result(scalar.value())?; - gpu_result(GpuColumn::from_slice(&vec![v; height])) + gpu_result(cudf::filling::sequence_f64(height, v, 0.0)) } _ => polars_bail!(ComputeError: "GPU engine: cannot broadcast scalar of type {:?}", tid), } From c033773d43d248d4c5ae7cade3adc282d19bd5c3 Mon Sep 17 00:00:00 2001 From: 0xDaizz <90135051+0xDaizz@users.noreply.github.com> Date: Sun, 5 Apr 2026 02:15:44 +0900 Subject: [PATCH 4/7] =?UTF-8?q?feat:=20Phase=204=20=E2=80=94=20temporal=20?= =?UTF-8?q?type=20support=20for=20cudf-polars?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add Date/Datetime/Duration type mapping in types.rs with explicit rejection of timezone-aware Datetime. Support temporal null columns (i32 for DurationDays/TimestampDays, i64 for all other temporal types) and scalar broadcast pass-through. Add temporal pass-through in arithmetic_output_type to let cudf handle native type promotion. Incorporates fixes from Codex rescue (5 blocking issues resolved). Co-Authored-By: Claude Opus 4.6 (1M context) --- cudf-polars/src/expr.rs | 43 ++++++++++++++++++++++++++++++++++++++++ cudf-polars/src/types.rs | 20 +++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/cudf-polars/src/expr.rs b/cudf-polars/src/expr.rs index a7fde52..449d8ec 100644 --- a/cudf-polars/src/expr.rs +++ b/cudf-polars/src/expr.rs @@ -17,12 +17,35 @@ use crate::error::gpu_result; use crate::gpu_frame::GpuDataFrame; use crate::types::{is_comparison, map_dtype, map_operator}; +/// Returns true if the type ID is a temporal type (timestamp or duration). +fn is_temporal(tid: GpuTypeId) -> bool { + matches!( + tid, + GpuTypeId::TimestampDays + | GpuTypeId::TimestampSeconds + | GpuTypeId::TimestampMilliseconds + | GpuTypeId::TimestampMicroseconds + | GpuTypeId::TimestampNanoseconds + | GpuTypeId::DurationDays + | GpuTypeId::DurationSeconds + | GpuTypeId::DurationMilliseconds + | GpuTypeId::DurationMicroseconds + | GpuTypeId::DurationNanoseconds + ) +} + /// Compute the output type for arithmetic operations (supertype promotion). fn arithmetic_output_type(left_type: &GpuDataType, right_type: &GpuDataType) -> GpuDataType { use GpuTypeId::*; let l = left_type.id(); let r = right_type.id(); + // Temporal types: let cudf handle the type promotion natively. + // Return the left type as a hint; cudf's binary_op will determine the actual output. + if is_temporal(l) || is_temporal(r) { + return left_type.clone(); + } + // Bool + Bool stays Bool (bitwise AND/OR/XOR on booleans) if l == Bool8 && r == Bool8 { return GpuDataType::new(Bool8); @@ -534,6 +557,22 @@ fn null_column_for_type(dtype: cudf::types::DataType, height: usize) -> PolarsRe let opts: Vec> = vec![None; height]; gpu_result(GpuColumn::from_optional_strings(&opts)) } + // Temporal types: Date/TimestampDays and DurationDays are i32, all others are i64 + GpuTypeId::TimestampDays | GpuTypeId::DurationDays => { + let opts: Vec> = vec![None; height]; + gpu_result(GpuColumn::from_optional_i32(&opts)) + } + GpuTypeId::TimestampSeconds + | GpuTypeId::TimestampMilliseconds + | GpuTypeId::TimestampMicroseconds + | GpuTypeId::TimestampNanoseconds + | GpuTypeId::DurationSeconds + | GpuTypeId::DurationMilliseconds + | GpuTypeId::DurationMicroseconds + | GpuTypeId::DurationNanoseconds => { + let opts: Vec> = vec![None; height]; + gpu_result(GpuColumn::from_optional_i64(&opts)) + } other => { polars_bail!(ComputeError: "GPU engine: cannot create null column of type {:?}", other) } @@ -596,6 +635,10 @@ fn broadcast_scalar(scalar: &cudf::Scalar, height: usize) -> PolarsResult polars_bail!(ComputeError: "GPU engine: cannot broadcast scalar of type {:?}", tid), } } diff --git a/cudf-polars/src/types.rs b/cudf-polars/src/types.rs index 250cac0..d11d38a 100644 --- a/cudf-polars/src/types.rs +++ b/cudf-polars/src/types.rs @@ -64,6 +64,26 @@ pub fn map_dtype(dtype: &DataType) -> PolarsResult { DataType::Float32 => GpuTypeId::Float32, DataType::Float64 => GpuTypeId::Float64, DataType::String => GpuTypeId::String, + DataType::Date => GpuTypeId::TimestampDays, + DataType::Datetime(tu, tz) => { + if tz.is_some() { + polars_bail!(ComputeError: "GPU engine: timezone-aware Datetime not yet supported"); + } + use polars_core::prelude::TimeUnit; + match tu { + TimeUnit::Milliseconds => GpuTypeId::TimestampMilliseconds, + TimeUnit::Microseconds => GpuTypeId::TimestampMicroseconds, + TimeUnit::Nanoseconds => GpuTypeId::TimestampNanoseconds, + } + } + DataType::Duration(tu) => { + use polars_core::prelude::TimeUnit; + match tu { + TimeUnit::Milliseconds => GpuTypeId::DurationMilliseconds, + TimeUnit::Microseconds => GpuTypeId::DurationMicroseconds, + TimeUnit::Nanoseconds => GpuTypeId::DurationNanoseconds, + } + } _ => polars_bail!(ComputeError: "GPU engine: unsupported dtype {:?}", dtype), }; Ok(GpuDataType::new(type_id)) From 852aa6a47100a12af802dba7f0d4929606c64a7f Mon Sep 17 00:00:00 2001 From: 0xDaizz <90135051+0xDaizz@users.noreply.github.com> Date: Sun, 5 Apr 2026 02:34:59 +0900 Subject: [PATCH 5/7] =?UTF-8?q?feat:=20Phase=205=20=E2=80=94=20window=20fu?= =?UTF-8?q?nctions=20(AExpr::Over)=20with=20GroupsToRows=20mapping?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement per-row broadcast for window aggregations via groupby→left_join→ sort_by_key pipeline. Left join output order is not guaranteed by libcudf, so sort by left_indices to restore original row alignment. - Add AExpr::Over handler with GroupsToRows dispatch - Explicitly reject order_by (not yet supported) - Promote extract_agg_info/map_ir_agg to pub(crate) for reuse Co-Authored-By: Claude Opus 4.6 (1M context) --- cudf-polars/src/engine.rs | 4 +- cudf-polars/src/expr.rs | 92 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 2 deletions(-) diff --git a/cudf-polars/src/engine.rs b/cudf-polars/src/engine.rs index d3a3dff..f5c96a0 100644 --- a/cudf-polars/src/engine.rs +++ b/cudf-polars/src/engine.rs @@ -610,7 +610,7 @@ pub fn execute_node( /// In polars-plan 0.53+, `Alias` is no longer an `AExpr` variant — it lives on the /// `ExprIR` wrapper (`OutputName::Alias`), so the caller strips it via `ExprIR::node()`. /// This function handles `Cast` wrappers that the optimizer may insert around `Agg`. -fn extract_agg_info( +pub(crate) fn extract_agg_info( node: Node, expr_arena: &Arena, ) -> PolarsResult<(Node, AggregationKind)> { @@ -657,7 +657,7 @@ fn extract_agg_info( } /// Map an IRAggExpr to its input node and cudf AggregationKind. -fn map_ir_agg(agg: &IRAggExpr) -> PolarsResult<(Node, AggregationKind)> { +pub(crate) fn map_ir_agg(agg: &IRAggExpr) -> PolarsResult<(Node, AggregationKind)> { match agg { IRAggExpr::Sum(input) => Ok((*input, AggregationKind::Sum)), IRAggExpr::Min { input, .. } => Ok((*input, AggregationKind::Min)), diff --git a/cudf-polars/src/expr.rs b/cudf-polars/src/expr.rs index 449d8ec..5064f14 100644 --- a/cudf-polars/src/expr.rs +++ b/cudf-polars/src/expr.rs @@ -180,10 +180,102 @@ pub fn eval_expr( eval_ir_function(&input, &function, expr_arena, df) } + AExpr::Over { + function, + partition_by, + order_by, + mapping, + } => { + use polars_plan::dsl::options::WindowMapping; + + if order_by.is_some() { + polars_bail!(ComputeError: "GPU engine: window functions with order_by not yet supported"); + } + + let function = *function; + let partition_by = partition_by.clone(); + let mapping = *mapping; + + match mapping { + WindowMapping::GroupsToRows => { + eval_window_groups_to_rows(function, &partition_by, expr_arena, df) + } + other => { + polars_bail!(ComputeError: "GPU engine: unsupported window mapping {:?}", other) + } + } + } + other => polars_bail!(ComputeError: "GPU engine: unsupported expression: {:?}", other), } } +/// Evaluate a window function with `GroupsToRows` mapping. +/// +/// Strategy: groupby partition keys → aggregate → left join back to original rows. +/// This broadcasts the per-group aggregate result to every row in that group. +fn eval_window_groups_to_rows( + function: Node, + partition_by: &[Node], + expr_arena: &Arena, + df: &GpuDataFrame, +) -> PolarsResult { + use cudf::Table as GpuTable; + + // 1. Evaluate partition key columns + let mut key_columns = Vec::with_capacity(partition_by.len()); + for &key_node in partition_by { + let col = eval_expr(key_node, expr_arena, df)?; + key_columns.push(col); + } + + // 2. Extract the aggregation info from the function node + let (input_node, agg_kind) = crate::engine::extract_agg_info(function, expr_arena)?; + + // 3. Evaluate the input column for the aggregation + let value_col = eval_expr(input_node, expr_arena, df)?; + + // 4. Perform groupby aggregation: keys → aggregate + let keys_table = gpu_result(GpuTable::new(key_columns.clone()))?; + let values_table = gpu_result(GpuTable::new(vec![value_col]))?; + + let gb = cudf::groupby::GroupBy::new(&keys_table) + .agg(0, agg_kind); + // Result: [key_col_0, key_col_1, ..., agg_result_col] + let agg_result = gpu_result(gb.execute(&values_table))?; + + // 5. Extract the aggregated keys and the agg result column from the result table + let n_keys = partition_by.len(); + let mut agg_key_cols = Vec::with_capacity(n_keys); + for i in 0..n_keys { + agg_key_cols.push(gpu_result(agg_result.column(i))?); + } + let agg_value_col = gpu_result(agg_result.column(n_keys))?; + + // 6. Left join: original keys LEFT JOIN aggregated keys → get per-row agg values + let agg_keys_table = gpu_result(GpuTable::new(agg_key_cols))?; + let join_result = gpu_result(keys_table.left_join(&agg_keys_table))?; + + // 7. Gather the agg result column using right_indices to broadcast to original rows + let agg_as_table = gpu_result(GpuTable::new(vec![agg_value_col]))?; + let gathered = gpu_result(agg_as_table.gather(&join_result.right_indices))?; + let agg_col = gpu_result(gathered.column(0))?; + + // 8. Restore original row order: libcudf left_join does NOT guarantee output is + // ordered by left_indices. Sort by left_indices (ascending) so the agg column + // aligns with the original DataFrame row order. + let value_table = gpu_result(GpuTable::new(vec![agg_col]))?; + let key_table = gpu_result(GpuTable::new(vec![join_result.left_indices]))?; + let sorted = gpu_result(value_table.sort_by_key( + &key_table, + &[cudf::sorting::SortOrder::Ascending], + &[cudf::sorting::NullOrder::After], + ))?; + let result_col = gpu_result(sorted.column(0))?; + + Ok(result_col) +} + /// Convert a Polars `LiteralValue` to a GPU column of the given height. fn literal_to_gpu_column(lit: &LiteralValue, height: usize) -> PolarsResult { use polars_core::prelude::*; From 00fe17e5d2b51dc1279f5f2f0be14c0505821f94 Mon Sep 17 00:00:00 2001 From: 0xDaizz <90135051+0xDaizz@users.noreply.github.com> Date: Sun, 5 Apr 2026 02:55:29 +0900 Subject: [PATCH 6/7] feat: make polars-lazy an optional dependency behind "lazy" feature This breaks the circular dependency when polars-lazy depends on cudf-polars for GPU engine dispatch. Co-Authored-By: Claude Opus 4.6 (1M context) --- cudf-polars/Cargo.toml | 3 ++- cudf-polars/src/engine.rs | 1 + cudf-polars/src/lib.rs | 1 + 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/cudf-polars/Cargo.toml b/cudf-polars/Cargo.toml index 9584a06..cbfbdfc 100644 --- a/cudf-polars/Cargo.toml +++ b/cudf-polars/Cargo.toml @@ -17,11 +17,12 @@ polars-error = { version = "=0.53.0" } polars-arrow = { version = "=0.53.0" } polars-plan = { version = "=0.53.0", default-features = false, features = ["parquet", "semi_anti_join", "abs", "is_in"] } polars-ops = { version = "=0.53.0", default-features = false, features = ["semi_anti_join"] } -polars-lazy = { version = "=0.53.0", default-features = false, features = ["abs", "parquet", "semi_anti_join"] } +polars-lazy = { version = "=0.53.0", default-features = false, features = ["abs", "parquet", "semi_anti_join"], optional = true } arrow = { version = "54", default-features = false, features = ["ffi"] } [features] default = [] +lazy = ["polars-lazy"] gpu-tests = [] [[example]] diff --git a/cudf-polars/src/engine.rs b/cudf-polars/src/engine.rs index f5c96a0..5926026 100644 --- a/cudf-polars/src/engine.rs +++ b/cudf-polars/src/engine.rs @@ -739,6 +739,7 @@ pub fn execute_plan(plan: IRPlan) -> PolarsResult { result.to_polars() } +#[cfg(feature = "lazy")] /// Execute a Polars LazyFrame on the GPU using polars' `_collect_post_opt` callback. /// /// This integrates with polars' physical-plan pipeline: after the optimizer runs, diff --git a/cudf-polars/src/lib.rs b/cudf-polars/src/lib.rs index 7b04361..9970c5e 100644 --- a/cudf-polars/src/lib.rs +++ b/cudf-polars/src/lib.rs @@ -7,6 +7,7 @@ pub mod expr; pub mod gpu_frame; pub mod types; +#[cfg(feature = "lazy")] pub use engine::collect_gpu; pub use engine::execute_plan; pub use gpu_frame::GpuDataFrame; From ba1c7b071c5a4f9b3790aabf907d6c89df9119a2 Mon Sep 17 00:00:00 2001 From: 0xDaizz <90135051+0xDaizz@users.noreply.github.com> Date: Sun, 5 Apr 2026 04:16:06 +0900 Subject: [PATCH 7/7] =?UTF-8?q?feat:=20v0.3.0=20=E2=80=94=20polars=20Engin?= =?UTF-8?q?e::Gpu=20integration,=20window=20functions,=20temporal=20types,?= =?UTF-8?q?=20performance=20optimizations?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.toml | 2 +- README.md | 16 +++++++++-- cudf-polars/src/engine.rs | 43 ++++++++++++++++------------ cudf-polars/src/expr.rs | 54 ++++++++++++++++++++++++++---------- cudf-polars/src/gpu_frame.rs | 6 ++++ 5 files changed, 84 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 330f45a..70cf2f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["cudf-sys", "cudf-cxx", "cudf", "cudf-polars"] resolver = "2" [workspace.package] -version = "0.2.1" +version = "0.3.0" edition = "2024" rust-version = "1.85" license = "Apache-2.0 OR MIT" diff --git a/README.md b/README.md index 5061255..711e3a9 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,16 @@ Unofficial Rust bindings for NVIDIA's [libcudf](https://github.com/rapidsai/cudf > **This project is unofficial and not affiliated with NVIDIA or RAPIDS.** +## What's New in v0.3.0 + +- **Polars integration via `_collect_post_opt`**: `collect_gpu()` now uses polars' native post-optimization callback. Zero changes to polars fork required for standalone use. +- **`Engine::Gpu` dispatch**: polars-fork integration enables `lf.collect_with_engine(Engine::Gpu)` -- cudf-polars as a native GPU backend. +- **New expressions**: `Not` (Bool logical / Int bitwise), `IsIn` (with null propagation), GroupBy `Quantile`. +- **Window functions**: `AExpr::Over` with `GroupsToRows` mapping -- groupby then scatter broadcast (O(n)). +- **Temporal types**: Date, Datetime (naive), Duration mapped to cudf timestamp/duration types. +- **Performance**: HStack/HConcat zero-copy via `into_parts()`, GPU-native scalar broadcast (`sequence_*`), HashMap name lookup O(1). +- **Safety**: `try_data_type()` fallible API, Arrow FFI debug assertions, First/Last empty guard. + ## Features - **Near-zero unsafe public API** -- all `unsafe` is confined to the internal FFI layer (sole exception: `DLPackTensor::from_raw_ptr`) @@ -61,7 +71,7 @@ export CUDA_PATH=/usr/local/cuda ```toml [dependencies] -cudf = "0.2" +cudf = "0.3" ``` ### 4. Build @@ -211,8 +221,8 @@ Arrow C Data Interface, Arrow IPC, DLPack tensor exchange, pack/unpack/contiguou - **GroupBy `maintain_order`**: Approximated by key-column sort, not true input-order preservation. - **Std/Var ddof**: Default standalone reduction uses ddof=1. Full ddof support via `reduce_var_with_ddof` / `reduce_std_with_ddof`. - **Polars version**: cudf-polars is compatible with Polars 0.53.0. -- **Unsupported types**: Date, Datetime, Duration, Categorical, List, Struct are not yet mapped (returns explicit error). -- **Unsupported expressions**: Window functions (`.over()`), `IsIn`, expression-level Sort/Filter/Slice. +- **Unsupported types**: Categorical, List, Struct are not yet mapped (returns explicit error). +- **Unsupported expressions**: expression-level Sort/Filter/Slice. - **Unsupported IR nodes**: `Cache`, `MapFunction`, `ExtContext`, `Sink`. - **Multi-file Parquet**: Only reads the first file in multi-file scans. diff --git a/cudf-polars/src/engine.rs b/cudf-polars/src/engine.rs index 5926026..bb67e64 100644 --- a/cudf-polars/src/engine.rs +++ b/cudf-polars/src/engine.rs @@ -18,7 +18,7 @@ use crate::expr; use crate::gpu_frame::GpuDataFrame; /// Execute an IR node recursively, producing a GPU-resident data frame. -pub fn execute_node( +pub(crate) fn execute_node( node: Node, lp_arena: &Arena, expr_arena: &Arena, @@ -89,16 +89,26 @@ pub fn execute_node( let table = execute_node(input_node, lp_arena, expr_arena)?; // HStack adds new columns to the existing frame. - // Use Option to allow zero-copy reordering without dummy GPU allocations. - let existing_width = table.width(); + // Evaluate new expressions first (before consuming table for zero-copy decomposition) + let mut new_cols = Vec::with_capacity(exprs.len()); + let mut new_names = Vec::with_capacity(exprs.len()); + for e in &exprs { + let col = expr::eval_expr(e.node(), expr_arena, &table)?; + new_cols.push(col); + new_names.push(e.output_name().to_string()); + } + + // Decompose table into columns (zero-copy) instead of deep-copying each + let (existing_cols, existing_names) = table.into_parts()?; + let existing_width = existing_cols.len(); let mut all_columns: Vec> = - Vec::with_capacity(existing_width + exprs.len()); - let mut all_names = Vec::with_capacity(existing_width + exprs.len()); + Vec::with_capacity(existing_width + new_cols.len()); + let mut all_names = Vec::with_capacity(existing_width + new_names.len()); - for i in 0..existing_width { - all_columns.push(Some(table.column(i)?)); - all_names.push(table.names()[i].clone()); + for col in existing_cols { + all_columns.push(Some(col)); } + all_names.extend(existing_names); // Build name→position index for O(1) lookup instead of O(n) linear scan let mut name_index: HashMap = all_names @@ -107,10 +117,8 @@ pub fn execute_node( .map(|(i, n)| (n.clone(), i)) .collect(); - for e in &exprs { - let col = expr::eval_expr(e.node(), expr_arena, &table)?; - let name = e.output_name().to_string(); - + // Merge new columns (replace or append) + for (col, name) in new_cols.into_iter().zip(new_names) { if let Some(&pos) = name_index.get(&name) { all_columns[pos] = Some(col); } else { @@ -583,14 +591,13 @@ pub fn execute_node( polars_bail!(ComputeError: "GPU HConcat requires all inputs to have the same height, got {:?}", heights); } - // Collect all columns from all tables + // Decompose all tables into columns (zero-copy) instead of deep-copying each let mut all_columns = Vec::new(); let mut all_names = Vec::new(); - for t in &tables { - for i in 0..t.width() { - all_columns.push(t.column(i)?); - all_names.push(t.names()[i].clone()); - } + for t in tables { + let (cols, names) = t.into_parts()?; + all_columns.extend(cols); + all_names.extend(names); } let combined = GpuDataFrame::from_columns(all_columns, all_names)?; diff --git a/cudf-polars/src/expr.rs b/cudf-polars/src/expr.rs index 5064f14..bc0c36a 100644 --- a/cudf-polars/src/expr.rs +++ b/cudf-polars/src/expr.rs @@ -261,17 +261,16 @@ fn eval_window_groups_to_rows( let gathered = gpu_result(agg_as_table.gather(&join_result.right_indices))?; let agg_col = gpu_result(gathered.column(0))?; - // 8. Restore original row order: libcudf left_join does NOT guarantee output is - // ordered by left_indices. Sort by left_indices (ascending) so the agg column - // aligns with the original DataFrame row order. - let value_table = gpu_result(GpuTable::new(vec![agg_col]))?; - let key_table = gpu_result(GpuTable::new(vec![join_result.left_indices]))?; - let sorted = gpu_result(value_table.sort_by_key( - &key_table, - &[cudf::sorting::SortOrder::Ascending], - &[cudf::sorting::NullOrder::After], - ))?; - let result_col = gpu_result(sorted.column(0))?; + // 8. Restore original row order using scatter (O(n)) instead of sort (O(n log n)). + // left_indices[i] = the original row position for join result row i. + // We scatter the gathered agg values into their correct positions. + let original_height = df.height(); + let agg_dtype = agg_col.data_type(); + let scatter_source = gpu_result(GpuTable::new(vec![agg_col]))?; + let target_col = null_column_for_type(agg_dtype, original_height)?; + let target_table = gpu_result(GpuTable::new(vec![target_col]))?; + let scattered = gpu_result(scatter_source.scatter(&join_result.left_indices, &target_table))?; + let result_col = gpu_result(scattered.column(0))?; Ok(result_col) } @@ -552,6 +551,8 @@ fn eval_agg_expr( let sliced = gpu_result(single_row_table.slice(0, 1))?; let repeated = gpu_result(sliced.repeat(height))?; let cols = gpu_result(repeated.into_columns())?; + // SAFETY: `repeated` is a 1-column table (from single-column `sliced`), + // so `into_columns()` returns exactly 1 element. `.next()` is always Some. Ok(cols.into_iter().next().unwrap()) } IRAggExpr::Last(input) => { @@ -564,6 +565,8 @@ fn eval_agg_expr( let sliced = gpu_result(single_row_table.slice(last_idx, last_idx + 1))?; let repeated = gpu_result(sliced.repeat(height))?; let cols = gpu_result(repeated.into_columns())?; + // SAFETY: `repeated` is a 1-column table (from single-column `sliced`), + // so `into_columns()` returns exactly 1 element. `.next()` is always Some. Ok(cols.into_iter().next().unwrap()) } other => { @@ -911,7 +914,16 @@ fn eval_boolean_function( } let col = eval_expr(inputs[0].node(), expr_arena, df)?; let values = eval_expr(inputs[1].node(), expr_arena, df)?; - // haystack=values (the set), needles=col (each row to check) + // cudf::Column::contains(haystack=self, needles=arg): for each element in `col`, + // checks if it exists in `values`. This matches Polars' `col.is_in(values)` semantics. + + // Empty values set → nothing can be "in", return all false + if values.len() == 0 { + return gpu_result(GpuColumn::from_optional_bool( + &vec![Some(false); col.len()], + )); + } + let result = gpu_result(values.contains(&col))?; if !nulls_equal { // When nulls_equal=false (default), null in col should produce null in output @@ -925,9 +937,21 @@ fn eval_boolean_function( // Where col is valid keep result, where col is null use null_col gpu_result(result.copy_if_else(&null_col, &col_valid)) } else { - // nulls_equal=true: null IS IN {null} = true - // cudf returns false for null, keep basic behavior for now - Ok(result) + // nulls_equal=true: null IS IN {null} should be true + // cudf's contains returns false for null needles, fix: + // where col is null AND values contains null → true + let values_has_null = values.null_count() > 0; + if values_has_null { + let col_is_null = gpu_result(col.is_null())?; + // Build a column of all true, same length as col + let true_col = gpu_result(GpuColumn::from_optional_bool( + &vec![Some(true); col.len()], + ))?; + // Where col is null, override result to true + gpu_result(true_col.copy_if_else(&result, &col_is_null)) + } else { + Ok(result) + } } } _ => polars_bail!(ComputeError: "GPU engine: unsupported boolean function {:?}", bf), diff --git a/cudf-polars/src/gpu_frame.rs b/cudf-polars/src/gpu_frame.rs index 57b84cb..3c2a135 100644 --- a/cudf-polars/src/gpu_frame.rs +++ b/cudf-polars/src/gpu_frame.rs @@ -239,6 +239,12 @@ impl GpuDataFrame { &self.table } + /// Decompose into (columns, names), consuming self. Zero-copy. + pub fn into_parts(self) -> PolarsResult<(Vec, Vec)> { + let cols = gpu_result(self.table.into_columns())?; + Ok((cols, self.names)) + } + /// Convert back to a Polars DataFrame. pub fn to_polars(self) -> PolarsResult { convert::gpu_to_dataframe(self.table, &self.names)