Skip to content

Commit 4568c16

Browse files
committed
Remove redundant RuntimeFilterBound - data filters already pushed
1 parent 1add4cf commit 4568c16

3 files changed

Lines changed: 1 addition & 85 deletions

File tree

native/proto/src/proto/operator.proto

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ syntax = "proto3";
2222
package spark.spark_operator;
2323

2424
import "expr.proto";
25-
import "literal.proto";
2625
import "partitioning.proto";
2726
import "types.proto";
2827

@@ -109,18 +108,6 @@ message NativeScan {
109108
// the map.
110109
map<string, string> object_store_options = 13;
111110
bool encryption_enabled = 14;
112-
repeated RuntimeFilterBound runtime_filter_bounds = 15;
113-
}
114-
115-
// Runtime filter bound for row-group pruning in native scan.
116-
// Extracted from Spark's data filters containing range or IN predicates.
117-
message RuntimeFilterBound {
118-
string column_name = 1;
119-
int32 column_index = 2;
120-
optional spark.spark_expression.Literal min_value = 3;
121-
optional spark.spark_expression.Literal max_value = 4;
122-
repeated spark.spark_expression.Literal in_values = 5;
123-
string filter_type = 6; // "minmax", "in", "bloom"
124111
}
125112

126113
message IcebergScan {

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -601,8 +601,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
601601
partitionSchema: StructType,
602602
hadoopConf: Configuration): String = {
603603

604-
val cometExecEnabled = COMET_EXEC_ENABLED.get()
605-
606604
val fallbackReasons = new ListBuffer[String]()
607605

608606
// native_iceberg_compat only supports local filesystem and S3
@@ -623,6 +621,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
623621
val partitionSchemaSupported =
624622
typeChecker.isSchemaSupported(partitionSchema, fallbackReasons)
625623

624+
val cometExecEnabled = COMET_EXEC_ENABLED.get()
626625
if (!cometExecEnabled) {
627626
fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT requires ${COMET_EXEC_ENABLED.key}=true"
628627
}

spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala

Lines changed: 0 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -193,10 +193,6 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
193193
}
194194
}
195195

196-
// Add runtime filter bounds if available
197-
// These are pushed down from join operators to enable I/O reduction
198-
addRuntimeFilterBounds(scan, nativeScanBuilder)
199-
200196
Some(builder.setNativeScan(nativeScanBuilder).build())
201197

202198
} else {
@@ -256,70 +252,4 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
256252
override def createExec(nativeOp: Operator, op: CometScanExec): CometNativeExec = {
257253
CometNativeScanExec(nativeOp, op.wrapped, op.session)
258254
}
259-
260-
/**
261-
* Add runtime filter bounds to the native scan for row-group pruning. Runtime filters are
262-
* extracted from data filters that contain range predicates (GreaterThanOrEqual,
263-
* LessThanOrEqual) or IN predicates.
264-
*/
265-
private def addRuntimeFilterBounds(
266-
scan: CometScanExec,
267-
nativeScanBuilder: OperatorOuterClass.NativeScan.Builder): Unit = {
268-
import org.apache.spark.sql.catalyst.expressions._
269-
270-
// Extract runtime filter bounds from data filters
271-
scan.supportedDataFilters.foreach {
272-
case GreaterThanOrEqual(attr: AttributeReference, Literal(value, dataType)) =>
273-
val boundBuilder = OperatorOuterClass.RuntimeFilterBound.newBuilder()
274-
boundBuilder.setColumnName(attr.name)
275-
boundBuilder.setColumnIndex(scan.output.indexWhere(_.name == attr.name))
276-
boundBuilder.setFilterType("minmax")
277-
exprToProto(Literal(value, dataType), scan.output).foreach { minProto =>
278-
boundBuilder.setMinValue(minProto.getLiteral)
279-
}
280-
nativeScanBuilder.addRuntimeFilterBounds(boundBuilder.build())
281-
282-
case LessThanOrEqual(attr: AttributeReference, Literal(value, dataType)) =>
283-
val boundBuilder = OperatorOuterClass.RuntimeFilterBound.newBuilder()
284-
boundBuilder.setColumnName(attr.name)
285-
boundBuilder.setColumnIndex(scan.output.indexWhere(_.name == attr.name))
286-
boundBuilder.setFilterType("minmax")
287-
exprToProto(Literal(value, dataType), scan.output).foreach { maxProto =>
288-
boundBuilder.setMaxValue(maxProto.getLiteral)
289-
}
290-
nativeScanBuilder.addRuntimeFilterBounds(boundBuilder.build())
291-
292-
case And(
293-
GreaterThanOrEqual(attr1: AttributeReference, Literal(minVal, minType)),
294-
LessThanOrEqual(attr2: AttributeReference, Literal(maxVal, maxType)))
295-
if attr1.name == attr2.name =>
296-
// Combined range filter: column >= min AND column <= max
297-
val boundBuilder = OperatorOuterClass.RuntimeFilterBound.newBuilder()
298-
boundBuilder.setColumnName(attr1.name)
299-
boundBuilder.setColumnIndex(scan.output.indexWhere(_.name == attr1.name))
300-
boundBuilder.setFilterType("minmax")
301-
exprToProto(Literal(minVal, minType), scan.output).foreach { minProto =>
302-
boundBuilder.setMinValue(minProto.getLiteral)
303-
}
304-
exprToProto(Literal(maxVal, maxType), scan.output).foreach { maxProto =>
305-
boundBuilder.setMaxValue(maxProto.getLiteral)
306-
}
307-
nativeScanBuilder.addRuntimeFilterBounds(boundBuilder.build())
308-
309-
case InSet(attr: AttributeReference, values) if values.size <= 10 =>
310-
// Small IN filter - pass individual values
311-
val boundBuilder = OperatorOuterClass.RuntimeFilterBound.newBuilder()
312-
boundBuilder.setColumnName(attr.name)
313-
boundBuilder.setColumnIndex(scan.output.indexWhere(_.name == attr.name))
314-
boundBuilder.setFilterType("in")
315-
values.foreach { value =>
316-
exprToProto(Literal(value, attr.dataType), scan.output).foreach { valProto =>
317-
boundBuilder.addInValues(valProto.getLiteral)
318-
}
319-
}
320-
nativeScanBuilder.addRuntimeFilterBounds(boundBuilder.build())
321-
322-
case _ => // Other filters are handled by data_filters
323-
}
324-
}
325255
}

0 commit comments

Comments
 (0)