diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index f2e90e5dcef1..ab96b1ba54fd 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -51,7 +51,13 @@ Whether to create underlying storage when reading and writing the table. -
blob.field
+
blob-as-descriptor
+ false + Boolean + Write blob field using blob descriptor rather than blob bytes. + + +
blob-field
(none) String Specify the blob field. diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index e2046da9262a..823804c9a17b 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1990,11 +1990,18 @@ public InlineElement getDescription() { .withDescription("Format table file path only contain partition value."); public static final ConfigOption BLOB_FIELD = - key("blob.field") + key("blob-field") .stringType() .noDefaultValue() .withDescription("Specify the blob field."); + public static final ConfigOption BLOB_AS_DESCRIPTOR = + key("blob-as-descriptor") + .booleanType() + .defaultValue(false) + .withDescription( + "Write blob field using blob descriptor rather than blob bytes."); + private final Options options; public CoreOptions(Map options) { @@ -3065,6 +3072,10 @@ public boolean formatTablePartitionOnlyValueInPath() { return options.get(FORMAT_TABLE_PARTITION_ONLY_VALUE_IN_PATH); } + public boolean blobAsDescriptor() { + return options.get(BLOB_AS_DESCRIPTOR); + } + /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java index e9dc77e371b5..1f632212d506 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java @@ -18,9 +18,11 @@ package org.apache.paimon.flink; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Blob; import org.apache.paimon.data.BlobData; +import org.apache.paimon.data.BlobDescriptor; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; @@ -29,6 +31,8 @@ import org.apache.paimon.data.variant.GenericVariant; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; +import org.apache.paimon.utils.UriReader; +import org.apache.paimon.utils.UriReaderFactory; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; @@ -42,9 +46,20 @@ public class FlinkRowWrapper implements InternalRow { private final org.apache.flink.table.data.RowData row; + private final boolean blobAsDescriptor; + private final UriReaderFactory uriReaderFactory; public FlinkRowWrapper(org.apache.flink.table.data.RowData row) { + this(row, false, null); + } + + public FlinkRowWrapper( + org.apache.flink.table.data.RowData row, + boolean blobAsDescriptor, + CatalogContext catalogContext) { this.row = row; + this.blobAsDescriptor = blobAsDescriptor; + this.uriReaderFactory = new UriReaderFactory(catalogContext); } @Override @@ -131,7 +146,13 @@ public Variant getVariant(int pos) { @Override public Blob getBlob(int pos) { - return new BlobData(row.getBinary(pos)); + if (blobAsDescriptor) { + BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(row.getBinary(pos)); + UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri()); + return Blob.fromDescriptor(uriReader, blobDescriptor); + } else { + return new BlobData(row.getBinary(pos)); + } } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 43a3e6897109..602d3a59a5b7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -330,10 +330,14 @@ private boolean buildForPostponeBucketCompaction( partitionSpec, options.get(FlinkConnectorOptions.SCAN_PARALLELISM)); + boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor(); DataStream partitioned = FlinkStreamPartitioner.partition( FlinkSinkBuilder.mapToInternalRow( - sourcePair.getLeft(), realTable.rowType()), + sourcePair.getLeft(), + realTable.rowType(), + blobAsDescriptor, + table.catalogEnvironment().catalogContext()), new RowDataChannelComputer(realTable.schema(), false), null); FixedBucketSink sink = new FixedBucketSink(realTable, null, null); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 4f40a5c5bcb7..af3d407d95de 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.PartitionSinkStrategy; import org.apache.paimon.annotation.Public; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.FlinkRowWrapper; @@ -204,7 +205,13 @@ public FlinkSinkBuilder clusteringIfPossible( public DataStreamSink build() { setParallelismIfAdaptiveConflict(); input = trySortInput(input); - DataStream input = mapToInternalRow(this.input, table.rowType()); + boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor(); + DataStream input = + mapToInternalRow( + this.input, + table.rowType(), + blobAsDescriptor, + table.catalogEnvironment().catalogContext()); if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) { SingleOutputStreamOperator newInput = input.forward() @@ -234,9 +241,16 @@ public DataStreamSink build() { } public static DataStream mapToInternalRow( - DataStream input, org.apache.paimon.types.RowType rowType) { + DataStream input, + org.apache.paimon.types.RowType rowType, + boolean blobAsDescriptor, + CatalogContext catalogContext) { SingleOutputStreamOperator result = - input.map((MapFunction) FlinkRowWrapper::new) + input.map( + (MapFunction) + r -> + new FlinkRowWrapper( + r, blobAsDescriptor, catalogContext)) .returns( org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType( rowType)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java index 412968468b89..4e4a178b5cac 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java @@ -18,21 +18,33 @@ package org.apache.paimon.flink; +import org.apache.paimon.data.BlobDescriptor; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.local.LocalFileIO; + import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; -import java.util.Collections; +import java.io.OutputStream; +import java.nio.file.Path; +import java.util.Arrays; import java.util.List; +import java.util.Random; import static org.assertj.core.api.Assertions.assertThat; /** Test write and read table with blob type. */ public class BlobTableITCase extends CatalogITCaseBase { + private static final Random RANDOM = new Random(); + @TempDir public Path warehouse; + @Override protected List ddl() { - return Collections.singletonList( - "CREATE TABLE IF NOT EXISTS blob_table (id INT, data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob.field'='picture')"); + return Arrays.asList( + "CREATE TABLE IF NOT EXISTS blob_table (id INT, data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture')", + "CREATE TABLE IF NOT EXISTS blob_table_descriptor (id INT, data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture', 'blob-as-descriptor'='true')"); } @Test @@ -44,4 +56,36 @@ public void testBasic() { Row.of(1, "paimon", new byte[] {72, 101, 108, 108, 111})); assertThat(batchSql("SELECT file_path FROM `blob_table$files`").size()).isEqualTo(2); } + + @Test + public void testWriteBlobAsDescriptor() throws Exception { + byte[] blobData = new byte[1024 * 1024]; + RANDOM.nextBytes(blobData); + FileIO fileIO = new LocalFileIO(); + String uri = "file://" + warehouse.toString() + "/external_blob"; + try (OutputStream outputStream = + fileIO.newOutputStream(new org.apache.paimon.fs.Path(uri), true)) { + outputStream.write(blobData); + } + + BlobDescriptor blobDescriptor = new BlobDescriptor(uri, 0, blobData.length); + batchSql( + "INSERT INTO blob_table_descriptor VALUES (1, 'paimon', X'" + + bytesToHex(blobDescriptor.serialize()) + + "')"); + assertThat(batchSql("SELECT * FROM blob_table_descriptor")) + .containsExactlyInAnyOrder(Row.of(1, "paimon", blobData)); + } + + private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray(); + + public static String bytesToHex(byte[] bytes) { + char[] hexChars = new char[bytes.length * 2]; + for (int j = 0; j < bytes.length; j++) { + int v = bytes[j] & 0xFF; + hexChars[j * 2] = HEX_ARRAY[v >>> 4]; + hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F]; + } + return new String(hexChars); + } }