[SPARK-57512][SQL] Introduce DelegateExpression, a logical-plan wrapper stripped before planning#56575
[SPARK-57512][SQL] Introduce DelegateExpression, a logical-plan wrapper stripped before planning#56575cloud-fan wants to merge 5 commits into
Conversation
8436c7c to
10bf8bc
Compare
|
cc @sunchao |
HyukjinKwon
left a comment
There was a problem hiding this comment.
1 blocking, 1 non-blocking, 0 nits. Solid, carefully-commented mechanism; the blocking item is that the materialization step isn't wired into the AQE path.
Design / architecture (1)
QueryExecution.scala:781:MaterializeRuntimeReplaceableis added only toQueryExecution.preparations, not to AQE's stage prep. Under AQE (default) a survivingRuntimeReplaceablereachesCollapseCodegenStagesun-materialized -- see inline.
Suggestions (1)
NormalizePlan.scala:94: full-unfold recursion duplicated withMaterializeRuntimeReplaceable.replace; consider a shared helper -- see inline.
Verification
Traced the survive-then-materialize transformation for result-equivalence: eval/doGenCode delegate to replacement; non-deterministic (Rand) and Unevaluable (With) replacements are gated to the eager path; foldable-but-referencing survivors are handled by literal-only FoldablePropagation + same-batch ConstantFolding; NormalizePlan fully unfolds. All result-bearing dimensions are equivalent or gated -- query results stay correct. The one gap is the AQE wiring (inline): AdaptiveSparkPlanExec is a LeafExecNode so the outer rule can't reach the managed subtree, and AQE's reOptimize re-inserts the survivor via OptimizeCsvJsonExprs, so under AQE the survivor reaches codegen (correct via delegation, but the stated invariant/metrics don't hold).
| // Materialize any RuntimeReplaceable that survived the optimizer into its replacement for | ||
| // the Spark execution path. After columnar/native conversion (so a native engine sees the | ||
| // original expression), before codegen (so Spark codegen never sees a RuntimeReplaceable). | ||
| MaterializeRuntimeReplaceable, |
There was a problem hiding this comment.
MaterializeRuntimeReplaceable is added here, to QueryExecution.preparations, but I don't see it in any of AQE's stage-prep lists (queryStagePreparationRules, queryStageOptimizerRules, postStageCreationRules in AdaptiveSparkPlanExec). Under AQE (default-on) that looks like a gap:
InsertAdaptiveSparkPlanruns first inpreparations, so the plan is wrapped inAdaptiveSparkPlanExec(aLeafExecNode) before this rule runs --transformUpWithSubqueriescan't reach the AQE-managed subtree.- AQE's
reOptimizere-runs the full optimizer (optimizer.execute), which includesOptimizeCsvJsonExprs, so the survivingMultiGetJsonObject(eagerReplace = false) is re-inserted on the re-optimized stage. - AQE's
postStageCreationRulesthen runsApplyColumnarRulesAndInsertTransitions->CollapseCodegenStageswith no materialization in between.
So under AQE a RuntimeReplaceable reaches whole-stage codegen un-materialized. Results stay correct because eval/doGenCode now delegate to replacement, but the invariant this PR relies on ("Spark whole-stage codegen never sees a RuntimeReplaceable", and "only the physical plan shows its Invoke replacement") wouldn't hold under AQE -- which affects EXPLAIN and the per-operator-metrics rationale. The peer prep rules (CollapseCodegenStages, ApplyColumnarRulesAndInsertTransitions, EnsureRequirements, ...) are all present in both the non-AQE and AQE lists; this rule is the only one that isn't.
Would adding MaterializeRuntimeReplaceable to AQE's postStageCreationRules (just before collapseCodegenStagesRule) be the right fix? The two new tests use no-shuffle queries that aren't AQE-wrapped, so an AQE-path (shuffle) test that inspects the AQE final plan would cover this.
There was a problem hiding this comment.
Agreed, this was a real gap. Fixed in b80438b by adding MaterializeRuntimeReplaceable to AQE's postStageCreationRules, right before collapseCodegenStagesRule, mirroring QueryExecution.preparations. I confirmed both newQueryStage and newResultQueryStage apply postStageCreationRules, so shuffle stages and the final result stage are both covered. Added an AQE-path test (repartition shuffle) that asserts no RuntimeReplaceable reaches the AQE finalPhysicalPlan and that results are correct.
One minor correction to the mechanism: AQEOptimizer doesn't actually run OptimizeCsvJsonExprs or spark.experimental.extraOptimizations, so the survivor isn't re-inserted during reOptimize. Instead it originates in the main optimizer and persists into the AQE-managed subtree, which the outer preparations rule can't reach (since AdaptiveSparkPlanExec is a LeafExecNode). So the conclusion stands -- AQE-side materialization is required -- just via a slightly different path.
| } | ||
|
|
||
| private def replaceRuntimeReplaceable(e: Expression): Expression = e match { | ||
| case r: RuntimeReplaceable => replaceRuntimeReplaceable(r.replacement) |
There was a problem hiding this comment.
Nit / non-blocking: this full-unfold recursion (case r: RuntimeReplaceable => replaceRuntimeReplaceable(r.replacement) / case _ => e.mapChildren(...)) is identical to MaterializeRuntimeReplaceable.replace. Two copies of the same unfold can drift over time -- might be worth a small shared helper.
There was a problem hiding this comment.
Done in b80438b. Extracted a shared RuntimeReplaceable.unfold helper (companion object in Expression.scala); both NormalizePlan.normalizeRuntimeReplaceable and MaterializeRuntimeReplaceable.apply now call it, removing the two private copies.
sunchao
left a comment
There was a problem hiding this comment.
One additional inline finding. The AQE materialization issue is already covered by the existing unresolved review thread, so I did not duplicate it.
| // initialized through the `lazy val replacement`, which tree transforms may re-create. | ||
| // - A replacement that contains an `Unevaluable` expression (e.g. `With`) depends on a later | ||
| // logical rule (such as `RewriteWithExpression`) that can only run in the logical phase. | ||
| val cannotSurvive = !replaced.deterministic || replaced.exists(_.isInstanceOf[Unevaluable]) |
There was a problem hiding this comment.
[P1] AttributeReference extends Unevaluable, so this condition is true for every replacement that reads a resolved input column. For example, an eagerReplace = false wrapper whose replacement is Add(attr, Literal(1L)) is still returned as replaced below and never reaches physical/native planning. In practice, the opt-out only works for constants or nodes inserted after ReplaceExpressions—which is why both MultiGetJsonObject and the new test avoid this path.
Could we exclude AttributeReference from this check (as ConvertToLocalRelation.hasUnevaluableExpr already does) and add a direct ReplaceExpressions regression test with a column-based wrapper?
There was a problem hiding this comment.
Good catch, you're exactly right. Fixed in b80438b: the survival gate now reuses ConvertToLocalRelation.hasUnevaluableExpr(replaced), which already excludes AttributeReference, so a replacement that reads an input column (e.g. Add(attr, Literal(1))) can survive instead of being forced down the eager path. Added a direct ReplaceExpressions regression test in OptimizerSuite with a column-based wrapper (ColumnBasedRuntimeReplaceable) that fails without the fix and passes with it.
sunchao
left a comment
There was a problem hiding this comment.
One inline P2 follow-up on the AQE table-cache path.
| // re-insert a surviving `RuntimeReplaceable`, e.g. via `OptimizeCsvJsonExprs`), and | ||
| // `AdaptiveSparkPlanExec` is a `LeafExecNode` that the outer `preparations` rule can't reach, | ||
| // so the materialization must also run here. | ||
| MaterializeRuntimeReplaceable, |
There was a problem hiding this comment.
[P2] This fixes AQE exchange and result stages, but newQueryStage's InMemoryTableScanLike branch still deliberately skips postStageCreationRules, so this materialization rule never reaches table-cache scans. InMemoryScans copies logical filters into InMemoryTableScanExec.predicates; after the scan is wrapped in TableCacheQueryStageExec (a leaf), later result-stage traversal cannot reach that hidden plan. A surviving predicate wrapper whose replacement is supported by CachedBatchSerializer.buildFilter (for example, EqualTo(attr, literal)) is therefore left unrecognized, disabling cached-batch pruning even though the separate FilterExec still produces correct rows.
Could we apply MaterializeRuntimeReplaceable explicitly to the optimized InMemoryTableScanLike before wrapping it, and add an AQE cache/filter test that inspects TableCacheQueryStageExec.plan? The current repartition test covers only the result-stage path, and finalPlan.exists does not traverse query-stage plans.
There was a problem hiding this comment.
Good catch — fixed in 50cc6a1, though at a slightly different layer than suggested.
I materialized at the pushdown consumer (InMemoryTableScanExec, unfolding RuntimeReplaceable in the predicates handed to buildFilter) rather than applying MaterializeRuntimeReplaceable to the optimized InMemoryTableScanLike before wrapping it. Reasoning:
- The scan is a
LeafExecNodethat never reaches whole-stage codegen, so this isn't a codegen-prep concern.MaterializeRuntimeReplaceablesits between columnar conversion andCollapseCodegenStagesspecifically for codegen, which is exactly why theInMemoryTableScanLikebranch skipspostStageCreationRules— extending that rule into a non-codegen leaf would be misplaced. predicatesonly feedbuildFilter(a runtime closure that pattern-matches expression shape), so unfolding them at the consumer is the right layer. It also covers AQE and non-AQE uniformly (non-AQE was relying onQueryExecution.preparationsreaching thepredicatesfield; now both paths go through the same unfold) while keeping the readable node in the plan/EXPLAIN output.
Added an AQE regression test in MaterializeRuntimeReplaceableSuite using an adaptive cached plan, so the scan is wrapped in a TableCacheQueryStageExec leaf (the gap you flagged). A surviving predicate RuntimeReplaceable is pushed into the scan; the test asserts the wrapper survives into scan.predicates and that cached-batch pruning still kicks in — it fails without the fix (readBatches == 10, no pruning). I also added comments at all three sites recording the codegen-vs-consumer split.
One scoping note: this is forward-looking. The current opt-in survivor (multi_get_json_object) isn't a buildFilter-recognizable predicate shape, so no current query regresses; the fix matters for a future survivor whose replacement is a prunable comparison.
There was a problem hiding this comment.
Follow-up: I generalized this beyond the cached-batch case.
First-principle we landed on: a surviving RuntimeReplaceable (eagerReplace = false) must be unfolded to its replacement wherever a predicate leaves Spark's own expression-evaluation engine and is handed to a consumer that interprets its structure or sends it elsewhere — because RuntimeReplaceable is a Spark-internal optimizer concept the consumer can't understand. Internal Spark evaluation (FilterExec/Project codegen + interpreted) needs no unfold: it's handled by the eval/doGenCode delegation plus MaterializeRuntimeReplaceable.
That boundary is realized at three places, all now unfolding:
InMemoryTableScanExec.buildFilter(cached-batch pruning — the case you flagged;CachedBatchSerializeris a@DeveloperApi).DataSourceStrategy.translateLeafNodeFilter(V1 / file source filter pushdown).V2ExpressionBuilder.generateExpression(V2) — and since all V2 pushdown (filters, aggregate functions, aggregate arguments, group-by, sort) funnels throughgenerateExpressionviaPushableExpression/translateAggregation, this single fallback covers V2 filter and aggregate/group-by pushdown. It's a fallback (after the explicit cases) so native high-level pushdown likeAES_ENCRYPTstill wins.
Out of scope (different kind of consumer — column selection, not expression translation, and driven by references which for a survivor equals its children's references): nested-column/schema pruning and partition pruning.
Tests added for all three boundaries (V1/V2 filter translation, V2 aggregate/group-by, and an AQE cached-batch pruning test); each was verified to fail without the corresponding unfold.
Note: I squashed the branch into a single commit.
6cdb81d to
b80438b
Compare
f459bc6 to
5251cd7
Compare
sunchao
left a comment
There was a problem hiding this comment.
One inline P2 follow-up on compound V2 filter reconstruction.
| // natively is a Spark-internal optimizer node the connector cannot understand. Fall back to its | ||
| // concrete `replacement` so the lowered form can still be pushed -- same boundary rationale as | ||
| // `DataSourceStrategy.translateLeafNodeFilter` (V1) and `CachedBatchSerializer.buildFilter`. | ||
| case r: RuntimeReplaceable => generateExpression(r.replacement, isPredicate) |
There was a problem hiding this comment.
[P2] Preserve the mapping for compound replacement predicates
translateFilterV2WithMapping treats the surviving wrapper as its leaf/other case, but this fallback can return a structural V2And, V2Or, or V2Not. The map then contains only compoundPredicate -> originalWrapper. If a SupportsPushDownV2Filters source returns that predicate unchanged because it cannot fully push it (as the interface contract requires), rebuildExpressionFromFilter matches the compound node first and recursively visits children that were created inside V2ExpressionBuilder and have no map entries. Planning then throws Failed to rebuild Expression for filter instead of retaining a post-scan filter. For example, this occurs for an eagerReplace = false wrapper whose replacement is And(a > 1, b < 2) on a rejecting V2 source.
Could we either unfold before entering the mapping-aware recursion, or make rebuilding prefer an exact map entry before descending, and add a rejecting-source regression test? The new test exercises only a leaf GreaterThan translation, so it does not cover this reconstruction path.
There was a problem hiding this comment.
Good catch, fixed in cafddb8 with the "prefer an exact map entry before descending" option.
rebuildExpressionFromFilter now checks translatedFilterToExpr.get(predicate) before matching V2And/V2Or/V2Not. For the surviving wrapper, the compound V2And is a map key (mapped to the wrapper), so it's restored directly and we never descend into the synthetic children that have no entries. The original readable wrapper is kept in the post-scan filter and materialized into its replacement before codegen by MaterializeRuntimeReplaceable.
This is granularity-correct at every level of descent, so nesting is covered too -- e.g. And(c = 5, wrapper(And(a > 1, b < 2))) rebuilds as And(c = 5, wrapper): the outer V2And isn't a map key so it descends, and the inner V2And(a>1, b<2) is a map key so it's restored before its synthetic children are visited. Normal compound filters are decomposed at translation (only leaves mapped), so the exact lookup misses and falls through to the existing descent -- behavior unchanged.
Added a regression test in DataSourceV2StrategySuite that translates a compound-replacement wrapper via translateFilterV2WithMapping, then feeds the result back through rebuildExpressionFromFilter (equivalent to a rejecting SupportsPushDownV2Filters source returning the predicate unchanged) and asserts the wrapper is restored; it throws Failed to rebuild Expression for filter without the fix.
One note on V1: the V1 path (DataSourceStrategy) does not have this crash. There the unfold sits inside translateLeafNodeFilter, which only recognizes leaf shapes, so a compound replacement simply isn't translated (returns None) -- no pushdown, no map entry, correct results. That's a milder missed-optimization version of the same root cause; I left V1 as-is since fixing it would be an enhancement (and would lose the readable wrapper in the post-scan filter).
sunchao
left a comment
There was a problem hiding this comment.
One inline P2 follow-up on throwable metadata for surviving RuntimeReplaceable expressions.
| // non-deterministic `Rand`. This matters once a `RuntimeReplaceable` may survive into the | ||
| // physical plan (see `eagerReplace`): the survival decision relies on an accurate determinism | ||
| // signal. | ||
| override lazy val deterministic: Boolean = replacement.deterministic |
There was a problem hiding this comment.
[P2] Delegate throwable to the surviving replacement
RuntimeReplaceable now delegates deterministic and foldable to replacement, but it still inherits Expression.throwable, which checks only the wrapper's original children. A surviving wrapper can therefore look non-throwing even when its replacement contains an explicitly throwable expression such as Sequence(..., step).
For a concrete example, consider a boolean wrapper whose replacement is size(sequence(start, stop, step)) > 0, and an inner join with these inputs:
- left row
(key=1, start=0, stop=1, step=1); right containskey=1 - left row
(key=2, start=0, stop=1, step=-1); right has nokey=2
With the predicate above the join, the second row is removed by the join before sequence(0, 1, -1) is evaluated, so the query succeeds. With eagerReplace = false, the wrapper reports throwable = false; PushPredicateThroughJoin moves it onto the left input, and the second row now throws Illegal sequence boundaries before the join can discard it. Physical materialization happens too late to undo that relocation. Spark's existing SPARK-46707 test specifically keeps explicit-step Sequence predicates above joins for this reason.
Could we also delegate this metadata, e.g. override lazy val throwable: Boolean = replacement.throwable, and add the wrapped join case as a regression test?
There was a problem hiding this comment.
Good catch, fixed in bd8f0ee.
RuntimeReplaceable now does override lazy val throwable: Boolean = replacement.throwable, alongside the existing deterministic/foldable delegations. You're right that the inherited children.exists(_.throwable) was wrong for a survivor: the children are the original arguments (non-throwing literal bounds) while the replacement is where the throwing Sequence lives, so PushPredicateThroughJoin (the SPARK-46707 guard) would relocate the predicate below the join and surface Illegal sequence boundaries on rows the join would otherwise discard.
Added a FilterPushdownSuite regression test mirroring the existing SPARK-46707 join test: a wrapper whose children (x.a, x.b) are non-throwing but whose replacement is an explicit-step Sequence; the predicate must stay above the join. It fails without the delegation (the wrapper is pushed onto the left input) and passes with it.
One implementation note for whoever reviews the test: the wrapper is deliberately not an InheritAnalysisRules expression. With InheritAnalysisRules the replacement becomes the wrapper's child, so the inherited children.exists(_.throwable) would already observe the throwing Sequence and the test would pass even without the fix. The test wrapper keeps its original args as children (the Uniform / MultiGetJsonObject shape) so it actually exercises the gap.
| // which already excludes it. | ||
| val cannotSurvive = | ||
| !replaced.deterministic || ConvertToLocalRelation.hasUnevaluableExpr(replaced) | ||
| if (r.eagerReplace || cannotSurvive) replaced else r |
There was a problem hiding this comment.
[P2] Force eager replacement when later finish-analysis rules need a hidden expression
This gate also lets through replacements that require a later finish-analysis rewrite. For example, consider a non-InheritAnalysisRules wrapper with an input timestamp child, eagerReplace = false, and replacement = Greatest(Seq(CurrentTimestamp(), child)). The replacement is deterministic and evaluable, and the input attribute makes it non-foldable, so the wrapper survives here. However, the replacement is not one of the wrapper's children, so its tree-pattern bits do not expose CURRENT_LIKE. ComputeCurrentTime, which runs after ReplaceExpressions, prunes this subtree and never replaces CurrentTimestamp with the single query-start literal. MaterializeRuntimeReplaceable exposes it only during physical preparation, where CurrentTimestamp.eval calls Instant.now() as rows are evaluated. In a focused 20,000-row regression, this produced multiple distinct timestamps within one query.
Before this PR, ReplaceExpressions always exposed the replacement before ComputeCurrentTime. Could we force eager replacement at least when replaced.containsPattern(CURRENT_LIKE) (or otherwise run the remaining finish-analysis rewrites on the hidden replacement)? Please keep the regression attribute-dependent so constant folding cannot mask the problem.
| // Materialize any RuntimeReplaceable that survived the optimizer into its replacement for | ||
| // the Spark execution path. After columnar/native conversion (so a native engine sees the | ||
| // original expression), before codegen (so Spark codegen never sees a RuntimeReplaceable). | ||
| MaterializeRuntimeReplaceable, |
There was a problem hiding this comment.
[P2] Expose structural equality before join-key extraction
Materializing at this point is too late for planner consumers. A supported eagerReplace = false binary predicate with replacement = EqualTo(leftKey, rightKey) survives the logical optimizer, but ExtractEquiJoinKeys only pattern-matches visible EqualTo / EqualNullSafe conjuncts and therefore extracts no keys from the wrapper. In a focused regression with broadcasting disabled, the batch plan contained CartesianProductExec and no SortMergeJoinExec (while still returning the correct rows), turning a normal equi-join into O(N*M) work that can OOM. A second control showed that an ordinary stream-stream EqualTo is accepted by StreamingJoinStrategy, while the identical equality hidden in this wrapper throws the 'stream-stream join without equality predicate' AnalysisException.
MaterializeRuntimeReplaceable unfolds the condition only after Spark strategies have selected the operator, so it cannot recover a hash/sort-merge/streaming equi-join. Before this PR, unconditional replacement exposed the equality before planning. Could join-key extraction inspect an unfolded condition (while preserving the original expression where needed), or should such structural predicates be forced to replace eagerly? Batch physical-plan and stream-stream planning regressions would cover both consequences.
sunchao
left a comment
There was a problem hiding this comment.
Follow-up review on 09e4f260: the exact CURRENT_LIKE and root-EqualTo cases are improved, but the broader correctness contracts are still incomplete in the cases below.
| val cannotSurvive = | ||
| !replaced.deterministic || | ||
| ConvertToLocalRelation.hasUnevaluableExpr(replaced) || | ||
| replaced.containsPattern(CURRENT_LIKE) |
There was a problem hiding this comment.
[P2] Cover every required finish-analysis rewrite
This only forces eager replacement for CURRENT_LIKE, but the same survivor opacity still bypasses other mandatory rules that run after ReplaceExpressions. For example, a deterministic, non-foldable wrapper with replacement = ArrayDistinct(arrayDoubleAttr) hides ARRAY_DISTINCT from both NormalizeFloatingNumbers passes; in a focused regression, [-0.0d, 0.0d] remained two elements instead of one canonical positive zero. Likewise, a wrapper whose replacement contains Cast(Literal("epoch"), DateType, ...) hides CAST from SpecialDatetimeValues; it returned NULL in legacy mode (and would throw in ANSI mode) instead of 1970-01-01.
MaterializeRuntimeReplaceable exposes these expressions only after logical optimization, so neither rewrite gets another chance. Could the survival gate cover every replacement requiring a mandatory post-ReplaceExpressions rewrite, or run those rewrites over the hidden replacement, with regressions for these two cases? A CURRENT_LIKE-only guard still permits wrong results and runtime errors.
| logDebug(s"Considering join on: $condition") | ||
| // Find equi-join predicates that can be evaluated before the join, and thus can be used | ||
| // as join keys. | ||
| val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil) |
There was a problem hiding this comment.
[P2] Split the unfolded condition for compound replacements
The condition is split while the outer RuntimeReplaceable is still opaque. If a supported survivor has replacement = And(EqualTo(left.k, right.k), GreaterThan(left.v, 0)), splitConjunctivePredicates produces the wrapper as one predicate; unfoldKeyPredicate then yields an And, which none of the top-level equality cases match. A focused regression still planned this as CartesianProductExec, and the equivalent stream-stream join is still rejected as lacking an equality predicate. This compound replacement shape is already treated as supported by TestCompoundPredicateRuntimeReplaceable in the V2 tests.
Could we expose the replacement before splitting (while retaining the mapping to the original survivor where needed), and add batch plus streaming coverage for a compound replacement?
| // keeps showing the readable node. The unfold is gated on the tree pattern so the common | ||
| // survivor-free join is unaffected. | ||
| def unfoldKeyPredicate(p: Expression): Expression = | ||
| if (p.containsPattern(RUNTIME_REPLACEABLE)) RuntimeReplaceable.unfold(p) else p |
There was a problem hiding this comment.
[P2] Preserve nested survivors when exposing the outer equality
This recursively unfolds every RuntimeReplaceable in the predicate, including a survivor that is already a visible join-key operand. For EqualTo(partitionKey, Survivor(dimensionKey)), the old extractor retained Survivor(dimensionKey) as the physical key; this code emits only its low-level replacement, so native planning can no longer match the high-level key.
It also makes DPP invalid: ExtractEquiJoinKeys returns the unfolded build key, but PartitionPruning inspects the original equality and calls joinKeys.indexOf(Survivor(dimensionKey)), producing -1. The resulting DynamicPruningSubquery is unresolved and fails default plan validation; with validation disabled, the negative index later reaches buildKeys(-1). In a focused planning regression, the visible survivor key was already reduced to (k + 0) before columnar/native conversion.
Could this unfold only the outer survivor needed to reveal an equality, preserving already-visible key operands, with a DPP regression for this shape?
09e4f26 to
47329ba
Compare
20c56d4 to
6b12fbc
Compare
…er stripped before planning `DelegateExpression` is a transparent, named delegate over a `definition` expression. It lets a high-level function (e.g. `right(a, b)`) stay readable in the analyzed/optimized logical plan, and lets optimizer rules introduce such nodes (e.g. `multi_get_json_object`), without hand-written `eval`/`doGenCode` -- every behavior delegates to `definition`, a real child fully visible to the analyzer and optimizer. `name`/`inputs` are purely informational; nothing enforces that `definition` matches them, so the wrapper is never exposed to physical planning or external systems. `LowerDelegateExpression` strips it to `definition` in `QueryExecution.createSparkPlan` -- the single entry point to the planner, used by both the main query and AQE re-planning -- so the planner and every physical consumer (join-key extraction, V1 / cached-batch pushdown, columnar rules, codegen) sees the real executed expression. Data source V2 pushdown runs earlier, in the logical optimizer, so it unfolds the wrapper directly in `V2ExpressionBuilder`. `eval`/`doGenCode` still delegate, as a safety net. `RuntimeReplaceable` is left exactly as on master. `MultiGetJsonObject` is rebuilt on `DelegateExpression` (Invoke definition, dropping hand-written eval/codegen); `right` is migrated via a new `DelegateFunction` (an `ExpressionBuilder`) to demonstrate the authoring path. Co-authored-by: Isaac
6b12fbc to
e684fc9
Compare
|
@sunchao Heads-up: I've reworked this PR substantially and force-pushed, so the existing review threads now point at code that has been removed. Thank you for the earlier reviews — they're what drove the redesign. Instead of letting The wrapper is stripped to its The intent is to dissolve the class of issues your Would appreciate a fresh look when you have time. |
sunchao
left a comment
There was a problem hiding this comment.
Follow-up review on the current head.
| Batch("Remove TempResolvedColumn", Once, RemoveTempResolvedColumn), | ||
| Batch("Remove analysis-only markers", Once, | ||
| RemoveTempResolvedColumn, | ||
| RemoveInputTypeMarkers), |
There was a problem hiding this comment.
[P2] Remove input markers on the single-pass analyzer path
This cleanup batch only runs in the fixed-point Analyzer. With spark.sql.analyzer.singlePassResolver.enabled=true, right(...) is resolved and its ImplicitCastInput succeeds, but ResolverRunner marks the plan analyzed without running RemoveInputTypeMarkers. LowerDelegateExpression then removes only the outer delegate, leaving the Unevaluable marker in the executable expression; for example, SELECT right('abc', 1) fails during evaluation/code generation. The base Right used no marker nodes and worked in this mode. Could we add equivalent cleanup to the single-pass path and an execution-level regression test?
There was a problem hiding this comment.
Fixed in 58889e8d7aa. The single-pass resolver has no rule batch to run the RemoveInputTypeMarkers rule in, so FunctionResolver now calls RemoveInputTypeMarkers.removeMarkers(...) directly once type coercion has cast the marker children. right(...) therefore strips its markers under spark.sql.analyzer.singlePassResolver.enabled=true too, and LowerDelegateExpression no longer leaves an Unevaluable marker in the executable expression. Added an execution-level regression in DelegateExpressionQuerySuite (asserts no ImplicitCastInput/TypeCheckInput survives single-pass analysis and that the implicit Cast the marker drove still applies).
| override def apply(plan: LogicalPlan): LogicalPlan = | ||
| plan.resolveExpressionsWithPruning(_.containsPattern(INPUT_TYPE_MARKER)) { | ||
| case marker: ImplicitCastInput => marker.child | ||
| case marker: TypeCheckInput => marker.child |
There was a problem hiding this comment.
[P2] Keep failed type checks visible to CheckAnalysis
This unwraps every TypeCheckInput, including one whose type check failed. For example, TypeCheckInput(Literal(1L), IntegerType) is unresolved before this rule, but an identity-lowering DelegateFunction with implicitCast = false becomes fully resolved after the marker is replaced by its Long child, so analysis accepts the mismatched argument. The new unit test only calls checkInputDataTypes() before cleanup and therefore misses the end-to-end behavior. Could we unwrap only successfully resolved markers (leaving failures for CheckAnalysis) and add an analyzer-level regression?
There was a problem hiding this comment.
Fixed in bffecc826d6. RemoveInputTypeMarkers (and its expression-level removeMarkers) now only unwrap a marker if marker.resolved, so a TypeCheckInput whose type check failed is left in place and its ExpectsInputTypes failure stays visible to CheckAnalysis instead of exposing a resolved child of the wrong type. Added a regression in DelegateExpressionSuite ("RemoveInputTypeMarkers keeps a failed type-check marker for CheckAnalysis to report").
| * (analysis-time) path inserts the input-type markers, because the analyzer's `TypeCoercion` | ||
| * casts them and `RemoveInputTypeMarkers` strips them afterwards. | ||
| */ | ||
| override def build(funcName: String, expressions: Seq[Expression]): Expression = { |
There was a problem hiding this comment.
[P2] Validate each delegate function's argument count
build currently accepts any number of expressions. Right.lower consumes only positions 0 and 1, so right('abcd', 1, 99) succeeds and silently ignores 99, while zero/one-argument calls fail by indexing the sequence instead of returning the structured WRONG_NUM_ARGS error. The previous expression[Right] registration enforced its two-expression constructor. Could DelegateFunction expose a functionSignature, or otherwise validate arity before calling lower, with wrong-arity SQL coverage?
There was a problem hiding this comment.
Fixed in bffecc826d6. DelegateFunction.build now validates expressions.length == inputTypes.length and throws the structured WRONG_NUM_ARGS error before calling lower, so right('abcd', 1, 99) no longer silently ignores the extra arg and zero/one-arg calls no longer index past the sequence. An empty inputTypes marks a variadic function whose lower owns arg handling (no arity check). Added wrong-arity SQL coverage in DelegateExpressionQuerySuite plus a unit test in DelegateExpressionSuite.
| val len = args(1) | ||
| If( | ||
| IsNull(str), | ||
| Literal(null, StringType), |
There was a problem hiding this comment.
[P2] Preserve the resolved input type for these branch literals
Hard-coding plain StringType changes the result schema when the public spark.sql.preserveCharVarcharTypeInfo setting is enabled. For example, typeof(right(CAST('abc' AS CHAR(5)), 2)) returned char(5) on the base implementation, whose null/empty literals used str.dataType, but returns string on this head because If widens the CHAR(5) substring and these STRING branches. Registry expression builders receive resolved arguments, and the marker's dataType delegates to its child, so using str.dataType here is safe. Could we preserve the old literal typing and add CHAR/VARCHAR result-type coverage?
There was a problem hiding this comment.
Thanks — the finding is right (the CHAR/VARCHAR result type was regressing), and it's fixed in bffecc8. But using str.dataType directly isn't safe here, for a reason worth recording:
At build/lower time the argument is the not-yet-coerced ImplicitCastInput marker, and the marker delegates dataType to its (uncoerced) child. So str.dataType is the input type, which isn't necessarily a string yet — e.g. right(12345, 2) has an IntegerType child that the implicit cast turns into a string only later. Typing the empty-string branch as Literal(UTF8String.EMPTY_UTF8, IntegerType) then fails Literal validation (Literal must have a corresponding value to int, but class UTF8String found). Confirmed: a naive str.dataType crashes DelegateExpressionQuerySuite."right() implicit-casts a non-string arg ...".
So the fix types the null/empty branch literals with str.dataType only when it is already a string-family type (StringType / CharType / VarcharType) — preserving CHAR(N)/VARCHAR(N) under preserveCharVarcharTypeInfo and non-default collations — and falls back to plain StringType otherwise (the type the implicit cast produces). New test right() preserves the input CHAR/VARCHAR type with preserveCharVarcharTypeInfo asserts typeof(right(CAST('abc' AS CHAR(5)), 2)) = char(5); the collation case was already covered.
| */ | ||
| object LowerDelegateExpression extends Rule[LogicalPlan] { | ||
| override def apply(plan: LogicalPlan): LogicalPlan = | ||
| plan.transformAllExpressionsWithPruning(_.containsPattern(DELEGATE_EXPRESSION)) { |
There was a problem hiding this comment.
[P2] Fully lower a delegate whose definition is another delegate
transformAllExpressionsWithPruning is pre-order. If an outer delegate's definition is itself a DelegateExpression, replacing the outer node causes traversal to continue only through the replacement's children; the replacement root is not matched again. One pass therefore leaves the inner delegate in the plan, despite this rule's strips every DelegateExpression contract. A nested predicate delegate can consequently remain opaque to join-key extraction or physical pushdown. Could this use bottom-up/recursive lowering and include a nested-delegate regression?
There was a problem hiding this comment.
Fixed in 58889e8d7aa. LowerDelegateExpression.lower is now a tail-recursive unwrap: if a delegate's definition is itself a DelegateExpression, the chain is collapsed in one go, so no wrapper survives the pre-order transformAllExpressionsWithPruning. Delegates nested deeper (as children of definition) are still handled by the surrounding tree traversal. Added a nested-delegate regression ("LowerDelegateExpression fully unwraps a directly-nested delegate-of-delegate").
| expr: Expression, isPredicate: Boolean = false): Option[V2Expression] = expr match { | ||
| case literal: Literal => Some(translateLiteral(literal)) | ||
| // DelegateExpression is a Spark-internal wrapper; push down its real definition instead. | ||
| case d: DelegateExpression => generateExpression(d.definition, isPredicate) |
There was a problem hiding this comment.
[P2] Preserve reconstruction mappings for compound delegate predicates
translateFilterV2WithMapping treats the wrapper as its leaf case, but this new recursion can return a structural V2And/V2Or. The map then contains only compoundPredicate -> originalDelegate. If a SupportsPushDownV2Filters source returns that predicate unchanged because it cannot fully push it, rebuildExpressionFromFilter decomposes the compound node before consulting the map and fails on its synthetic, unmapped children with Failed to rebuild Expression for filter. Could we unfold before the mapping-aware recursion, or prefer an exact map entry before structural descent, and add a rejected-filter round-trip test?
There was a problem hiding this comment.
Fixed in bffecc826d6, with the "prefer an exact map entry before structural descent" option. rebuildExpressionFromFilter now checks translatedFilterToExpr.get(predicate) before matching V2And/V2Or/V2Not, so a compound-definition delegate that translated to a single mapped V2And/V2Or is restored directly instead of recursing into its synthetic, unmapped children (which threw Failed to rebuild Expression for filter). This is granularity-correct at every level, so a delegate nested inside an ordinary compound is covered too; normal compound filters are decomposed at translation (only leaves mapped), so the exact lookup misses and behavior is unchanged. Added a rejected-filter round-trip regression in DataSourceV2StrategySuite. (Same root cause as the pre-redesign V2 finding, now applied to DelegateExpression.)
| // Go through QueryExecution.createSparkPlan -- the single place that strips DelegateExpression | ||
| // before planning -- instead of calling the planner directly, so the re-planned stage sees the | ||
| // real executed expression. | ||
| val sparkPlan = QueryExecution.createSparkPlan( |
There was a problem hiding this comment.
[P2] Return the same logical tree that AQE used for planning
When a runtime optimizer rule inserts a DelegateExpression and the resulting physical plan is adopted, createSparkPlan plans a lowered copy whose nodes become the physical plan's logicalLink targets, while this method returns the original unlowered optimized tree below. If a copied node backs a later query stage, replaceWithQueryStagesInLogicalPlan cannot find it using reference equality, so the completed stage is not inserted into currentLogicalPlan and subsequent runtime optimization loses that stage's statistics. This is conditional on the new plan being adopted, not every runtime-added wrapper. Could AQE retain and return the exact lowered tree used for planning, with an injected runtime-optimizer/staged-query regression?
There was a problem hiding this comment.
Fixed in bffecc8 — reOptimize now lowers once and returns the same lowered tree it planned, so the physical plan's logicalLink targets stay reference-consistent with the returned logical plan.
On the regression test: I couldn't find a way to assert this one deterministically. The symptom is silent — no wrong results and no error (eval/codegen still delegate), and the physical logicalLinks are identical with or without the fix (they're set on the lowered copy in createSparkPlan either way). The only thing that differs is currentLogicalPlan, a transient local in getFinalPhysicalPlan, whose orphaned-stage state surfaces only as a threshold-sensitive re-planning decision on a later iteration. A test would need either a flaky multi-stage stats-driven plan-difference query or test-only instrumentation to expose stage tracking. I verified the fix by reasoning through replaceWithQueryStagesInLogicalPlan plus a full AdaptiveQueryExecSuite run (green). Open to adding observability instrumentation in a follow-up if you think it's warranted — let me know if you'd prefer that.
…ested delegates - FunctionResolver: resolve a built DelegateExpression's definition recursively (like InheritAnalysisRules) so coercion reaches the buried ImplicitCastInput/TypeCheckInput markers, then strip them (RemoveInputTypeMarkers.removeMarkers); single-pass has no end-of-analysis batch for this. Fixes right(<non-string>) failing to resolve under spark.sql.analyzer.singlePassResolver.enabled. - LowerDelegateExpression: unwrap a directly-nested delegate-of-delegate chain so no wrapper survives transformDown to the planner. - Tests: single-pass resolution of right(); nested-delegate lowering; collation preservation of right(); plus a comment typo fix. Co-authored-by: Isaac
…t delegation, arity/type-check validation, CHAR/VARCHAR result type, V2 compound rebuild, AQE logical link Addresses review feedback on the DelegateExpression mechanism: - DelegateExpression now delegates `nullIntolerant` to its `definition` (like its other metadata), so null-intolerance optimizations (IsNotNull-constraint inference, NullPropagation) still fire for `multi_get_json_object` while the wrapper is in the logical plan. - DelegateFunction.build validates argument count against the `inputTypes` arity, so wrong-arity calls fail with WRONG_NUM_ARGS instead of IndexOutOfBounds or a silently ignored extra argument. - Right.lower types its null/empty branch literals with the input's string-family type (CHAR/VARCHAR/collation) when available, preserving the result type under preserveCharVarcharTypeInfo, while falling back to StringType for a yet-to-be cast non-string input (e.g. right(12345, 2)). - RemoveInputTypeMarkers (and the single-pass removeMarkers) only unwrap resolved markers, leaving a failed TypeCheckInput visible to CheckAnalysis. - DataSourceV2Strategy.rebuildExpressionFromFilter prefers an exact map entry before structural descent, so a compound-definition delegate round-trips instead of failing reconstruction on a rejecting V2 source. - AdaptiveSparkPlanExec.reOptimize lowers once and returns the same lowered tree it planned, so the physical plan's logicalLink targets match the returned logical plan by reference. - TreePatterns: place DELEGATE_EXPRESSION / INPUT_TYPE_MARKER in alphabetical order. New tests: nullIntolerant delegation, build arity validation, failed type-check preservation (DelegateExpressionSuite); CHAR/VARCHAR result type and wrong-arity rejection (DelegateExpressionQuerySuite); compound-definition V2 filter round-trip (DataSourceV2StrategySuite). Co-authored-by: Isaac
…n inputs; fix scalastyle
`DelegateExpression.inputs` are display-only metadata, not tree children, so generic
expression traversals (`transform`, `transformAllExpressions`) never reach them. Two
consumers that walk expressions to render or normalize them were left with raw inputs:
- `usePrettyExpression` produced generated column names from un-prettified inputs, so
string literals kept their quotes (`right('abcd', 2)`) and attributes kept their
qualifiers (`right(spark_catalog.default.t.c7, 2)`). Add a `DelegateExpression` case
that re-renders the call with each input pushed through `toPrettySQL`, mirroring the
existing `InheritAnalysisRules` case, so names match the pre-delegate function.
- `NormalizePlan.normalizeExprIds` left non-deterministic process-global expr ids inside
the surviving wrapper's inputs (`right(g#16854, ...)`), making the connect golden file
unstable. Extract the normalization rule and apply it recursively to `inputs` so the
informational call is deterministic (`right(g#0, g#0)`).
Regenerate the legitimately-changed golden files: the `right`/`text` analyzer-results
(now showing the readable wrapper) and `function_right.explain`. The `results/` schema
files and `sql-expression-schema.md` are unchanged -- the pretty-name fix restores exact
master output.
Also fix scalastyle line-length and import-order violations across the touched files.
Co-authored-by: Isaac
…delegate test suites Fixes the Scala linter failures in the three new test files: line-length overflows, two missing explicit return types, and an import ordering issue. Co-authored-by: Isaac
What changes were proposed in this pull request?
This PR introduces
DelegateExpression, a transparent, named delegate that keeps a high-levelfunction readable in the logical plan and is lowered to its real definition before physical planning.
DelegateExpression(name, inputs, definition)extendsUnaryExpression. Every behavior --dataType,nullable,foldable,deterministic,canonicalized,eval,doGenCode--delegates to
definition, which is a real child and therefore fully visible to the analyzer andoptimizer. It carries no hand-written
eval/doGenCode.name/inputsare purely informational (EXPLAIN/SQL); nothing enforces thatdefinitionmatches them. So the wrapper is never exposed to physical planning or external systems:
LowerDelegateExpressionstrips it todefinitioninQueryExecution.createSparkPlan-- thesingle entry point to the planner, used by both the main query and AQE re-planning. The planner
and every physical consumer (join-key extraction, V1 / cached-batch pushdown, columnar rules,
codegen) therefore sees the real executed expression. The wrapper survives the logical optimizer,
so the optimized logical plan stays readable and optimizer rules can introduce these nodes.
createSparkPlan), so it unfolds thewrapper directly in
V2ExpressionBuilder.DelegateFunctionis the per-function authoring object -- just anExpressionBuilderregisteredwith the ordinary
expressionBuilder(...). It reduces a function to onelowermethod plus twoknobs, supporting all three input-type contracts per argument: implicit cast (default),
type-check-only (
implicitCast = false), and accept-any-type (noinputTypes). The implicit-castcontract is driven by analysis-only marker shims (
ImplicitCastInput/TypeCheckInput) that ridethe standard
TypeCoercionrule and are removed at the end of analysis byRemoveInputTypeMarkers(modeled on
RemoveTempResolvedColumn). Markers are inserted only on the registry/buildpath;DelegateFunction.apply(direct/optimizer construction) inserts none and asserts resolved inputs.Two consumers are migrated:
MultiGetJsonObject(inserted byOptimizeCsvJsonExprs, after analysis) is rebuilt as aDelegateExpressionwhosedefinitionis anInvoketoMultiGetJsonObjectEvaluator, droppingits hand-written
eval/doGenCode. This is the optimizer-constructed case.rightis migrated toobject Right extends DelegateFunction, demonstrating the registry/authoringpath.
RuntimeReplaceableitself is left exactly as on master;DelegateExpressionis a separate,decoupled mechanism.
Known limitation: because the strip runs before planning, a
DelegateExpressioncreated by aphysical rule (after
createSparkPlan) is not stripped and may reach an external systemun-lowered. This is acceptable -- like any other expression the external system does not recognize,
it simply falls back, and
eval/doGenCodekeep it correct within Spark. Analysis- andoptimizer-inserted nodes (the common case) are always stripped, so physical-rule insertion is the
only uncovered path.
Note: this supersedes an earlier revision that made
RuntimeReplaceableopt out of eager replacementand survive into the physical plan (with a
MaterializeRuntimeReplaceablephysical rule andper-consumer unfolds). That approach kept the lowered
replacementhidden from the analyzer andoptimizer, which required unfolding it at every consumer that pattern-matches expressions. By keeping
the lowered form a visible child and lowering it at the single planner entry,
DelegateExpressionavoids that whole class of problems, and all of that machinery is reverted.
Why are the changes needed?
Keeping the semantic expression (e.g.
right(a, b),multi_get_json_object(json, ...)) readable inthe logical plan keeps the plan understandable and lets optimizer rules introduce such nodes, while
the implementation is just a
lowermethod with no per-expressioneval/doGenCode.MultiGetJsonObjectis exactly this case: an optimizer-inserted node that previously neededhand-written codegen and could not be an eagerly-replaced
RuntimeReplaceable.Does this PR introduce any user-facing change?
No. Query results are unchanged, and
RuntimeReplaceablebehavior is identical to master.rightproduces the same results and implicit-cast behavior (e.g.
right(12345, 2)->45). The optimizedlogical plan now shows the readable
right(...)/multi_get_json_object(...)node; the physicalplan shows the lowered form (the real executed expression).
MultiGetJsonObjectis only producedunder the opt-in
spark.sql.optimizer.getJsonObjectSharedParsing.enabledpath (off by default).How was this patch tested?
DelegateExpressionSuite(catalyst): transparency (eval + codegen, viacheckEvaluationrunning both paths), the three per-argument input-type contracts, that
definitionis a real child(
transformdescends into it,withNewChildrenswaps it,referencescome from it),sql,unapply,applyrejecting unresolved inputs, the markers beingUnevaluable, andMultiGetJsonObjectconstruction/eval.DelegateExpressionQuerySuite(sql/core):right()and the optimizer-insertedMultiGetJsonObjectappear in the optimized plan and are lowered before execution; implicit castworks via the standard rule; input markers are stripped after analysis; results match with
whole-stage codegen on and off.
DataSourceV2StrategySuitecase: a wrapped predicate translates identically to its definition(V2 pushdown unfold).
OptimizeJsonExprsSuiteandJsonFunctionsSuite(sharedget_json_objectcodegen anddeeply-nested
eval) cover theMultiGetJsonObjectmigration end to end; theright/leftfunction tests and an AQE re-planning smoke test pass unchanged.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.8)