@@ -35,28 +35,24 @@ import org.apache.comet.shims.CometExprTraitShim
3535 * Arrow input reads, expression evaluation, and Arrow output writes into one Janino-compiled
3636 * method per (expression, schema) pair.
3737 *
38- * The kernel is generic over Catalyst expressions. It does not know or assume that the bound tree
39- * came from a `ScalaUDF`; any bound `Expression` whose input and output types are in the
40- * supported surface compiles. Today the only consumer is the JVM UDF dispatcher in
41- * [[org.apache.comet.udf.codegen.CometScalaUDFCodegen ]], but a future consumer (e.g. Spark
38+ * The kernel is generic over Catalyst expressions; it does not know or assume that the bound tree
39+ * came from a `ScalaUDF`. Today's only consumer is
40+ * [[org.apache.comet.udf.codegen.CometScalaUDFCodegen ]], but a future consumer (Spark
4241 * `WholeStageCodegenExec` integration, a non-UDF batch evaluator) can drive this class directly.
4342 *
44- * Constraints today:
45- * - Single output vector per kernel; whole projections would need a multi-output extension.
46- * - Per-row scalar evaluation; aggregation, window, and generator expressions are out of scope
47- * and rejected by [[canHandle ]].
43+ * Constraints: single output vector per kernel (whole projections need a multi-output extension);
44+ * per-row scalar evaluation only (aggregation, window, generator rejected by [[canHandle ]]).
4845 *
4946 * Input- and output-side emission live in [[CometBatchKernelCodegenInput ]] and
50- * [[CometBatchKernelCodegenOutput ]]. This file is the orchestrator: the [[ArrowColumnSpec ]]
51- * vocabulary, [[canHandle ]] / [[allocateOutput ]] / [[compile ]] / [[generateSource ]] entry points,
52- * and the cross-cutting kernel-shape decisions (null-intolerant short-circuit, CSE variant).
47+ * [[CometBatchKernelCodegenOutput ]]. This file owns the [[ArrowColumnSpec ]] vocabulary, the
48+ * [[canHandle ]] / [[allocateOutput ]] / [[compile ]] / [[generateSource ]] entry points, and
49+ * cross-cutting kernel-shape decisions (null-intolerant short-circuit, CSE variant).
5350 *
5451 * The generated kernel '''is''' the `InternalRow` that Spark's `BoundReference.genCode` reads
55- * from. `ctx.INPUT_ROW = "row"` plus `InternalRow row = this;` inside `process` routes every
52+ * from: `ctx.INPUT_ROW = "row"` plus `InternalRow row = this;` inside `process` routes
5653 * `row.getUTF8String(ord)` to the kernel's own typed getter (final method, constant ordinal; JIT
57- * devirtualizes and folds the switch). `row` rather than `this` because Spark's
58- * `splitExpressions` uses INPUT_ROW as a helper-method parameter name and `this` is a reserved
59- * Java keyword.
54+ * devirtualizes and folds). `row` rather than `this` because Spark's `splitExpressions` passes
55+ * INPUT_ROW as a helper-method parameter name and `this` is a reserved Java keyword.
6056 */
6157object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
6258
@@ -103,22 +99,14 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
10399 }
104100
105101 /**
106- * Plan-time predicate: can the codegen dispatcher handle this bound expression end to end? If
107- * it returns `None`, the serde is free to emit the codegen proto. If it returns `Some(reason)`,
108- * the serde must fall back (usually via `withInfo(...) + None`) so Spark runs the expression
109- * rather than crashing in the Janino compile at execute time.
102+ * Plan-time predicate: can the codegen dispatcher handle this bound expression end to end?
103+ * `None` greenlights the serde to emit the codegen proto; `Some(reason)` forces a Spark
104+ * fallback (typically `withInfo(...) + None`) rather than crashing the Janino compile at
105+ * execute time.
110106 *
111- * Checks:
112- * - every `BoundReference`'s data type is in [[isSupportedDataType ]] (i.e. the kernel has a
113- * typed getter for it)
114- * - the overall `expr.dataType` is in [[isSupportedDataType ]] (i.e. `allocateOutput` and
115- * `emitWrite` know how to materialize it)
116- * - the expression is scalar (no `AggregateFunction`, no generators). These never reach a
117- * scalar serde, but we belt-and-suspenders anyway.
118- *
119- * Intermediate node types are '''not''' checked. Spark's `doGenCode` materializes intermediates
120- * in local variables; only the leaves (which read from the row) and the root (which writes to
121- * the output vector) touch Arrow.
107+ * Checks every `BoundReference`'s data type and the root `expr.dataType` against
108+ * [[isSupportedDataType ]], and rejects aggregates / generators. Intermediate nodes are not
109+ * checked: only leaves (row reads) and the root (output write) touch Arrow.
122110 */
123111 def canHandle (boundExpr : Expression ): Option [String ] = {
124112 if (! isSupportedDataType(boundExpr.dataType)) {
@@ -133,31 +121,18 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
133121 // `Unevaluable` but never touched by codegen (e.g. Spark 4.0's `ResolvedCollation`, which
134122 // lives in `Collate.collation` as a type marker; `Collate.genCode` delegates to its child).
135123 //
136- // Nondeterministic and stateful expressions are accepted: the dispatcher allocates one
137- // kernel instance per partition (per `CometScalaUDFCodegen.ensureKernel`) and calls
138- // `init(partitionIndex)` once on partition entry, so per-row state on `Rand`,
139- // `MonotonicallyIncreasingID`, etc. advances correctly across batches in the same
140- // partition and resets across partitions.
141- //
142- // `ExecSubqueryExpression` (e.g. `ScalarSubquery`, `InSubqueryExec`) is also accepted, and
143- // works correctly via a four-link invariant:
144- // 1. The surrounding Comet operator inherits `SparkPlan.waitForSubqueries`, which calls
145- // `updateResult()` on every `ExecSubqueryExpression` in its `expressions` before the
146- // operator's compute path ever reaches the JVM UDF bridge.
147- // 2. `ScalarSubquery.result` (and equivalents on other subquery expressions) is a plain
148- // mutable field on the case class. `@volatile` affects cross-thread visibility but
149- // not serializability: Java/Kryo serializers include it.
150- // 3. `SparkEnv.closureSerializer` captures the populated `result` value in the bytes
151- // that travel through `CometScalaUDFCodegen`'s arg-0 transport.
152- // 4. The dispatcher's cache key is those exact bytes (see
153- // `CometScalaUDFCodegen.CacheKey`). Different `result` values produce different
154- // bytes, hence different cache entries, hence a fresh compile per distinct subquery
155- // value. No cross-query staleness.
124+ // Nondeterministic / stateful expressions are accepted: per-partition kernel allocation
125+ // (`CometScalaUDFCodegen.ensureKernel`) plus a single `init(partitionIndex)` call at
126+ // partition entry give `Rand` / `MonotonicallyIncreasingID` / etc. correct state across
127+ // batches and a clean reset across partitions.
156128 //
157- // If any of those four links breaks (a different cache-key derivation that drops `result`;
158- // a Comet operator that bypasses `waitForSubqueries`; a transport that strips `@volatile`
159- // fields), subquery correctness regresses. Keep this invariant intact when refactoring the
160- // cache-key or transport layers.
129+ // `ExecSubqueryExpression` (`ScalarSubquery`, `InSubqueryExec`) is accepted via a chain:
130+ // the surrounding Comet operator's inherited `SparkPlan.waitForSubqueries` populates the
131+ // subquery's mutable `result` field before evaluation; the closure serializer captures that
132+ // populated value into the arg-0 bytes; the dispatcher keys its compile cache on those
133+ // exact bytes, so distinct subquery results produce distinct cache entries with no
134+ // cross-query staleness. Refactors to the cache-key derivation, the transport, or any
135+ // Comet operator that bypasses `waitForSubqueries` would break this; preserve it.
161136 boundExpr.find {
162137 case _ : org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction => true
163138 case _ : org.apache.spark.sql.catalyst.expressions.Generator => true
@@ -348,23 +323,15 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
348323 }
349324
350325 /**
351- * Per-row body for the default path.
352- *
353- * For expressions that implement the `NullIntolerant` marker trait (null in any input -> null
354- * output), emits a short-circuit that skips expression evaluation entirely when any input
355- * column is null in the current row. This saves the full `ev.code` cost for null rows, not just
356- * the output setNull call. Does not change behavior, only performance.
357- *
358- * For other expressions, the standard shape applies: evaluate the expression, then check
359- * `ev.isNull` to decide between `setNull` and a write. Null semantics are handled internally by
360- * Spark's generated `ev.code`.
326+ * Per-row body for the default path. For `NullIntolerant` expressions (null in any input ->
327+ * null output), prepends a short-circuit that skips expression evaluation entirely when any
328+ * input column is null this row, saving the full `ev.code` cost. Otherwise the standard shape:
329+ * run `ev.code`, then `setNull` or write based on `ev.isNull`.
361330 *
362- * `subExprsCode` is the CSE helper-invocation block (see the "Subexpression elimination"
363- * section of the object-level Scaladoc). It writes common subexpression results into class
364- * fields that `ev.code` reads, so it must run before `ev.code`. In the NullIntolerant short-
365- * circuit case it is placed inside the else branch, skipping CSE evaluation for null rows as
366- * well as main-body evaluation. In the default case it precedes `ev.code`. Empty string when
367- * CSE is disabled or the tree has no common subexpressions.
331+ * `subExprsCode` is the CSE helper-invocation block; it writes common subexpression results
332+ * into class fields that `ev.code` reads, so it must run before `ev.code`. Inside the
333+ * short-circuit it lives in the else branch, skipping CSE for null rows. Empty when CSE is
334+ * disabled or the tree has none.
368335 */
369336 private def defaultBody (
370337 boundExpr : Expression ,
@@ -512,21 +479,18 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim {
512479 }
513480
514481 /**
515- * Result of compiling a bound [[Expression ]] into a Janino kernel. The `factory` is the Spark
516- * [[GeneratedClass ]] produced by Janino and is safe to share across threads and partitions: it
517- * holds no mutable state. The `freshReferences` closure regenerates the references array each
518- * time a new kernel instance is allocated.
482+ * Result of compiling a bound [[Expression ]] into a Janino kernel. The Spark-generated
483+ * `factory` is stateless and safe to share across partitions; `freshReferences` regenerates the
484+ * references array per kernel allocation.
519485 *
520- * Why not cache a single `references` array: some expressions (notably [[ScalaUDF ]]) embed
521- * stateful Spark `ExpressionEncoder` serializers into `references` via `ctx.addReferenceObj`.
522- * Those serializers reuse an internal `UnsafeRow` / `byte[]` buffer per `.apply(...)` call and
523- * are not thread-safe. If two kernels on different partitions shared one serializer instance,
524- * they would race on that buffer and produce garbage. Re-running `genCode` per kernel
525- * allocation costs microseconds; Janino compile costs milliseconds. Cache the expensive piece,
526- * refresh the cheap one, stay correct.
486+ * The references array can't be cached because some expressions (notably [[ScalaUDF ]]) embed
487+ * stateful `ExpressionEncoder` serializers via `ctx.addReferenceObj` that reuse an internal
488+ * `UnsafeRow` / `byte[]` per `.apply(...)`. Sharing one serializer across partition kernels
489+ * would race on that buffer. Re-running `genCode` is microseconds; Janino compile is
490+ * milliseconds. Cache the expensive piece, refresh the cheap one.
527491 *
528- * Mirrors Spark `WholeStageCodegenExec`: compile once per plan, instantiate per partition, call
529- * `init(partitionIndex)` once , iterate.
492+ * Mirrors Spark `WholeStageCodegenExec`: compile once per plan, instantiate per partition,
493+ * `init(partitionIndex)`, iterate.
530494 */
531495 final case class CompiledKernel (factory : GeneratedClass , freshReferences : () => Array [Any ]) {
532496 def newInstance (): CometBatchKernel =
0 commit comments