[GLUTEN-12013][VL] Fix bloom-filter bytes corruption on whole-stage AQE fallback#12151
[GLUTEN-12013][VL] Fix bloom-filter bytes corruption on whole-stage AQE fallback#12151brijrajk wants to merge 4 commits into
Conversation
4a56662 to
9bf19dc
Compare
|
Run Gluten Clickhouse CI on x86 |
|
Could a maintainer please remove the CORE label? All three changed files are Velox-backend-specific (backends-velox/ and gluten-ut/spark40/) — no common core code is touched. VELOX label only is correct. Thanks! |
|
Gentle ping for a maintainer review. The Also re-raising: could a maintainer remove the CORE label? The three changed files are all Velox-backend-specific ( |
@brijrajk, thanks for the PR. Could you rebase the code to see if the CI failures go away? |
9bf19dc to
009a9a8
Compare
|
Done — rebased onto current main and force-pushed. Fresh CI triggered. |
009a9a8 to
3148dbe
Compare
| override def apply(plan: SparkPlan): SparkPlan = { | ||
| if (!BackendsApiManager.getSettings.requireBloomFilterAggMightContainJointFallback()) { | ||
| return plan | ||
| } | ||
| plan match { |
| val df = spark.sql(sqlString) | ||
| // Must not throw java.io.IOException: Unexpected Bloom filter version number (16777217) | ||
| df.collect | ||
| // All 200003 rows match the bloom filter built from the same data. | ||
| assert(df.count() == 200003L) |
|
@brijrajk, could you first check if Copilot's comments make sense? |
|
Thanks for flagging this, @philo-he! Both of Copilot's comments were valid: 1. Patcher active when native bloom filter is disabled When Added a second guard: 2. Combined into |
|
@brijrajk, thanks for the update. Could you check if my following understanding is correct? Besides the |
|
@philo-he You are absolutely right. We confirmed it with a test case. How threshold and cost work
Test case confirming the failure testGluten(
"Test bloom_filter_agg whole-stage fallback when both stages fall back",
Issue12013) {
...
if (BackendsApiManager.getSettings.requireBloomFilterAggMightContainJointFallback()) {
// threshold=1: Stage 0's inherent transition cost of 1 meets the threshold, so
// ExpandFallbackPolicy promotes Stage 0 to a whole-stage fallback as well.
// Stage 0 runs as Spark and produces Spark-format bytes. Stage 1 also falls back.
// The patcher must NOT rewrite BloomFilterMightContain -> VeloxBloomFilterMightContain
// in this case.
withSQLConf(
GlutenConfig.COLUMNAR_FILTER_ENABLED.key -> "false",
GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1",
SQLConf.ANSI_ENABLED.key -> "false"
) {
val df = spark.sql(sqlString)
assert(df.collect().length == 200003L)
}
}
}Output
Proposed fix The root cause is that Do you see any concerns with this approach, or is there a cleaner way you would handle it? |
|
@philo-he Just a gentle ping — we've already added a failing test that reproduces your scenario (see above). Does the proposed structural fix approach look good to you? Ready to implement it as soon as you confirm. |
25c7fd9 to
2727774
Compare
| * This rule runs as a second fallback-policy pass, after `ExpandFallbackPolicy`, so it only acts | ||
| * when the plan is already wrapped in a `FallbackNode`. | ||
| */ | ||
| case class BloomFilterMightContainFallbackPatcher() extends Rule[SparkPlan] { |
There was a problem hiding this comment.
I don't recall why BloomFilterMightContainJointRewriteRule was made a physical rule, but can you try turning it to a logical rule anyway? So such a patcher rule can be avoided?
There was a problem hiding this comment.
Done — BloomFilterMightContainJointRewriteRule is now a Rule[LogicalPlan] registered via injectOptimizerRule, modelled after CollectRewriteRule. The patcher is gone. Running as an optimizer rule ensures both substitutions (BloomFilterAggregate → VeloxBloomFilterAggregate and BloomFilterMightContain → VeloxBloomFilterMightContain) are captured in the originalPlan snapshot before ExpandFallbackPolicy takes it, so the byte format stays consistent regardless of which stages fall back. This also fixes the threshold=1 case where Stage 0 itself falls back (the patcher would incorrectly rewrite the filter side while Stage 0 was producing Spark-format bytes).
2727774 to
cac891f
Compare
3e210d8 to
ee314fe
Compare
|
Run Gluten Clickhouse CI on x86 |
philo-he
left a comment
There was a problem hiding this comment.
@brijrajk, Thank you so much for your work. LGTM. One extra minor comment. Please also check whether the CI failures are related.
cc @zhztheplayer
| // plan still uses VeloxBloomFilterMightContain so the JVM filter reads Velox-format bytes. | ||
| testWithMinSparkVersion( | ||
| "GLUTEN-12013: bloom_filter_agg whole-stage fallback does not corrupt bloom filter bytes", | ||
| "3.3") { |
There was a problem hiding this comment.
Since 3.3 is already Gluten's minimum supported version, it might be unnecessary to use "testWithMinSparkVersion". Ditto for other tests. Thanks.
There was a problem hiding this comment.
Done, replaced all three calls with plain . Re-triggered CI to check whether the spark34/spark40/spark41 failures are related to this change or pre-existing flaky tests.
|
Run Gluten Clickhouse CI on x86 |
bfe384f to
7cc6faf
Compare
|
Run Gluten Clickhouse CI on x86 |
|
@brijrajk would you help check whether CI failures are related? |
zhztheplayer
left a comment
There was a problem hiding this comment.
I ran a local test and there are unexpected fallbacks on bloom filter operators with this PR.
@brijrajk can you run some local end-to-end tests to check?
brijrajk
left a comment
There was a problem hiding this comment.
Thanks for running the local test, @zhztheplayer.
The unexpected fallbacks were caused by a second bug in the original patch: BloomFilterMightContainJointRewriteRule was rewriting every BloomFilterAggregate it encountered — including standalone usages such as DataFrame.stat.bloomFilter(). That API collects the aggregate output bytes and feeds them directly to BloomFilter.readFrom(), which expects Spark-native format. When Velox-format bytes were produced instead, the session fell back with java.io.IOException: Unexpected Bloom filter version number (visible as the GlutenDataFrameStatSuite - Bloom filter CI failure).
Fix (pushed): the rule now only rewrites BloomFilterAggregate when it appears inside the ScalarSubquery of a BloomFilterMightContain. Standalone aggregates are left untouched.
I ran the following suites locally (Spark 4.0, Velox backend) against the updated patch:
| Suite | Tests | Result |
|---|---|---|
GlutenDataFrameStatSuite |
25/25 | passed (was failing) |
GlutenBloomFilterFallbackSuite |
4/4 | passed |
GlutenBloomFilterAggregateQuerySuite |
14/14 | passed |
GlutenInjectRuntimeFilterSuite |
13/13 | passed |
A new regression test (GLUTEN-12013: DataFrame.stat.bloomFilter() produces Spark-readable bytes) has also been added to GlutenBloomFilterFallbackSuite to guard against this in future.
|
Run Gluten Clickhouse CI on x86 |
8639584 to
7323e4e
Compare
|
Run Gluten Clickhouse CI on x86 |
|
@zhztheplayer The branch has been rebased onto the latest upstream The Our bloom filter-specific test suites all pass locally (Spark 4.0, Velox backend):
The root fix (moving |
7323e4e to
299f4f8
Compare
|
Run Gluten Clickhouse CI on x86 |
|
@brijrajk thanks for the update. I've rerun the CI. |
…QE fallback `BloomFilterMightContainJointRewriteRule` previously rewrote every `BloomFilterAggregate` it encountered, including standalone usages such as `DataFrame.stat.bloomFilter()`. That API collects the aggregate output bytes and passes them directly to `BloomFilter.readFrom()`, which expects Spark-native format; receiving Velox-format bytes caused `java.io.IOException: Unexpected Bloom filter version number` (surfaced as a CI failure in `GlutenDataFrameStatSuite - Bloom filter`). Fix: only rewrite `BloomFilterAggregate` when it appears inside the `ScalarSubquery` of a `BloomFilterMightContain`. Standalone aggregates are left untouched so that collected bytes remain in Spark-native format. Add a regression test (`GlutenBloomFilterFallbackSuite`) to guard against reintroducing this regression. Local test results (Spark 4.0, Velox backend): - GlutenDataFrameStatSuite : 25/25 passed (was failing) - GlutenBloomFilterFallbackSuite : 4/4 passed - GlutenBloomFilterAggregateQuerySuite: 14/14 passed - GlutenInjectRuntimeFilterSuite : 13/13 passed
299f4f8 to
9d096a3
Compare
|
Run Gluten Clickhouse CI on x86 |
CI failure explanation —
|
|
Temporarily cherry-picked the q19 golden file fix from #12374 onto this branch (commit If CI passes with this commit, it confirms that the only blocker was the pre-existing q19 plan stability issue and the bloom filter fix itself is clean. |
|
Run Gluten Clickhouse CI on x86 |
The ExprId normalizer in GlutenPlanStabilitySuite uses regex `#\d+` which inadvertently matches TPC-H string literals such as Brand#11, Brand#12, Brand#13 (p_brand values in q19's filter). Over the 264 commits since the golden file was added in apache#11805, new optimizer rules shifted the ExprId counter so Brand#12 now normalizes to Brand#6 and _pre_1#14 to _pre_1#13, causing a spurious plan mismatch. Regenerated by running GlutenTPCHPlanStabilitySuite with SPARK_GENERATE_GOLDEN_FILES=1. Only q19/explain.txt changes; simplified.txt and all other queries are unaffected. Verified: q19 fails on main without this fix (21/22); passes with it (22/22). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…lters In Spark 4.1, the optimizer processes BloomFilterMightContainJointRewriteRule after InjectRuntimeFilter, so the rule sees DPP-injected bloom filters (which use xxhash64(col, seed) as their value expression). Rewriting those to VeloxBloomFilterMightContain caused FilterExecTransformer to fail Velox validation and fall back to JVM, changing the physical plan structure and breaking 19 TPC-DS plan-stability golden files across v1.4, v2.7, and modified suites (q2, q10, q16, q24a, q24b, q32, q37, q40, q59, q69, q80, q82, q85, q92, q94, q95, q10a, q64, q80a). Fix: guard the first arm of transformAllExpressions with `v: Attribute`. DPP bloom filters always hash the join key — xxhash64(col, seed) — so their value is never a bare Attribute and the guard skips them cleanly. User-facing bloom filters (might_contain(subquery, col)) pass a plain column reference and continue to be rewritten correctly. Also guard the catch-all arm against ScalarSubquery to be consistent. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
c06593c to
aca2904
Compare
|
Run Gluten Clickhouse CI on x86 |
Scalastyle nonascii.message rule rejects Unicode em-dash (U+2014). Replace all occurrences in docstring and inline comments with ASCII --. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Run Gluten Clickhouse CI on x86 |
What changes are proposed in this pull request?
Fixes #12013
Root cause
When
ExpandFallbackPolicytriggers a whole-stage AQE fallback it reverts to the plan captured beforeHeuristicTransformruns (i.e. before all pre-transform rewrites).BloomFilterMightContainJointRewriteRulewas registered as a pre-transformRule[SparkPlan], so its substitutions were silently undone in the fallback plan.If Stage 0 (
bloom_filter_aggsubquery) had already executed natively it produced Velox-format bloom filter bytes. The vanillaBloomFilterMightContainin the fallen-back filter stage then calledBloomFilterImpl.readFrom()on those bytes, throwing:A second issue caused
BloomFilterMightContainJointRewriteRuleto rewrite everyBloomFilterAggregateit encountered — including the standaloneDataFrame.stat.bloomFilter()path. That API collects the aggregate output bytes and passes them directly toBloomFilter.readFrom(), which expects Spark-native format; receiving Velox-format bytes surfaced asGlutenDataFrameStatSuite - Bloom filterfailing in CI.A third issue emerged in Spark 4.1 CI: in Spark 4.1, Gluten's
injectOptimizerRuleruns afterInjectRuntimeFilter, so our optimizer rule sees DPP-injected bloom filters (BloomFilterMightContain(ScalarSubquery(...), xxhash64(col, seed))). The previous catch-all pattern rewrote them toVeloxBloomFilterMightContain, whichFilterExecTransformerrejects as a non-native Velox expression — causing the filter stage and its subquery aggregate to fall back to JVMObjectHashAggregate, changing the physical plan structure and breaking 19 TPC-DS plan-stability golden files in Spark 4.1.Fix
Move
BloomFilterMightContainJointRewriteRulefrominjectPreTransform(Rule[SparkPlan]) toinjectOptimizerRule(Rule[LogicalPlan]), modelled afterCollectRewriteRule. Running as an optimizer rule ensures both substitutions are baked into theoriginalPlansnapshot beforeExpandFallbackPolicytakes it. Both sides of the bloom-filter pair therefore always produce and consume the same byte format, regardless of which stages fall back and in what order.The rule only rewrites
BloomFilterAggregatewhen it appears inside theScalarSubqueryof aBloomFilterMightContain. Standalone usages such asDataFrame.stat.bloomFilter()are intentionally left untouched so that collected bytes remain in Spark-native format.DPP-injected bloom filters are also deliberately excluded: the rule only matches
BloomFilterMightContain(subq: ScalarSubquery, v: Attribute). DPP filters (fromInjectRuntimeFilter) always hash the join key —xxhash64(col, seed)— before passing it toBloomFilterMightContain, so their value expression is never a bareAttributeand the pattern never matches.Files changed
BloomFilterMightContainJointRewriteRule.scala— Rewritten asRule[LogicalPlan]; rewritesBloomFilterAggregateonly when inside aScalarSubqueryofBloomFilterMightContainwhose value is a plain column reference (Attribute)VeloxRuleApi.scala— Moves registration frominjectPreTransformtoinjectOptimizerRuleGlutenBloomFilterFallbackSuite.scala(new,gluten-ut/test) — Four regression tests covering all fallback scenariosHow was this patch tested?
Four regression tests were added to
GlutenBloomFilterFallbackSuiteingluten-ut/test, guarded withrequireBloomFilterAggMightContainJointFallback().Test 1 — only filter stage falls back (threshold=2)
COLUMNAR_FILTER_ENABLED=falseforcesFilterExecto fall back (transition cost=2 meets threshold)bloom_filter_aggsubquery (cost=1) continues to run natively and emits Velox-format bytesvelox_might_containappears in the optimized planTest 2 — both stages fall back (threshold=1)
COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD=1causes Stage 0 (bloom_filter_agg) to also fall backTest 3 —
DataFrame.stat.bloomFilter()produces Spark-readable bytesdf.stat.bloomFilter("col", 1000L, 0.01)directly and asserts noIOExceptionandmightContainLongreturns the expected resultGlutenDataFrameStatSuite - Bloom filterCI failureTest 4 — native bloom filter disabled (
enableNativeBloomFilter=false)velox_might_containAll tests pass, verified locally against the
gluten-ut/spark40module:GlutenDataFrameStatSuiteGlutenBloomFilterFallbackSuiteGlutenBloomFilterAggregateQuerySuiteGlutenInjectRuntimeFilterSuiteWas this patch authored or co-authored using generative AI tooling?
Yes. Claude Code (claude-sonnet-4-6) was used as an AI assistant during development.