Skip to content

Commit 8436c7c

Browse files
committed
[SPARK-57512][SQL] Make RuntimeReplaceable survival opt-in via eagerReplace
Introduce `RuntimeReplaceable.eagerReplace` (default `true`) so existing expressions keep being rewritten eagerly by `ReplaceExpressions`, while an expression can opt into surviving into the physical plan by overriding it to `false`. This enables incremental migration of expressions. `ReplaceExpressions` rewrites eagerly when `eagerReplace` is true or when the replacement cannot survive (non-deterministic or unevaluable); otherwise the node survives and is materialized into its replacement by `MaterializeRuntimeReplaceable` right before codegen. Add `MaterializeRuntimeReplaceableSuite` proving an optimizer rule can insert a surviving `RuntimeReplaceable` that executes correctly. Co-authored-by: Isaac
1 parent a036ed7 commit 8436c7c

11 files changed

Lines changed: 138 additions & 41 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -457,8 +457,9 @@ trait RuntimeReplaceable extends Expression {
457457
// The actual evaluation is delegated to `replacement`, so determinism must reflect `replacement`,
458458
// not this expression's `children` (which are the original arguments). For example, the children
459459
// of `Uniform` are literal bounds and a seed (all deterministic), while its `replacement` is a
460-
// non-deterministic `Rand`. This matters now that a deterministic `RuntimeReplaceable` may
461-
// survive into the physical plan: the survival decision relies on an accurate determinism signal.
460+
// non-deterministic `Rand`. This matters once a `RuntimeReplaceable` may survive into the
461+
// physical plan (see `eagerReplace`): the survival decision relies on an accurate determinism
462+
// signal.
462463
override lazy val deterministic: Boolean = replacement.deterministic
463464
// Foldability is also derived from `replacement` rather than this expression's `children`. Note
464465
// that this can yield a foldable expression that still has references (e.g. `collation(c1)`,
@@ -471,12 +472,23 @@ trait RuntimeReplaceable extends Expression {
471472
// are semantically equal.
472473
override lazy val canonicalized: Expression = replacement.canonicalized
473474

475+
// Whether `ReplaceExpressions` should rewrite this expression into its `replacement` eagerly, in
476+
// the logical optimizer. This is `true` by default, which preserves the historical behavior where
477+
// a `RuntimeReplaceable` never reaches the physical plan. An expression can override this to
478+
// `false` to survive into the physical plan (e.g. so a native engine can match the high-level
479+
// expression directly); such a survivor is materialized into its `replacement` right before
480+
// codegen by `MaterializeRuntimeReplaceable`. Note that an expression that opts out can still be
481+
// rewritten eagerly if its `replacement` cannot survive (non-deterministic or unevaluable); see
482+
// `ReplaceExpressions`.
483+
def eagerReplace: Boolean = true
484+
474485
// `RuntimeReplaceable` expressions are normally rewritten into their `replacement` by the
475-
// `ReplaceExpressions` rule before execution. However, a `RuntimeReplaceable` may also be
476-
// produced *after* `ReplaceExpressions` has run (e.g. by an optimizer rule). To keep such an
477-
// expression evaluable without depending on the rewrite, both `eval` and `doGenCode` delegate
478-
// to `replacement`. As `replacement` is derived from this expression's children, it is bound
479-
// and code-generated together with them, so the delegation observes the same input row.
486+
// `ReplaceExpressions` rule before execution. However, an expression with `eagerReplace = false`
487+
// survives into the physical plan, and a `RuntimeReplaceable` may also be produced *after*
488+
// `ReplaceExpressions` has run (e.g. by an optimizer rule). To keep such an expression evaluable
489+
// without depending on the rewrite, both `eval` and `doGenCode` delegate to `replacement`. As
490+
// `replacement` is derived from this expression's children, it is bound and code-generated
491+
// together with them, so the delegation observes the same input row.
480492
final override def eval(input: InternalRow = null): Any = replacement.eval(input)
481493

482494
final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,18 @@ object ReplaceExpressions extends Rule[LogicalPlan] {
5858

5959
case r: RuntimeReplaceable =>
6060
val replaced = replace(r.replacement)
61-
// Only a deterministic, fully-evaluable replacement may survive into the physical plan, to be
62-
// matched by a native engine or materialized just before codegen. The other cases must be
63-
// rewritten early instead:
61+
// By default (`eagerReplace = true`) a `RuntimeReplaceable` is rewritten here, so it never
62+
// reaches the physical plan. An expression with `eagerReplace = false` is instead allowed to
63+
// survive into the physical plan (to be matched by a native engine or materialized just
64+
// before codegen). Even then, a survivor must be rewritten early if its replacement cannot
65+
// survive:
6466
// - A non-deterministic replacement (e.g. the `Rand` inside `uniform`) carries mutable
6567
// per-partition state that must be initialized before eval. That state cannot be reliably
6668
// initialized through the `lazy val replacement`, which tree transforms may re-create.
6769
// - A replacement that contains an `Unevaluable` expression (e.g. `With`) depends on a later
6870
// logical rule (such as `RewriteWithExpression`) that can only run in the logical phase.
69-
// A foldable replacement (e.g. `collation(c1)`) is allowed to survive here; `ConstantFolding`
70-
// later materializes it into a literal.
71-
if (replaced.deterministic && !replaced.exists(_.isInstanceOf[Unevaluable])) r else replaced
71+
val cannotSurvive = !replaced.deterministic || replaced.exists(_.isInstanceOf[Unevaluable])
72+
if (r.eagerReplace || cannotSurvive) replaced else r
7273

7374
case _ => e.mapChildren(replace)
7475
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.SparkException
2121
import org.apache.spark.sql.AnalysisException
2222
import org.apache.spark.sql.catalyst.dsl.expressions._
2323
import org.apache.spark.sql.catalyst.dsl.plans._
24-
import org.apache.spark.sql.catalyst.expressions.{Add, Alias, ArrayCompact, AttributeReference, CreateArray, CreateStruct, Expression, IntegerLiteral, Literal, MapFromEntries, Multiply, NamedExpression, NullIf, Remainder, RuntimeReplaceable}
24+
import org.apache.spark.sql.catalyst.expressions.{Add, Alias, ArrayCompact, AttributeReference, CreateArray, CreateStruct, IntegerLiteral, Literal, MapFromEntries, Multiply, NamedExpression, NullIf, Remainder, RuntimeReplaceable}
2525
import org.apache.spark.sql.catalyst.expressions.aggregate.Sum
2626
import org.apache.spark.sql.catalyst.plans.PlanTest
2727
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LocalRelation, LogicalPlan, OneRowRelation, Project}
@@ -347,20 +347,11 @@ class OptimizerSuite extends PlanTest {
347347
val plan = Project(Alias(nullIf, "out")() :: Nil, OneRowRelation()).analyze
348348
val optimized = optimizer.execute(plan)
349349

350-
// NullIf is now deterministic and evaluable, so it survives ReplaceExpressions and is only
351-
// unfolded at the physical layer (MaterializeRuntimeReplaceable). Materialize it here to
352-
// inspect the form that reaches execution.
353-
def materialize(e: Expression): Expression = e match {
354-
case r: RuntimeReplaceable => materialize(r.replacement)
355-
case _ => e.mapChildren(materialize)
356-
}
357-
val materialized = optimized.expressions.map(materialize)
358-
359-
assert(materialized.exists(_.exists {
350+
assert(optimized.expressions.exists(_.exists {
360351
case Literal(null, BooleanType) => true
361352
case _ => false
362353
}))
363-
assert(materialized.forall(!_.exists(_.isInstanceOf[RuntimeReplaceable])))
354+
assert(optimized.expressions.forall(!_.exists(_.isInstanceOf[RuntimeReplaceable])))
364355
}
365356
}
366357
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Project [collation(g#0) AS collation(g)#0]
1+
Project [SYSTEM.BUILTIN.UTF8_BINARY AS collation(g)#0]
22
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Project [schema_of_csv(1|abc, (sep,|)) AS schema_of_csv(1|abc)#0]
1+
Project [invoke(SchemaOfCsvEvaluator(Map(sep -> |)).evaluate(1|abc)) AS schema_of_csv(1|abc)#0]
22
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Project [schema_of_json([{"col":01}]) AS schema_of_json([{"col":01}])#0]
1+
Project [invoke(SchemaOfJsonEvaluator(Map()).evaluate([{"col":01}])) AS schema_of_json([{"col":01}])#0]
22
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Project [schema_of_json([{"col":01}], (allowNumericLeadingZeros,true)) AS schema_of_json([{"col":01}])#0]
1+
Project [invoke(SchemaOfJsonEvaluator(Map(allowNumericLeadingZeros -> true)).evaluate([{"col":01}])) AS schema_of_json([{"col":01}])#0]
22
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Project [tryeval(to_binary(g#0, Some(format), true)) AS try_to_binary(g, format)#0]
1+
Project [tryeval(null) AS try_to_binary(g, format)#0]
22
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]

sql/core/src/main/scala/org/apache/spark/sql/execution/MaterializeRuntimeReplaceable.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ import org.apache.spark.sql.catalyst.rules.Rule
2222
import org.apache.spark.sql.catalyst.trees.TreePattern.RUNTIME_REPLACEABLE
2323

2424
/**
25-
* Prototype: materializes any [[RuntimeReplaceable]] that survived the logical optimizer into its
26-
* `replacement`, on the physical plan, right before whole-stage codegen.
25+
* Materializes any [[RuntimeReplaceable]] that survived the logical optimizer into its
26+
* `replacement`, on the physical plan.
2727
*
28-
* In this prototype `RuntimeReplaceable` is intentionally left in the plan by the optimizer (scalar
29-
* `ReplaceExpressions` is disabled) so that a native engine (Photon) could match the semantic
30-
* expression directly. This rule then materializes the replacement for the Spark execution path, so
31-
* Spark codegen/interpreted evaluation behaves exactly as today. Placed after the columnar/native
32-
* conversion and before `CollapseCodegenStages`, so a native engine sees the origin while Spark
33-
* sees the replacement.
28+
* A `RuntimeReplaceable` with `eagerReplace = false` is intentionally kept in the plan by the
29+
* optimizer (see `ReplaceExpressions`) so that a native engine can match the high-level expression
30+
* directly. This rule then materializes the replacement for the Spark execution path, so Spark
31+
* codegen/interpreted evaluation behaves exactly as today. It is placed after the columnar/native
32+
* conversion and before `CollapseCodegenStages`, so a native engine sees the original expression
33+
* while Spark whole-stage codegen never sees a `RuntimeReplaceable`.
3434
*/
3535
object MaterializeRuntimeReplaceable extends Rule[SparkPlan] {
3636
override def apply(plan: SparkPlan): SparkPlan = plan.transformUpWithSubqueries {

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -775,9 +775,9 @@ object QueryExecution {
775775
DisableUnnecessaryBucketedScan,
776776
ApplyColumnarRulesAndInsertTransitions(
777777
sparkSession.sessionState.columnarRules, outputsColumnar = false),
778-
// Prototype: materialize any RuntimeReplaceable that survived the optimizer into its
779-
// replacement for the Spark execution path. After columnar/native conversion (so a native
780-
// engine sees the origin), before codegen (so Spark codegen never sees a RuntimeReplaceable).
778+
// Materialize any RuntimeReplaceable that survived the optimizer into its replacement for the
779+
// Spark execution path. After columnar/native conversion (so a native engine sees the original
780+
// expression), before codegen (so Spark codegen never sees a RuntimeReplaceable).
781781
MaterializeRuntimeReplaceable,
782782
CollapseCodegenStages()) ++
783783
(if (subquery) {

0 commit comments

Comments
 (0)