diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index f2e90e5dcef1..ab96b1ba54fd 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -51,7 +51,13 @@
Whether to create underlying storage when reading and writing the table. |
- blob.field |
+ blob-as-descriptor |
+ false |
+ Boolean |
+ Write blob field using blob descriptor rather than blob bytes. |
+
+
+ blob-field |
(none) |
String |
Specify the blob field. |
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e2046da9262a..823804c9a17b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1990,11 +1990,18 @@ public InlineElement getDescription() {
.withDescription("Format table file path only contain partition value.");
public static final ConfigOption BLOB_FIELD =
- key("blob.field")
+ key("blob-field")
.stringType()
.noDefaultValue()
.withDescription("Specify the blob field.");
+ public static final ConfigOption BLOB_AS_DESCRIPTOR =
+ key("blob-as-descriptor")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Write blob field using blob descriptor rather than blob bytes.");
+
private final Options options;
public CoreOptions(Map options) {
@@ -3065,6 +3072,10 @@ public boolean formatTablePartitionOnlyValueInPath() {
return options.get(FORMAT_TABLE_PARTITION_ONLY_VALUE_IN_PATH);
}
+ public boolean blobAsDescriptor() {
+ return options.get(BLOB_AS_DESCRIPTOR);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
index e9dc77e371b5..1f632212d506 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
@@ -18,9 +18,11 @@
package org.apache.paimon.flink;
+import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
@@ -29,6 +31,8 @@
import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.UriReader;
+import org.apache.paimon.utils.UriReaderFactory;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
@@ -42,9 +46,20 @@
public class FlinkRowWrapper implements InternalRow {
private final org.apache.flink.table.data.RowData row;
+ private final boolean blobAsDescriptor;
+ private final UriReaderFactory uriReaderFactory;
public FlinkRowWrapper(org.apache.flink.table.data.RowData row) {
+ this(row, false, null);
+ }
+
+ public FlinkRowWrapper(
+ org.apache.flink.table.data.RowData row,
+ boolean blobAsDescriptor,
+ CatalogContext catalogContext) {
this.row = row;
+ this.blobAsDescriptor = blobAsDescriptor;
+ this.uriReaderFactory = new UriReaderFactory(catalogContext);
}
@Override
@@ -131,7 +146,13 @@ public Variant getVariant(int pos) {
@Override
public Blob getBlob(int pos) {
- return new BlobData(row.getBinary(pos));
+ if (blobAsDescriptor) {
+ BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(row.getBinary(pos));
+ UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri());
+ return Blob.fromDescriptor(uriReader, blobDescriptor);
+ } else {
+ return new BlobData(row.getBinary(pos));
+ }
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 43a3e6897109..602d3a59a5b7 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -330,10 +330,14 @@ private boolean buildForPostponeBucketCompaction(
partitionSpec,
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
+ boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
DataStream partitioned =
FlinkStreamPartitioner.partition(
FlinkSinkBuilder.mapToInternalRow(
- sourcePair.getLeft(), realTable.rowType()),
+ sourcePair.getLeft(),
+ realTable.rowType(),
+ blobAsDescriptor,
+ table.catalogEnvironment().catalogContext()),
new RowDataChannelComputer(realTable.schema(), false),
null);
FixedBucketSink sink = new FixedBucketSink(realTable, null, null);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 4f40a5c5bcb7..af3d407d95de 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.PartitionSinkStrategy;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowWrapper;
@@ -204,7 +205,13 @@ public FlinkSinkBuilder clusteringIfPossible(
public DataStreamSink> build() {
setParallelismIfAdaptiveConflict();
input = trySortInput(input);
- DataStream input = mapToInternalRow(this.input, table.rowType());
+ boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
+ DataStream input =
+ mapToInternalRow(
+ this.input,
+ table.rowType(),
+ blobAsDescriptor,
+ table.catalogEnvironment().catalogContext());
if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) {
SingleOutputStreamOperator newInput =
input.forward()
@@ -234,9 +241,16 @@ public DataStreamSink> build() {
}
public static DataStream mapToInternalRow(
- DataStream input, org.apache.paimon.types.RowType rowType) {
+ DataStream input,
+ org.apache.paimon.types.RowType rowType,
+ boolean blobAsDescriptor,
+ CatalogContext catalogContext) {
SingleOutputStreamOperator result =
- input.map((MapFunction) FlinkRowWrapper::new)
+ input.map(
+ (MapFunction)
+ r ->
+ new FlinkRowWrapper(
+ r, blobAsDescriptor, catalogContext))
.returns(
org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType(
rowType));
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index 412968468b89..4e4a178b5cac 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -18,21 +18,33 @@
package org.apache.paimon.flink;
+import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.local.LocalFileIO;
+
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
-import java.util.Collections;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.util.Arrays;
import java.util.List;
+import java.util.Random;
import static org.assertj.core.api.Assertions.assertThat;
/** Test write and read table with blob type. */
public class BlobTableITCase extends CatalogITCaseBase {
+ private static final Random RANDOM = new Random();
+ @TempDir public Path warehouse;
+
@Override
protected List ddl() {
- return Collections.singletonList(
- "CREATE TABLE IF NOT EXISTS blob_table (id INT, data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob.field'='picture')");
+ return Arrays.asList(
+ "CREATE TABLE IF NOT EXISTS blob_table (id INT, data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture')",
+ "CREATE TABLE IF NOT EXISTS blob_table_descriptor (id INT, data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture', 'blob-as-descriptor'='true')");
}
@Test
@@ -44,4 +56,36 @@ public void testBasic() {
Row.of(1, "paimon", new byte[] {72, 101, 108, 108, 111}));
assertThat(batchSql("SELECT file_path FROM `blob_table$files`").size()).isEqualTo(2);
}
+
+ @Test
+ public void testWriteBlobAsDescriptor() throws Exception {
+ byte[] blobData = new byte[1024 * 1024];
+ RANDOM.nextBytes(blobData);
+ FileIO fileIO = new LocalFileIO();
+ String uri = "file://" + warehouse.toString() + "/external_blob";
+ try (OutputStream outputStream =
+ fileIO.newOutputStream(new org.apache.paimon.fs.Path(uri), true)) {
+ outputStream.write(blobData);
+ }
+
+ BlobDescriptor blobDescriptor = new BlobDescriptor(uri, 0, blobData.length);
+ batchSql(
+ "INSERT INTO blob_table_descriptor VALUES (1, 'paimon', X'"
+ + bytesToHex(blobDescriptor.serialize())
+ + "')");
+ assertThat(batchSql("SELECT * FROM blob_table_descriptor"))
+ .containsExactlyInAnyOrder(Row.of(1, "paimon", blobData));
+ }
+
+ private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
+
+ public static String bytesToHex(byte[] bytes) {
+ char[] hexChars = new char[bytes.length * 2];
+ for (int j = 0; j < bytes.length; j++) {
+ int v = bytes[j] & 0xFF;
+ hexChars[j * 2] = HEX_ARRAY[v >>> 4];
+ hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
+ }
+ return new String(hexChars);
+ }
}