Skip to content

Commit c69a7c2

Browse files
authored
[VL] Add cross-config / cross-build-cycle invariant tests for ColumnarCachedBatchSerializer (#12124)
Extend ColumnarCachedBatchE2ESuite with three lifecycle invariant tests that exercise the cached-batch wire format across SQLConf transitions: 1. cross-config: build with stats=true, read with stats=false -- wire format is build-time-decided; v2-with-stats payload must survive a reader-time downgrade and prune must still engage. 2. cross-config (reverse): build with stats=false, read with stats=true -- legacy v1 payload (stats=null) at build time; reader must NOT fabricate stats and must fall back to full scan. 3. cross-build-cycle: same logical query rebuilt twice with different stats settings (round 1 stats=true, round 2 stats=false). Round 2 must re-honor stats=false; the serializer must not reuse stale gate state from round 1. Each test asserts (R-correct) result row count, (R-path) the cached batches are served by ColumnarCachedBatchSerializer (not vanilla DefaultCachedBatchSerializer), and (R-prune, when expectPrune=true) that InMemoryTableScanExec.numOutputRows reflects partition pruning. The shared assertion logic is factored into a private helper `assertGlutenCachedPlanAndPrune(df, expectPrune)`. Its scaladoc documents an intentional asymmetry: the reverse "no prune" direction is not observable via numOutputRows on the Gluten native path (the same baseline test "numOutputRows reflects post-filter row count" already notes that outRows may legitimately be 0 even with full pruning, because the surviving row is delivered through the native scan metrics path). The expectPrune=false branch therefore intentionally performs path-only verification. All 15 suite tests pass locally (Spark 3.3, Velox backend). Generated-by: Claude claude-opus-4.7
1 parent 5286b8c commit c69a7c2

1 file changed

Lines changed: 103 additions & 0 deletions

File tree

backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchE2ESuite.scala

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,26 @@ class ColumnarCachedBatchE2ESuite
7474
.cache()
7575
}
7676

77+
// Caller must have triggered execution so numOutputRows is populated.
78+
// expectPrune=false is path-only: numOutputRows is unreliable for "no prune"
79+
// on the Gluten native path (surviving rows bypass the IMS counter, see
80+
// baseline "numOutputRows reflects post-filter row count" test).
81+
private def assertGlutenCachedPlanAndPrune(df: DataFrame, expectPrune: Boolean): Unit = {
82+
val plan = df.queryExecution.executedPlan
83+
val ims = find(plan) {
84+
case _: InMemoryTableScanExec => true
85+
case _ => false
86+
}
87+
.get.asInstanceOf[InMemoryTableScanExec]
88+
val serName = ims.relation.cacheBuilder.serializer.getClass.getSimpleName
89+
assert(serName == "ColumnarCachedBatchSerializer", s"got $serName")
90+
if (expectPrune) {
91+
val outRows = ims.metrics("numOutputRows").value
92+
val upperBound = (N / P) * 2
93+
assert(outRows <= upperBound, s"numOutputRows=$outRows > $upperBound (N=$N, P=$P)")
94+
}
95+
}
96+
7797
test("e2e cache + equality filter: no crash + correct result") {
7898
val cached = cacheRange()
7999
try {
@@ -385,4 +405,87 @@ class ColumnarCachedBatchE2ESuite
385405
}
386406
}
387407
}
408+
409+
// Cross-config: build with stats enabled, read with stats disabled.
410+
// Wire format is build-time-decided, so reader-time SQLConf must not affect prune.
411+
test("cross-config: build with stats enabled, read with stats disabled") {
412+
var cached: DataFrame = null
413+
var filtered: DataFrame = null
414+
var result: Long = -1L
415+
withSQLConf(
416+
GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key -> "true") {
417+
val df = cacheRange()
418+
df.count()
419+
cached = df
420+
}
421+
try {
422+
withSQLConf(
423+
GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key -> "false") {
424+
filtered = cached.filter(col("k") === pivot)
425+
result = filtered.count()
426+
}
427+
assert(result == 1L, s"got $result")
428+
assertGlutenCachedPlanAndPrune(filtered, expectPrune = true)
429+
} finally {
430+
cached.unpersist()
431+
}
432+
}
433+
434+
// Reverse: legacy v1 payload at build (stats=null), reader cannot fabricate
435+
// stats. Distinct from the same-config legacy test: this forces cross-config.
436+
test("cross-config: build with stats disabled, read with stats enabled") {
437+
var cached: DataFrame = null
438+
var filtered: DataFrame = null
439+
var result: Long = -1L
440+
withSQLConf(
441+
GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key -> "false") {
442+
val df = cacheRange()
443+
df.count()
444+
cached = df
445+
}
446+
try {
447+
withSQLConf(
448+
GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key -> "true") {
449+
filtered = cached.filter(col("k") === pivot)
450+
result = filtered.count()
451+
}
452+
assert(result == 1L, s"got $result")
453+
assertGlutenCachedPlanAndPrune(filtered, expectPrune = false)
454+
} finally {
455+
cached.unpersist()
456+
}
457+
}
458+
459+
// Round 2 must re-honor the new SQLConf, not reuse stale gate decision /
460+
// payload from round 1.
461+
test("cross-build-cycle: unpersist + toggle stats config + rebuild same query") {
462+
var resultA: Long = -1L
463+
var resultB: Long = -1L
464+
withSQLConf(
465+
GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key -> "true") {
466+
val df = cacheRange()
467+
try {
468+
df.count()
469+
val filtered = df.filter(col("k") === pivot)
470+
resultA = filtered.count()
471+
assert(resultA == 1L, s"round 1: got $resultA")
472+
assertGlutenCachedPlanAndPrune(filtered, expectPrune = true)
473+
} finally {
474+
df.unpersist(blocking = true)
475+
}
476+
}
477+
withSQLConf(
478+
GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key -> "false") {
479+
val df = cacheRange()
480+
try {
481+
df.count()
482+
val filtered = df.filter(col("k") === pivot)
483+
resultB = filtered.count()
484+
assert(resultB == 1L, s"round 2: got $resultB")
485+
assertGlutenCachedPlanAndPrune(filtered, expectPrune = false)
486+
} finally {
487+
df.unpersist(blocking = true)
488+
}
489+
}
490+
}
388491
}

0 commit comments

Comments
 (0)