@@ -58,10 +58,12 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
5858 withInfo(scanExec, s " Full native scan disabled because ${COMET_EXEC_ENABLED .key} disabled " )
5959 }
6060
61- // Native DataFusion doesn't support subqueries/dynamic pruning
62- if (scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
63- withInfo(scanExec, " Native DataFusion scan does not support subqueries/dynamic pruning" )
64- }
61+ // Dynamic partition pruning (DPP) is now supported!
62+ // The dynamicallySelectedPartitions in CometScanExec evaluates DPP filters
63+ // and returns the filtered file list. Native scan receives these pre-filtered
64+ // files, so partition-level pruning works correctly.
65+ // Note: DPP filters are excluded from dataFilters to avoid pushing subqueries
66+ // to native execution (see supportedDataFilters in CometScanExec).
6567
6668 if (SQLConf .get.ignoreCorruptFiles ||
6769 scanExec.relation.options
@@ -191,6 +193,10 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
191193 }
192194 }
193195
196+ // Add runtime filter bounds if available
197+ // These are pushed down from join operators to enable I/O reduction
198+ addRuntimeFilterBounds(scan, nativeScanBuilder)
199+
194200 Some (builder.setNativeScan(nativeScanBuilder).build())
195201
196202 } else {
@@ -250,4 +256,70 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
250256 override def createExec (nativeOp : Operator , op : CometScanExec ): CometNativeExec = {
251257 CometNativeScanExec (nativeOp, op.wrapped, op.session)
252258 }
259+
260+ /**
261+ * Add runtime filter bounds to the native scan for row-group pruning. Runtime filters are
262+ * extracted from data filters that contain range predicates (GreaterThanOrEqual,
263+ * LessThanOrEqual) or IN predicates.
264+ */
265+ private def addRuntimeFilterBounds (
266+ scan : CometScanExec ,
267+ nativeScanBuilder : OperatorOuterClass .NativeScan .Builder ): Unit = {
268+ import org .apache .spark .sql .catalyst .expressions ._
269+
270+ // Extract runtime filter bounds from data filters
271+ scan.supportedDataFilters.foreach {
272+ case GreaterThanOrEqual (attr : AttributeReference , Literal (value, dataType)) =>
273+ val boundBuilder = OperatorOuterClass .RuntimeFilterBound .newBuilder()
274+ boundBuilder.setColumnName(attr.name)
275+ boundBuilder.setColumnIndex(scan.output.indexWhere(_.name == attr.name))
276+ boundBuilder.setFilterType(" minmax" )
277+ exprToProto(Literal (value, dataType), scan.output).foreach { minProto =>
278+ boundBuilder.setMinValue(minProto.getLiteral)
279+ }
280+ nativeScanBuilder.addRuntimeFilterBounds(boundBuilder.build())
281+
282+ case LessThanOrEqual (attr : AttributeReference , Literal (value, dataType)) =>
283+ val boundBuilder = OperatorOuterClass .RuntimeFilterBound .newBuilder()
284+ boundBuilder.setColumnName(attr.name)
285+ boundBuilder.setColumnIndex(scan.output.indexWhere(_.name == attr.name))
286+ boundBuilder.setFilterType(" minmax" )
287+ exprToProto(Literal (value, dataType), scan.output).foreach { maxProto =>
288+ boundBuilder.setMaxValue(maxProto.getLiteral)
289+ }
290+ nativeScanBuilder.addRuntimeFilterBounds(boundBuilder.build())
291+
292+ case And (
293+ GreaterThanOrEqual (attr1 : AttributeReference , Literal (minVal, minType)),
294+ LessThanOrEqual (attr2 : AttributeReference , Literal (maxVal, maxType)))
295+ if attr1.name == attr2.name =>
296+ // Combined range filter: column >= min AND column <= max
297+ val boundBuilder = OperatorOuterClass .RuntimeFilterBound .newBuilder()
298+ boundBuilder.setColumnName(attr1.name)
299+ boundBuilder.setColumnIndex(scan.output.indexWhere(_.name == attr1.name))
300+ boundBuilder.setFilterType(" minmax" )
301+ exprToProto(Literal (minVal, minType), scan.output).foreach { minProto =>
302+ boundBuilder.setMinValue(minProto.getLiteral)
303+ }
304+ exprToProto(Literal (maxVal, maxType), scan.output).foreach { maxProto =>
305+ boundBuilder.setMaxValue(maxProto.getLiteral)
306+ }
307+ nativeScanBuilder.addRuntimeFilterBounds(boundBuilder.build())
308+
309+ case InSet (attr : AttributeReference , values) if values.size <= 10 =>
310+ // Small IN filter - pass individual values
311+ val boundBuilder = OperatorOuterClass .RuntimeFilterBound .newBuilder()
312+ boundBuilder.setColumnName(attr.name)
313+ boundBuilder.setColumnIndex(scan.output.indexWhere(_.name == attr.name))
314+ boundBuilder.setFilterType(" in" )
315+ values.foreach { value =>
316+ exprToProto(Literal (value, attr.dataType), scan.output).foreach { valProto =>
317+ boundBuilder.addInValues(valProto.getLiteral)
318+ }
319+ }
320+ nativeScanBuilder.addRuntimeFilterBounds(boundBuilder.build())
321+
322+ case _ => // Other filters are handled by data_filters
323+ }
324+ }
253325}
0 commit comments