Skip to content

Commit 2b5c120

Browse files
committed
Remove redundant RuntimeFilterBound - data filters already pushed via addAllDataFilters
1 parent 1b6f816 commit 2b5c120

2 files changed

Lines changed: 4 additions & 89 deletions

File tree

native/proto/src/proto/operator.proto

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ syntax = "proto3";
2222
package spark.spark_operator;
2323

2424
import "expr.proto";
25-
import "literal.proto";
2625
import "partitioning.proto";
2726
import "types.proto";
2827

@@ -109,18 +108,6 @@ message NativeScan {
109108
// the map.
110109
map<string, string> object_store_options = 13;
111110
bool encryption_enabled = 14;
112-
repeated RuntimeFilterBound runtime_filter_bounds = 15;
113-
}
114-
115-
// Runtime filter bound for row-group pruning in native scan.
116-
// Extracted from Spark's data filters containing range or IN predicates.
117-
message RuntimeFilterBound {
118-
string column_name = 1;
119-
int32 column_index = 2;
120-
optional spark.spark_expression.Literal min_value = 3;
121-
optional spark.spark_expression.Literal max_value = 4;
122-
repeated spark.spark_expression.Literal in_values = 5;
123-
string filter_type = 6; // "minmax", "in", "bloom"
124111
}
125112

126113
message IcebergScan {

spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala

Lines changed: 4 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,10 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
5858
withInfo(scanExec, s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled")
5959
}
6060

61-
// Dynamic partition pruning (DPP) is now supported!
62-
// The dynamicallySelectedPartitions in CometScanExec evaluates DPP filters
63-
// and returns the filtered file list. Native scan receives these pre-filtered
64-
// files, so partition-level pruning works correctly.
65-
// Note: DPP filters are excluded from dataFilters to avoid pushing subqueries
66-
// to native execution (see supportedDataFilters in CometScanExec).
61+
// Native DataFusion doesn't support subqueries/dynamic pruning
62+
if (scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
63+
withInfo(scanExec, "Native DataFusion scan does not support subqueries/dynamic pruning")
64+
}
6765

6866
if (SQLConf.get.ignoreCorruptFiles ||
6967
scanExec.relation.options
@@ -193,10 +191,6 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
193191
}
194192
}
195193

196-
// Add runtime filter bounds if available
197-
// These are pushed down from join operators to enable I/O reduction
198-
addRuntimeFilterBounds(scan, nativeScanBuilder)
199-
200194
Some(builder.setNativeScan(nativeScanBuilder).build())
201195

202196
} else {
@@ -256,70 +250,4 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
256250
override def createExec(nativeOp: Operator, op: CometScanExec): CometNativeExec = {
257251
CometNativeScanExec(nativeOp, op.wrapped, op.session)
258252
}
259-
260-
/**
261-
* Add runtime filter bounds to the native scan for row-group pruning. Runtime filters are
262-
* extracted from data filters that contain range predicates (GreaterThanOrEqual,
263-
* LessThanOrEqual) or IN predicates.
264-
*/
265-
private def addRuntimeFilterBounds(
266-
scan: CometScanExec,
267-
nativeScanBuilder: OperatorOuterClass.NativeScan.Builder): Unit = {
268-
import org.apache.spark.sql.catalyst.expressions._
269-
270-
// Extract runtime filter bounds from data filters
271-
scan.supportedDataFilters.foreach {
272-
case GreaterThanOrEqual(attr: AttributeReference, Literal(value, dataType)) =>
273-
val boundBuilder = OperatorOuterClass.RuntimeFilterBound.newBuilder()
274-
boundBuilder.setColumnName(attr.name)
275-
boundBuilder.setColumnIndex(scan.output.indexWhere(_.name == attr.name))
276-
boundBuilder.setFilterType("minmax")
277-
exprToProto(Literal(value, dataType), scan.output).foreach { minProto =>
278-
boundBuilder.setMinValue(minProto.getLiteral)
279-
}
280-
nativeScanBuilder.addRuntimeFilterBounds(boundBuilder.build())
281-
282-
case LessThanOrEqual(attr: AttributeReference, Literal(value, dataType)) =>
283-
val boundBuilder = OperatorOuterClass.RuntimeFilterBound.newBuilder()
284-
boundBuilder.setColumnName(attr.name)
285-
boundBuilder.setColumnIndex(scan.output.indexWhere(_.name == attr.name))
286-
boundBuilder.setFilterType("minmax")
287-
exprToProto(Literal(value, dataType), scan.output).foreach { maxProto =>
288-
boundBuilder.setMaxValue(maxProto.getLiteral)
289-
}
290-
nativeScanBuilder.addRuntimeFilterBounds(boundBuilder.build())
291-
292-
case And(
293-
GreaterThanOrEqual(attr1: AttributeReference, Literal(minVal, minType)),
294-
LessThanOrEqual(attr2: AttributeReference, Literal(maxVal, maxType)))
295-
if attr1.name == attr2.name =>
296-
// Combined range filter: column >= min AND column <= max
297-
val boundBuilder = OperatorOuterClass.RuntimeFilterBound.newBuilder()
298-
boundBuilder.setColumnName(attr1.name)
299-
boundBuilder.setColumnIndex(scan.output.indexWhere(_.name == attr1.name))
300-
boundBuilder.setFilterType("minmax")
301-
exprToProto(Literal(minVal, minType), scan.output).foreach { minProto =>
302-
boundBuilder.setMinValue(minProto.getLiteral)
303-
}
304-
exprToProto(Literal(maxVal, maxType), scan.output).foreach { maxProto =>
305-
boundBuilder.setMaxValue(maxProto.getLiteral)
306-
}
307-
nativeScanBuilder.addRuntimeFilterBounds(boundBuilder.build())
308-
309-
case InSet(attr: AttributeReference, values) if values.size <= 10 =>
310-
// Small IN filter - pass individual values
311-
val boundBuilder = OperatorOuterClass.RuntimeFilterBound.newBuilder()
312-
boundBuilder.setColumnName(attr.name)
313-
boundBuilder.setColumnIndex(scan.output.indexWhere(_.name == attr.name))
314-
boundBuilder.setFilterType("in")
315-
values.foreach { value =>
316-
exprToProto(Literal(value, attr.dataType), scan.output).foreach { valProto =>
317-
boundBuilder.addInValues(valProto.getLiteral)
318-
}
319-
}
320-
nativeScanBuilder.addRuntimeFilterBounds(boundBuilder.build())
321-
322-
case _ => // Other filters are handled by data_filters
323-
}
324-
}
325253
}

0 commit comments

Comments
 (0)