Skip to content

Commit 58f60e7

Browse files
committed
feat: Enable DPP support with native_datafusion scan
1 parent c7aad67 commit 58f60e7

5 files changed

Lines changed: 535 additions & 5 deletions

File tree

native/proto/src/proto/operator.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ syntax = "proto3";
2222
package spark.spark_operator;
2323

2424
import "expr.proto";
25+
import "literal.proto";
2526
import "partitioning.proto";
2627
import "types.proto";
2728

@@ -108,6 +109,18 @@ message NativeScan {
108109
// the map.
109110
map<string, string> object_store_options = 13;
110111
bool encryption_enabled = 14;
112+
repeated RuntimeFilterBound runtime_filter_bounds = 15;
113+
}
114+
115+
// Runtime filter bound for row-group pruning in native scan.
116+
// Extracted from Spark's data filters containing range or IN predicates.
117+
message RuntimeFilterBound {
118+
string column_name = 1;
119+
int32 column_index = 2;
120+
optional spark.spark_expression.Literal min_value = 3;
121+
optional spark.spark_expression.Literal max_value = 4;
122+
repeated spark.spark_expression.Literal in_values = 5;
123+
string filter_type = 6; // "minmax", "in", "bloom"
111124
}
112125

113126
message IcebergScan {

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,25 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
601601
partitionSchema: StructType,
602602
hadoopConf: Configuration): String = {
603603

604+
val cometExecEnabled = COMET_EXEC_ENABLED.get()
605+
606+
// Prefer native_datafusion for queries with Dynamic Partition Pruning (DPP)
607+
// because it provides significant I/O reduction through partition filtering
608+
val hasDPP = scanExec.partitionFilters.exists(isDynamicPruningFilter)
609+
if (hasDPP && cometExecEnabled) {
610+
val dppFallbackReasons = new ListBuffer[String]()
611+
val typeChecker = CometScanTypeChecker(SCAN_NATIVE_DATAFUSION)
612+
val schemaSupported =
613+
typeChecker.isSchemaSupported(scanExec.requiredSchema, dppFallbackReasons)
614+
val partitionSchemaSupported =
615+
typeChecker.isSchemaSupported(partitionSchema, dppFallbackReasons)
616+
617+
if (schemaSupported && partitionSchemaSupported && dppFallbackReasons.isEmpty) {
618+
logInfo(s"Auto scan mode selecting $SCAN_NATIVE_DATAFUSION for DPP query")
619+
return SCAN_NATIVE_DATAFUSION
620+
}
621+
}
622+
604623
val fallbackReasons = new ListBuffer[String]()
605624

606625
// native_iceberg_compat only supports local filesystem and S3
@@ -621,7 +640,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
621640
val partitionSchemaSupported =
622641
typeChecker.isSchemaSupported(partitionSchema, fallbackReasons)
623642

624-
val cometExecEnabled = COMET_EXEC_ENABLED.get()
625643
if (!cometExecEnabled) {
626644
fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT requires ${COMET_EXEC_ENABLED.key}=true"
627645
}

spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,12 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
5858
withInfo(scanExec, s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled")
5959
}
6060

61-
// Native DataFusion doesn't support subqueries/dynamic pruning
62-
if (scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
63-
withInfo(scanExec, "Native DataFusion scan does not support subqueries/dynamic pruning")
64-
}
61+
// Dynamic partition pruning (DPP) is now supported!
62+
// The dynamicallySelectedPartitions in CometScanExec evaluates DPP filters
63+
// and returns the filtered file list. Native scan receives these pre-filtered
64+
// files, so partition-level pruning works correctly.
65+
// Note: DPP filters are excluded from dataFilters to avoid pushing subqueries
66+
// to native execution (see supportedDataFilters in CometScanExec).
6567

6668
if (SQLConf.get.ignoreCorruptFiles ||
6769
scanExec.relation.options
@@ -191,6 +193,10 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
191193
}
192194
}
193195

196+
// Add runtime filter bounds if available
197+
// These are pushed down from join operators to enable I/O reduction
198+
addRuntimeFilterBounds(scan, nativeScanBuilder)
199+
194200
Some(builder.setNativeScan(nativeScanBuilder).build())
195201

196202
} else {
@@ -250,4 +256,70 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
250256
override def createExec(nativeOp: Operator, op: CometScanExec): CometNativeExec = {
251257
CometNativeScanExec(nativeOp, op.wrapped, op.session)
252258
}
259+
260+
/**
261+
* Add runtime filter bounds to the native scan for row-group pruning. Runtime filters are
262+
* extracted from data filters that contain range predicates (GreaterThanOrEqual,
263+
* LessThanOrEqual) or IN predicates.
264+
*/
265+
private def addRuntimeFilterBounds(
266+
scan: CometScanExec,
267+
nativeScanBuilder: OperatorOuterClass.NativeScan.Builder): Unit = {
268+
import org.apache.spark.sql.catalyst.expressions._
269+
270+
// Extract runtime filter bounds from data filters
271+
scan.supportedDataFilters.foreach {
272+
case GreaterThanOrEqual(attr: AttributeReference, Literal(value, dataType)) =>
273+
val boundBuilder = OperatorOuterClass.RuntimeFilterBound.newBuilder()
274+
boundBuilder.setColumnName(attr.name)
275+
boundBuilder.setColumnIndex(scan.output.indexWhere(_.name == attr.name))
276+
boundBuilder.setFilterType("minmax")
277+
exprToProto(Literal(value, dataType), scan.output).foreach { minProto =>
278+
boundBuilder.setMinValue(minProto.getLiteral)
279+
}
280+
nativeScanBuilder.addRuntimeFilterBounds(boundBuilder.build())
281+
282+
case LessThanOrEqual(attr: AttributeReference, Literal(value, dataType)) =>
283+
val boundBuilder = OperatorOuterClass.RuntimeFilterBound.newBuilder()
284+
boundBuilder.setColumnName(attr.name)
285+
boundBuilder.setColumnIndex(scan.output.indexWhere(_.name == attr.name))
286+
boundBuilder.setFilterType("minmax")
287+
exprToProto(Literal(value, dataType), scan.output).foreach { maxProto =>
288+
boundBuilder.setMaxValue(maxProto.getLiteral)
289+
}
290+
nativeScanBuilder.addRuntimeFilterBounds(boundBuilder.build())
291+
292+
case And(
293+
GreaterThanOrEqual(attr1: AttributeReference, Literal(minVal, minType)),
294+
LessThanOrEqual(attr2: AttributeReference, Literal(maxVal, maxType)))
295+
if attr1.name == attr2.name =>
296+
// Combined range filter: column >= min AND column <= max
297+
val boundBuilder = OperatorOuterClass.RuntimeFilterBound.newBuilder()
298+
boundBuilder.setColumnName(attr1.name)
299+
boundBuilder.setColumnIndex(scan.output.indexWhere(_.name == attr1.name))
300+
boundBuilder.setFilterType("minmax")
301+
exprToProto(Literal(minVal, minType), scan.output).foreach { minProto =>
302+
boundBuilder.setMinValue(minProto.getLiteral)
303+
}
304+
exprToProto(Literal(maxVal, maxType), scan.output).foreach { maxProto =>
305+
boundBuilder.setMaxValue(maxProto.getLiteral)
306+
}
307+
nativeScanBuilder.addRuntimeFilterBounds(boundBuilder.build())
308+
309+
case InSet(attr: AttributeReference, values) if values.size <= 10 =>
310+
// Small IN filter - pass individual values
311+
val boundBuilder = OperatorOuterClass.RuntimeFilterBound.newBuilder()
312+
boundBuilder.setColumnName(attr.name)
313+
boundBuilder.setColumnIndex(scan.output.indexWhere(_.name == attr.name))
314+
boundBuilder.setFilterType("in")
315+
values.foreach { value =>
316+
exprToProto(Literal(value, attr.dataType), scan.output).foreach { valProto =>
317+
boundBuilder.addInValues(valProto.getLiteral)
318+
}
319+
}
320+
nativeScanBuilder.addRuntimeFilterBounds(boundBuilder.build())
321+
322+
case _ => // Other filters are handled by data_filters
323+
}
324+
}
253325
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.exec
21+
22+
import org.apache.spark.sql.CometTestBase
23+
import org.apache.spark.sql.comet.CometNativeScanExec
24+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
25+
26+
import org.apache.comet.CometConf
27+
28+
/** Tests for Dynamic Partition Pruning (DPP) with native DataFusion scan. */
29+
class CometDPPSuite extends CometTestBase {
30+
31+
test("DPP with native_datafusion scan - basic join") {
32+
withSQLConf(
33+
CometConf.COMET_ENABLED.key -> "true",
34+
CometConf.COMET_EXEC_ENABLED.key -> "true",
35+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_datafusion") {
36+
withTempPath { dir =>
37+
spark
38+
.range(10000)
39+
.selectExpr("id", "id % 100 as dim_key", "rand() as value")
40+
.write
41+
.mode("overwrite")
42+
.parquet(s"${dir.getCanonicalPath}/fact")
43+
spark
44+
.range(100)
45+
.selectExpr("id", "'name_' || id as name")
46+
.where("id < 10")
47+
.write
48+
.mode("overwrite")
49+
.parquet(s"${dir.getCanonicalPath}/dim")
50+
51+
spark.read.parquet(s"${dir.getCanonicalPath}/fact").createOrReplaceTempView("fact")
52+
spark.read.parquet(s"${dir.getCanonicalPath}/dim").createOrReplaceTempView("dim")
53+
54+
val df = spark.sql("SELECT f.*, d.name FROM fact f JOIN dim d ON f.dim_key = d.id")
55+
val result = df.collect()
56+
57+
assert(result.forall(row => row.getLong(1) < 10))
58+
59+
val plan = df.queryExecution.executedPlan
60+
val hasNativeScan = plan.collect { case _: CometNativeScanExec => true }.nonEmpty ||
61+
plan
62+
.collect { case a: AdaptiveSparkPlanExec =>
63+
a.executedPlan.collect { case _: CometNativeScanExec => true }.nonEmpty
64+
}
65+
.exists(identity)
66+
assert(hasNativeScan, "Expected CometNativeScanExec in plan")
67+
}
68+
}
69+
}
70+
71+
test("DPP auto-selection for queries with dynamic pruning filters") {
72+
withSQLConf(
73+
CometConf.COMET_ENABLED.key -> "true",
74+
CometConf.COMET_EXEC_ENABLED.key -> "true",
75+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> "auto") {
76+
withTempPath { dir =>
77+
spark
78+
.range(1000)
79+
.selectExpr("id", "id % 10 as dim_key")
80+
.write
81+
.mode("overwrite")
82+
.parquet(s"${dir.getCanonicalPath}/fact")
83+
spark
84+
.range(10)
85+
.selectExpr("id", "'name_' || id as name")
86+
.where("id < 5")
87+
.write
88+
.mode("overwrite")
89+
.parquet(s"${dir.getCanonicalPath}/dim")
90+
91+
spark.read.parquet(s"${dir.getCanonicalPath}/fact").createOrReplaceTempView("fact2")
92+
spark.read.parquet(s"${dir.getCanonicalPath}/dim").createOrReplaceTempView("dim2")
93+
94+
val result = spark
95+
.sql("SELECT f.*, d.name FROM fact2 f JOIN dim2 d ON f.dim_key = d.id")
96+
.collect()
97+
assert(result.forall(row => row.getLong(1) < 5))
98+
}
99+
}
100+
}
101+
102+
test("DPP reduces output rows significantly") {
103+
withSQLConf(
104+
CometConf.COMET_ENABLED.key -> "true",
105+
CometConf.COMET_EXEC_ENABLED.key -> "true",
106+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_datafusion") {
107+
withTempPath { dir =>
108+
val factRows = 100000
109+
val dimRows = 1000
110+
val selectivity = 0.01
111+
112+
spark
113+
.range(factRows)
114+
.selectExpr("id", s"id % $dimRows as dim_key", "rand() as value")
115+
.write
116+
.mode("overwrite")
117+
.parquet(s"${dir.getCanonicalPath}/fact")
118+
spark
119+
.range(dimRows)
120+
.selectExpr("id", "'name_' || id as name")
121+
.where(s"id < ${(dimRows * selectivity).toInt}")
122+
.write
123+
.mode("overwrite")
124+
.parquet(s"${dir.getCanonicalPath}/dim")
125+
126+
spark.read.parquet(s"${dir.getCanonicalPath}/fact").createOrReplaceTempView("fact3")
127+
spark.read.parquet(s"${dir.getCanonicalPath}/dim").createOrReplaceTempView("dim3")
128+
129+
val count =
130+
spark.sql("SELECT f.*, d.name FROM fact3 f JOIN dim3 d ON f.dim_key = d.id").count()
131+
val expectedMax = (factRows * selectivity * 2).toLong
132+
assert(count <= expectedMax, s"Expected at most $expectedMax rows with DPP, got $count")
133+
}
134+
}
135+
}
136+
137+
test("DPP with multiple join conditions") {
138+
withSQLConf(
139+
CometConf.COMET_ENABLED.key -> "true",
140+
CometConf.COMET_EXEC_ENABLED.key -> "true",
141+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_datafusion") {
142+
withTempPath { dir =>
143+
spark
144+
.range(1000)
145+
.selectExpr("id", "id % 10 as key1", "id % 5 as key2", "rand() as value")
146+
.write
147+
.mode("overwrite")
148+
.parquet(s"${dir.getCanonicalPath}/fact")
149+
spark
150+
.range(10)
151+
.selectExpr("id as key1", "'dim1_' || id as name1")
152+
.where("id < 3")
153+
.write
154+
.mode("overwrite")
155+
.parquet(s"${dir.getCanonicalPath}/dim1")
156+
spark
157+
.range(5)
158+
.selectExpr("id as key2", "'dim2_' || id as name2")
159+
.where("id < 2")
160+
.write
161+
.mode("overwrite")
162+
.parquet(s"${dir.getCanonicalPath}/dim2")
163+
164+
spark.read.parquet(s"${dir.getCanonicalPath}/fact").createOrReplaceTempView("fact_multi")
165+
spark.read.parquet(s"${dir.getCanonicalPath}/dim1").createOrReplaceTempView("dim1")
166+
spark.read.parquet(s"${dir.getCanonicalPath}/dim2").createOrReplaceTempView("dim2")
167+
168+
val result = spark
169+
.sql("""
170+
SELECT f.*, d1.name1, d2.name2 FROM fact_multi f
171+
JOIN dim1 d1 ON f.key1 = d1.key1 JOIN dim2 d2 ON f.key2 = d2.key2
172+
""")
173+
.collect()
174+
assert(result.forall(row => row.getLong(1) < 3 && row.getLong(2) < 2))
175+
}
176+
}
177+
}
178+
}

0 commit comments

Comments
 (0)