Skip to content

Commit ec42809

Browse files
committed
cleanup round 2
1 parent e19683e commit ec42809

5 files changed

Lines changed: 161 additions & 172 deletions

File tree

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet
21+
22+
import org.apache.arrow.vector.ValueVector
23+
import org.apache.spark.sql.types.DataType
24+
25+
import org.apache.comet.udf.codegen.CometScalaUDFCodegen
26+
27+
/**
28+
* Shared assertions for the codegen-dispatcher test suites. Mix in alongside `CometTestBase`.
29+
*/
30+
trait CometCodegenAssertions {
31+
32+
/** Asserts the dispatcher actually ran during `f`, guarding against silent serde fallback. */
33+
protected def assertCodegenRan(f: => Unit): Unit = {
34+
CometScalaUDFCodegen.resetStats()
35+
f
36+
val after = CometScalaUDFCodegen.stats()
37+
assert(
38+
after.compileCount + after.cacheHitCount >= 1,
39+
s"expected codegen dispatcher activity, got $after")
40+
}
41+
42+
/**
43+
* Asserts the composed subtree fused into one kernel signature, not N (one per sub-expression).
44+
* Uses the JVM-wide signature set rather than `compileCount` because per-task `boundExpr`
45+
* isolation makes multi-partition queries trip `compileCount > 1` even when the bytecode is
46+
* shared.
47+
*/
48+
protected def assertOneKernelForSubtree(f: => Unit): Unit = {
49+
CometScalaUDFCodegen.resetStats()
50+
val sigsBefore = CometScalaUDFCodegen.snapshotCompiledSignatures()
51+
f
52+
val sigsAfter = CometScalaUDFCodegen.snapshotCompiledSignatures()
53+
val grew = sigsAfter.size - sigsBefore.size
54+
assert(
55+
grew <= 1,
56+
s"expected <= 1 new compiled-kernel signature for the composed subtree, grew by $grew; " +
57+
s"new=${sigsAfter -- sigsBefore}")
58+
val after = CometScalaUDFCodegen.stats()
59+
assert(
60+
after.compileCount + after.cacheHitCount >= 1,
61+
s"expected codegen dispatcher activity, got $after")
62+
}
63+
64+
/**
65+
* Asserts a kernel matching the given input Arrow vector classes and output type sits in the
66+
* JVM-wide signature set. Pair with `assertCodegenRan` since the set is append-only. Compares
67+
* by simple name because `common` shades `org.apache.arrow`.
68+
*/
69+
protected def assertKernelSignaturePresent(
70+
inputs: Seq[Class[_ <: ValueVector]],
71+
output: DataType): Unit = {
72+
val sigs = CometScalaUDFCodegen.snapshotCompiledSignatures()
73+
val expectedNames = inputs.map(_.getSimpleName).toIndexedSeq
74+
val present = sigs.exists { case (cached, dt) =>
75+
dt == output && cached.map(_.getSimpleName) == expectedNames
76+
}
77+
assert(
78+
present,
79+
s"expected kernel signature $expectedNames -> $output; " +
80+
s"cache had ${sigs.map { case (c, d) => (c.map(_.getSimpleName), d) }}")
81+
}
82+
}

spark/src/test/scala/org/apache/comet/CometCodegenFuzzSuite.scala

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import org.apache.spark.sql.types._
3333

3434
import org.apache.comet.DataTypeSupport.isComplexType
3535
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, ParquetGenerator, SchemaGenOptions}
36-
import org.apache.comet.udf.codegen.CometScalaUDFCodegen
3736

3837
/**
3938
* Randomized end-to-end tests for the Arrow-direct codegen dispatcher: schema-driven coverage of
@@ -42,7 +41,10 @@ import org.apache.comet.udf.codegen.CometScalaUDFCodegen
4241
* (not [[CometFuzzTestBase]]) because the base's `shuffle` x `nativeC2R` cross-product is
4342
* irrelevant for projection-only queries.
4443
*/
45-
class CometCodegenFuzzSuite extends CometTestBase with AdaptiveSparkPlanHelper {
44+
class CometCodegenFuzzSuite
45+
extends CometTestBase
46+
with AdaptiveSparkPlanHelper
47+
with CometCodegenAssertions {
4648

4749
/** Random schema with primitives plus shallow arrays and structs. No maps, no deep nesting. */
4850
private var mixedTypesFilename: String = _
@@ -120,20 +122,6 @@ class CometCodegenFuzzSuite extends CometTestBase with AdaptiveSparkPlanHelper {
120122
super.sparkConf
121123
.set(CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key, "true")
122124

123-
/**
124-
* Resets dispatcher stats, runs `f`, then asserts the codegen path actually ran for at least
125-
* one batch. Without this, a silent serde fallback would let the fuzz pass trivially because
126-
* both Spark and whatever-Comet-ran-instead agree with Spark.
127-
*/
128-
private def assertCodegenRan(f: => Unit): Unit = {
129-
CometScalaUDFCodegen.resetStats()
130-
f
131-
val after = CometScalaUDFCodegen.stats()
132-
assert(
133-
after.compileCount + after.cacheHitCount >= 1,
134-
s"expected at least one codegen dispatcher invocation during this query, got $after")
135-
}
136-
137125
/**
138126
* Identity ScalaUDF for one of the 14 primitive types in
139127
* [[org.apache.comet.testing.SchemaGenOptions.defaultPrimitiveTypes]]. Returns the registered

spark/src/test/scala/org/apache/comet/CometCodegenHOFSuite.scala

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,28 +23,29 @@ import org.apache.spark.SparkConf
2323
import org.apache.spark.sql.CometTestBase
2424
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2525

26-
import org.apache.comet.udf.codegen.CometScalaUDFCodegen
27-
2826
/**
2927
* Higher-order function regression coverage for the codegen dispatcher.
3028
*
3129
* Spark's HOFs (`ArrayTransform`, `ArrayFilter`, `ArrayAggregate`, `ArrayExists`, `ZipWith`,
3230
* `MapFilter`, etc.) all extend `CodegenFallback`. The dispatcher's `canHandle` admits them.
3331
* `CodegenFallback.doGenCode` emits a single `((Expression) references[N]).eval(row)` call site
34-
* per HOF; at runtime the kernel dispatches to `Expression.eval(InternalRow)`, which iterates the
35-
* array, mutates `NamedLambdaVariable.value`'s `AtomicReference` per element, and recursively
36-
* evaluates the lambda body. Lambda-body leaf reads resolve through the kernel's own typed Arrow
37-
* getters since the kernel '''is''' an `InternalRow`.
32+
* per HOF; the kernel dispatches to `Expression.eval(InternalRow)`, which iterates the array,
33+
* mutates `NamedLambdaVariable.value`'s `AtomicReference` per element, and recursively evaluates
34+
* the lambda body. Lambda-body leaf reads resolve through the kernel's typed Arrow getters since
35+
* the kernel '''is''' an `InternalRow`.
3836
*
3937
* Cost model: per-row interpreted-eval inside the HOF subtree. Surrounding native operators stay
4038
* native; surrounding non-HOF expressions stay codegen.
4139
*
4240
* Critical invariant: each Spark task gets its own `boundExpr` Java object. The dispatcher's
4341
* compile cache lives on the per-task instance, not the companion, so concurrent partitions
44-
* cannot race on a shared `NamedLambdaVariable.value`. Mirrors Spark's per-task closure-
45-
* deserialize model. The two-collects test below regresses this.
42+
* cannot race on a shared `NamedLambdaVariable.value`. The two-collects test below regresses
43+
* this.
4644
*/
47-
class CometCodegenHOFSuite extends CometTestBase with AdaptiveSparkPlanHelper {
45+
class CometCodegenHOFSuite
46+
extends CometTestBase
47+
with AdaptiveSparkPlanHelper
48+
with CometCodegenAssertions {
4849

4950
override protected def sparkConf: SparkConf =
5051
super.sparkConf
@@ -58,15 +59,6 @@ class CometCodegenHOFSuite extends CometTestBase with AdaptiveSparkPlanHelper {
5859
}
5960
}
6061

61-
private def assertCodegenRan(f: => Unit): Unit = {
62-
CometScalaUDFCodegen.resetStats()
63-
f
64-
val after = CometScalaUDFCodegen.stats()
65-
assert(
66-
after.compileCount + after.cacheHitCount >= 1,
67-
s"expected dispatcher activity, got $after")
68-
}
69-
7062
test("ArrayTransform inside identity ScalaUDF over Array<Int>") {
7163
// Regresses the simplest HOF shape: `idArr(transform(a, x -> x + 1))`. Tree contains one
7264
// CodegenFallback HOF; the kernel splices its interpreted-eval call site into the per-row

spark/src/test/scala/org/apache/comet/CometCodegenSourceSuite.scala

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -569,13 +569,6 @@ class CometCodegenSourceSuite extends AnyFunSuite {
569569
s"expected BigDecimal slow path for p>18 element; got:\n$src")
570570
}
571571

572-
// ============================================================================================
573-
// Nested-type tests. Each case verifies that a complex-within-complex shape emits a full
574-
// nested-class tree (outer + inner), wired together through the path-suffix naming
575-
// convention: `_e` for array element, `_f${fi}` for struct field fi. Scalar-element / scalar-
576-
// field leaves reuse the typed-getter templates already covered by the single-depth tests.
577-
// ============================================================================================
578-
579572
private def generate(expr: Expression, specs: IndexedSeq[ArrowColumnSpec]): String =
580573
CometBatchKernelCodegen.generateSource(expr, specs).body
581574

@@ -751,18 +744,16 @@ class CometCodegenSourceSuite extends AnyFunSuite {
751744
}
752745
}
753746

754-
// ============================================================================================
755-
// Null-guard emission for nested reference-typed getters. Spark's
756-
// `CodeGenerator.setArrayElement` only emits an `isNullAt` check before `update(i, getX(j))`
757-
// for primitive elements. For reference types (Decimal / String / Binary / Struct / Array /
758-
// Map) it relies on the source's `getX` to return null on null positions itself, matching
759-
// `ColumnarArray.getBinary` and friends. The emitter prepends `if (isNullAt(...)) return null;`
760-
// to those getters when the element/field is nullable.
761-
//
762-
// Runtime regressions for the leaf reference types live in `CometCodegenSuite`; complex-type
763-
// (Struct/Array/Map) coverage runs through HOFs in `CometCodegenHOFSuite`.
764-
// ============================================================================================
765-
747+
/**
748+
* Null-guard emission for nested reference-typed getters. Spark's
749+
* `CodeGenerator.setArrayElement` only emits an `isNullAt` check before `update(i, getX(j))`
750+
* for primitive elements. For reference types it relies on the source's `getX` to return null
751+
* on null positions itself, matching `ColumnarArray.getBinary`. The emitter prepends `if
752+
* (isNullAt(...)) return null;` when the element / field is nullable.
753+
*
754+
* Runtime regressions for the leaf reference types live in `CometCodegenSuite`; complex-type
755+
* (Struct/Array/Map) coverage runs through HOFs in `CometCodegenHOFSuite`.
756+
*/
766757
private val nullableIntStruct = StructColumnSpec(
767758
nullable = true,
768759
fields = Seq(

0 commit comments

Comments
 (0)