@@ -50,6 +50,18 @@ private[codegen] object CometBatchKernelCodegenOutput {
5050 * `ListVector` / `StructVector` / `MapVector`, the parent's `allocateNew` reallocates child
5151 * buffers at default size, so a leaf hint would be lost.
5252 *
53+ * TODO(nested-varwidth-sizing): thread the byte estimate into nested var-width children via the
54+ * Arrow `Field` tree so a `Map<String, Array<String>>` UDF return doesn't realloc its key /
55+ * value data buffers per row. Arrow Java's child-vector hints are allocator-level rather than
56+ * per-child, so this needs a small loop or a heuristic that overshoots root size into
57+ * known-leaf children.
58+ *
59+ * TODO(cached-write-buffer-addrs): mirror the input emitter's `_valueAddr` / `_offsetAddr`
60+ * caching on the write side. Cache the data and offset buffer addresses once at the start of
61+ * `process` and emit `Platform.putByte` / `Platform.copyMemory` writes for VarChar / VarBinary
62+ * / Decimal scalar outputs, bypassing `setSafe`'s realloc check. Requires pre-allocated buffers
63+ * (the existing `estimatedBytes` plus the nested-sizing TODO above).
64+ *
5365 * Closes the vector on any failure between construction and return so a partially-initialized
5466 * tree does not leak buffers back to the allocator.
5567 */
@@ -173,6 +185,13 @@ private[codegen] object CometBatchKernelCodegenOutput {
173185 // existing byte[] directly to `VarCharVector.setSafe(int, byte[], int, int)` via the
174186 // encoded offset and skip the redundant `getBytes()` allocation. Off-heap passthrough
175187 // (rare on output side) falls back to `getBytes()`.
188+ //
189+ // TODO(utf8-unsafe-write): the output-side equivalent of the input emitter's
190+ // `UTF8String.fromAddress` zero-copy read would cache the data buffer address once per
191+ // batch and write via `Platform.copyMemory` + manual offset/validity buffer updates,
192+ // bypassing `setSafe`'s realloc check. Coupled with `cached-write-buffer-addrs` and a
193+ // pre-allocated buffer (root-only `estimatedBytes` today). Not done because perf payoff
194+ // is unmeasured against this PR's workloads.
176195 val bBase = ctx.freshName(" utfBase" )
177196 val bLen = ctx.freshName(" utfLen" )
178197 val bArr = ctx.freshName(" utfArr" )
@@ -192,7 +211,7 @@ private[codegen] object CometBatchKernelCodegenOutput {
192211 case BinaryType =>
193212 // Spark's BinaryType value is already a `byte[]`.
194213 OutputEmit (" " , s " $targetVec.setSafe( $idx, $source, 0, $source.length); " )
195- case ArrayType (elementType, _ ) =>
214+ case ArrayType (elementType, containsNull ) =>
196215 // Complex-type output: recursive per-row write.
197216 // Spark's `doGenCode` for ArrayType-returning expressions produces an `ArrayData` value
198217 // (usually `GenericArrayData` / `UnsafeArrayData`). We iterate its elements, write each
@@ -202,6 +221,10 @@ private[codegen] object CometBatchKernelCodegenOutput {
202221 // Array, Array of Struct) work by the same recursion. `targetVec` is a `ListVector` at
203222 // the call site (either `output` at root or a hoisted child cast); we only need to cast
204223 // its data vector, and that cast goes into setup.
224+ //
225+ // Optimization: NullableElementElision. When `containsNull == false`, the element
226+ // `isNullAt` guard is dead by Spark's own type-system contract, so we drop it at source
227+ // level rather than relying on JIT folding.
205228 val childVar = ctx.freshName(" outListChild" )
206229 val childClass = outputVectorClass(elementType)
207230 val arrVar = ctx.freshName(" arr" )
@@ -213,16 +236,21 @@ private[codegen] object CometBatchKernelCodegenOutput {
213236 val setup =
214237 (s " $childClass $childVar = ( $childClass) $targetVec.getDataVector(); " +:
215238 Seq (inner.setup).filter(_.nonEmpty)).mkString(" \n " )
239+ val elementWrite = if (containsNull) {
240+ s """ if ( $arrVar.isNullAt( $jVar)) {
241+ | $childVar.setNull( $childIdx + $jVar);
242+ | } else {
243+ | ${inner.perRow}
244+ | } """ .stripMargin
245+ } else {
246+ inner.perRow
247+ }
216248 val perRow =
217249 s """ org.apache.spark.sql.catalyst.util.ArrayData $arrVar = $source;
218250 |int $nVar = $arrVar.numElements();
219251 |int $childIdx = $targetVec.startNewValue( $idx);
220252 |for (int $jVar = 0; $jVar < $nVar; $jVar++) {
221- | if ( $arrVar.isNullAt( $jVar)) {
222- | $childVar.setNull( $childIdx + $jVar);
223- | } else {
224- | ${inner.perRow}
225- | }
253+ | $elementWrite
226254 |}
227255 | $targetVec.endValue( $idx, $nVar); """ .stripMargin
228256 OutputEmit (setup, perRow)
@@ -304,6 +332,15 @@ private[codegen] object CometBatchKernelCodegenOutput {
304332 s " $keyClass $keyVar = ( $keyClass) $entriesVar.getChildByOrdinal(0); " ,
305333 s " $valClass $valVar = ( $valClass) $entriesVar.getChildByOrdinal(1); " ) ++
306334 Seq (keyEmit.setup, valEmit.setup).filter(_.nonEmpty)).mkString(" \n " )
335+ val valueWrite = if (mt.valueContainsNull) {
336+ s """ if ( $valArr.isNullAt( $jVar)) {
337+ | $valVar.setNull( $childIdx + $jVar);
338+ | } else {
339+ | ${valEmit.perRow}
340+ | } """ .stripMargin
341+ } else {
342+ valEmit.perRow
343+ }
307344 val perRow =
308345 s """ org.apache.spark.sql.catalyst.util.MapData $mapSrc = $source;
309346 |org.apache.spark.sql.catalyst.util.ArrayData $keyArr = $mapSrc.keyArray();
@@ -313,11 +350,7 @@ private[codegen] object CometBatchKernelCodegenOutput {
313350 |for (int $jVar = 0; $jVar < $nVar; $jVar++) {
314351 | $entriesVar.setIndexDefined( $childIdx + $jVar);
315352 | ${keyEmit.perRow}
316- | if ( $valArr.isNullAt( $jVar)) {
317- | $valVar.setNull( $childIdx + $jVar);
318- | } else {
319- | ${valEmit.perRow}
320- | }
353+ | $valueWrite
321354 |}
322355 | $targetVec.endValue( $idx, $nVar); """ .stripMargin
323356 OutputEmit (setup, perRow)
0 commit comments