Skip to content

Commit fef6888

Browse files
authored
[spark] Introduce a base class for SparkTable to support multi-version APIs (#6421)
1 parent 9bd9c41 commit fef6888

3 files changed

Lines changed: 173 additions & 150 deletions

File tree

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import java.util.{Map => JMap, Objects, UUID}
3636
import scala.collection.JavaConverters._
3737

3838
trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
39-
self: SparkTable =>
39+
self: PaimonSparkTableBase =>
4040

4141
lazy val partitionRowType: RowType = TypeUtils.project(table.rowType, table.partitionKeys)
4242

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark
20+
21+
import org.apache.paimon.CoreOptions
22+
import org.apache.paimon.CoreOptions.BucketFunctionType
23+
import org.apache.paimon.options.Options
24+
import org.apache.paimon.spark.catalog.functions.BucketFunction
25+
import org.apache.paimon.spark.schema.PaimonMetadataColumn
26+
import org.apache.paimon.spark.util.OptionUtils
27+
import org.apache.paimon.spark.write.{PaimonV2WriteBuilder, PaimonWriteBuilder}
28+
import org.apache.paimon.table.{Table, _}
29+
import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, HASH_FIXED, POSTPONE_MODE}
30+
import org.apache.paimon.utils.StringUtils
31+
32+
import org.apache.spark.sql.connector.catalog._
33+
import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
34+
import org.apache.spark.sql.connector.read.ScanBuilder
35+
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
36+
import org.apache.spark.sql.types.StructType
37+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
38+
39+
import java.util.{Collections, EnumSet => JEnumSet, HashMap => JHashMap, Map => JMap, Set => JSet}
40+
41+
import scala.collection.JavaConverters._
42+
import scala.collection.mutable.ArrayBuffer
43+
44+
abstract class PaimonSparkTableBase(val table: Table)
45+
extends org.apache.spark.sql.connector.catalog.Table
46+
with SupportsRead
47+
with SupportsWrite
48+
with SupportsMetadataColumns
49+
with PaimonPartitionManagement {
50+
51+
lazy val coreOptions = new CoreOptions(table.options())
52+
53+
private lazy val useV2Write: Boolean = {
54+
val v2WriteConfigured = OptionUtils.useV2Write()
55+
v2WriteConfigured && supportsV2Write
56+
}
57+
58+
private def supportsV2Write: Boolean = {
59+
coreOptions.bucketFunctionType() == BucketFunctionType.DEFAULT && {
60+
table match {
61+
case storeTable: FileStoreTable =>
62+
storeTable.bucketMode() match {
63+
case HASH_FIXED => BucketFunction.supportsTable(storeTable)
64+
case BUCKET_UNAWARE | POSTPONE_MODE => true
65+
case _ => false
66+
}
67+
68+
case _ => false
69+
}
70+
} && coreOptions.clusteringColumns().isEmpty
71+
}
72+
73+
def getTable: Table = table
74+
75+
override def name: String = table.fullName
76+
77+
override lazy val schema: StructType = SparkTypeUtils.fromPaimonRowType(table.rowType)
78+
79+
override def partitioning: Array[Transform] = {
80+
table.partitionKeys().asScala.map(p => Expressions.identity(StringUtils.quote(p))).toArray
81+
}
82+
83+
override def properties: JMap[String, String] = {
84+
table match {
85+
case dataTable: DataTable =>
86+
val properties = new JHashMap[String, String](dataTable.coreOptions.toMap)
87+
if (!table.primaryKeys.isEmpty) {
88+
properties.put(CoreOptions.PRIMARY_KEY.key, String.join(",", table.primaryKeys))
89+
}
90+
properties.put(TableCatalog.PROP_PROVIDER, SparkSource.NAME)
91+
if (table.comment.isPresent) {
92+
properties.put(TableCatalog.PROP_COMMENT, table.comment.get)
93+
}
94+
if (properties.containsKey(CoreOptions.PATH.key())) {
95+
properties.put(TableCatalog.PROP_LOCATION, properties.get(CoreOptions.PATH.key()))
96+
}
97+
properties
98+
case _ => Collections.emptyMap()
99+
}
100+
}
101+
102+
override def capabilities: JSet[TableCapability] = {
103+
val capabilities = JEnumSet.of(
104+
TableCapability.BATCH_READ,
105+
TableCapability.OVERWRITE_BY_FILTER,
106+
TableCapability.MICRO_BATCH_READ
107+
)
108+
109+
if (useV2Write) {
110+
capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA)
111+
capabilities.add(TableCapability.BATCH_WRITE)
112+
capabilities.add(TableCapability.OVERWRITE_DYNAMIC)
113+
} else {
114+
capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA)
115+
capabilities.add(TableCapability.V1_BATCH_WRITE)
116+
}
117+
118+
capabilities
119+
}
120+
121+
override def metadataColumns: Array[MetadataColumn] = {
122+
val partitionType = SparkTypeUtils.toSparkPartitionType(table)
123+
124+
val _metadataColumns = ArrayBuffer[MetadataColumn]()
125+
126+
if (coreOptions.rowTrackingEnabled()) {
127+
_metadataColumns.append(PaimonMetadataColumn.ROW_ID)
128+
_metadataColumns.append(PaimonMetadataColumn.SEQUENCE_NUMBER)
129+
}
130+
131+
_metadataColumns.appendAll(
132+
Seq(
133+
PaimonMetadataColumn.FILE_PATH,
134+
PaimonMetadataColumn.ROW_INDEX,
135+
PaimonMetadataColumn.PARTITION(partitionType),
136+
PaimonMetadataColumn.BUCKET
137+
))
138+
139+
_metadataColumns.toArray
140+
}
141+
142+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
143+
table match {
144+
case t: KnownSplitsTable =>
145+
new PaimonSplitScanBuilder(t)
146+
case _: InnerTable =>
147+
new PaimonScanBuilder(table.copy(options.asCaseSensitiveMap).asInstanceOf[InnerTable])
148+
case _ =>
149+
throw new RuntimeException("Only InnerTable can be scanned.")
150+
}
151+
}
152+
153+
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
154+
table match {
155+
case fileStoreTable: FileStoreTable =>
156+
val options = Options.fromMap(info.options)
157+
if (useV2Write) {
158+
new PaimonV2WriteBuilder(fileStoreTable, info.schema(), options)
159+
} else {
160+
new PaimonWriteBuilder(fileStoreTable, options)
161+
}
162+
case _ =>
163+
throw new RuntimeException("Only FileStoreTable can be written.")
164+
}
165+
}
166+
167+
override def toString: String = {
168+
s"${table.getClass.getSimpleName}[${table.fullName()}]"
169+
}
170+
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala

Lines changed: 2 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -18,154 +18,7 @@
1818

1919
package org.apache.paimon.spark
2020

21-
import org.apache.paimon.CoreOptions
22-
import org.apache.paimon.CoreOptions.BucketFunctionType
23-
import org.apache.paimon.options.Options
24-
import org.apache.paimon.spark.catalog.functions.BucketFunction
25-
import org.apache.paimon.spark.schema.PaimonMetadataColumn
26-
import org.apache.paimon.spark.util.OptionUtils
27-
import org.apache.paimon.spark.write.{PaimonV2WriteBuilder, PaimonWriteBuilder}
28-
import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable, KnownSplitsTable, Table}
29-
import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, HASH_FIXED, POSTPONE_MODE}
30-
import org.apache.paimon.utils.StringUtils
31-
32-
import org.apache.spark.sql.connector.catalog._
33-
import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
34-
import org.apache.spark.sql.connector.read.ScanBuilder
35-
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
36-
import org.apache.spark.sql.types.StructType
37-
import org.apache.spark.sql.util.CaseInsensitiveStringMap
38-
39-
import java.util.{Collections, EnumSet => JEnumSet, HashMap => JHashMap, Map => JMap, Set => JSet}
40-
41-
import scala.collection.JavaConverters._
42-
import scala.collection.mutable.ArrayBuffer
21+
import org.apache.paimon.table.Table
4322

4423
/** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
45-
case class SparkTable(table: Table)
46-
extends org.apache.spark.sql.connector.catalog.Table
47-
with SupportsRead
48-
with SupportsWrite
49-
with SupportsMetadataColumns
50-
with PaimonPartitionManagement {
51-
52-
lazy val coreOptions = new CoreOptions(table.options())
53-
54-
private lazy val useV2Write: Boolean = {
55-
val v2WriteConfigured = OptionUtils.useV2Write()
56-
v2WriteConfigured && supportsV2Write
57-
}
58-
59-
private def supportsV2Write: Boolean = {
60-
coreOptions.bucketFunctionType() == BucketFunctionType.DEFAULT && {
61-
table match {
62-
case storeTable: FileStoreTable =>
63-
storeTable.bucketMode() match {
64-
case HASH_FIXED => BucketFunction.supportsTable(storeTable)
65-
case BUCKET_UNAWARE | POSTPONE_MODE => true
66-
case _ => false
67-
}
68-
69-
case _ => false
70-
}
71-
} && coreOptions.clusteringColumns().isEmpty
72-
}
73-
74-
def getTable: Table = table
75-
76-
override def name: String = table.fullName
77-
78-
override lazy val schema: StructType = SparkTypeUtils.fromPaimonRowType(table.rowType)
79-
80-
override def partitioning: Array[Transform] = {
81-
table.partitionKeys().asScala.map(p => Expressions.identity(StringUtils.quote(p))).toArray
82-
}
83-
84-
override def properties: JMap[String, String] = {
85-
table match {
86-
case dataTable: DataTable =>
87-
val properties = new JHashMap[String, String](dataTable.coreOptions.toMap)
88-
if (!table.primaryKeys.isEmpty) {
89-
properties.put(CoreOptions.PRIMARY_KEY.key, String.join(",", table.primaryKeys))
90-
}
91-
properties.put(TableCatalog.PROP_PROVIDER, SparkSource.NAME)
92-
if (table.comment.isPresent) {
93-
properties.put(TableCatalog.PROP_COMMENT, table.comment.get)
94-
}
95-
if (properties.containsKey(CoreOptions.PATH.key())) {
96-
properties.put(TableCatalog.PROP_LOCATION, properties.get(CoreOptions.PATH.key()))
97-
}
98-
properties
99-
case _ => Collections.emptyMap()
100-
}
101-
}
102-
103-
override def capabilities: JSet[TableCapability] = {
104-
val capabilities = JEnumSet.of(
105-
TableCapability.BATCH_READ,
106-
TableCapability.OVERWRITE_BY_FILTER,
107-
TableCapability.MICRO_BATCH_READ
108-
)
109-
110-
if (useV2Write) {
111-
capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA)
112-
capabilities.add(TableCapability.BATCH_WRITE)
113-
capabilities.add(TableCapability.OVERWRITE_DYNAMIC)
114-
} else {
115-
capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA)
116-
capabilities.add(TableCapability.V1_BATCH_WRITE)
117-
}
118-
119-
capabilities
120-
}
121-
122-
override def metadataColumns: Array[MetadataColumn] = {
123-
val partitionType = SparkTypeUtils.toSparkPartitionType(table)
124-
125-
val _metadataColumns = ArrayBuffer[MetadataColumn]()
126-
127-
if (coreOptions.rowTrackingEnabled()) {
128-
_metadataColumns.append(PaimonMetadataColumn.ROW_ID)
129-
_metadataColumns.append(PaimonMetadataColumn.SEQUENCE_NUMBER)
130-
}
131-
132-
_metadataColumns.appendAll(
133-
Seq(
134-
PaimonMetadataColumn.FILE_PATH,
135-
PaimonMetadataColumn.ROW_INDEX,
136-
PaimonMetadataColumn.PARTITION(partitionType),
137-
PaimonMetadataColumn.BUCKET
138-
))
139-
140-
_metadataColumns.toArray
141-
}
142-
143-
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
144-
table match {
145-
case t: KnownSplitsTable =>
146-
new PaimonSplitScanBuilder(t)
147-
case _: InnerTable =>
148-
new PaimonScanBuilder(table.copy(options.asCaseSensitiveMap).asInstanceOf[InnerTable])
149-
case _ =>
150-
throw new RuntimeException("Only InnerTable can be scanned.")
151-
}
152-
}
153-
154-
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
155-
table match {
156-
case fileStoreTable: FileStoreTable =>
157-
val options = Options.fromMap(info.options)
158-
if (useV2Write) {
159-
new PaimonV2WriteBuilder(fileStoreTable, info.schema(), options)
160-
} else {
161-
new PaimonWriteBuilder(fileStoreTable, options)
162-
}
163-
case _ =>
164-
throw new RuntimeException("Only FileStoreTable can be written.")
165-
}
166-
}
167-
168-
override def toString: String = {
169-
s"${table.getClass.getSimpleName}[${table.fullName()}]"
170-
}
171-
}
24+
case class SparkTable(override val table: Table) extends PaimonSparkTableBase(table) {}

0 commit comments

Comments
 (0)