@@ -31,10 +31,12 @@ row-at-a-time execution while keeping the implementation in Java/Scala.
3131
3232The framework consists of:
3333
34- - ** ` CometUDF ` ** — a trait your UDF class must implement, declaring its name, return type, optional input types,
35- and the vectorized ` evaluate ` method
36- - ** ` CometUdfRegistry ` ** — a registry that introspects your ` CometUDF ` class to record metadata for the serde layer
37- - ** ` CometUdfBridge ` ** — the JNI bridge that native execution uses to invoke your UDF (no user interaction needed)
34+ - ** ` CometUDF ` ** : a trait your UDF class must implement, declaring its name, return type, optional input
35+ types, and the vectorized ` evaluate ` method.
36+ - ** ` CometUdfRegistry ` ** : a registry that introspects your ` CometUDF ` class to record metadata for the serde
37+ layer.
38+ - ** ` CometUdfBridge ` ** : the JNI bridge that native execution uses to invoke your UDF (no user interaction
39+ needed).
3840
3941## Writing a CometUDF
4042
@@ -43,15 +45,15 @@ Implement the `org.apache.comet.udf.CometUDF` trait. Comet relocates Apache Arro
4345so your implementation must import Arrow types from the shaded package. This is the
4446same package that the published ` comet-spark ` JAR exposes on your classpath.
4547
48+ ### Java
49+
4650``` java
4751import org.apache.comet.shaded.arrow.vector.IntVector ;
4852import org.apache.comet.shaded.arrow.vector.BitVector ;
4953import org.apache.comet.shaded.arrow.vector.ValueVector ;
5054import org.apache.comet.udf.CometUDF ;
5155import org.apache.spark.sql.types.DataType ;
5256import org.apache.spark.sql.types.DataTypes ;
53- import scala.collection.JavaConverters ;
54- import java.util.Arrays ;
5557
5658public class IsPositiveUdf implements CometUDF {
5759
@@ -64,12 +66,6 @@ public class IsPositiveUdf implements CometUDF {
6466 @Override
6567 public boolean nullable () { return true ; }
6668
67- @Override
68- public scala.collection.Seq<DataType > inputTypes () {
69- return JavaConverters . asScalaBuffer(
70- Arrays . < DataType > asList(DataTypes . IntegerType )). toSeq();
71- }
72-
7369 @Override
7470 public ValueVector evaluate (ValueVector [] inputs ) {
7571 IntVector input = (IntVector ) inputs[0 ];
@@ -92,18 +88,54 @@ public class IsPositiveUdf implements CometUDF {
9288}
9389```
9490
91+ ### Scala
92+
93+ ``` scala
94+ import org .apache .comet .shaded .arrow .vector .{BitVector , IntVector , ValueVector }
95+ import org .apache .comet .CometArrowAllocator
96+ import org .apache .comet .udf .CometUDF
97+ import org .apache .spark .sql .types .{BooleanType , DataType , IntegerType }
98+
99+ class IsPositiveUdf extends CometUDF {
100+ override def name : String = " is_positive"
101+ override def returnType : DataType = BooleanType
102+ override def nullable : Boolean = true
103+
104+ // Optional: declare only if you plan to use registerColumnarOnly.
105+ override def inputTypes : Seq [DataType ] = Seq (IntegerType )
106+
107+ override def evaluate (inputs : Array [ValueVector ]): ValueVector = {
108+ val input = inputs(0 ).asInstanceOf [IntVector ]
109+ val rowCount = input.getValueCount
110+ val result = new BitVector (" result" , CometArrowAllocator )
111+ result.allocateNew(rowCount)
112+ var i = 0
113+ while (i < rowCount) {
114+ if (input.isNull(i)) result.setNull(i)
115+ else result.set(i, if (input.get(i) > 0 ) 1 else 0 )
116+ i += 1
117+ }
118+ result.setValueCount(rowCount)
119+ result
120+ }
121+ }
122+ ```
123+
95124Key requirements:
96125
97- - The class must have a ** public no-arg constructor**
98- - Arrow types must be imported from ` org.apache.comet.shaded.arrow.* ` (the relocated package)
99- - Input vectors arrive at the row count of the current batch
100- - Scalar (literal) arguments arrive as length-1 vectors — read at index 0
101- - The returned vector's length ** must match** the longest input vector
102- - Instances are cached per executor thread, so implementations should be ** stateless**
103- - ` inputTypes ` is required only for columnar-only registration (see below)
126+ - The class must have a ** public no-arg constructor** .
127+ - Arrow types must be imported from ` org.apache.comet.shaded.arrow.* ` (the relocated package).
128+ - Input vectors arrive at the row count of the current batch.
129+ - Scalar (literal) arguments arrive as length-1 vectors: read at index 0.
130+ - The returned vector's length ** must match** the longest input vector.
131+ - Instances are cached per executor thread, so implementations should be ** stateless** .
132+ - ` inputTypes ` is only required for columnar-only registration (see Option 3 below).
104133
105134## Registering a CometUDF
106135
136+ There are three ways to register a ` CometUDF ` with Comet, depending on whether you also want a
137+ row-based Spark fallback.
138+
107139### Option 1: Comet UDF only (existing Spark UDF)
108140
109141If you already have a Spark UDF registered, just tell Comet about the accelerated implementation:
@@ -126,10 +158,13 @@ import org.apache.comet.udf.CometUdfRegistry
126158CometUdfRegistry .register(spark, classOf [IsPositiveUdf ], (x : Int ) => x > 0 )
127159```
128160
161+ Convenience overloads exist for arities 1, 2, and 3. For higher arities, use Option 1 and call
162+ ` spark.udf.register ` separately.
163+
129164### Option 3: Columnar-only (no row-based equivalent)
130165
131166If you do not want to write a row-based fallback, Comet can synthesize a stub Spark UDF that
132- throws ` UnsupportedOperationException ` if invoked row-at-a-time. The CometUDF must declare
167+ throws ` UnsupportedOperationException ` if invoked row-at-a-time. The ` CometUDF ` must declare
133168` inputTypes ` so the stub has the correct arity.
134169
135170``` scala
@@ -140,7 +175,7 @@ CometUdfRegistry.registerColumnarOnly(spark, classOf[IsPositiveUdf])
140175
141176When Comet is enabled and the query is supported, the vectorized implementation runs natively.
142177If Comet falls back (e.g. an unsupported expression elsewhere in the plan), the stub is invoked
143- and the query fails with a clear error rather than silently slow row-at-a-time execution.
178+ and the query fails with a clear error rather than silently degrading to row-at-a-time execution.
144179
145180## How It Works
146181
@@ -158,16 +193,16 @@ and the query fails with a clear error rather than silently slow row-at-a-time e
158193
159194## Packaging and Deployment
160195
161- 1 . Package your ` CometUDF ` implementation in a JAR
162- 2 . Include it on the Spark classpath via ` --jars ` or ` spark.jars `
163- 3 . Register the UDF as shown above (in your application code or via a Spark session extension)
196+ 1 . Package your ` CometUDF ` implementation in a JAR.
197+ 2 . Include it on the Spark classpath via ` --jars ` or ` spark.jars ` .
198+ 3 . Register the UDF as shown above (in your application code or via a Spark session extension).
164199
165- The CometUDF class is resolved using the executor's context classloader, so user-supplied JARs added via
200+ The ` CometUDF ` class is resolved using the executor's context classloader, so user-supplied JARs added via
166201` spark.jars ` or ` --jars ` are automatically visible.
167202
168203## Limitations
169204
170- - Only scalar UDFs are supported (not aggregate or table UDFs)
171- - The UDF must be registered by name — anonymous lambdas without a name cannot be intercepted
172- - All input and output types must be representable as Arrow vectors
173- - Columnar-only registration currently supports arities 1 through 5
205+ - Only scalar UDFs are supported (not aggregate or table UDFs).
206+ - The UDF must be registered by name: anonymous lambdas without a name cannot be intercepted.
207+ - All input and output types must be representable as Arrow vectors.
208+ - ` registerColumnarOnly ` currently supports arities 1 through 5.
0 commit comments