Skip to content

Commit 8c8142c

Browse files
committed
[flink] Support creating table with blob in flink sql.
1 parent e1dcc85 commit 8c8142c

7 files changed

Lines changed: 81 additions & 44 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,17 @@
1818

1919
package org.apache.paimon.io;
2020

21-
import org.apache.paimon.annotation.VisibleForTesting;
2221
import org.apache.paimon.data.InternalRow;
2322
import org.apache.paimon.fileindex.FileIndexOptions;
2423
import org.apache.paimon.format.FileFormat;
25-
import org.apache.paimon.format.SimpleStatsCollector;
26-
import org.apache.paimon.format.avro.AvroFileFormat;
2724
import org.apache.paimon.fs.FileIO;
2825
import org.apache.paimon.manifest.FileSource;
29-
import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
3026
import org.apache.paimon.statistics.SimpleColStatsCollector;
3127
import org.apache.paimon.types.RowType;
3228
import org.apache.paimon.utils.LongCounter;
3329

3430
import javax.annotation.Nullable;
3531

36-
import java.util.Arrays;
3732
import java.util.List;
3833

3934
/** {@link RollingFileWriterImpl} for data files containing {@link InternalRow}. */
@@ -58,7 +53,7 @@ public RowDataRollingFileWriter(
5853
() ->
5954
new RowDataFileWriter(
6055
fileIO,
61-
createFileWriterContext(
56+
RollingFileWriter.createFileWriterContext(
6257
fileFormat, writeSchema, statsCollectors, fileCompression),
6358
pathFactory.newPath(),
6459
writeSchema,
@@ -72,34 +67,4 @@ public RowDataRollingFileWriter(
7267
writeCols),
7368
targetFileSize);
7469
}
75-
76-
@VisibleForTesting
77-
static FileWriterContext createFileWriterContext(
78-
FileFormat fileFormat,
79-
RowType rowType,
80-
SimpleColStatsCollector.Factory[] statsCollectors,
81-
String fileCompression) {
82-
return new FileWriterContext(
83-
fileFormat.createWriterFactory(rowType),
84-
createStatsProducer(fileFormat, rowType, statsCollectors),
85-
fileCompression);
86-
}
87-
88-
private static SimpleStatsProducer createStatsProducer(
89-
FileFormat fileFormat,
90-
RowType rowType,
91-
SimpleColStatsCollector.Factory[] statsCollectors) {
92-
boolean isDisabled =
93-
Arrays.stream(SimpleColStatsCollector.create(statsCollectors))
94-
.allMatch(p -> p instanceof NoneSimpleColStatsCollector);
95-
if (isDisabled) {
96-
return SimpleStatsProducer.disabledProducer();
97-
}
98-
if (fileFormat instanceof AvroFileFormat) {
99-
SimpleStatsCollector collector = new SimpleStatsCollector(rowType, statsCollectors);
100-
return SimpleStatsProducer.fromCollector(collector);
101-
}
102-
return SimpleStatsProducer.fromExtractor(
103-
fileFormat.createStatsExtractor(rowType, statsCollectors).orElse(null));
104-
}
10570
}

paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void initialize(String identifier, boolean statsDenseStore) {
7171
() ->
7272
new RowDataFileWriter(
7373
LocalFileIO.create(),
74-
RowDataRollingFileWriter.createFileWriterContext(
74+
RollingFileWriter.createFileWriterContext(
7575
fileFormat,
7676
SCHEMA,
7777
SimpleColStatsCollector.createFullStatsFactories(

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@
151151
import static org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB;
152152
import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
153153
import static org.apache.paimon.flink.FlinkCatalogOptions.REGISTER_TIMEOUT;
154+
import static org.apache.paimon.flink.LogicalTypeConversion.toBlobType;
154155
import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
155156
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
156157
import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem;
@@ -1050,6 +1051,16 @@ public static Schema fromCatalogTable(CatalogBaseTable catalogTable) {
10501051
RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();
10511052

10521053
Map<String, String> options = new HashMap<>(catalogTable.getOptions());
1054+
String blobName = options.get(FlinkConnectorOptions.BLOB_FIELD.key());
1055+
if (blobName != null) {
1056+
checkArgument(
1057+
options.containsKey(CoreOptions.DATA_EVOLUTION_ENABLED.key()),
1058+
"When setting '"
1059+
+ FlinkConnectorOptions.BLOB_FIELD.key()
1060+
+ "', you must also set '"
1061+
+ CoreOptions.DATA_EVOLUTION_ENABLED.key()
1062+
+ "'");
1063+
}
10531064
// Serialize virtual columns and watermark to the options
10541065
// This is what Flink SQL needs, the storage itself does not need them
10551066
options.putAll(columnOptions(schema));
@@ -1069,7 +1080,9 @@ public static Schema fromCatalogTable(CatalogBaseTable catalogTable) {
10691080
field ->
10701081
schemaBuilder.column(
10711082
field.getName(),
1072-
toDataType(field.getType()),
1083+
field.getName().equals(blobName)
1084+
? toBlobType(field.getType())
1085+
: toDataType(field.getType()),
10731086
columnComments.get(field.getName())));
10741087

10751088
return schemaBuilder.build();

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,12 @@ public class FlinkConnectorOptions {
539539
+ "multiple option group separated by commas. "
540540
+ "Now only 'external-paths' is supported.");
541541

542+
public static final ConfigOption<String> BLOB_FIELD =
543+
key("blob.field")
544+
.stringType()
545+
.noDefaultValue()
546+
.withDescription("Specify the blob field.");
547+
542548
public static List<ConfigOption<?>> getOptions() {
543549
final Field[] fields = FlinkConnectorOptions.class.getFields();
544550
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,18 @@
1818

1919
package org.apache.paimon.flink;
2020

21+
import org.apache.paimon.types.BlobType;
2122
import org.apache.paimon.types.DataType;
2223
import org.apache.paimon.types.RowType;
2324

25+
import org.apache.flink.table.types.logical.BinaryType;
2426
import org.apache.flink.table.types.logical.LogicalType;
27+
import org.apache.flink.table.types.logical.VarBinaryType;
2528

2629
import java.util.concurrent.atomic.AtomicInteger;
2730

31+
import static org.apache.paimon.utils.Preconditions.checkArgument;
32+
2833
/** Conversion between {@link LogicalType} and {@link DataType}. */
2934
public class LogicalTypeConversion {
3035

@@ -37,6 +42,13 @@ public static LogicalType toLogicalType(DataType dataType) {
3742
return dataType.accept(DataTypeToLogicalType.INSTANCE);
3843
}
3944

45+
public static BlobType toBlobType(LogicalType logicalType) {
46+
checkArgument(
47+
logicalType instanceof BinaryType || logicalType instanceof VarBinaryType,
48+
"Expected BinaryType or VarBinaryType, but got: " + logicalType);
49+
return new BlobType();
50+
}
51+
4052
public static RowType toDataType(org.apache.flink.table.types.logical.RowType logicalType) {
4153
return (RowType) toDataType(logicalType, new AtomicInteger(-1));
4254
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import org.apache.paimon.spark.commands.DataEvolutionPaimonWriter.{deserializeCo
2525
import org.apache.paimon.spark.write.WriteHelper
2626
import org.apache.paimon.table.FileStoreTable
2727
import org.apache.paimon.table.sink._
28+
import org.apache.paimon.types.DataType
29+
import org.apache.paimon.types.DataTypeRoot.BLOB
2830

2931
import org.apache.spark.sql._
3032

@@ -61,6 +63,11 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable) extends WriteH
6163
assert(data.columns.length == columnNames.size + 2)
6264
val writeType = table.rowType().project(columnNames.asJava)
6365

66+
if (writeType.getFieldTypes.stream.anyMatch((t: DataType) => t.is(BLOB))) {
67+
throw new UnsupportedOperationException(
68+
"DataEvolution does not support writing partial columns mixed with BLOB type.")
69+
}
70+
6471
val written =
6572
data.mapPartitions {
6673
iter =>

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,43 @@ case class MergeIntoPaimonDataEvolutionTable(
7575

7676
override val table: FileStoreTable = v2Table.getTable.asInstanceOf[FileStoreTable]
7777
private val firstRowIds: Seq[Long] = table
78-
.newSnapshotReader()
79-
.withManifestEntryFilter(entry => entry.file().firstRowId() != null)
80-
.read()
81-
.splits()
78+
.store()
79+
.newScan()
80+
.withManifestEntryFilter(
81+
entry =>
82+
entry.file().firstRowId() != null && (!entry
83+
.file()
84+
.isBlob))
85+
.plan()
86+
.files()
8287
.asScala
83-
.map(_.asInstanceOf[DataSplit])
84-
.flatMap(split => split.dataFiles().asScala.map(s => s.firstRowId().asInstanceOf[Long]))
88+
.map(file => file.file().firstRowId().asInstanceOf[Long])
8589
.distinct
8690
.sorted
8791
.toSeq
8892

93+
private val firstRowIdToBlobFirstRowIds = {
94+
val map = new mutable.HashMap[Long, List[Long]]()
95+
val files = table
96+
.store()
97+
.newScan()
98+
.withManifestEntryFilter(entry => entry.file().isBlob)
99+
.plan()
100+
.files()
101+
.asScala
102+
.sortBy(f => f.file().firstRowId())
103+
104+
for (file <- files) {
105+
val firstRowId = file.file().firstRowId().asInstanceOf[Long]
106+
val firstIdInNormalFile = floorBinarySearch(firstRowIds, firstRowId)
107+
map.update(
108+
firstIdInNormalFile,
109+
map.getOrElseUpdate(firstIdInNormalFile, List.empty[Long]) :+ firstRowId
110+
)
111+
}
112+
map
113+
}
114+
89115
lazy val targetRelation: DataSourceV2Relation = PaimonRelation.getPaimonRelation(targetTable)
90116
lazy val sourceRelation: DataSourceV2Relation = PaimonRelation.getPaimonRelation(sourceTable)
91117

@@ -288,6 +314,14 @@ case class MergeIntoPaimonDataEvolutionTable(
288314
.select(firstRowIdUdf(col(identifier)))
289315
.distinct()
290316
.as[Long]
317+
.flatMap(
318+
f => {
319+
if (firstRowIdToBlobFirstRowIds.contains(f)) {
320+
firstRowIdToBlobFirstRowIds(f)
321+
} else {
322+
Seq(f)
323+
}
324+
})
291325
.collect()
292326
}
293327

0 commit comments

Comments
 (0)