Skip to content

Commit b7ca31f

Browse files
authored
[flink] Support write blob data using blob descriptor (#6367)
1 parent 81ceea9 commit b7ca31f

6 files changed

Lines changed: 110 additions & 10 deletions

File tree

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,13 @@
5151
<td>Whether to create underlying storage when reading and writing the table.</td>
5252
</tr>
5353
<tr>
54-
<td><h5>blob.field</h5></td>
54+
<td><h5>blob-as-descriptor</h5></td>
55+
<td style="word-wrap: break-word;">false</td>
56+
<td>Boolean</td>
57+
<td>Write blob field using blob descriptor rather than blob bytes.</td>
58+
</tr>
59+
<tr>
60+
<td><h5>blob-field</h5></td>
5561
<td style="word-wrap: break-word;">(none)</td>
5662
<td>String</td>
5763
<td>Specify the blob field.</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1991,11 +1991,18 @@ public InlineElement getDescription() {
19911991
.withDescription("Format table file path only contain partition value.");
19921992

19931993
public static final ConfigOption<String> BLOB_FIELD =
1994-
key("blob.field")
1994+
key("blob-field")
19951995
.stringType()
19961996
.noDefaultValue()
19971997
.withDescription("Specify the blob field.");
19981998

1999+
public static final ConfigOption<Boolean> BLOB_AS_DESCRIPTOR =
2000+
key("blob-as-descriptor")
2001+
.booleanType()
2002+
.defaultValue(false)
2003+
.withDescription(
2004+
"Write blob field using blob descriptor rather than blob bytes.");
2005+
19992006
private final Options options;
20002007

20012008
public CoreOptions(Map<String, String> options) {
@@ -3071,6 +3078,10 @@ public boolean formatTablePartitionOnlyValueInPath() {
30713078
return options.get(FORMAT_TABLE_PARTITION_ONLY_VALUE_IN_PATH);
30723079
}
30733080

3081+
public boolean blobAsDescriptor() {
3082+
return options.get(BLOB_AS_DESCRIPTOR);
3083+
}
3084+
30743085
/** Specifies the merge engine for table with primary key. */
30753086
public enum MergeEngine implements DescribedEnum {
30763087
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
package org.apache.paimon.flink;
2020

21+
import org.apache.paimon.catalog.CatalogContext;
2122
import org.apache.paimon.data.BinaryString;
2223
import org.apache.paimon.data.Blob;
2324
import org.apache.paimon.data.BlobData;
25+
import org.apache.paimon.data.BlobDescriptor;
2426
import org.apache.paimon.data.Decimal;
2527
import org.apache.paimon.data.InternalArray;
2628
import org.apache.paimon.data.InternalMap;
@@ -29,6 +31,8 @@
2931
import org.apache.paimon.data.variant.GenericVariant;
3032
import org.apache.paimon.data.variant.Variant;
3133
import org.apache.paimon.types.RowKind;
34+
import org.apache.paimon.utils.UriReader;
35+
import org.apache.paimon.utils.UriReaderFactory;
3236

3337
import org.apache.flink.table.data.DecimalData;
3438
import org.apache.flink.table.data.GenericRowData;
@@ -42,9 +46,20 @@
4246
public class FlinkRowWrapper implements InternalRow {
4347

4448
private final org.apache.flink.table.data.RowData row;
49+
private final boolean blobAsDescriptor;
50+
private final UriReaderFactory uriReaderFactory;
4551

4652
public FlinkRowWrapper(org.apache.flink.table.data.RowData row) {
53+
this(row, false, null);
54+
}
55+
56+
public FlinkRowWrapper(
57+
org.apache.flink.table.data.RowData row,
58+
boolean blobAsDescriptor,
59+
CatalogContext catalogContext) {
4760
this.row = row;
61+
this.blobAsDescriptor = blobAsDescriptor;
62+
this.uriReaderFactory = new UriReaderFactory(catalogContext);
4863
}
4964

5065
@Override
@@ -131,7 +146,13 @@ public Variant getVariant(int pos) {
131146

132147
@Override
133148
public Blob getBlob(int pos) {
134-
return new BlobData(row.getBinary(pos));
149+
if (blobAsDescriptor) {
150+
BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(row.getBinary(pos));
151+
UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri());
152+
return Blob.fromDescriptor(uriReader, blobDescriptor);
153+
} else {
154+
return new BlobData(row.getBinary(pos));
155+
}
135156
}
136157

137158
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,10 +330,14 @@ private boolean buildForPostponeBucketCompaction(
330330
partitionSpec,
331331
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
332332

333+
boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
333334
DataStream<InternalRow> partitioned =
334335
FlinkStreamPartitioner.partition(
335336
FlinkSinkBuilder.mapToInternalRow(
336-
sourcePair.getLeft(), realTable.rowType()),
337+
sourcePair.getLeft(),
338+
realTable.rowType(),
339+
blobAsDescriptor,
340+
table.catalogEnvironment().catalogContext()),
337341
new RowDataChannelComputer(realTable.schema(), false),
338342
null);
339343
FixedBucketSink sink = new FixedBucketSink(realTable, null, null);

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.CoreOptions.PartitionSinkStrategy;
2323
import org.apache.paimon.annotation.Public;
24+
import org.apache.paimon.catalog.CatalogContext;
2425
import org.apache.paimon.data.InternalRow;
2526
import org.apache.paimon.flink.FlinkConnectorOptions;
2627
import org.apache.paimon.flink.FlinkRowWrapper;
@@ -204,7 +205,13 @@ public FlinkSinkBuilder clusteringIfPossible(
204205
public DataStreamSink<?> build() {
205206
setParallelismIfAdaptiveConflict();
206207
input = trySortInput(input);
207-
DataStream<InternalRow> input = mapToInternalRow(this.input, table.rowType());
208+
boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
209+
DataStream<InternalRow> input =
210+
mapToInternalRow(
211+
this.input,
212+
table.rowType(),
213+
blobAsDescriptor,
214+
table.catalogEnvironment().catalogContext());
208215
if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) {
209216
SingleOutputStreamOperator<InternalRow> newInput =
210217
input.forward()
@@ -234,9 +241,16 @@ public DataStreamSink<?> build() {
234241
}
235242

236243
public static DataStream<InternalRow> mapToInternalRow(
237-
DataStream<RowData> input, org.apache.paimon.types.RowType rowType) {
244+
DataStream<RowData> input,
245+
org.apache.paimon.types.RowType rowType,
246+
boolean blobAsDescriptor,
247+
CatalogContext catalogContext) {
238248
SingleOutputStreamOperator<InternalRow> result =
239-
input.map((MapFunction<RowData, InternalRow>) FlinkRowWrapper::new)
249+
input.map(
250+
(MapFunction<RowData, InternalRow>)
251+
r ->
252+
new FlinkRowWrapper(
253+
r, blobAsDescriptor, catalogContext))
240254
.returns(
241255
org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType(
242256
rowType));

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,33 @@
1818

1919
package org.apache.paimon.flink;
2020

21+
import org.apache.paimon.data.BlobDescriptor;
22+
import org.apache.paimon.fs.FileIO;
23+
import org.apache.paimon.fs.local.LocalFileIO;
24+
2125
import org.apache.flink.types.Row;
2226
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.api.io.TempDir;
2328

24-
import java.util.Collections;
29+
import java.io.OutputStream;
30+
import java.nio.file.Path;
31+
import java.util.Arrays;
2532
import java.util.List;
33+
import java.util.Random;
2634

2735
import static org.assertj.core.api.Assertions.assertThat;
2836

2937
/** Test write and read table with blob type. */
3038
public class BlobTableITCase extends CatalogITCaseBase {
3139

40+
private static final Random RANDOM = new Random();
41+
@TempDir public Path warehouse;
42+
3243
@Override
3344
protected List<String> ddl() {
34-
return Collections.singletonList(
35-
"CREATE TABLE IF NOT EXISTS blob_table (id INT, data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob.field'='picture')");
45+
return Arrays.asList(
46+
"CREATE TABLE IF NOT EXISTS blob_table (id INT, data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture')",
47+
"CREATE TABLE IF NOT EXISTS blob_table_descriptor (id INT, data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture', 'blob-as-descriptor'='true')");
3648
}
3749

3850
@Test
@@ -44,4 +56,36 @@ public void testBasic() {
4456
Row.of(1, "paimon", new byte[] {72, 101, 108, 108, 111}));
4557
assertThat(batchSql("SELECT file_path FROM `blob_table$files`").size()).isEqualTo(2);
4658
}
59+
60+
@Test
61+
public void testWriteBlobAsDescriptor() throws Exception {
62+
byte[] blobData = new byte[1024 * 1024];
63+
RANDOM.nextBytes(blobData);
64+
FileIO fileIO = new LocalFileIO();
65+
String uri = "file://" + warehouse.toString() + "/external_blob";
66+
try (OutputStream outputStream =
67+
fileIO.newOutputStream(new org.apache.paimon.fs.Path(uri), true)) {
68+
outputStream.write(blobData);
69+
}
70+
71+
BlobDescriptor blobDescriptor = new BlobDescriptor(uri, 0, blobData.length);
72+
batchSql(
73+
"INSERT INTO blob_table_descriptor VALUES (1, 'paimon', X'"
74+
+ bytesToHex(blobDescriptor.serialize())
75+
+ "')");
76+
assertThat(batchSql("SELECT * FROM blob_table_descriptor"))
77+
.containsExactlyInAnyOrder(Row.of(1, "paimon", blobData));
78+
}
79+
80+
private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
81+
82+
public static String bytesToHex(byte[] bytes) {
83+
char[] hexChars = new char[bytes.length * 2];
84+
for (int j = 0; j < bytes.length; j++) {
85+
int v = bytes[j] & 0xFF;
86+
hexChars[j * 2] = HEX_ARRAY[v >>> 4];
87+
hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
88+
}
89+
return new String(hexChars);
90+
}
4791
}

0 commit comments

Comments
 (0)