-
Notifications
You must be signed in to change notification settings - Fork 331
fix: rebalance deep AND/OR chains to avoid protobuf recursion limit #4531
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,7 @@ import java.time.{Duration, Period} | |
| import scala.util.Random | ||
|
|
||
| import org.apache.hadoop.fs.Path | ||
| import org.apache.spark.sql.{CometTestBase, DataFrame, Row} | ||
| import org.apache.spark.sql.{Column, CometTestBase, DataFrame, Row} | ||
| import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, FromUnixTime, Literal, StructsToJson, TruncDate, TruncTimestamp} | ||
| import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps | ||
| import org.apache.spark.sql.comet.CometProjectExec | ||
|
|
@@ -3096,4 +3096,39 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { | |
| } | ||
| } | ||
|
|
||
| test("deep AND/OR predicate chains do not overflow the protobuf recursion limit") { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The operands here are never null, so the associativity-with-nulls case is not exercised. Would it be worth adding a nullable predicate into one of the chains to lock that in? A deep OR in a WHERE clause might also be worth a case, since that is a common trigger and stays intact rather than being split.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added both: the chains now mix in a nullable column so the rebalanced tree is exercised under three-valued logic, and there's a deep |
||
| // A left-deep chain of N associative boolean operands serializes to a proto nested N | ||
| // levels deep. With N > protobuf's default recursion limit (100), the message overflows | ||
| // when the serialized plan is re-parsed (JVM Operator.parseFrom and the Rust prost | ||
| // decoder), failing an otherwise-supported query. Comet evaluates AND/OR vectorially with | ||
| // no short-circuit, so the chain is fully associative and safe to rebalance. | ||
| val n = 200 | ||
| // `_2` is nullable (every 7th row is null) so the rebalanced chain is exercised under SQL | ||
| // three-valued logic, not just true/false operands. | ||
| withParquetTable( | ||
| (0 until 100).map(i => (i, if (i % 7 == 0) None else Some(i.toLong))), | ||
| "tbl") { | ||
| // Build a chain that mixes the non-nullable `_1` with the nullable `_2` so null operands | ||
| // flow through the rebalanced tree. | ||
| def operand(i: Int): Column = | ||
| if (i % 2 == 0) col("_2") > lit(-i) else col("_1") > lit(-i) | ||
|
|
||
| // Project the chains as boolean columns rather than filtering: a top-level filter AND is | ||
| // split by Spark's splitConjunctivePredicates into many shallow pushed predicates, which | ||
| // would hide the deep-nesting. A projected expression survives intact. Distinct literals | ||
| // keep the optimizer from folding the chain; `>`/`<` (not `=`) keeps OptimizeIn from | ||
| // collapsing the OR chain into a single In. | ||
| val andChain = (1 to n).map(operand).reduce(_ && _) | ||
| checkSparkAnswerAndOperator(spark.table("tbl").select(andChain.as("a"))) | ||
|
|
||
| val orChain = (1 to n).map(i => col("_1") < lit(i) || col("_2") < lit(i)).reduce(_ || _) | ||
| checkSparkAnswerAndOperator(spark.table("tbl").select(orChain.as("o"))) | ||
|
|
||
| // A deep OR is a common real-world WHERE clause and, unlike a top-level AND, is NOT split | ||
| // by Spark -- it stays intact as a single deeply-nested predicate, so exercise that path | ||
| // directly. | ||
| checkSparkAnswerAndOperator(spark.table("tbl").where(orChain)) | ||
| } | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small thought: this recurses O(n) deep and the ++ accumulation is O(n^2). It is totally fine for the depths that hit this bug, but since the motivation is deep chains, would an explicit accumulator be worth it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Rewrote it with an explicit work stack and an accumulating buffer instead of recursion, so it's O(n) time and no longer recurses O(n) deep (which on these left-deep chains could itself overflow the stack). Left-to-right operand order is preserved.