Skip to content

Commit 760cd94

Browse files
committed
perf: send scalar UDF arguments as length-1 vectors
Literal-folded children no longer get expanded to batch-row count before crossing JNI; ColumnarValue::Scalar is materialized at length 1, avoiding an O(rows) copy of values that never vary across the batch. Document the contract on CometUDF: scalar inputs arrive as length-1 vectors, vector inputs at the batch row count, and the result must match the longest input.
1 parent 2a43867 commit 760cd94

2 files changed

Lines changed: 15 additions & 5 deletions

File tree

common/src/main/scala/org/apache/comet/udf/CometUDF.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,14 @@ import org.apache.arrow.vector.ValueVector
2323

2424
/**
2525
* Scalar UDF invoked from native execution via JNI. Receives Arrow vectors as input and returns
26-
* an Arrow vector. The returned vector must have the same length as the input vectors.
26+
* an Arrow vector.
2727
*
28-
* Implementations must have a public no-arg constructor.
28+
* - Vector arguments arrive at the row count of the current batch.
29+
* - Scalar (literal-folded) arguments arrive as length-1 vectors and must be read at index 0.
30+
* - The returned vector's length must match the longest input.
31+
*
32+
* Implementations must have a public no-arg constructor and should be stateless: instances are
33+
* cached per executor thread for the lifetime of the JVM.
2934
*/
3035
trait CometUDF {
3136
def evaluate(inputs: Array[ValueVector]): ValueVector

native/spark-expr/src/jvm_udf/mod.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,17 @@ impl PhysicalExpr for JvmScalarUdfExpr {
105105
}
106106

107107
fn evaluate(&self, batch: &RecordBatch) -> DFResult<ColumnarValue> {
108-
// Step 1: evaluate child expressions to get Arrow arrays.
109-
let n = batch.num_rows();
108+
// Step 1: evaluate child expressions to get Arrow arrays. Scalar children
109+
// (e.g. literal patterns) are sent as length-1 vectors rather than expanded
110+
// to batch-row count, so the JVM bridge does not pay an O(rows) copy for
111+
// values that never vary across the batch.
110112
let arrays: Vec<ArrayRef> = self
111113
.args
112114
.iter()
113-
.map(|e| e.evaluate(batch).and_then(|cv| cv.into_array(n)))
115+
.map(|e| match e.evaluate(batch)? {
116+
ColumnarValue::Array(a) => Ok(a),
117+
ColumnarValue::Scalar(s) => s.to_array_of_size(1),
118+
})
114119
.collect::<DFResult<_>>()?;
115120

116121
// Step 2: allocate FFI structs on the Rust heap and collect their raw pointers.

0 commit comments

Comments
 (0)