Skip to content

Commit c265f1d

Browse files
committed
[SPARK-57512][SQL] Unfold surviving RuntimeReplaceable at predicate-pushdown boundaries
Generalize the cached-batch fix: a surviving RuntimeReplaceable must be unfolded to its replacement wherever a predicate leaves Spark's expression-evaluation engine for a structure-interpreting consumer. In addition to CachedBatchSerializer.buildFilter, this covers data source filter pushdown -- V1 DataSourceStrategy.translateLeafNodeFilter and V2 V2ExpressionBuilder. V2 unfolds only as a fallback so explicit native handling (e.g. AES_ENCRYPT) still wins; the V1 leaf unfold preserves the translatedFilterToExpr mapping by keeping the original expression in the recursive caller. Internal Spark evaluation handles RuntimeReplaceable via delegation + MaterializeRuntimeReplaceable, so it needs no unfold. Co-authored-by: Isaac
1 parent 50cc6a1 commit c265f1d

4 files changed

Lines changed: 26 additions & 1 deletion

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,11 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) extends L
376376
} else {
377377
None
378378
}
379+
// A surviving `RuntimeReplaceable` (`eagerReplace = false`) that no explicit case above pushes
380+
// natively is a Spark-internal optimizer node the connector cannot understand. Fall back to its
381+
// concrete `replacement` so the lowered form can still be pushed -- same boundary rationale as
382+
// `DataSourceStrategy.translateLeafNodeFilter` (V1) and `CachedBatchSerializer.buildFilter`.
383+
case r: RuntimeReplaceable => generateExpression(r.replacement, isPredicate)
379384
case _ => None
380385
}
381386

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -631,9 +631,16 @@ object DataSourceStrategy
631631
}
632632
}
633633

634+
// A surviving `RuntimeReplaceable` (`eagerReplace = false`) is a Spark-internal optimizer node a
635+
// data source cannot understand, so pushdown must translate its concrete `replacement`. Unfold at
636+
// this boundary -- where the predicate leaves Spark's evaluation engine for an external system --
637+
// mirroring `V2ExpressionBuilder` (V2) and `CachedBatchSerializer.buildFilter` (cache). The
638+
// recursive caller keeps the original expression for the `translatedFilterToExpr` mapping, so
639+
// only this leaf translation sees the unfolded form.
634640
private def translateLeafNodeFilter(
635641
predicate: Expression,
636-
pushableColumn: PushableColumnBase): Option[Filter] = predicate match {
642+
pushableColumn: PushableColumnBase): Option[Filter] = {
643+
RuntimeReplaceable.unfold(predicate) match {
637644
case expressions.EqualTo(e @ pushableColumn(name), Literal(v, t)) =>
638645
Some(collationAwareFilter(sources.EqualTo(name, convertToScala(v, t)), e.dataType))
639646
case expressions.EqualTo(Literal(v, t), e @ pushableColumn(name)) =>
@@ -699,6 +706,7 @@ object DataSourceStrategy
699706
Some(sources.EqualTo(name, true))
700707

701708
case _ => None
709+
}
702710
}
703711

704712
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
1919

2020
import org.apache.spark.sql.catalyst.dsl.expressions._
2121
import org.apache.spark.sql.catalyst.expressions._
22+
import org.apache.spark.sql.execution.TestPredicateRuntimeReplaceable
2223
import org.apache.spark.sql.sources
2324
import org.apache.spark.sql.test.SharedSparkSession
2425
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
@@ -96,6 +97,11 @@ class DataSourceStrategySuite extends SharedSparkSession {
9697
testTranslateFilter(GreaterThan(attrInt, 1), Some(sources.GreaterThan(intColName, 1)))
9798
testTranslateFilter(GreaterThan(1, attrInt), Some(sources.LessThan(intColName, 1)))
9899

100+
// SPARK-57512: a surviving RuntimeReplaceable is unfolded so its replacement (here a
101+
// `GreaterThan`) can be translated and pushed, instead of being left unrecognized.
102+
testTranslateFilter(TestPredicateRuntimeReplaceable(attrInt, Literal(1)),
103+
Some(sources.GreaterThan(intColName, 1)))
104+
99105
testTranslateFilter(LessThan(attrInt, 1), Some(sources.LessThan(intColName, 1)))
100106
testTranslateFilter(LessThan(1, attrInt), Some(sources.GreaterThan(intColName, 1)))
101107

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StrategySuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.variant.VariantGet
2525
import org.apache.spark.sql.catalyst.util.V2ExpressionBuilder
2626
import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, FieldReference, GeneralScalarExpression, LiteralValue, VariantGet => V2VariantGet}
2727
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate}
28+
import org.apache.spark.sql.execution.TestPredicateRuntimeReplaceable
2829
import org.apache.spark.sql.internal.SQLConf
2930
import org.apache.spark.sql.test.SharedSparkSession
3031
import org.apache.spark.sql.types.{BooleanType, DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType, VariantType}
@@ -110,6 +111,11 @@ class DataSourceV2StrategySuite extends SharedSparkSession {
110111
testTranslateFilter(GreaterThan(1, attrInt),
111112
Some(new Predicate("<", Array(FieldReference(intColName), LiteralValue(1, IntegerType)))))
112113

114+
// SPARK-57512: a surviving RuntimeReplaceable is unfolded so its replacement (here a
115+
// `GreaterThan`) can be translated and pushed, instead of being left unrecognized.
116+
testTranslateFilter(TestPredicateRuntimeReplaceable(attrInt, Literal(1)),
117+
Some(new Predicate(">", Array(FieldReference(intColName), LiteralValue(1, IntegerType)))))
118+
113119
testTranslateFilter(LessThan(attrInt, 1),
114120
Some(new Predicate("<", Array(FieldReference(intColName), LiteralValue(1, IntegerType)))))
115121
testTranslateFilter(LessThan(1, attrInt),

0 commit comments

Comments
 (0)