Skip to content

Commit 6a8167f

Browse files
authored
[spark] Support partition statistics in SHOW TABLE EXTENDED PARTITION command (#7612)
1 parent 72600f9 commit 6a8167f

4 files changed

Lines changed: 69 additions & 5 deletions

File tree

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
package org.apache.paimon.spark
2020

2121
import org.apache.paimon.CoreOptions
22+
import org.apache.paimon.partition.PartitionStatistics
2223
import org.apache.paimon.table.{FileStoreTable, Table}
24+
import org.apache.paimon.table.source.ScanMode
2325
import org.apache.paimon.types.RowType
2426
import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils}
2527

@@ -136,7 +138,31 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement with L
136138
}
137139

138140
override def loadPartitionMetadata(ident: InternalRow): JMap[String, String] = {
139-
Map.empty[String, String].asJava
141+
table match {
142+
case fileStoreTable: FileStoreTable =>
143+
val partitionSpec = toPaimonPartitions(Array(ident)).head
144+
val partitionEntries = fileStoreTable
145+
.newSnapshotReader()
146+
.withMode(ScanMode.ALL)
147+
.withPartitionFilter(partitionSpec)
148+
.partitionEntries()
149+
150+
if (!partitionEntries.isEmpty) {
151+
val entry = partitionEntries.get(0)
152+
Map(
153+
PartitionStatistics.FIELD_RECORD_COUNT -> entry.recordCount().toString,
154+
PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES -> entry.fileSizeInBytes().toString,
155+
PartitionStatistics.FIELD_FILE_COUNT -> entry.fileCount().toString,
156+
PartitionStatistics.FIELD_LAST_FILE_CREATION_TIME -> entry
157+
.lastFileCreationTime()
158+
.toString
159+
).asJava
160+
} else {
161+
Map.empty[String, String].asJava
162+
}
163+
case _ =>
164+
Map.empty[String, String].asJava
165+
}
140166
}
141167

142168
override def listPartitionIdentifiers(

paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala renamed to paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818

1919
package org.apache.paimon.spark.commands
2020

21+
import org.apache.paimon.partition.PartitionStatistics
2122
import org.apache.paimon.spark.catalyst.Compatibility
2223
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
2324

2425
import org.apache.spark.sql.{Row, SparkSession}
2526
import org.apache.spark.sql.catalyst.analysis.ResolvedPartitionSpec
2627
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
27-
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, ToPrettyString}
28+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal}
2829
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsPartitionManagement, TableCatalog}
2930
import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._
3031
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
@@ -87,7 +88,20 @@ case class PaimonShowTablePartitionCommand(
8788
val partitionValues = partitions.mkString("[", ", ", "]")
8889
results.put("Partition Values", s"$partitionValues")
8990

90-
// TODO "Partition Parameters", "Created Time", "Last Access", "Partition Statistics"
91+
// Partition Parameters and Partition Statistics
92+
val metadata = partitionTable.loadPartitionMetadata(row)
93+
if (!metadata.isEmpty) {
94+
val metadataMap = metadata.asScala
95+
results.put(
96+
"Partition Parameters",
97+
s"{${metadataMap.map { case (k, v) => s"$k=$v" }.mkString(", ")}}")
98+
99+
val fileSizeInBytes =
100+
metadataMap.getOrElse(PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES, "0").toLong
101+
val recordCount =
102+
metadataMap.getOrElse(PartitionStatistics.FIELD_RECORD_COUNT, "0").toLong
103+
results.put("Partition Statistics", s"$recordCount rows, $fileSizeInBytes bytes")
104+
}
91105

92106
results
93107
.map {

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,22 @@ abstract class DescribeTableTestBase extends PaimonSparkTestBase {
9898
)
9999
Assertions.assertTrue(
100100
res2.select("information").collect().head.getString(0).contains("Partition Values"))
101+
102+
val info2 = res2.select("information").collect().head.getString(0)
103+
Assertions.assertTrue(info2.contains("Partition Parameters"))
104+
Assertions.assertTrue(info2.contains(PartitionStatistics.FIELD_RECORD_COUNT))
105+
Assertions.assertTrue(info2.contains(PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES))
106+
Assertions.assertTrue(info2.contains(PartitionStatistics.FIELD_FILE_COUNT))
107+
Assertions.assertTrue(info2.contains(PartitionStatistics.FIELD_LAST_FILE_CREATION_TIME))
108+
Assertions.assertTrue(info2.contains("Partition Statistics"))
109+
Assertions.assertTrue(info2.contains("recordCount=1"))
110+
Assertions.assertTrue(info2.contains("1 rows"))
111+
112+
val res3 =
113+
spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's2' PARTITION(pt = '2024')")
114+
val info3 = res3.select("information").collect().head.getString(0)
115+
Assertions.assertTrue(info3.contains("recordCount=2"))
116+
Assertions.assertTrue(info3.contains("2 rows"))
101117
}
102118
}
103119
}

paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,18 @@
1818

1919
package org.apache.paimon.spark.catalyst.analysis
2020

21+
import org.apache.paimon.spark.commands.PaimonShowTablePartitionCommand
22+
2123
import org.apache.spark.sql.SparkSession
22-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24+
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
25+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ShowTablePartition}
2326
import org.apache.spark.sql.catalyst.rules.Rule
2427

2528
case class Spark4ResolutionRules(session: SparkSession) extends Rule[LogicalPlan] {
26-
override def apply(plan: LogicalPlan): LogicalPlan = plan
29+
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown {
30+
case s @ ShowTablePartition(rt: ResolvedTable, _, _) =>
31+
val resolvedSpec =
32+
PaimonResolvePartitionSpec.resolve(rt.catalog, rt.identifier, s.partitionSpec)
33+
PaimonShowTablePartitionCommand(s.output, rt.catalog, rt.identifier, resolvedSpec)
34+
}
2735
}

0 commit comments

Comments
 (0)