Skip to content

Commit 70110f2

Browse files
[spark] Fix RowTrackingTable reading by add _ROW_ID existing check before adding _ROW_ID (#7606)
1 parent e69109d commit 70110f2

File tree

3 files changed

+25
-2
lines changed

3 files changed

+25
-2
lines changed

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,10 @@ abstract class PaimonBaseScanBuilder
6969
val postScan = mutable.ArrayBuffer.empty[SparkPredicate]
7070

7171
var newRowType = rowType
72-
if (coreOptions.rowTrackingEnabled() && coreOptions.dataEvolutionEnabled()) {
72+
if (
73+
coreOptions.rowTrackingEnabled() && coreOptions
74+
.dataEvolutionEnabled() && !rowType.containsField(SpecialFields.ROW_ID.name())
75+
) {
7376
newRowType = SpecialFields.rowTypeWithRowTracking(newRowType);
7477
}
7578
val converter = SparkV2FilterConverter(newRowType)

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,10 @@ trait BaseScan extends Scan with SupportsReportStatistics with Logging {
6868
val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
6969

7070
lazy val tableRowType: RowType = {
71-
if (coreOptions.rowTrackingEnabled()) {
71+
if (
72+
coreOptions
73+
.rowTrackingEnabled() && !table.rowType().containsField(SpecialFields.ROW_ID.name())
74+
) {
7275
SpecialFields.rowTypeWithRowTracking(table.rowType())
7376
} else {
7477
table.rowType()

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,22 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase {
795795
}
796796
}
797797

798+
test("Row Tracking: query row_tracking system table with filter pushdown") {
799+
withTable("t") {
800+
sql("CREATE TABLE t (a INT, b INT) TBLPROPERTIES ('row-tracking.enabled' = 'true')")
801+
sql("INSERT INTO t VALUES (1, 10), (2, 20), (3, 30)")
802+
803+
val query = s"SELECT a, b FROM `t$$row_tracking` WHERE a > 1 ORDER BY a"
804+
checkAnswer(sql(query), Seq(Row(2, 20), Row(3, 30)))
805+
806+
val scan = getScan(query)
807+
assert(
808+
scan.description().contains("DataFilters"),
809+
s"Expected predicate pushdown (DataFilters) in scan description, but got: ${scan.description()}"
810+
)
811+
}
812+
}
813+
798814
test("Data Evolution: compact fields action") {
799815
withTable("s", "t") {
800816
sql("CREATE TABLE s (id INT, b INT)")
@@ -952,4 +968,5 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase {
952968
assert(!indexEntries.exists(entry => entry.partition().getString(0).toString.equals("p1")))
953969
}
954970
}
971+
955972
}

0 commit comments

Comments
 (0)