Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@
<td>Whether to create underlying storage when reading and writing the table.</td>
</tr>
<tr>
<td><h5>blob.field</h5></td>
<td><h5>blob-as-descriptor</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Write blob field using blob descriptor rather than blob bytes.</td>
</tr>
<tr>
<td><h5>blob-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify the blob field.</td>
Expand Down
13 changes: 12 additions & 1 deletion paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1990,11 +1990,18 @@ public InlineElement getDescription() {
.withDescription("Format table file path only contain partition value.");

public static final ConfigOption<String> BLOB_FIELD =
key("blob.field")
key("blob-field")
.stringType()
.noDefaultValue()
.withDescription("Specify the blob field.");

public static final ConfigOption<Boolean> BLOB_AS_DESCRIPTOR =
key("blob-as-descriptor")
.booleanType()
.defaultValue(false)
.withDescription(
"Write blob field using blob descriptor rather than blob bytes.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -3065,6 +3072,10 @@ public boolean formatTablePartitionOnlyValueInPath() {
return options.get(FORMAT_TABLE_PARTITION_ONLY_VALUE_IN_PATH);
}

public boolean blobAsDescriptor() {
return options.get(BLOB_AS_DESCRIPTOR);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.paimon.flink;

import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobData;
import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
Expand All @@ -29,6 +31,8 @@
import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.UriReader;
import org.apache.paimon.utils.UriReaderFactory;

import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
Expand All @@ -42,9 +46,20 @@
public class FlinkRowWrapper implements InternalRow {

private final org.apache.flink.table.data.RowData row;
private final boolean blobAsDescriptor;
private final UriReaderFactory uriReaderFactory;

public FlinkRowWrapper(org.apache.flink.table.data.RowData row) {
this(row, false, null);
}

public FlinkRowWrapper(
org.apache.flink.table.data.RowData row,
boolean blobAsDescriptor,
CatalogContext catalogContext) {
this.row = row;
this.blobAsDescriptor = blobAsDescriptor;
this.uriReaderFactory = new UriReaderFactory(catalogContext);
}

@Override
Expand Down Expand Up @@ -131,7 +146,13 @@ public Variant getVariant(int pos) {

@Override
public Blob getBlob(int pos) {
return new BlobData(row.getBinary(pos));
if (blobAsDescriptor) {
BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(row.getBinary(pos));
UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri());
return Blob.fromDescriptor(uriReader, blobDescriptor);
} else {
return new BlobData(row.getBinary(pos));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,14 @@ private boolean buildForPostponeBucketCompaction(
partitionSpec,
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));

boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
DataStream<InternalRow> partitioned =
FlinkStreamPartitioner.partition(
FlinkSinkBuilder.mapToInternalRow(
sourcePair.getLeft(), realTable.rowType()),
sourcePair.getLeft(),
realTable.rowType(),
blobAsDescriptor,
table.catalogEnvironment().catalogContext()),
new RowDataChannelComputer(realTable.schema(), false),
null);
FixedBucketSink sink = new FixedBucketSink(realTable, null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.PartitionSinkStrategy;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowWrapper;
Expand Down Expand Up @@ -204,7 +205,13 @@ public FlinkSinkBuilder clusteringIfPossible(
public DataStreamSink<?> build() {
setParallelismIfAdaptiveConflict();
input = trySortInput(input);
DataStream<InternalRow> input = mapToInternalRow(this.input, table.rowType());
boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
DataStream<InternalRow> input =
mapToInternalRow(
this.input,
table.rowType(),
blobAsDescriptor,
table.catalogEnvironment().catalogContext());
if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) {
SingleOutputStreamOperator<InternalRow> newInput =
input.forward()
Expand Down Expand Up @@ -234,9 +241,16 @@ public DataStreamSink<?> build() {
}

public static DataStream<InternalRow> mapToInternalRow(
DataStream<RowData> input, org.apache.paimon.types.RowType rowType) {
DataStream<RowData> input,
org.apache.paimon.types.RowType rowType,
boolean blobAsDescriptor,
CatalogContext catalogContext) {
SingleOutputStreamOperator<InternalRow> result =
input.map((MapFunction<RowData, InternalRow>) FlinkRowWrapper::new)
input.map(
(MapFunction<RowData, InternalRow>)
r ->
new FlinkRowWrapper(
r, blobAsDescriptor, catalogContext))
.returns(
org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType(
rowType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,33 @@

package org.apache.paimon.flink;

import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.local.LocalFileIO;

import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.util.Collections;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

import static org.assertj.core.api.Assertions.assertThat;

/** Test write and read table with blob type. */
public class BlobTableITCase extends CatalogITCaseBase {

private static final Random RANDOM = new Random();
@TempDir public Path warehouse;

@Override
protected List<String> ddl() {
return Collections.singletonList(
"CREATE TABLE IF NOT EXISTS blob_table (id INT, data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob.field'='picture')");
return Arrays.asList(
"CREATE TABLE IF NOT EXISTS blob_table (id INT, data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture')",
"CREATE TABLE IF NOT EXISTS blob_table_descriptor (id INT, data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture', 'blob-as-descriptor'='true')");
}

@Test
Expand All @@ -44,4 +56,36 @@ public void testBasic() {
Row.of(1, "paimon", new byte[] {72, 101, 108, 108, 111}));
assertThat(batchSql("SELECT file_path FROM `blob_table$files`").size()).isEqualTo(2);
}

@Test
public void testWriteBlobAsDescriptor() throws Exception {
byte[] blobData = new byte[1024 * 1024];
RANDOM.nextBytes(blobData);
FileIO fileIO = new LocalFileIO();
String uri = "file://" + warehouse.toString() + "/external_blob";
try (OutputStream outputStream =
fileIO.newOutputStream(new org.apache.paimon.fs.Path(uri), true)) {
outputStream.write(blobData);
}

BlobDescriptor blobDescriptor = new BlobDescriptor(uri, 0, blobData.length);
batchSql(
"INSERT INTO blob_table_descriptor VALUES (1, 'paimon', X'"
+ bytesToHex(blobDescriptor.serialize())
+ "')");
assertThat(batchSql("SELECT * FROM blob_table_descriptor"))
.containsExactlyInAnyOrder(Row.of(1, "paimon", blobData));
}

private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();

public static String bytesToHex(byte[] bytes) {
char[] hexChars = new char[bytes.length * 2];
for (int j = 0; j < bytes.length; j++) {
int v = bytes[j] & 0xFF;
hexChars[j * 2] = HEX_ARRAY[v >>> 4];
hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
}
return new String(hexChars);
}
}