Skip to content

Commit a3e643d

Browse files
authored
[spark] Format table read filter push down in spark (#6385)
1 parent 428fab9 commit a3e643d

12 files changed

Lines changed: 453 additions & 184 deletions

File tree

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark
20+
21+
import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate}
22+
import org.apache.paimon.types.RowType
23+
24+
import org.apache.spark.sql.connector.read.SupportsPushDownFilters
25+
import org.apache.spark.sql.sources.Filter
26+
27+
import java.util.{List => JList}
28+
29+
import scala.collection.mutable
30+
31+
/** Base trait for Paimon scan filter push down. */
32+
trait PaimonBasePushDown extends SupportsPushDownFilters {
33+
34+
private var pushedSparkFilters = Array.empty[Filter]
35+
36+
protected var pushedPaimonPredicates: Array[Predicate] = Array.empty
37+
38+
protected var reservedFilters: Array[Filter] = Array.empty
39+
40+
protected var hasPostScanPredicates = false
41+
42+
protected var partitionKeys: JList[String]
43+
protected var rowType: RowType
44+
45+
/**
46+
* Pushes down filters, and returns filters that need to be evaluated after scanning. <p> Rows
47+
* should be returned from the data source if and only if all the filters match. That is, filters
48+
* must be interpreted as ANDed together.
49+
*/
50+
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
51+
val pushable = mutable.ArrayBuffer.empty[(Filter, Predicate)]
52+
val postScan = mutable.ArrayBuffer.empty[Filter]
53+
val reserved = mutable.ArrayBuffer.empty[Filter]
54+
55+
val converter = new SparkFilterConverter(rowType)
56+
val visitor = new PartitionPredicateVisitor(partitionKeys)
57+
filters.foreach {
58+
filter =>
59+
val predicate = converter.convertIgnoreFailure(filter)
60+
if (predicate == null) {
61+
postScan.append(filter)
62+
} else {
63+
pushable.append((filter, predicate))
64+
if (predicate.visit(visitor)) {
65+
reserved.append(filter)
66+
} else {
67+
postScan.append(filter)
68+
}
69+
}
70+
}
71+
72+
if (pushable.nonEmpty) {
73+
this.pushedSparkFilters = pushable.map(_._1).toArray
74+
this.pushedPaimonPredicates = pushable.map(_._2).toArray
75+
}
76+
if (reserved.nonEmpty) {
77+
this.reservedFilters = reserved.toArray
78+
}
79+
if (postScan.nonEmpty) {
80+
this.hasPostScanPredicates = true
81+
}
82+
postScan.toArray
83+
}
84+
85+
override def pushedFilters(): Array[Filter] = {
86+
pushedSparkFilters
87+
}
88+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark
20+
21+
import org.apache.paimon.predicate.Predicate
22+
import org.apache.paimon.table.FormatTable
23+
24+
import org.apache.spark.sql.PaimonUtils.fieldReference
25+
import org.apache.spark.sql.connector.expressions.NamedReference
26+
import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering
27+
import org.apache.spark.sql.sources.{Filter, In}
28+
import org.apache.spark.sql.types.StructType
29+
30+
import scala.collection.JavaConverters._
31+
32+
/** Scan for {@link FormatTable} */
33+
case class PaimonFormatTableScan(
34+
table: FormatTable,
35+
requiredSchema: StructType,
36+
filters: Seq[Predicate],
37+
override val pushDownLimit: Option[Int])
38+
extends PaimonFormatTableBaseScan(table, requiredSchema, filters, pushDownLimit)
39+
with SupportsRuntimeFiltering
40+
with ScanHelper {
41+
42+
override def filterAttributes(): Array[NamedReference] = {
43+
val requiredFields = readBuilder.readType().getFieldNames.asScala
44+
table
45+
.partitionKeys()
46+
.asScala
47+
.toArray
48+
.filter(requiredFields.contains)
49+
.map(fieldReference)
50+
}
51+
52+
override def filter(filters: Array[Filter]): Unit = {
53+
val converter = new SparkFilterConverter(table.rowType())
54+
val partitionFilter = filters.flatMap {
55+
case in @ In(attr, _) if table.partitionKeys().contains(attr) =>
56+
Some(converter.convert(in))
57+
case _ => None
58+
}
59+
if (partitionFilter.nonEmpty) {
60+
readBuilder.withFilter(partitionFilter.head)
61+
// set inputPartitions null to trigger to get the new splits.
62+
inputPartitions = null
63+
}
64+
}
65+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark
20+
21+
import org.apache.paimon.table.FormatTable
22+
import org.apache.paimon.types.RowType
23+
24+
import org.apache.spark.sql.connector.read.{SupportsPushDownRequiredColumns, SupportsRuntimeFiltering}
25+
import org.apache.spark.sql.types.StructType
26+
27+
import java.util.{List => JList}
28+
29+
/** ScanBuilder for {@link FormatTable}. */
30+
case class PaimonFormatTableScanBuilder(table: FormatTable)
31+
extends PaimonBasePushDown
32+
with SupportsPushDownRequiredColumns {
33+
34+
override protected var partitionKeys: JList[String] = table.partitionKeys()
35+
override protected var rowType: RowType = table.rowType()
36+
protected var requiredSchema: StructType = SparkTypeUtils.fromPaimonRowType(rowType)
37+
38+
override def build() = PaimonFormatTableScan(table, requiredSchema, pushedPaimonPredicates, None)
39+
40+
override def pruneColumns(requiredSchema: StructType): Unit = {
41+
this.requiredSchema = requiredSchema
42+
}
43+
}

paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.paimon.spark
2020

2121
import org.apache.paimon.predicate.{Predicate, TopN}
22-
import org.apache.paimon.table.{InnerTable, Table}
22+
import org.apache.paimon.table.InnerTable
2323

2424
import org.apache.spark.sql.PaimonUtils.fieldReference
2525
import org.apache.spark.sql.connector.expressions.NamedReference

paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala

Lines changed: 8 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,61 +18,23 @@
1818

1919
package org.apache.paimon.spark
2020

21-
import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate}
2221
import org.apache.paimon.table.InnerTable
22+
import org.apache.paimon.types.RowType
2323

24-
import org.apache.spark.sql.connector.read.SupportsPushDownFilters
24+
import org.apache.spark.sql.connector.read.Scan
2525
import org.apache.spark.sql.sources.Filter
2626

27-
import scala.collection.mutable
27+
import java.util.{List => JList}
2828

2929
class PaimonScanBuilder(table: InnerTable)
3030
extends PaimonBaseScanBuilder(table)
31-
with SupportsPushDownFilters {
31+
with PaimonBasePushDown {
3232

3333
private var pushedSparkFilters = Array.empty[Filter]
3434

35-
/**
36-
* Pushes down filters, and returns filters that need to be evaluated after scanning. <p> Rows
37-
* should be returned from the data source if and only if all the filters match. That is, filters
38-
* must be interpreted as ANDed together.
39-
*/
40-
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
41-
val pushable = mutable.ArrayBuffer.empty[(Filter, Predicate)]
42-
val postScan = mutable.ArrayBuffer.empty[Filter]
43-
val reserved = mutable.ArrayBuffer.empty[Filter]
44-
45-
val converter = new SparkFilterConverter(table.rowType)
46-
val visitor = new PartitionPredicateVisitor(table.partitionKeys())
47-
filters.foreach {
48-
filter =>
49-
val predicate = converter.convertIgnoreFailure(filter)
50-
if (predicate == null) {
51-
postScan.append(filter)
52-
} else {
53-
pushable.append((filter, predicate))
54-
if (predicate.visit(visitor)) {
55-
reserved.append(filter)
56-
} else {
57-
postScan.append(filter)
58-
}
59-
}
60-
}
61-
62-
if (pushable.nonEmpty) {
63-
this.pushedSparkFilters = pushable.map(_._1).toArray
64-
this.pushedPaimonPredicates = pushable.map(_._2).toArray
65-
}
66-
if (reserved.nonEmpty) {
67-
this.reservedFilters = reserved.toArray
68-
}
69-
if (postScan.nonEmpty) {
70-
this.hasPostScanPredicates = true
71-
}
72-
postScan.toArray
73-
}
74-
75-
override def pushedFilters(): Array[Filter] = {
76-
pushedSparkFilters
35+
override protected var partitionKeys: JList[String] = table.partitionKeys()
36+
override protected var rowType: RowType = table.rowType()
37+
override def build(): Scan = {
38+
PaimonScan(table, requiredSchema, pushedPaimonPredicates, reservedFilters, None, pushDownTopN)
7739
}
7840
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark
20+
21+
import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate}
22+
import org.apache.paimon.types.RowType
23+
24+
import org.apache.spark.sql.PaimonUtils
25+
import org.apache.spark.sql.connector.expressions.filter.{Predicate => SparkPredicate}
26+
import org.apache.spark.sql.connector.read.{SupportsPushDownLimit, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
27+
import org.apache.spark.sql.sources.Filter
28+
29+
import java.util.{List => JList}
30+
31+
import scala.collection.mutable
32+
33+
/** Base trait for Paimon scan push down. */
34+
trait PaimonBasePushDown extends SupportsPushDownV2Filters with SupportsPushDownLimit {
35+
private var pushedSparkPredicates = Array.empty[SparkPredicate]
36+
37+
protected var pushedPaimonPredicates: Array[Predicate] = Array.empty
38+
39+
protected var reservedFilters: Array[Filter] = Array.empty
40+
41+
protected var hasPostScanPredicates = false
42+
43+
protected var pushDownLimit: Option[Int] = None
44+
protected var partitionKeys: JList[String]
45+
protected var rowType: RowType
46+
47+
override def pushPredicates(predicates: Array[SparkPredicate]): Array[SparkPredicate] = {
48+
val pushable = mutable.ArrayBuffer.empty[(SparkPredicate, Predicate)]
49+
val postScan = mutable.ArrayBuffer.empty[SparkPredicate]
50+
val reserved = mutable.ArrayBuffer.empty[Filter]
51+
52+
val converter = SparkV2FilterConverter(rowType)
53+
val visitor = new PartitionPredicateVisitor(partitionKeys)
54+
predicates.foreach {
55+
predicate =>
56+
converter.convert(predicate) match {
57+
case Some(paimonPredicate) =>
58+
pushable.append((predicate, paimonPredicate))
59+
if (paimonPredicate.visit(visitor)) {
60+
// We need to filter the stats using filter instead of predicate.
61+
reserved.append(PaimonUtils.filterV2ToV1(predicate).get)
62+
} else {
63+
postScan.append(predicate)
64+
}
65+
case None =>
66+
postScan.append(predicate)
67+
}
68+
}
69+
70+
if (pushable.nonEmpty) {
71+
this.pushedSparkPredicates = pushable.map(_._1).toArray
72+
this.pushedPaimonPredicates = pushable.map(_._2).toArray
73+
}
74+
if (reserved.nonEmpty) {
75+
this.reservedFilters = reserved.toArray
76+
}
77+
if (postScan.nonEmpty) {
78+
this.hasPostScanPredicates = true
79+
}
80+
postScan.toArray
81+
}
82+
83+
override def pushedPredicates: Array[SparkPredicate] = {
84+
pushedSparkPredicates
85+
}
86+
87+
override def pushLimit(limit: Int): Boolean = {
88+
// It is safe, since we will do nothing if it is the primary table and the split is not `rawConvertible`
89+
pushDownLimit = Some(limit)
90+
// just make the best effort to push down limit
91+
false
92+
}
93+
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,26 +33,8 @@ abstract class PaimonBaseScanBuilder(table: InnerTable)
3333

3434
protected var requiredSchema: StructType = SparkTypeUtils.fromPaimonRowType(table.rowType())
3535

36-
protected var pushedPaimonPredicates: Array[Predicate] = Array.empty
37-
38-
protected var reservedFilters: Array[Filter] = Array.empty
39-
40-
protected var hasPostScanPredicates = false
41-
42-
protected var pushDownLimit: Option[Int] = None
43-
4436
protected var pushDownTopN: Option[TopN] = None
4537

46-
override def build(): Scan = {
47-
PaimonScan(
48-
table,
49-
requiredSchema,
50-
pushedPaimonPredicates,
51-
reservedFilters,
52-
pushDownLimit,
53-
pushDownTopN)
54-
}
55-
5638
override def pruneColumns(requiredSchema: StructType): Unit = {
5739
this.requiredSchema = requiredSchema
5840
}

0 commit comments

Comments
 (0)