Skip to content

Commit 33fbb6f

Browse files
committed
spark
1 parent 2a46ba6 commit 33fbb6f

File tree

7 files changed

+29
-12
lines changed

7 files changed

+29
-12
lines changed

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/BaseTable.scala

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.paimon.spark
2020

21-
import org.apache.paimon.table.Table
21+
import org.apache.paimon.table.{FileStoreTable, Table}
2222
import org.apache.paimon.utils.StringUtils
2323

2424
import org.apache.spark.sql.connector.catalog.TableCapability
@@ -37,7 +37,7 @@ abstract class BaseTable
3737

3838
override def capabilities(): JSet[TableCapability] = JCollections.emptySet[TableCapability]()
3939

40-
override def name: String = table.fullName
40+
override def name: String = BaseTable.tableNameWithCatalog(table)
4141

4242
override lazy val schema: StructType = SparkTypeUtils.fromPaimonRowType(table.rowType)
4343

@@ -48,6 +48,20 @@ abstract class BaseTable
4848
override def properties: JMap[String, String] = table.options()
4949

5050
override def toString: String = {
51-
s"${table.getClass.getSimpleName}[${table.fullName()}]"
51+
s"${table.getClass.getSimpleName}[$name]"
52+
}
53+
}
54+
55+
object BaseTable {
56+
57+
/** Returns the full table name with catalog prefix if available. */
58+
def tableNameWithCatalog(table: Table): String = {
59+
val fullName = table.fullName
60+
table match {
61+
case t: FileStoreTable =>
62+
Option(t.catalogEnvironment().catalogName())
63+
.fold(fullName)(catalog => s"$catalog.$fullName")
64+
case _ => fullName
65+
}
5266
}
5367
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/TruncatePaimonTableWithFilterExec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.spark.execution
2020

2121
import org.apache.paimon.partition.PartitionPredicate
22+
import org.apache.paimon.spark.BaseTable
2223
import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
2324
import org.apache.paimon.table.{FileStoreTable, Table}
2425
import org.apache.paimon.utils.InternalRowPartitionComputer
@@ -69,7 +70,7 @@ case class TruncatePaimonTableWithFilterExec(
6970
override def output: Seq[Attribute] = Nil
7071

7172
override def simpleString(maxFields: Int): String = {
72-
s"TruncatePaimonTableWithFilterExec: ${table.fullName()}" +
73+
s"TruncatePaimonTableWithFilterExec: ${BaseTable.tableNameWithCatalog(table)}" +
7374
partitionPredicate.map(p => s", PartitionPredicate: [$p]").getOrElse("")
7475
}
7576
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions
2222
import org.apache.paimon.partition.PartitionPredicate
2323
import org.apache.paimon.predicate.{Predicate, TopN, VectorSearch}
2424
import org.apache.paimon.spark.{PaimonBatch, PaimonInputPartition, PaimonNumSplitMetric, PaimonPartitionSizeMetric, PaimonReadBatchTimeMetric, PaimonResultedTableFilesMetric, PaimonResultedTableFilesTaskMetric, SparkTypeUtils}
25+
import org.apache.paimon.spark.BaseTable
2526
import org.apache.paimon.spark.schema.PaimonMetadataColumn
2627
import org.apache.paimon.spark.schema.PaimonMetadataColumn._
2728
import org.apache.paimon.spark.util.{OptionUtils, SplitUtils}
@@ -182,7 +183,7 @@ trait BaseScan extends Scan with SupportsReportStatistics with Logging {
182183
} else {
183184
""
184185
}
185-
s"${getClass.getSimpleName}: [${table.name}]" +
186+
s"${getClass.getSimpleName}: [${BaseTable.tableNameWithCatalog(table)}]" +
186187
pushedPartitionFiltersStr +
187188
pushedRuntimePartitionFiltersStr +
188189
pushedDataFiltersStr +

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/PaimonLocalScan.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.spark.read
2020

2121
import org.apache.paimon.partition.PartitionPredicate
22+
import org.apache.paimon.spark.BaseTable
2223
import org.apache.paimon.table.Table
2324

2425
import org.apache.spark.sql.catalyst.InternalRow
@@ -39,6 +40,6 @@ case class PaimonLocalScan(
3940
} else {
4041
""
4142
}
42-
s"PaimonLocalScan: [${table.name}]" + pushedPartitionFiltersStr
43+
s"PaimonLocalScan: [${BaseTable.tableNameWithCatalog(table)}]" + pushedPartitionFiltersStr
4344
}
4445
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class PaimonV2Write(
104104
case Some(_) if !overwriteDynamic => ", overwriteTable=true"
105105
case _ => ""
106106
}
107-
s"PaimonWrite(table=${table.fullName()}$overwriteDynamicStr$overwritePartitionsStr)"
107+
s"PaimonWrite(table=${BaseTable.tableNameWithCatalog(table)}$overwriteDynamicStr$overwritePartitionsStr)"
108108
}
109109

110110
override def description(): String = toString

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWrite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.paimon.spark.write
2020

2121
import org.apache.paimon.options.Options
22-
import org.apache.paimon.spark.SaveMode
22+
import org.apache.paimon.spark.{BaseTable, SaveMode}
2323
import org.apache.paimon.spark.commands.WriteIntoPaimonTable
2424
import org.apache.paimon.table.FileStoreTable
2525

@@ -38,6 +38,6 @@ class PaimonWrite(val table: FileStoreTable, saveMode: SaveMode, options: Option
3838
}
3939

4040
override def toString: String = {
41-
s"table: ${table.fullName()}, saveMode: $saveMode, options: ${options.toMap}"
41+
s"table: ${BaseTable.tableNameWithCatalog(table)}, saveMode: $saveMode, options: ${options.toMap}"
4242
}
4343
}

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteRequireDistributionTest.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class V2WriteRequireDistributionTest extends PaimonSparkTestBase with AdaptiveSp
4949
val node1 = nodes(0)
5050
assert(
5151
node1.isInstanceOf[AppendDataExec] &&
52-
node1.toString.contains("PaimonWrite(table=test.t1"),
52+
node1.toString.contains("PaimonWrite(table=paimon.test.t1"),
5353
s"Expected AppendDataExec with specific paimon write, but got: $node1"
5454
)
5555

@@ -92,7 +92,7 @@ class V2WriteRequireDistributionTest extends PaimonSparkTestBase with AdaptiveSp
9292
val node1 = nodes(0)
9393
assert(
9494
node1.isInstanceOf[AppendDataExec] &&
95-
node1.toString.contains("PaimonWrite(table=test.t1"),
95+
node1.toString.contains("PaimonWrite(table=paimon.test.t1"),
9696
s"Expected AppendDataExec with specific paimon write, but got: $node1"
9797
)
9898

@@ -136,7 +136,7 @@ class V2WriteRequireDistributionTest extends PaimonSparkTestBase with AdaptiveSp
136136
val node1 = nodes(0)
137137
assert(
138138
node1.isInstanceOf[AppendDataExecV1] &&
139-
node1.toString.contains("AppendDataExecV1 PrimaryKeyFileStoreTable[test.t1]"),
139+
node1.toString.contains("AppendDataExecV1 PrimaryKeyFileStoreTable[paimon.test.t1]"),
140140
s"Expected AppendDataExec with specific paimon write, but got: $node1"
141141
)
142142
}

0 commit comments

Comments
 (0)