Skip to content

Commit e684fc9

Browse files
committed
[SPARK-57512][SQL] Introduce DelegateExpression, a logical-plan wrapper stripped before planning
`DelegateExpression` is a transparent, named delegate over a `definition` expression. It lets a high-level function (e.g. `right(a, b)`) stay readable in the analyzed/optimized logical plan, and lets optimizer rules introduce such nodes (e.g. `multi_get_json_object`), without hand-written `eval`/`doGenCode` -- every behavior delegates to `definition`, a real child fully visible to the analyzer and optimizer. `name`/`inputs` are purely informational; nothing enforces that `definition` matches them, so the wrapper is never exposed to physical planning or external systems. `LowerDelegateExpression` strips it to `definition` in `QueryExecution.createSparkPlan` -- the single entry point to the planner, used by both the main query and AQE re-planning -- so the planner and every physical consumer (join-key extraction, V1 / cached-batch pushdown, columnar rules, codegen) sees the real executed expression. Data source V2 pushdown runs earlier, in the logical optimizer, so it unfolds the wrapper directly in `V2ExpressionBuilder`. `eval`/`doGenCode` still delegate, as a safety net. `RuntimeReplaceable` is left exactly as on master. `MultiGetJsonObject` is rebuilt on `DelegateExpression` (Invoke definition, dropping hand-written eval/codegen); `right` is migrated via a new `DelegateFunction` (an `ExpressionBuilder`) to demonstrate the authoring path. Co-authored-by: Isaac
1 parent 64197c9 commit e684fc9

16 files changed

Lines changed: 635 additions & 112 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,9 @@ class Analyzer(
583583
Seq(ResolveUpdateEventTimeWatermarkColumn) ++
584584
extendedResolutionRules ++
585585
Seq(NameStreamingSources) : _*),
586-
Batch("Remove TempResolvedColumn", Once, RemoveTempResolvedColumn),
586+
Batch("Remove analysis-only markers", Once,
587+
RemoveTempResolvedColumn,
588+
RemoveInputTypeMarkers),
587589
Batch("Post-Hoc Resolution", Once,
588590
Seq(ResolveCommandsWithIfExists) ++
589591
postHocResolutionRules: _*),
@@ -4391,6 +4393,21 @@ object EliminateUnions extends Rule[LogicalPlan] {
43914393
}
43924394
}
43934395

4396+
/**
4397+
* Removes the analysis-only input-type markers ([[ImplicitCastInput]] / [[TypeCheckInput]]) that a
4398+
* [[DelegateFunction]] inserts to drive or check implicit cast. Once type coercion has run they have
4399+
* served their purpose, so we strip them at the end of analysis, leaving a clean `definition` in the
4400+
* [[DelegateExpression]]. Like [[RemoveTempResolvedColumn]], this just unwraps a marker to its
4401+
* child; it is not load-bearing -- a `DelegateExpression` is correct with or without the markers.
4402+
*/
4403+
object RemoveInputTypeMarkers extends Rule[LogicalPlan] {
4404+
override def apply(plan: LogicalPlan): LogicalPlan =
4405+
plan.resolveExpressionsWithPruning(_.containsPattern(INPUT_TYPE_MARKER)) {
4406+
case marker: ImplicitCastInput => marker.child
4407+
case marker: TypeCheckInput => marker.child
4408+
}
4409+
}
4410+
43944411
/**
43954412
* Cleans up unnecessary Aliases inside the plan. Basically we only need Alias as a top level
43964413
* expression in Project(project list) or Aggregate(aggregate expressions) or

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ object FunctionRegistry {
661661
expression[Substring]("substr", true),
662662
expression[Substring]("substring"),
663663
expression[Left]("left"),
664-
expression[Right]("right"),
664+
expressionBuilder("right", Right),
665665
expression[SubstringIndex]("substring_index"),
666666
expression[StringTranslate]("translate"),
667667
expression[StringTrim]("trim"),

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,8 @@ class ResolverGuard(
558558
_: TryValidateUTF8 | _: StringReplace | _: Overlay | _: StringTranslate | _: FindInSet |
559559
_: String2TrimExpression | _: StringTrimBoth | _: StringInstr | _: SubstringIndex |
560560
_: StringLocate | _: StringLPad | _: BinaryPad | _: StringRPad | _: FormatString |
561-
_: InitCap | _: StringRepeat | _: StringSpace | _: Substring | _: Right | _: Left |
561+
_: InitCap | _: StringRepeat | _: StringSpace | _: Substring | _: DelegateExpression |
562+
_: Left |
562563
_: Length | _: BitLength | _: OctetLength | _: Levenshtein | _: SoundEx | _: Ascii |
563564
_: Chr | _: Base64 | _: UnBase64 | _: Decode | _: StringDecode | _: Encode | _: ToBinary |
564565
_: FormatNumber | _: Sentences | _: StringSplitSQL | _: SplitPart | _: Empty2Null |
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions
19+
20+
import org.apache.spark.sql.catalyst.InternalRow
21+
import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder
22+
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
23+
import org.apache.spark.sql.catalyst.trees.UnaryLike
24+
import org.apache.spark.sql.catalyst.trees.TreePattern.{DELEGATE_EXPRESSION, INPUT_TYPE_MARKER, TreePattern}
25+
import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, DataType}
26+
27+
/**
28+
* A transparent, named delegate over a `definition` expression -- a LOGICAL-phase construct.
29+
*
30+
* `DelegateExpression` lets a high-level function (e.g. `right(a, b)`) stay readable in the analyzed
31+
* and optimized logical plan, and lets optimizer rules introduce such nodes (e.g.
32+
* `multi_get_json_object`), without hand-written `eval`/`doGenCode`. Every behavior delegates to
33+
* `definition`, a real child fully visible to the analyzer and optimizer.
34+
*
35+
* `name`/`inputs` are purely informational (EXPLAIN/SQL): nothing enforces that `definition` matches
36+
* what they claim, so the wrapper is never exposed to physical planning or external systems.
37+
* `LowerDelegateExpression` strips it to `definition` in `QueryExecution.createSparkPlan` -- the
38+
* single entry point to the planner, used by both the main query and AQE re-planning -- so the
39+
* planner and every physical consumer (join-key extraction, data source pushdown, columnar rules,
40+
* codegen) sees the real executed expression. (Data source V2 pushdown runs earlier, in the logical
41+
* optimizer, so it unfolds the wrapper directly in `V2ExpressionBuilder`.) The wrapper survives the
42+
* logical optimizer, so the optimized plan stays readable and optimizer rules can introduce these
43+
* nodes; `eval`/`doGenCode` still delegate, as a safety net if a delegate ever reaches execution.
44+
*
45+
* Note: because the strip runs before planning, a `DelegateExpression` created by a *physical* rule
46+
* (after `createSparkPlan`) is not stripped and may reach an external system un-lowered. That is
47+
* acceptable -- like any other expression the system does not recognize, it simply falls back, and
48+
* `eval`/`doGenCode` keep it correct within Spark. Analysis- and optimizer-inserted nodes (the
49+
* common case) are always stripped, so physical-rule insertion is the only uncovered path.
50+
*/
51+
case class DelegateExpression(
52+
name: String,
53+
inputs: Seq[Expression],
54+
definition: Expression)
55+
extends Expression with UnaryLike[Expression] {
56+
57+
override def child: Expression = definition
58+
override def dataType: DataType = definition.dataType
59+
override def nullable: Boolean = definition.nullable
60+
override def foldable: Boolean = definition.foldable
61+
override lazy val deterministic: Boolean = definition.deterministic
62+
override lazy val canonicalized: Expression = definition.canonicalized
63+
64+
override def eval(input: InternalRow): Any = definition.eval(input)
65+
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode =
66+
definition.genCode(ctx)
67+
68+
final override val nodePatterns: Seq[TreePattern] = Seq(DELEGATE_EXPRESSION)
69+
override protected def withNewChildInternal(newChild: Expression): DelegateExpression =
70+
copy(definition = newChild)
71+
72+
override def prettyName: String = name
73+
override def sql: String = s"$name(${inputs.map(_.sql).mkString(", ")})"
74+
override def toString: String = s"$name(${inputs.mkString(", ")})"
75+
}
76+
77+
/**
78+
* Analysis-only marker that requests an implicit cast of `child` to `expectedType`: it declares the
79+
* expected type so the standard `TypeCoercion` rule casts the child, then is removed at the end of
80+
* analysis by [[org.apache.spark.sql.catalyst.analysis.RemoveInputTypeMarkers]]. It never reaches
81+
* execution, hence [[Unevaluable]]. Modeled on [[org.apache.spark.sql.catalyst.analysis.TempResolvedColumn]].
82+
*/
83+
case class ImplicitCastInput(child: Expression, expectedType: AbstractDataType)
84+
extends UnaryExpression with Unevaluable with ImplicitCastInputTypes {
85+
override def inputTypes: Seq[AbstractDataType] = Seq(expectedType)
86+
override def dataType: DataType = child.dataType
87+
override def nullable: Boolean = child.nullable
88+
override lazy val canonicalized: Expression = child.canonicalized
89+
final override val nodePatterns: Seq[TreePattern] = Seq(INPUT_TYPE_MARKER)
90+
override protected def withNewChildInternal(newChild: Expression): ImplicitCastInput =
91+
copy(child = newChild)
92+
}
93+
94+
/**
95+
* Analysis-only marker that requires `child` to already match `expectedType` (no cast is inserted),
96+
* failing analysis otherwise. Removed at the end of analysis like [[ImplicitCastInput]].
97+
*/
98+
case class TypeCheckInput(child: Expression, expectedType: AbstractDataType)
99+
extends UnaryExpression with Unevaluable with ExpectsInputTypes {
100+
override def inputTypes: Seq[AbstractDataType] = Seq(expectedType)
101+
override def dataType: DataType = child.dataType
102+
override def nullable: Boolean = child.nullable
103+
override lazy val canonicalized: Expression = child.canonicalized
104+
final override val nodePatterns: Seq[TreePattern] = Seq(INPUT_TYPE_MARKER)
105+
override protected def withNewChildInternal(newChild: Expression): TypeCheckInput =
106+
copy(child = newChild)
107+
}
108+
109+
/**
110+
* The per-function object each built-in function defines (e.g. `object Right extends
111+
* DelegateFunction`). It is just an [[ExpressionBuilder]] -- registered with the ordinary
112+
* `expressionBuilder(...)`, with its `@ExpressionDescription` annotation read off the object as
113+
* usual -- specialized for the delegate pattern: replace the `InheritAnalysisRules` ceremony with
114+
* one `lower` method plus a couple of flags. `apply` is the direct-construction entry point.
115+
*
116+
* Input-type contract, covering all three cases (applied per argument):
117+
* - `inputTypes` empty (or `AnyDataType` for a position): accept any type (no check, no cast).
118+
* - `inputTypes` set, `implicitCast = true` (default): implicit-cast each arg to its type.
119+
* - `inputTypes` set, `implicitCast = false` : type-check each arg, no cast.
120+
*/
121+
trait DelegateFunction extends ExpressionBuilder {
122+
def name: String
123+
def inputTypes: Seq[AbstractDataType] = Nil
124+
def implicitCast: Boolean = true
125+
126+
/** Lower the function into the expression it delegates to. */
127+
def lower(args: Seq[Expression]): Expression
128+
129+
/**
130+
* ExpressionBuilder contract: invoked by the registry during function resolution. ONLY this
131+
* (analysis-time) path inserts the input-type markers, because the analyzer's `TypeCoercion`
132+
* casts them and `RemoveInputTypeMarkers` strips them afterwards.
133+
*/
134+
override def build(funcName: String, expressions: Seq[Expression]): Expression = {
135+
val args = expressions.zipWithIndex.map { case (e, i) =>
136+
val expected = if (i < inputTypes.length) inputTypes(i) else AnyDataType
137+
expected match {
138+
case AnyDataType => e
139+
case t if implicitCast => ImplicitCastInput(e, t)
140+
case t => TypeCheckInput(e, t)
141+
}
142+
}
143+
DelegateExpression(name, expressions, lower(args))
144+
}
145+
146+
/**
147+
* Direct construction for use anywhere, including optimizer rules. Unlike [[build]] this inserts
148+
* NO input-type markers -- there is no analyzer pass left to coerce or strip them -- so callers
149+
* must pass arguments that are already resolved and of the expected types, exactly as when
150+
* constructing any other expression (`Add`, `Substring`, ...) after analysis. The resolved
151+
* precondition is asserted so misuse fails loudly here rather than later.
152+
*/
153+
final def apply(inputs: Expression*): DelegateExpression = {
154+
require(inputs.forall(_.resolved),
155+
s"$name: arguments to DelegateFunction.apply must be resolved; use it after analysis " +
156+
"(e.g. in optimizer rules) with already-typed arguments, or register the function and " +
157+
"let the analyzer build it")
158+
DelegateExpression(name, inputs, lower(inputs))
159+
}
160+
161+
def unapply(e: Expression): Option[Seq[Expression]] = e match {
162+
case d: DelegateExpression if d.name == name => Some(d.inputs)
163+
case _ => None
164+
}
165+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala

Lines changed: 49 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -161,70 +161,62 @@ object GetJsonObject {
161161
}
162162

163163
/**
164-
* Extracts multiple simple named paths from a JSON string in one parse. This is an internal
165-
* expression used to share sibling [[GetJsonObject]] expressions; unsupported and
166-
* prefix-conflicting JSON paths remain as independent GetJsonObject expressions.
164+
* Builds the internal expression that extracts multiple simple named paths from a JSON string in one
165+
* parse, used to share sibling [[GetJsonObject]] expressions; unsupported and prefix-conflicting
166+
* paths remain as independent `GetJsonObject` expressions.
167+
*
168+
* It is inserted by `OptimizeCsvJsonExprs` (after analysis, so its inputs are resolved), and is the
169+
* optimizer-constructed showcase for [[DelegateExpression]]: instead of hand-written eval/doGenCode,
170+
* it builds a typed delegate directly -- the high-level call `multi_get_json_object(json, p1, .., pn)`
171+
* stays visible via `inputs`, while the `definition` delegates evaluation to
172+
* [[MultiGetJsonObjectEvaluator]] through an `Invoke`. No rewrite step: the delegate runs as-is.
167173
*/
168-
case class MultiGetJsonObject(
169-
json: Expression,
170-
fallbackPaths: Seq[String])
171-
extends UnaryExpression
172-
with ExpectsInputTypes {
173-
174-
// OptimizeCsvJsonExprs caps shared path depth to keep evaluator recursion stack-safe.
175-
require(fallbackPaths.nonEmpty)
176-
177-
override def child: Expression = json
178-
179-
override def inputTypes: Seq[AbstractDataType] =
180-
Seq(StringTypeWithCollation(supportsTrimCollation = true))
181-
182-
override lazy val dataType: DataType = StructType(fallbackPaths.indices.map { index =>
183-
StructField(s"_$index", StringType, nullable = true)
184-
})
185-
186-
override def nullable: Boolean = true
187-
188-
// This internal unary expression always returns null when its JSON child is null.
189-
override def nullIntolerant: Boolean = true
190-
191-
override def prettyName: String = "multi_get_json_object"
192-
193-
final override val nodePatterns: Seq[TreePattern] = Seq(GET_JSON_OBJECT)
194-
195-
@transient
196-
private lazy val namedPaths = fallbackPaths.map { path =>
197-
GetJsonObject.simpleNamedPath(UTF8String.fromString(path)).getOrElse {
198-
throw new IllegalArgumentException(s"Unsupported shared JSON path: $path")
174+
object MultiGetJsonObject {
175+
val name: String = "multi_get_json_object"
176+
177+
def apply(json: Expression, fallbackPaths: Seq[String]): DelegateExpression = {
178+
// OptimizeCsvJsonExprs caps shared path depth to keep evaluator recursion stack-safe.
179+
require(fallbackPaths.nonEmpty)
180+
val resultType = StructType(fallbackPaths.indices.map { index =>
181+
StructField(s"_$index", StringType, nullable = true)
182+
})
183+
val namedPaths = fallbackPaths.map { path =>
184+
GetJsonObject.simpleNamedPath(UTF8String.fromString(path)).getOrElse {
185+
throw new IllegalArgumentException(s"Unsupported shared JSON path: $path")
186+
}
199187
}
188+
val evaluator =
189+
MultiGetJsonObjectEvaluator(fallbackPaths.map(UTF8String.fromString), namedPaths)
190+
// `propagateNull = true` reproduces the old null-intolerant behavior: null json -> null result.
191+
val definition = Invoke(
192+
Literal.create(evaluator, ObjectType(classOf[MultiGetJsonObjectEvaluator])),
193+
"evaluate",
194+
resultType,
195+
Seq(json),
196+
Seq(json.dataType),
197+
returnNullable = true)
198+
// `inputs` keeps the high-level call visible: the json plus one string literal per path.
199+
val pathInputs = fallbackPaths.map(p => Literal(UTF8String.fromString(p), StringType))
200+
DelegateExpression(name, json +: pathInputs, definition)
200201
}
201202

202-
@transient
203-
private lazy val evaluator = MultiGetJsonObjectEvaluator(
204-
fallbackPaths.map(UTF8String.fromString),
205-
namedPaths)
206-
207-
override def eval(input: InternalRow): Any = {
208-
evaluator.evaluate(json.eval(input).asInstanceOf[UTF8String])
203+
/** Recovers `(json, fallbackPaths)` from a delegate produced by `apply`. */
204+
def unapply(e: Expression): Option[(Expression, Seq[String])] = e match {
205+
case d: DelegateExpression if d.name == name =>
206+
val paths = d.inputs.tail.map {
207+
case Literal(p: UTF8String, _: StringType) => p.toString
208+
case other => throw new IllegalStateException(s"Unexpected path input: $other")
209+
}
210+
Some((d.inputs.head, paths))
211+
case _ => None
209212
}
210213

211-
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
212-
val refEvaluator = ctx.addReferenceObj("evaluator", evaluator)
213-
val jsonEval = json.genCode(ctx)
214-
val resultType = CodeGenerator.javaType(dataType)
215-
ev.copy(code = code"""
216-
|${jsonEval.code}
217-
|boolean ${ev.isNull} = ${jsonEval.isNull};
218-
|$resultType ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
219-
|if (!${ev.isNull}) {
220-
| ${ev.value} = ($resultType) $refEvaluator.evaluate(${jsonEval.value});
221-
| ${ev.isNull} = ${ev.value} == null;
222-
|}
223-
|""".stripMargin)
224-
}
214+
def isInstance(e: Expression): Boolean = unapply(e).isDefined
225215

226-
override protected def withNewChildInternal(newChild: Expression): MultiGetJsonObject =
227-
copy(json = newChild)
216+
def pathsOf(e: Expression): Seq[String] = unapply(e) match {
217+
case Some((_, paths)) => paths
218+
case None => throw new IllegalArgumentException(s"Not a multi_get_json_object: $e")
219+
}
228220
}
229221

230222
// scalastyle:off line.size.limit line.contains.tab

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2359,29 +2359,24 @@ case class Substring(str: Expression, pos: Expression, len: Expression)
23592359
since = "2.3.0",
23602360
group = "string_funcs")
23612361
// scalastyle:on line.size.limit
2362-
case class Right(str: Expression, len: Expression) extends RuntimeReplaceable
2363-
with ImplicitCastInputTypes with BinaryLike[Expression] {
2364-
2365-
override lazy val replacement: Expression = If(
2366-
IsNull(str),
2367-
Literal(null, str.dataType),
2368-
If(
2369-
LessThanOrEqual(len, Literal(0)),
2370-
Literal(UTF8String.EMPTY_UTF8, str.dataType),
2371-
new Substring(str, UnaryMinus(len, failOnError = false))
2372-
)
2373-
)
2362+
object Right extends DelegateFunction {
2363+
override val name: String = "right"
23742364

23752365
override def inputTypes: Seq[AbstractDataType] =
2376-
Seq(
2377-
StringTypeWithCollation(supportsTrimCollation = true),
2378-
IntegerType
2379-
)
2380-
override def left: Expression = str
2381-
override def right: Expression = len
2382-
override protected def withNewChildrenInternal(
2383-
newLeft: Expression, newRight: Expression): Expression = {
2384-
copy(str = newLeft, len = newRight)
2366+
Seq(StringTypeWithCollation(supportsTrimCollation = true), IntegerType)
2367+
2368+
// NOTE: runs at parse time on unresolved args, so it must not read an input's `.dataType`.
2369+
// The `If` branch types are unified later by type coercion.
2370+
override def lower(args: Seq[Expression]): Expression = {
2371+
val str = args(0)
2372+
val len = args(1)
2373+
If(
2374+
IsNull(str),
2375+
Literal(null, StringType),
2376+
If(
2377+
LessThanOrEqual(len, Literal(0)),
2378+
Literal(UTF8String.EMPTY_UTF8, StringType),
2379+
new Substring(str, UnaryMinus(len, failOnError = false))))
23852380
}
23862381
}
23872382

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ object TreePattern extends Enumeration {
3939
val AVERAGE: Value = Value
4040
val BINARY_ARITHMETIC: Value = Value
4141
val BINARY_COMPARISON: Value = Value
42+
val DELEGATE_EXPRESSION: Value = Value
43+
val INPUT_TYPE_MARKER: Value = Value
4244
val CASE_WHEN: Value = Value
4345
val CAST: Value = Value
4446
val COALESCE: Value = Value

0 commit comments

Comments
 (0)