Skip to content

[SPARK-57512][SQL] Introduce DelegateExpression, a logical-plan wrapper stripped before planning#56575

Open
cloud-fan wants to merge 5 commits into
apache:masterfrom
cloud-fan:rr-selfeval-validation
Open

[SPARK-57512][SQL] Introduce DelegateExpression, a logical-plan wrapper stripped before planning#56575
cloud-fan wants to merge 5 commits into
apache:masterfrom
cloud-fan:rr-selfeval-validation

Conversation

@cloud-fan

@cloud-fan cloud-fan commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This PR introduces DelegateExpression, a transparent, named delegate that keeps a high-level
function readable in the logical plan and is lowered to its real definition before physical planning.

  • DelegateExpression(name, inputs, definition) extends UnaryExpression. Every behavior --
    dataType, nullable, foldable, deterministic, canonicalized, eval, doGenCode --
    delegates to definition, which is a real child and therefore fully visible to the analyzer and
    optimizer. It carries no hand-written eval/doGenCode.
  • name/inputs are purely informational (EXPLAIN/SQL); nothing enforces that definition
    matches them
    . So the wrapper is never exposed to physical planning or external systems:
    LowerDelegateExpression strips it to definition in QueryExecution.createSparkPlan -- the
    single entry point to the planner, used by both the main query and AQE re-planning. The planner
    and every physical consumer (join-key extraction, V1 / cached-batch pushdown, columnar rules,
    codegen) therefore sees the real executed expression. The wrapper survives the logical optimizer,
    so the optimized logical plan stays readable and optimizer rules can introduce these nodes.
  • Data source V2 pushdown runs in the logical optimizer (before createSparkPlan), so it unfolds the
    wrapper directly in V2ExpressionBuilder.
  • DelegateFunction is the per-function authoring object -- just an ExpressionBuilder registered
    with the ordinary expressionBuilder(...). It reduces a function to one lower method plus two
    knobs, supporting all three input-type contracts per argument: implicit cast (default),
    type-check-only (implicitCast = false), and accept-any-type (no inputTypes). The implicit-cast
    contract is driven by analysis-only marker shims (ImplicitCastInput / TypeCheckInput) that ride
    the standard TypeCoercion rule and are removed at the end of analysis by RemoveInputTypeMarkers
    (modeled on RemoveTempResolvedColumn). Markers are inserted only on the registry/build path;
    DelegateFunction.apply (direct/optimizer construction) inserts none and asserts resolved inputs.

Two consumers are migrated:

  • MultiGetJsonObject (inserted by OptimizeCsvJsonExprs, after analysis) is rebuilt as a
    DelegateExpression whose definition is an Invoke to MultiGetJsonObjectEvaluator, dropping
    its hand-written eval/doGenCode. This is the optimizer-constructed case.
  • right is migrated to object Right extends DelegateFunction, demonstrating the registry/authoring
    path.

RuntimeReplaceable itself is left exactly as on master; DelegateExpression is a separate,
decoupled mechanism.

Known limitation: because the strip runs before planning, a DelegateExpression created by a
physical rule (after createSparkPlan) is not stripped and may reach an external system
un-lowered. This is acceptable -- like any other expression the external system does not recognize,
it simply falls back, and eval/doGenCode keep it correct within Spark. Analysis- and
optimizer-inserted nodes (the common case) are always stripped, so physical-rule insertion is the
only uncovered path.

Note: this supersedes an earlier revision that made RuntimeReplaceable opt out of eager replacement
and survive into the physical plan (with a MaterializeRuntimeReplaceable physical rule and
per-consumer unfolds). That approach kept the lowered replacement hidden from the analyzer and
optimizer, which required unfolding it at every consumer that pattern-matches expressions. By keeping
the lowered form a visible child and lowering it at the single planner entry, DelegateExpression
avoids that whole class of problems, and all of that machinery is reverted.

Why are the changes needed?

Keeping the semantic expression (e.g. right(a, b), multi_get_json_object(json, ...)) readable in
the logical plan keeps the plan understandable and lets optimizer rules introduce such nodes, while
the implementation is just a lower method with no per-expression eval/doGenCode.
MultiGetJsonObject is exactly this case: an optimizer-inserted node that previously needed
hand-written codegen and could not be an eagerly-replaced RuntimeReplaceable.

Does this PR introduce any user-facing change?

No. Query results are unchanged, and RuntimeReplaceable behavior is identical to master. right
produces the same results and implicit-cast behavior (e.g. right(12345, 2) -> 45). The optimized
logical plan now shows the readable right(...) / multi_get_json_object(...) node; the physical
plan shows the lowered form (the real executed expression). MultiGetJsonObject is only produced
under the opt-in spark.sql.optimizer.getJsonObjectSharedParsing.enabled path (off by default).

How was this patch tested?

  • New DelegateExpressionSuite (catalyst): transparency (eval + codegen, via checkEvaluation
    running both paths), the three per-argument input-type contracts, that definition is a real child
    (transform descends into it, withNewChildren swaps it, references come from it), sql,
    unapply, apply rejecting unresolved inputs, the markers being Unevaluable, and
    MultiGetJsonObject construction/eval.
  • New DelegateExpressionQuerySuite (sql/core): right() and the optimizer-inserted
    MultiGetJsonObject appear in the optimized plan and are lowered before execution; implicit cast
    works via the standard rule; input markers are stripped after analysis; results match with
    whole-stage codegen on and off.
  • New DataSourceV2StrategySuite case: a wrapped predicate translates identically to its definition
    (V2 pushdown unfold).
  • Existing OptimizeJsonExprsSuite and JsonFunctionsSuite (shared get_json_object codegen and
    deeply-nested eval) cover the MultiGetJsonObject migration end to end; the right/left
    function tests and an AQE re-planning smoke test pass unchanged.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.8)

@cloud-fan cloud-fan changed the title [WIP][SPARK-57512][SQL] Allow evaluable RuntimeReplaceable to survive into the physical plan [SPARK-57512][SQL] Allow RuntimeReplaceable to opt out of eager replacement and survive into the physical plan Jun 20, 2026
@cloud-fan cloud-fan force-pushed the rr-selfeval-validation branch from 8436c7c to 10bf8bc Compare June 20, 2026 08:04
@cloud-fan cloud-fan requested a review from sunchao June 21, 2026 20:41
@cloud-fan

Copy link
Copy Markdown
Contributor Author

cc @sunchao

@HyukjinKwon HyukjinKwon left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 blocking, 1 non-blocking, 0 nits. Solid, carefully-commented mechanism; the blocking item is that the materialization step isn't wired into the AQE path.

Design / architecture (1)

  • QueryExecution.scala:781: MaterializeRuntimeReplaceable is added only to QueryExecution.preparations, not to AQE's stage prep. Under AQE (default) a surviving RuntimeReplaceable reaches CollapseCodegenStages un-materialized -- see inline.

Suggestions (1)

  • NormalizePlan.scala:94: full-unfold recursion duplicated with MaterializeRuntimeReplaceable.replace; consider a shared helper -- see inline.

Verification

Traced the survive-then-materialize transformation for result-equivalence: eval/doGenCode delegate to replacement; non-deterministic (Rand) and Unevaluable (With) replacements are gated to the eager path; foldable-but-referencing survivors are handled by literal-only FoldablePropagation + same-batch ConstantFolding; NormalizePlan fully unfolds. All result-bearing dimensions are equivalent or gated -- query results stay correct. The one gap is the AQE wiring (inline): AdaptiveSparkPlanExec is a LeafExecNode so the outer rule can't reach the managed subtree, and AQE's reOptimize re-inserts the survivor via OptimizeCsvJsonExprs, so under AQE the survivor reaches codegen (correct via delegation, but the stated invariant/metrics don't hold).

// Materialize any RuntimeReplaceable that survived the optimizer into its replacement for
// the Spark execution path. After columnar/native conversion (so a native engine sees the
// original expression), before codegen (so Spark codegen never sees a RuntimeReplaceable).
MaterializeRuntimeReplaceable,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MaterializeRuntimeReplaceable is added here, to QueryExecution.preparations, but I don't see it in any of AQE's stage-prep lists (queryStagePreparationRules, queryStageOptimizerRules, postStageCreationRules in AdaptiveSparkPlanExec). Under AQE (default-on) that looks like a gap:

  • InsertAdaptiveSparkPlan runs first in preparations, so the plan is wrapped in AdaptiveSparkPlanExec (a LeafExecNode) before this rule runs -- transformUpWithSubqueries can't reach the AQE-managed subtree.
  • AQE's reOptimize re-runs the full optimizer (optimizer.execute), which includes OptimizeCsvJsonExprs, so the surviving MultiGetJsonObject (eagerReplace = false) is re-inserted on the re-optimized stage.
  • AQE's postStageCreationRules then runs ApplyColumnarRulesAndInsertTransitions -> CollapseCodegenStages with no materialization in between.

So under AQE a RuntimeReplaceable reaches whole-stage codegen un-materialized. Results stay correct because eval/doGenCode now delegate to replacement, but the invariant this PR relies on ("Spark whole-stage codegen never sees a RuntimeReplaceable", and "only the physical plan shows its Invoke replacement") wouldn't hold under AQE -- which affects EXPLAIN and the per-operator-metrics rationale. The peer prep rules (CollapseCodegenStages, ApplyColumnarRulesAndInsertTransitions, EnsureRequirements, ...) are all present in both the non-AQE and AQE lists; this rule is the only one that isn't.

Would adding MaterializeRuntimeReplaceable to AQE's postStageCreationRules (just before collapseCodegenStagesRule) be the right fix? The two new tests use no-shuffle queries that aren't AQE-wrapped, so an AQE-path (shuffle) test that inspects the AQE final plan would cover this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, this was a real gap. Fixed in b80438b by adding MaterializeRuntimeReplaceable to AQE's postStageCreationRules, right before collapseCodegenStagesRule, mirroring QueryExecution.preparations. I confirmed both newQueryStage and newResultQueryStage apply postStageCreationRules, so shuffle stages and the final result stage are both covered. Added an AQE-path test (repartition shuffle) that asserts no RuntimeReplaceable reaches the AQE finalPhysicalPlan and that results are correct.

One minor correction to the mechanism: AQEOptimizer doesn't actually run OptimizeCsvJsonExprs or spark.experimental.extraOptimizations, so the survivor isn't re-inserted during reOptimize. Instead it originates in the main optimizer and persists into the AQE-managed subtree, which the outer preparations rule can't reach (since AdaptiveSparkPlanExec is a LeafExecNode). So the conclusion stands -- AQE-side materialization is required -- just via a slightly different path.

}

private def replaceRuntimeReplaceable(e: Expression): Expression = e match {
case r: RuntimeReplaceable => replaceRuntimeReplaceable(r.replacement)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit / non-blocking: this full-unfold recursion (case r: RuntimeReplaceable => replaceRuntimeReplaceable(r.replacement) / case _ => e.mapChildren(...)) is identical to MaterializeRuntimeReplaceable.replace. Two copies of the same unfold can drift over time -- might be worth a small shared helper.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in b80438b. Extracted a shared RuntimeReplaceable.unfold helper (companion object in Expression.scala); both NormalizePlan.normalizeRuntimeReplaceable and MaterializeRuntimeReplaceable.apply now call it, removing the two private copies.

@sunchao sunchao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One additional inline finding. The AQE materialization issue is already covered by the existing unresolved review thread, so I did not duplicate it.

// initialized through the `lazy val replacement`, which tree transforms may re-create.
// - A replacement that contains an `Unevaluable` expression (e.g. `With`) depends on a later
// logical rule (such as `RewriteWithExpression`) that can only run in the logical phase.
val cannotSurvive = !replaced.deterministic || replaced.exists(_.isInstanceOf[Unevaluable])

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] AttributeReference extends Unevaluable, so this condition is true for every replacement that reads a resolved input column. For example, an eagerReplace = false wrapper whose replacement is Add(attr, Literal(1L)) is still returned as replaced below and never reaches physical/native planning. In practice, the opt-out only works for constants or nodes inserted after ReplaceExpressions—which is why both MultiGetJsonObject and the new test avoid this path.

Could we exclude AttributeReference from this check (as ConvertToLocalRelation.hasUnevaluableExpr already does) and add a direct ReplaceExpressions regression test with a column-based wrapper?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, you're exactly right. Fixed in b80438b: the survival gate now reuses ConvertToLocalRelation.hasUnevaluableExpr(replaced), which already excludes AttributeReference, so a replacement that reads an input column (e.g. Add(attr, Literal(1))) can survive instead of being forced down the eager path. Added a direct ReplaceExpressions regression test in OptimizerSuite with a column-based wrapper (ColumnBasedRuntimeReplaceable) that fails without the fix and passes with it.

@sunchao sunchao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One inline P2 follow-up on the AQE table-cache path.

// re-insert a surviving `RuntimeReplaceable`, e.g. via `OptimizeCsvJsonExprs`), and
// `AdaptiveSparkPlanExec` is a `LeafExecNode` that the outer `preparations` rule can't reach,
// so the materialization must also run here.
MaterializeRuntimeReplaceable,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] This fixes AQE exchange and result stages, but newQueryStage's InMemoryTableScanLike branch still deliberately skips postStageCreationRules, so this materialization rule never reaches table-cache scans. InMemoryScans copies logical filters into InMemoryTableScanExec.predicates; after the scan is wrapped in TableCacheQueryStageExec (a leaf), later result-stage traversal cannot reach that hidden plan. A surviving predicate wrapper whose replacement is supported by CachedBatchSerializer.buildFilter (for example, EqualTo(attr, literal)) is therefore left unrecognized, disabling cached-batch pruning even though the separate FilterExec still produces correct rows.

Could we apply MaterializeRuntimeReplaceable explicitly to the optimized InMemoryTableScanLike before wrapping it, and add an AQE cache/filter test that inspects TableCacheQueryStageExec.plan? The current repartition test covers only the result-stage path, and finalPlan.exists does not traverse query-stage plans.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed in 50cc6a1, though at a slightly different layer than suggested.

I materialized at the pushdown consumer (InMemoryTableScanExec, unfolding RuntimeReplaceable in the predicates handed to buildFilter) rather than applying MaterializeRuntimeReplaceable to the optimized InMemoryTableScanLike before wrapping it. Reasoning:

  • The scan is a LeafExecNode that never reaches whole-stage codegen, so this isn't a codegen-prep concern. MaterializeRuntimeReplaceable sits between columnar conversion and CollapseCodegenStages specifically for codegen, which is exactly why the InMemoryTableScanLike branch skips postStageCreationRules — extending that rule into a non-codegen leaf would be misplaced.
  • predicates only feed buildFilter (a runtime closure that pattern-matches expression shape), so unfolding them at the consumer is the right layer. It also covers AQE and non-AQE uniformly (non-AQE was relying on QueryExecution.preparations reaching the predicates field; now both paths go through the same unfold) while keeping the readable node in the plan/EXPLAIN output.

Added an AQE regression test in MaterializeRuntimeReplaceableSuite using an adaptive cached plan, so the scan is wrapped in a TableCacheQueryStageExec leaf (the gap you flagged). A surviving predicate RuntimeReplaceable is pushed into the scan; the test asserts the wrapper survives into scan.predicates and that cached-batch pruning still kicks in — it fails without the fix (readBatches == 10, no pruning). I also added comments at all three sites recording the codegen-vs-consumer split.

One scoping note: this is forward-looking. The current opt-in survivor (multi_get_json_object) isn't a buildFilter-recognizable predicate shape, so no current query regresses; the fix matters for a future survivor whose replacement is a prunable comparison.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up: I generalized this beyond the cached-batch case.

First-principle we landed on: a surviving RuntimeReplaceable (eagerReplace = false) must be unfolded to its replacement wherever a predicate leaves Spark's own expression-evaluation engine and is handed to a consumer that interprets its structure or sends it elsewhere — because RuntimeReplaceable is a Spark-internal optimizer concept the consumer can't understand. Internal Spark evaluation (FilterExec/Project codegen + interpreted) needs no unfold: it's handled by the eval/doGenCode delegation plus MaterializeRuntimeReplaceable.

That boundary is realized at three places, all now unfolding:

  • InMemoryTableScanExec.buildFilter (cached-batch pruning — the case you flagged; CachedBatchSerializer is a @DeveloperApi).
  • DataSourceStrategy.translateLeafNodeFilter (V1 / file source filter pushdown).
  • V2ExpressionBuilder.generateExpression (V2) — and since all V2 pushdown (filters, aggregate functions, aggregate arguments, group-by, sort) funnels through generateExpression via PushableExpression/translateAggregation, this single fallback covers V2 filter and aggregate/group-by pushdown. It's a fallback (after the explicit cases) so native high-level pushdown like AES_ENCRYPT still wins.

Out of scope (different kind of consumer — column selection, not expression translation, and driven by references which for a survivor equals its children's references): nested-column/schema pruning and partition pruning.

Tests added for all three boundaries (V1/V2 filter translation, V2 aggregate/group-by, and an AQE cached-batch pruning test); each was verified to fail without the corresponding unfold.

Note: I squashed the branch into a single commit.

@cloud-fan cloud-fan force-pushed the rr-selfeval-validation branch from 6cdb81d to b80438b Compare June 22, 2026 16:59
@cloud-fan cloud-fan force-pushed the rr-selfeval-validation branch from f459bc6 to 5251cd7 Compare June 22, 2026 20:06

@sunchao sunchao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One inline P2 follow-up on compound V2 filter reconstruction.

// natively is a Spark-internal optimizer node the connector cannot understand. Fall back to its
// concrete `replacement` so the lowered form can still be pushed -- same boundary rationale as
// `DataSourceStrategy.translateLeafNodeFilter` (V1) and `CachedBatchSerializer.buildFilter`.
case r: RuntimeReplaceable => generateExpression(r.replacement, isPredicate)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Preserve the mapping for compound replacement predicates

translateFilterV2WithMapping treats the surviving wrapper as its leaf/other case, but this fallback can return a structural V2And, V2Or, or V2Not. The map then contains only compoundPredicate -> originalWrapper. If a SupportsPushDownV2Filters source returns that predicate unchanged because it cannot fully push it (as the interface contract requires), rebuildExpressionFromFilter matches the compound node first and recursively visits children that were created inside V2ExpressionBuilder and have no map entries. Planning then throws Failed to rebuild Expression for filter instead of retaining a post-scan filter. For example, this occurs for an eagerReplace = false wrapper whose replacement is And(a > 1, b < 2) on a rejecting V2 source.

Could we either unfold before entering the mapping-aware recursion, or make rebuilding prefer an exact map entry before descending, and add a rejecting-source regression test? The new test exercises only a leaf GreaterThan translation, so it does not cover this reconstruction path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed in cafddb8 with the "prefer an exact map entry before descending" option.

rebuildExpressionFromFilter now checks translatedFilterToExpr.get(predicate) before matching V2And/V2Or/V2Not. For the surviving wrapper, the compound V2And is a map key (mapped to the wrapper), so it's restored directly and we never descend into the synthetic children that have no entries. The original readable wrapper is kept in the post-scan filter and materialized into its replacement before codegen by MaterializeRuntimeReplaceable.

This is granularity-correct at every level of descent, so nesting is covered too -- e.g. And(c = 5, wrapper(And(a > 1, b < 2))) rebuilds as And(c = 5, wrapper): the outer V2And isn't a map key so it descends, and the inner V2And(a>1, b<2) is a map key so it's restored before its synthetic children are visited. Normal compound filters are decomposed at translation (only leaves mapped), so the exact lookup misses and falls through to the existing descent -- behavior unchanged.

Added a regression test in DataSourceV2StrategySuite that translates a compound-replacement wrapper via translateFilterV2WithMapping, then feeds the result back through rebuildExpressionFromFilter (equivalent to a rejecting SupportsPushDownV2Filters source returning the predicate unchanged) and asserts the wrapper is restored; it throws Failed to rebuild Expression for filter without the fix.

One note on V1: the V1 path (DataSourceStrategy) does not have this crash. There the unfold sits inside translateLeafNodeFilter, which only recognizes leaf shapes, so a compound replacement simply isn't translated (returns None) -- no pushdown, no map entry, correct results. That's a milder missed-optimization version of the same root cause; I left V1 as-is since fixing it would be an enhancement (and would lose the readable wrapper in the post-scan filter).

@sunchao sunchao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One inline P2 follow-up on throwable metadata for surviving RuntimeReplaceable expressions.

// non-deterministic `Rand`. This matters once a `RuntimeReplaceable` may survive into the
// physical plan (see `eagerReplace`): the survival decision relies on an accurate determinism
// signal.
override lazy val deterministic: Boolean = replacement.deterministic

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Delegate throwable to the surviving replacement

RuntimeReplaceable now delegates deterministic and foldable to replacement, but it still inherits Expression.throwable, which checks only the wrapper's original children. A surviving wrapper can therefore look non-throwing even when its replacement contains an explicitly throwable expression such as Sequence(..., step).

For a concrete example, consider a boolean wrapper whose replacement is size(sequence(start, stop, step)) > 0, and an inner join with these inputs:

  • left row (key=1, start=0, stop=1, step=1); right contains key=1
  • left row (key=2, start=0, stop=1, step=-1); right has no key=2

With the predicate above the join, the second row is removed by the join before sequence(0, 1, -1) is evaluated, so the query succeeds. With eagerReplace = false, the wrapper reports throwable = false; PushPredicateThroughJoin moves it onto the left input, and the second row now throws Illegal sequence boundaries before the join can discard it. Physical materialization happens too late to undo that relocation. Spark's existing SPARK-46707 test specifically keeps explicit-step Sequence predicates above joins for this reason.

Could we also delegate this metadata, e.g. override lazy val throwable: Boolean = replacement.throwable, and add the wrapped join case as a regression test?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed in bd8f0ee.

RuntimeReplaceable now does override lazy val throwable: Boolean = replacement.throwable, alongside the existing deterministic/foldable delegations. You're right that the inherited children.exists(_.throwable) was wrong for a survivor: the children are the original arguments (non-throwing literal bounds) while the replacement is where the throwing Sequence lives, so PushPredicateThroughJoin (the SPARK-46707 guard) would relocate the predicate below the join and surface Illegal sequence boundaries on rows the join would otherwise discard.

Added a FilterPushdownSuite regression test mirroring the existing SPARK-46707 join test: a wrapper whose children (x.a, x.b) are non-throwing but whose replacement is an explicit-step Sequence; the predicate must stay above the join. It fails without the delegation (the wrapper is pushed onto the left input) and passes with it.

One implementation note for whoever reviews the test: the wrapper is deliberately not an InheritAnalysisRules expression. With InheritAnalysisRules the replacement becomes the wrapper's child, so the inherited children.exists(_.throwable) would already observe the throwing Sequence and the test would pass even without the fix. The test wrapper keeps its original args as children (the Uniform / MultiGetJsonObject shape) so it actually exercises the gap.

// which already excludes it.
val cannotSurvive =
!replaced.deterministic || ConvertToLocalRelation.hasUnevaluableExpr(replaced)
if (r.eagerReplace || cannotSurvive) replaced else r

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Force eager replacement when later finish-analysis rules need a hidden expression

This gate also lets through replacements that require a later finish-analysis rewrite. For example, consider a non-InheritAnalysisRules wrapper with an input timestamp child, eagerReplace = false, and replacement = Greatest(Seq(CurrentTimestamp(), child)). The replacement is deterministic and evaluable, and the input attribute makes it non-foldable, so the wrapper survives here. However, the replacement is not one of the wrapper's children, so its tree-pattern bits do not expose CURRENT_LIKE. ComputeCurrentTime, which runs after ReplaceExpressions, prunes this subtree and never replaces CurrentTimestamp with the single query-start literal. MaterializeRuntimeReplaceable exposes it only during physical preparation, where CurrentTimestamp.eval calls Instant.now() as rows are evaluated. In a focused 20,000-row regression, this produced multiple distinct timestamps within one query.

Before this PR, ReplaceExpressions always exposed the replacement before ComputeCurrentTime. Could we force eager replacement at least when replaced.containsPattern(CURRENT_LIKE) (or otherwise run the remaining finish-analysis rewrites on the hidden replacement)? Please keep the regression attribute-dependent so constant folding cannot mask the problem.

// Materialize any RuntimeReplaceable that survived the optimizer into its replacement for
// the Spark execution path. After columnar/native conversion (so a native engine sees the
// original expression), before codegen (so Spark codegen never sees a RuntimeReplaceable).
MaterializeRuntimeReplaceable,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Expose structural equality before join-key extraction

Materializing at this point is too late for planner consumers. A supported eagerReplace = false binary predicate with replacement = EqualTo(leftKey, rightKey) survives the logical optimizer, but ExtractEquiJoinKeys only pattern-matches visible EqualTo / EqualNullSafe conjuncts and therefore extracts no keys from the wrapper. In a focused regression with broadcasting disabled, the batch plan contained CartesianProductExec and no SortMergeJoinExec (while still returning the correct rows), turning a normal equi-join into O(N*M) work that can OOM. A second control showed that an ordinary stream-stream EqualTo is accepted by StreamingJoinStrategy, while the identical equality hidden in this wrapper throws the 'stream-stream join without equality predicate' AnalysisException.

MaterializeRuntimeReplaceable unfolds the condition only after Spark strategies have selected the operator, so it cannot recover a hash/sort-merge/streaming equi-join. Before this PR, unconditional replacement exposed the equality before planning. Could join-key extraction inspect an unfolded condition (while preserving the original expression where needed), or should such structural predicates be forced to replace eagerly? Batch physical-plan and stream-stream planning regressions would cover both consequences.

@sunchao sunchao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up review on 09e4f260: the exact CURRENT_LIKE and root-EqualTo cases are improved, but the broader correctness contracts are still incomplete in the cases below.

val cannotSurvive =
!replaced.deterministic ||
ConvertToLocalRelation.hasUnevaluableExpr(replaced) ||
replaced.containsPattern(CURRENT_LIKE)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Cover every required finish-analysis rewrite

This only forces eager replacement for CURRENT_LIKE, but the same survivor opacity still bypasses other mandatory rules that run after ReplaceExpressions. For example, a deterministic, non-foldable wrapper with replacement = ArrayDistinct(arrayDoubleAttr) hides ARRAY_DISTINCT from both NormalizeFloatingNumbers passes; in a focused regression, [-0.0d, 0.0d] remained two elements instead of one canonical positive zero. Likewise, a wrapper whose replacement contains Cast(Literal("epoch"), DateType, ...) hides CAST from SpecialDatetimeValues; it returned NULL in legacy mode (and would throw in ANSI mode) instead of 1970-01-01.

MaterializeRuntimeReplaceable exposes these expressions only after logical optimization, so neither rewrite gets another chance. Could the survival gate cover every replacement requiring a mandatory post-ReplaceExpressions rewrite, or run those rewrites over the hidden replacement, with regressions for these two cases? A CURRENT_LIKE-only guard still permits wrong results and runtime errors.

logDebug(s"Considering join on: $condition")
// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys.
val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Split the unfolded condition for compound replacements

The condition is split while the outer RuntimeReplaceable is still opaque. If a supported survivor has replacement = And(EqualTo(left.k, right.k), GreaterThan(left.v, 0)), splitConjunctivePredicates produces the wrapper as one predicate; unfoldKeyPredicate then yields an And, which none of the top-level equality cases match. A focused regression still planned this as CartesianProductExec, and the equivalent stream-stream join is still rejected as lacking an equality predicate. This compound replacement shape is already treated as supported by TestCompoundPredicateRuntimeReplaceable in the V2 tests.

Could we expose the replacement before splitting (while retaining the mapping to the original survivor where needed), and add batch plus streaming coverage for a compound replacement?

// keeps showing the readable node. The unfold is gated on the tree pattern so the common
// survivor-free join is unaffected.
def unfoldKeyPredicate(p: Expression): Expression =
if (p.containsPattern(RUNTIME_REPLACEABLE)) RuntimeReplaceable.unfold(p) else p

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Preserve nested survivors when exposing the outer equality

This recursively unfolds every RuntimeReplaceable in the predicate, including a survivor that is already a visible join-key operand. For EqualTo(partitionKey, Survivor(dimensionKey)), the old extractor retained Survivor(dimensionKey) as the physical key; this code emits only its low-level replacement, so native planning can no longer match the high-level key.

It also makes DPP invalid: ExtractEquiJoinKeys returns the unfolded build key, but PartitionPruning inspects the original equality and calls joinKeys.indexOf(Survivor(dimensionKey)), producing -1. The resulting DynamicPruningSubquery is unresolved and fails default plan validation; with validation disabled, the negative index later reaches buildKeys(-1). In a focused planning regression, the visible survivor key was already reduced to (k + 0) before columnar/native conversion.

Could this unfold only the outer survivor needed to reveal an equality, preserving already-visible key operands, with a DPP regression for this shape?

@cloud-fan cloud-fan force-pushed the rr-selfeval-validation branch from 09e4f26 to 47329ba Compare June 26, 2026 21:49
@cloud-fan cloud-fan changed the title [SPARK-57512][SQL] Allow RuntimeReplaceable to opt out of eager replacement and survive into the physical plan [SPARK-57512][SQL] Introduce DelegateExpression to keep high-level functions in the physical plan Jun 26, 2026
@cloud-fan cloud-fan force-pushed the rr-selfeval-validation branch 2 times, most recently from 20c56d4 to 6b12fbc Compare June 26, 2026 22:15
…er stripped before planning

`DelegateExpression` is a transparent, named delegate over a `definition` expression. It lets a
high-level function (e.g. `right(a, b)`) stay readable in the analyzed/optimized logical plan, and
lets optimizer rules introduce such nodes (e.g. `multi_get_json_object`), without hand-written
`eval`/`doGenCode` -- every behavior delegates to `definition`, a real child fully visible to the
analyzer and optimizer.

`name`/`inputs` are purely informational; nothing enforces that `definition` matches them, so the
wrapper is never exposed to physical planning or external systems. `LowerDelegateExpression` strips
it to `definition` in `QueryExecution.createSparkPlan` -- the single entry point to the planner, used
by both the main query and AQE re-planning -- so the planner and every physical consumer (join-key
extraction, V1 / cached-batch pushdown, columnar rules, codegen) sees the real executed expression.
Data source V2 pushdown runs earlier, in the logical optimizer, so it unfolds the wrapper directly in
`V2ExpressionBuilder`. `eval`/`doGenCode` still delegate, as a safety net.

`RuntimeReplaceable` is left exactly as on master. `MultiGetJsonObject` is rebuilt on
`DelegateExpression` (Invoke definition, dropping hand-written eval/codegen); `right` is migrated via
a new `DelegateFunction` (an `ExpressionBuilder`) to demonstrate the authoring path.

Co-authored-by: Isaac
@cloud-fan cloud-fan force-pushed the rr-selfeval-validation branch from 6b12fbc to e684fc9 Compare June 26, 2026 23:24
@cloud-fan cloud-fan changed the title [SPARK-57512][SQL] Introduce DelegateExpression to keep high-level functions in the physical plan [SPARK-57512][SQL] Introduce DelegateExpression, a logical-plan wrapper stripped before planning Jun 26, 2026
@cloud-fan

Copy link
Copy Markdown
Contributor Author

@sunchao Heads-up: I've reworked this PR substantially and force-pushed, so the existing review threads now point at code that has been removed. Thank you for the earlier reviews — they're what drove the redesign.

Instead of letting RuntimeReplaceable opt out of eager replacement and survive into the physical plan (with MaterializeRuntimeReplaceable and per-consumer unfolds), the PR now adds DelegateExpression: a transparent, named logical-plan wrapper whose definition is a real child, fully visible to the analyzer and optimizer. RuntimeReplaceable is reverted to exactly master.

The wrapper is stripped to its definition at the single entry to the planner (QueryExecution.createSparkPlan, which AQE re-planning also routes through), so the planner and every physical consumer — join-key extraction, pushdown, columnar, codegen — operate on the real executed expression. V2 pushdown is a logical rule (it runs before the strip), so it unfolds the wrapper directly in V2ExpressionBuilder.

The intent is to dissolve the class of issues your [P2]s identified — the lowered form hidden from finish-analysis rules and from planner-level structural matchers — at the root (keep the definition a visible child, and lower it before planning) rather than patching each consumer. The migrated consumers (right, multi_get_json_object) are scalar/value functions. The updated PR description has the full design, including one documented limitation (a delegate created by a physical rule isn't stripped and may reach an external system un-lowered — acceptable, like any expression it doesn't recognize).

Would appreciate a fresh look when you have time.

@sunchao sunchao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up review on the current head.

Batch("Remove TempResolvedColumn", Once, RemoveTempResolvedColumn),
Batch("Remove analysis-only markers", Once,
RemoveTempResolvedColumn,
RemoveInputTypeMarkers),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Remove input markers on the single-pass analyzer path

This cleanup batch only runs in the fixed-point Analyzer. With spark.sql.analyzer.singlePassResolver.enabled=true, right(...) is resolved and its ImplicitCastInput succeeds, but ResolverRunner marks the plan analyzed without running RemoveInputTypeMarkers. LowerDelegateExpression then removes only the outer delegate, leaving the Unevaluable marker in the executable expression; for example, SELECT right('abc', 1) fails during evaluation/code generation. The base Right used no marker nodes and worked in this mode. Could we add equivalent cleanup to the single-pass path and an execution-level regression test?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 58889e8d7aa. The single-pass resolver has no rule batch to run the RemoveInputTypeMarkers rule in, so FunctionResolver now calls RemoveInputTypeMarkers.removeMarkers(...) directly once type coercion has cast the marker children. right(...) therefore strips its markers under spark.sql.analyzer.singlePassResolver.enabled=true too, and LowerDelegateExpression no longer leaves an Unevaluable marker in the executable expression. Added an execution-level regression in DelegateExpressionQuerySuite (asserts no ImplicitCastInput/TypeCheckInput survives single-pass analysis and that the implicit Cast the marker drove still applies).

override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveExpressionsWithPruning(_.containsPattern(INPUT_TYPE_MARKER)) {
case marker: ImplicitCastInput => marker.child
case marker: TypeCheckInput => marker.child

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Keep failed type checks visible to CheckAnalysis

This unwraps every TypeCheckInput, including one whose type check failed. For example, TypeCheckInput(Literal(1L), IntegerType) is unresolved before this rule, but an identity-lowering DelegateFunction with implicitCast = false becomes fully resolved after the marker is replaced by its Long child, so analysis accepts the mismatched argument. The new unit test only calls checkInputDataTypes() before cleanup and therefore misses the end-to-end behavior. Could we unwrap only successfully resolved markers (leaving failures for CheckAnalysis) and add an analyzer-level regression?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in bffecc826d6. RemoveInputTypeMarkers (and its expression-level removeMarkers) now only unwrap a marker if marker.resolved, so a TypeCheckInput whose type check failed is left in place and its ExpectsInputTypes failure stays visible to CheckAnalysis instead of exposing a resolved child of the wrong type. Added a regression in DelegateExpressionSuite ("RemoveInputTypeMarkers keeps a failed type-check marker for CheckAnalysis to report").

* (analysis-time) path inserts the input-type markers, because the analyzer's `TypeCoercion`
* casts them and `RemoveInputTypeMarkers` strips them afterwards.
*/
override def build(funcName: String, expressions: Seq[Expression]): Expression = {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Validate each delegate function's argument count

build currently accepts any number of expressions. Right.lower consumes only positions 0 and 1, so right('abcd', 1, 99) succeeds and silently ignores 99, while zero/one-argument calls fail by indexing the sequence instead of returning the structured WRONG_NUM_ARGS error. The previous expression[Right] registration enforced its two-expression constructor. Could DelegateFunction expose a functionSignature, or otherwise validate arity before calling lower, with wrong-arity SQL coverage?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in bffecc826d6. DelegateFunction.build now validates expressions.length == inputTypes.length and throws the structured WRONG_NUM_ARGS error before calling lower, so right('abcd', 1, 99) no longer silently ignores the extra arg and zero/one-arg calls no longer index past the sequence. An empty inputTypes marks a variadic function whose lower owns arg handling (no arity check). Added wrong-arity SQL coverage in DelegateExpressionQuerySuite plus a unit test in DelegateExpressionSuite.

val len = args(1)
If(
IsNull(str),
Literal(null, StringType),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Preserve the resolved input type for these branch literals

Hard-coding plain StringType changes the result schema when the public spark.sql.preserveCharVarcharTypeInfo setting is enabled. For example, typeof(right(CAST('abc' AS CHAR(5)), 2)) returned char(5) on the base implementation, whose null/empty literals used str.dataType, but returns string on this head because If widens the CHAR(5) substring and these STRING branches. Registry expression builders receive resolved arguments, and the marker's dataType delegates to its child, so using str.dataType here is safe. Could we preserve the old literal typing and add CHAR/VARCHAR result-type coverage?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks — the finding is right (the CHAR/VARCHAR result type was regressing), and it's fixed in bffecc8. But using str.dataType directly isn't safe here, for a reason worth recording:

At build/lower time the argument is the not-yet-coerced ImplicitCastInput marker, and the marker delegates dataType to its (uncoerced) child. So str.dataType is the input type, which isn't necessarily a string yet — e.g. right(12345, 2) has an IntegerType child that the implicit cast turns into a string only later. Typing the empty-string branch as Literal(UTF8String.EMPTY_UTF8, IntegerType) then fails Literal validation (Literal must have a corresponding value to int, but class UTF8String found). Confirmed: a naive str.dataType crashes DelegateExpressionQuerySuite."right() implicit-casts a non-string arg ...".

So the fix types the null/empty branch literals with str.dataType only when it is already a string-family type (StringType / CharType / VarcharType) — preserving CHAR(N)/VARCHAR(N) under preserveCharVarcharTypeInfo and non-default collations — and falls back to plain StringType otherwise (the type the implicit cast produces). New test right() preserves the input CHAR/VARCHAR type with preserveCharVarcharTypeInfo asserts typeof(right(CAST('abc' AS CHAR(5)), 2)) = char(5); the collation case was already covered.

*/
object LowerDelegateExpression extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan =
plan.transformAllExpressionsWithPruning(_.containsPattern(DELEGATE_EXPRESSION)) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Fully lower a delegate whose definition is another delegate

transformAllExpressionsWithPruning is pre-order. If an outer delegate's definition is itself a DelegateExpression, replacing the outer node causes traversal to continue only through the replacement's children; the replacement root is not matched again. One pass therefore leaves the inner delegate in the plan, despite this rule's strips every DelegateExpression contract. A nested predicate delegate can consequently remain opaque to join-key extraction or physical pushdown. Could this use bottom-up/recursive lowering and include a nested-delegate regression?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 58889e8d7aa. LowerDelegateExpression.lower is now a tail-recursive unwrap: if a delegate's definition is itself a DelegateExpression, the chain is collapsed in one go, so no wrapper survives the pre-order transformAllExpressionsWithPruning. Delegates nested deeper (as children of definition) are still handled by the surrounding tree traversal. Added a nested-delegate regression ("LowerDelegateExpression fully unwraps a directly-nested delegate-of-delegate").

expr: Expression, isPredicate: Boolean = false): Option[V2Expression] = expr match {
case literal: Literal => Some(translateLiteral(literal))
// DelegateExpression is a Spark-internal wrapper; push down its real definition instead.
case d: DelegateExpression => generateExpression(d.definition, isPredicate)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Preserve reconstruction mappings for compound delegate predicates

translateFilterV2WithMapping treats the wrapper as its leaf case, but this new recursion can return a structural V2And/V2Or. The map then contains only compoundPredicate -> originalDelegate. If a SupportsPushDownV2Filters source returns that predicate unchanged because it cannot fully push it, rebuildExpressionFromFilter decomposes the compound node before consulting the map and fails on its synthetic, unmapped children with Failed to rebuild Expression for filter. Could we unfold before the mapping-aware recursion, or prefer an exact map entry before structural descent, and add a rejected-filter round-trip test?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in bffecc826d6, with the "prefer an exact map entry before structural descent" option. rebuildExpressionFromFilter now checks translatedFilterToExpr.get(predicate) before matching V2And/V2Or/V2Not, so a compound-definition delegate that translated to a single mapped V2And/V2Or is restored directly instead of recursing into its synthetic, unmapped children (which threw Failed to rebuild Expression for filter). This is granularity-correct at every level, so a delegate nested inside an ordinary compound is covered too; normal compound filters are decomposed at translation (only leaves mapped), so the exact lookup misses and behavior is unchanged. Added a rejected-filter round-trip regression in DataSourceV2StrategySuite. (Same root cause as the pre-redesign V2 finding, now applied to DelegateExpression.)

// Go through QueryExecution.createSparkPlan -- the single place that strips DelegateExpression
// before planning -- instead of calling the planner directly, so the re-planned stage sees the
// real executed expression.
val sparkPlan = QueryExecution.createSparkPlan(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Return the same logical tree that AQE used for planning

When a runtime optimizer rule inserts a DelegateExpression and the resulting physical plan is adopted, createSparkPlan plans a lowered copy whose nodes become the physical plan's logicalLink targets, while this method returns the original unlowered optimized tree below. If a copied node backs a later query stage, replaceWithQueryStagesInLogicalPlan cannot find it using reference equality, so the completed stage is not inserted into currentLogicalPlan and subsequent runtime optimization loses that stage's statistics. This is conditional on the new plan being adopted, not every runtime-added wrapper. Could AQE retain and return the exact lowered tree used for planning, with an injected runtime-optimizer/staged-query regression?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in bffecc8reOptimize now lowers once and returns the same lowered tree it planned, so the physical plan's logicalLink targets stay reference-consistent with the returned logical plan.

On the regression test: I couldn't find a way to assert this one deterministically. The symptom is silent — no wrong results and no error (eval/codegen still delegate), and the physical logicalLinks are identical with or without the fix (they're set on the lowered copy in createSparkPlan either way). The only thing that differs is currentLogicalPlan, a transient local in getFinalPhysicalPlan, whose orphaned-stage state surfaces only as a threshold-sensitive re-planning decision on a later iteration. A test would need either a flaky multi-stage stats-driven plan-difference query or test-only instrumentation to expose stage tracking. I verified the fix by reasoning through replaceWithQueryStagesInLogicalPlan plus a full AdaptiveQueryExecSuite run (green). Open to adding observability instrumentation in a follow-up if you think it's warranted — let me know if you'd prefer that.

…ested delegates

- FunctionResolver: resolve a built DelegateExpression's definition recursively (like
  InheritAnalysisRules) so coercion reaches the buried ImplicitCastInput/TypeCheckInput
  markers, then strip them (RemoveInputTypeMarkers.removeMarkers); single-pass has no
  end-of-analysis batch for this. Fixes right(<non-string>) failing to resolve under
  spark.sql.analyzer.singlePassResolver.enabled.
- LowerDelegateExpression: unwrap a directly-nested delegate-of-delegate chain so no
  wrapper survives transformDown to the planner.
- Tests: single-pass resolution of right(); nested-delegate lowering; collation
  preservation of right(); plus a comment typo fix.

Co-authored-by: Isaac
…t delegation, arity/type-check validation, CHAR/VARCHAR result type, V2 compound rebuild, AQE logical link

Addresses review feedback on the DelegateExpression mechanism:

- DelegateExpression now delegates `nullIntolerant` to its `definition` (like its
  other metadata), so null-intolerance optimizations (IsNotNull-constraint
  inference, NullPropagation) still fire for `multi_get_json_object` while the
  wrapper is in the logical plan.
- DelegateFunction.build validates argument count against the `inputTypes` arity,
  so wrong-arity calls fail with WRONG_NUM_ARGS instead of IndexOutOfBounds or a
  silently ignored extra argument.
- Right.lower types its null/empty branch literals with the input's string-family
  type (CHAR/VARCHAR/collation) when available, preserving the result type under
  preserveCharVarcharTypeInfo, while falling back to StringType for a yet-to-be
  cast non-string input (e.g. right(12345, 2)).
- RemoveInputTypeMarkers (and the single-pass removeMarkers) only unwrap resolved
  markers, leaving a failed TypeCheckInput visible to CheckAnalysis.
- DataSourceV2Strategy.rebuildExpressionFromFilter prefers an exact map entry
  before structural descent, so a compound-definition delegate round-trips instead
  of failing reconstruction on a rejecting V2 source.
- AdaptiveSparkPlanExec.reOptimize lowers once and returns the same lowered tree it
  planned, so the physical plan's logicalLink targets match the returned logical
  plan by reference.
- TreePatterns: place DELEGATE_EXPRESSION / INPUT_TYPE_MARKER in alphabetical order.

New tests: nullIntolerant delegation, build arity validation, failed type-check
preservation (DelegateExpressionSuite); CHAR/VARCHAR result type and wrong-arity
rejection (DelegateExpressionQuerySuite); compound-definition V2 filter round-trip
(DataSourceV2StrategySuite).

Co-authored-by: Isaac
…n inputs; fix scalastyle

`DelegateExpression.inputs` are display-only metadata, not tree children, so generic
expression traversals (`transform`, `transformAllExpressions`) never reach them. Two
consumers that walk expressions to render or normalize them were left with raw inputs:

- `usePrettyExpression` produced generated column names from un-prettified inputs, so
  string literals kept their quotes (`right('abcd', 2)`) and attributes kept their
  qualifiers (`right(spark_catalog.default.t.c7, 2)`). Add a `DelegateExpression` case
  that re-renders the call with each input pushed through `toPrettySQL`, mirroring the
  existing `InheritAnalysisRules` case, so names match the pre-delegate function.
- `NormalizePlan.normalizeExprIds` left non-deterministic process-global expr ids inside
  the surviving wrapper's inputs (`right(g#16854, ...)`), making the connect golden file
  unstable. Extract the normalization rule and apply it recursively to `inputs` so the
  informational call is deterministic (`right(g#0, g#0)`).

Regenerate the legitimately-changed golden files: the `right`/`text` analyzer-results
(now showing the readable wrapper) and `function_right.explain`. The `results/` schema
files and `sql-expression-schema.md` are unchanged -- the pretty-name fix restores exact
master output.

Also fix scalastyle line-length and import-order violations across the touched files.

Co-authored-by: Isaac
…delegate test suites

Fixes the Scala linter failures in the three new test files: line-length
overflows, two missing explicit return types, and an import ordering issue.

Co-authored-by: Isaac
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants