Skip to content

Commit 96ebbaa

Browse files
JkSelfdongjoon-hyun
authored andcommitted
[SPARK-56485][SQL] Fix RowCount Estimation in CBO to avoid unintended BroadcastHashJoin
### What changes were proposed in this pull request? When running TPC-DS Q4 on 1TB TPC-DS with CBO enabled and `spark.sql.autoBroadcastJoinThreshold` set to `10MB`, the query fails with a SparkException. ``` Py4JJavaError: An error occurred while calling o596.collectToPython. : org.apache.spark.SparkException: Cannot broadcast the table that is larger than 8.0 GiB: 10.7 GiB at org.apache.gluten.backendsapi.velox.VeloxSparkPlanExecApi.createBroadcastRelation(VeloxSparkPlanExecApi.scala:850) at org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$2(ColumnarBroadcastExchangeExec.scala:79) at org.apache.gluten.utils.Arm$.withResource(Arm.scala:25) at org.apache.gluten.metrics.GlutenTimeMetric$.millis(GlutenTimeMetric.scala:37) at org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$1(ColumnarBroadcastExchangeExec.scala:66) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:230) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:225) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) ``` The error indicates that Spark attempted to broadcast a table significantly larger than the 8GB limit (actual size ~10.7 GiB). Interestingly, the query runs successfully if CBO is disabled. The issue stems from a bug in Spark's CBO Filter Estimation. When processing equality filters (e.g., d_year = 2001), the CBO incorrectly estimates the rowCount as 0. Because the estimated row count is 0, the optimizer concludes that the join result will be very small and chooses a BroadcastHashJoin . However, the actual data size is roughly 10.7 GiB, which exceeds Spark's hard limit for broadcasting. The root of this miscalculation is in [FilterEstimation.scala](https://www.google.com/search?q=%5Bhttps://github.com/apache/spark/blob/15ffa544ca53cd9f8a25baaf11fa0171dac7c85f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala%23L347%5D(https://github.com/apache/spark/blob/15ffa544ca53cd9f8a25baaf11fa0171dac7c85f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala%23L347)). Under normal circumstances, Spark should use column statistics to estimate selectivity. In this case, although column information is present, most of the statistical fields (min, max, distinct count, etc.) are null/None, except for versioning information. ``` Filter (isnotnull(d_year#7208) AND (d_year#7208 = 2001)), Statistics(sizeInBytes=1.0 B, rowCount=0) +- RelationV2[d_date_sk#7202, d_date_id#7203, d_date#7204, d_month_seq#7205, d_week_seq#7206, d_quarter_seq#7207, d_year#7208, d_dow#7209, d_moy#7210, d_dom#7211, d_qoy#7212, d_fy_year#7213, d_fy_quarter_seq#7214, d_fy_week_seq#7215, d_day_name#7216, d_quarter_name#7217, d_holiday#7218, d_weekend#7219, d_following_holiday#7220, d_first_dom#7221, d_last_dom#7222, d_same_day_ly#7223, d_same_day_lq#7224, d_current_day#7225, ... 4 more fields] spark_catalog.wxd_icebergtpcdsdb1000g.date_dim, Statistics(sizeInBytes=20.1 MiB, rowCount=7.30E+4) ``` As a result, Spark fails to enter the correct evaluation logic in [lines 310-313](https://www.google.com/search?q=https://github.com/apache/spark/blob/15ffa544ca53cd9f8a25baaf11fa0171dac7c85f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala%23L310-L313) and defaults to an incorrect estimation. ``` // Example of the empty stats being passed: d_year#20690 -> ColumnStat(None,None,None,None,None,None,None,2) ``` ### Why are the changes needed? Fix OOM issue ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Added new unit tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #55350 from JkSelf/fix-equal-filter-estimation. Authored-by: Ke Jia <ke.jia@ibm.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 5d491f6 commit 96ebbaa

2 files changed

Lines changed: 82 additions & 29 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -325,39 +325,44 @@ case class FilterEstimation(plan: Filter) extends Logging {
325325
}
326326
val colStat = colStatsMap(attr)
327327

328-
// decide if the value is in [min, max] of the column.
329-
// We currently don't store min/max for binary/string type.
330-
// Hence, we assume it is in boundary for binary/string type.
331-
val statsInterval = ValueInterval(colStat.min, colStat.max, attr.dataType)
332-
if (statsInterval.contains(literal)) {
333-
if (update) {
334-
// We update ColumnStat structure after apply this equality predicate:
335-
// Set distinctCount to 1, nullCount to 0, and min/max values (if exist) to the literal
336-
// value.
337-
val newStats = attr.dataType match {
338-
case StringType | BinaryType =>
339-
colStat.copy(distinctCount = Some(1), nullCount = Some(0))
340-
case _ =>
341-
colStat.copy(distinctCount = Some(1), min = Some(literal.value),
342-
max = Some(literal.value), nullCount = Some(0))
343-
}
344-
colStatsMap.update(attr, newStats)
345-
}
328+
// Decide if the value is in [min, max] of the column.
329+
// We currently don't store min/max for binary/string type. For other types, if min/max are
330+
// missing, treat the range as unknown (instead of "all nulls") and fall back to NDV/histogram.
331+
val valueInRange = attr.dataType match {
332+
case StringType | BinaryType =>
333+
true
334+
case _ if !colStat.hasMinMaxStats =>
335+
true
336+
case _ =>
337+
ValueInterval(colStat.min, colStat.max, attr.dataType).contains(literal)
338+
}
346339

347-
if (colStat.histogram.isEmpty) {
348-
if (!colStat.distinctCount.isEmpty) {
349-
// returns 1/ndv if there is no histogram
350-
Some(1.0 / colStat.distinctCount.get.toDouble)
351-
} else {
352-
None
353-
}
354-
} else {
340+
if (!valueInRange) return Some(0.0)
341+
342+
val percent = if (colStat.histogram.isDefined) {
343+
if (colStat.hasMinMaxStats) {
355344
Some(computeEqualityPossibilityByHistogram(literal, colStat))
345+
} else {
346+
None
356347
}
348+
} else {
349+
colStat.distinctCount.filter(_ > 0).map(ndv => 1.0 / ndv.toDouble)
350+
}
357351

358-
} else { // not in interval
359-
Some(0.0)
352+
if (update && percent.isDefined) {
353+
// We update ColumnStat structure after apply this equality predicate:
354+
// Set distinctCount to 1, nullCount to 0, and min/max values (if exist) to the literal value.
355+
val newStats = attr.dataType match {
356+
case StringType | BinaryType =>
357+
colStat.copy(distinctCount = Some(1), nullCount = Some(0))
358+
case _ =>
359+
colStat.copy(distinctCount = Some(1), min = Some(literal.value),
360+
max = Some(literal.value), nullCount = Some(0))
361+
}
362+
colStatsMap.update(attr, newStats)
360363
}
364+
365+
percent
361366
}
362367

363368
/**
@@ -409,9 +414,12 @@ case class FilterEstimation(plan: Filter) extends Logging {
409414
// use [min, max] to filter the original hSet
410415
dataType match {
411416
case _: NumericType | BooleanType | DateType | TimestampType =>
412-
if (ndv.toDouble == 0 || colStat.min.isEmpty || colStat.max.isEmpty) {
417+
if (ndv.toDouble == 0) {
413418
return Some(0.0)
414419
}
420+
if (colStat.min.isEmpty || colStat.max.isEmpty) {
421+
return None
422+
}
415423

416424
val statsInterval =
417425
ValueInterval(colStat.min, colStat.max, dataType).asInstanceOf[NumericValueInterval]

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,36 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
211211
expectedRowCount = 1)
212212
}
213213

214+
test("cint = 2 - missing min/max but has NDV") {
215+
val attrIntNoMinMax = AttributeReference("cint_no_min_max", IntegerType)()
216+
val colStatIntNoMinMax = ColumnStat(distinctCount = Some(10), min = None, max = None,
217+
nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
218+
val childPlan = StatsTestPlan(
219+
outputList = Seq(attrIntNoMinMax),
220+
rowCount = 10L,
221+
attributeStats = AttributeMap(Seq(attrIntNoMinMax -> colStatIntNoMinMax))
222+
)
223+
validateEstimatedStats(
224+
Filter(EqualTo(attrIntNoMinMax, Literal(2)), childPlan),
225+
Seq(attrIntNoMinMax -> colStatIntNoMinMax.copy(distinctCount = Some(1),
226+
min = Some(2), max = Some(2), nullCount = Some(0))),
227+
expectedRowCount = 1)
228+
}
229+
230+
test("cint = 2 - missing all column stats") {
231+
val attrIntNoStats = AttributeReference("cint_no_stats", IntegerType)()
232+
val colStatIntNoStats = ColumnStat()
233+
val childPlan = StatsTestPlan(
234+
outputList = Seq(attrIntNoStats),
235+
rowCount = 10L,
236+
attributeStats = AttributeMap(Seq(attrIntNoStats -> colStatIntNoStats))
237+
)
238+
validateEstimatedStats(
239+
Filter(EqualTo(attrIntNoStats, Literal(2)), childPlan),
240+
Seq(attrIntNoStats -> colStatIntNoStats),
241+
expectedRowCount = 10)
242+
}
243+
214244
test("cint <=> 2") {
215245
validateEstimatedStats(
216246
Filter(EqualNullSafe(attrInt, Literal(2)), childStatsTestPlan(Seq(attrInt), 10L)),
@@ -387,6 +417,21 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
387417
expectedRowCount = 3)
388418
}
389419

420+
test("cint IN (3, 4, 5) - missing min/max but has NDV") {
421+
val attrIntNoMinMax = AttributeReference("cint_no_min_max", IntegerType)()
422+
val colStatIntNoMinMax = ColumnStat(distinctCount = Some(10), min = None, max = None,
423+
nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
424+
val childPlan = StatsTestPlan(
425+
outputList = Seq(attrIntNoMinMax),
426+
rowCount = 10L,
427+
attributeStats = AttributeMap(Seq(attrIntNoMinMax -> colStatIntNoMinMax))
428+
)
429+
validateEstimatedStats(
430+
Filter(InSet(attrIntNoMinMax, Set(3, 4, 5)), childPlan),
431+
Seq(attrIntNoMinMax -> colStatIntNoMinMax),
432+
expectedRowCount = 10)
433+
}
434+
390435
test("evaluateInSet with all zeros") {
391436
validateEstimatedStats(
392437
Filter(InSet(attrString, Set(3, 4, 5)),

0 commit comments

Comments
 (0)