Skip to content

Commit d0fc63c

Browse files
authored
[spark] Support spark blob basic read write (#6368)
1 parent b7ca31f commit d0fc63c

9 files changed

Lines changed: 164 additions & 5 deletions

File tree

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.paimon.spark.catalog.functions.V1FunctionConverter;
4040
import org.apache.paimon.spark.utils.CatalogUtils;
4141
import org.apache.paimon.table.FormatTable;
42+
import org.apache.paimon.types.BlobType;
4243
import org.apache.paimon.types.DataField;
4344
import org.apache.paimon.types.DataType;
4445
import org.apache.paimon.utils.ExceptionUtils;
@@ -92,6 +93,7 @@
9293
import java.util.HashMap;
9394
import java.util.List;
9495
import java.util.Map;
96+
import java.util.Objects;
9597
import java.util.stream.Collectors;
9698

9799
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
@@ -109,6 +111,7 @@
109111
import static org.apache.paimon.spark.utils.CatalogUtils.removeCatalogName;
110112
import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
111113
import static org.apache.paimon.spark.utils.CatalogUtils.toUpdateColumnDefaultValue;
114+
import static org.apache.paimon.utils.Preconditions.checkArgument;
112115

113116
/** Spark {@link TableCatalog} for paimon. */
114117
public class SparkCatalog extends SparkBaseCatalog
@@ -456,6 +459,7 @@ private static SchemaChange.Move getMove(
456459
private Schema toInitialSchema(
457460
StructType schema, Transform[] partitions, Map<String, String> properties) {
458461
Map<String, String> normalizedProperties = new HashMap<>(properties);
462+
String blobFieldName = properties.get(CoreOptions.BLOB_FIELD.key());
459463
String provider = properties.get(TableCatalog.PROP_PROVIDER);
460464
if (!usePaimon(provider)) {
461465
if (isFormatTable(provider)) {
@@ -488,7 +492,15 @@ private Schema toInitialSchema(
488492

489493
for (StructField field : schema.fields()) {
490494
String name = field.name();
491-
DataType type = toPaimonType(field.dataType()).copy(field.nullable());
495+
DataType type;
496+
if (Objects.equals(blobFieldName, name)) {
497+
checkArgument(
498+
field.dataType() instanceof org.apache.spark.sql.types.BinaryType,
499+
"The type of blob field must be binary");
500+
type = new BlobType();
501+
} else {
502+
type = toPaimonType(field.dataType()).copy(field.nullable());
503+
}
492504
String comment = field.getComment().getOrElse(() -> null);
493505
if (field.metadata().contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
494506
String defaultValue =

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.types.ArrayType;
2424
import org.apache.paimon.types.BigIntType;
2525
import org.apache.paimon.types.BinaryType;
26+
import org.apache.paimon.types.BlobType;
2627
import org.apache.paimon.types.BooleanType;
2728
import org.apache.paimon.types.CharType;
2829
import org.apache.paimon.types.DataField;
@@ -161,6 +162,11 @@ public DataType visit(BinaryType binaryType) {
161162
return DataTypes.BinaryType;
162163
}
163164

165+
@Override
166+
public DataType visit(BlobType blobType) {
167+
return DataTypes.BinaryType;
168+
}
169+
164170
@Override
165171
public DataType visit(VarBinaryType varBinaryType) {
166172
return DataTypes.BinaryType;

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,37 @@
1818

1919
package org.apache.paimon.spark.data
2020

21-
import org.apache.paimon.types.RowType
21+
import org.apache.paimon.types.{DataTypeRoot, RowType}
2222

2323
import org.apache.spark.sql.catalyst.InternalRow
2424
import org.apache.spark.sql.paimon.shims.SparkShimLoader
2525

26+
import java.util.OptionalInt
27+
2628
abstract class SparkInternalRow extends InternalRow {
2729
def replace(row: org.apache.paimon.data.InternalRow): SparkInternalRow
2830
}
2931

3032
object SparkInternalRow {
3133

3234
def create(rowType: RowType): SparkInternalRow = {
33-
SparkShimLoader.shim.createSparkInternalRow(rowType)
35+
val fieldIndex = blobFieldIndex(rowType)
36+
if (fieldIndex.isPresent) {
37+
SparkShimLoader.shim.createSparkInternalRowWithBlob(rowType, fieldIndex.getAsInt)
38+
} else {
39+
SparkShimLoader.shim.createSparkInternalRow(rowType)
40+
}
41+
}
42+
43+
private def blobFieldIndex(rowType: RowType): OptionalInt = {
44+
var i: Int = 0
45+
while (i < rowType.getFieldCount) {
46+
if (rowType.getTypeAt(i).getTypeRoot.equals(DataTypeRoot.BLOB)) {
47+
return OptionalInt.of(i)
48+
}
49+
i += 1
50+
}
51+
OptionalInt.empty()
3452
}
3553

3654
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ trait SparkShim {
5151

5252
def createSparkInternalRow(rowType: RowType): SparkInternalRow
5353

54+
def createSparkInternalRowWithBlob(rowType: RowType, blobFieldIndex: Int): SparkInternalRow
55+
5456
def createSparkArrayData(elementType: DataType): SparkArrayData
5557

5658
def createTable(
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.sql
20+
21+
import org.apache.paimon.spark.PaimonSparkTestBase
22+
23+
import org.apache.spark.sql.Row
24+
25+
class BlobTestBase extends PaimonSparkTestBase {
26+
test("Blob: test basic") {
27+
withTable("t") {
28+
sql(
29+
"CREATE TABLE t (id INT, data STRING, picture BINARY) TBLPROPERTIES ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture')")
30+
sql("INSERT INTO t VALUES (1, 'paimon', X'48656C6C6F')")
31+
32+
checkAnswer(
33+
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"),
34+
Seq(Row(1, "paimon", Array[Byte](72, 101, 108, 108, 111), 0, 1))
35+
)
36+
}
37+
}
38+
39+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.data
20+
21+
import org.apache.paimon.spark.AbstractSparkInternalRow
22+
import org.apache.paimon.types.RowType
23+
24+
class Spark3InternalRowWithBlob(rowType: RowType, blobFieldIndex: Int)
25+
extends Spark3InternalRow(rowType) {
26+
27+
override def getBinary(ordinal: Int): Array[Byte] = {
28+
if (ordinal == blobFieldIndex) {
29+
row.getBlob(ordinal).toData
30+
} else {
31+
super.getBinary(ordinal)
32+
}
33+
}
34+
}

paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package org.apache.spark.sql.paimon.shims
2121
import org.apache.paimon.data.variant.Variant
2222
import org.apache.paimon.spark.catalyst.analysis.Spark3ResolutionRules
2323
import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark3SqlExtensionsParser
24-
import org.apache.paimon.spark.data.{Spark3ArrayData, Spark3InternalRow, SparkArrayData, SparkInternalRow}
24+
import org.apache.paimon.spark.data.{Spark3ArrayData, Spark3InternalRow, Spark3InternalRowWithBlob, SparkArrayData, SparkInternalRow}
2525
import org.apache.paimon.types.{DataType, RowType}
2626

2727
import org.apache.spark.sql.SparkSession
@@ -54,6 +54,12 @@ class Spark3Shim extends SparkShim {
5454
new Spark3InternalRow(rowType)
5555
}
5656

57+
override def createSparkInternalRowWithBlob(
58+
rowType: RowType,
59+
blobFieldIndex: Int): SparkInternalRow = {
60+
new Spark3InternalRowWithBlob(rowType, blobFieldIndex)
61+
}
62+
5763
override def createSparkArrayData(elementType: DataType): SparkArrayData = {
5864
new Spark3ArrayData(elementType)
5965
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.data
20+
21+
import org.apache.paimon.spark.AbstractSparkInternalRow
22+
import org.apache.paimon.types.RowType
23+
24+
import org.apache.spark.unsafe.types.VariantVal
25+
26+
class Spark4InternalRowWithBlob(rowType: RowType, blobFieldIndex: Int)
27+
extends Spark4InternalRow(rowType) {
28+
29+
override def getBinary(ordinal: Int): Array[Byte] = {
30+
if (ordinal == blobFieldIndex) {
31+
row.getBlob(ordinal).toData
32+
} else {
33+
super.getBinary(ordinal)
34+
}
35+
}
36+
}

paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package org.apache.spark.sql.paimon.shims
2121
import org.apache.paimon.data.variant.{GenericVariant, Variant}
2222
import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules
2323
import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser
24-
import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, SparkArrayData, SparkInternalRow}
24+
import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, Spark4InternalRowWithBlob, SparkArrayData, SparkInternalRow}
2525
import org.apache.paimon.types.{DataType, RowType}
2626

2727
import org.apache.spark.sql.SparkSession
@@ -55,6 +55,12 @@ class Spark4Shim extends SparkShim {
5555
new Spark4InternalRow(rowType)
5656
}
5757

58+
override def createSparkInternalRowWithBlob(
59+
rowType: RowType,
60+
blobFieldIndex: Int): SparkInternalRow = {
61+
new Spark4InternalRowWithBlob(rowType, blobFieldIndex)
62+
}
63+
5864
override def createSparkArrayData(elementType: DataType): SparkArrayData = {
5965
new Spark4ArrayData(elementType)
6066
}

0 commit comments

Comments
 (0)