diff --git a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java index c0f885c72325..e72cd2cd6b54 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java @@ -103,12 +103,21 @@ public static List fieldsInBlobFile(RowType rowType, Set desc rowType.getFields() .forEach( field -> { - DataTypeRoot type = field.type().getTypeRoot(); - if (type == DataTypeRoot.BLOB + if (isBlobFileField(field.type()) && !descriptorFields.contains(field.name())) { result.add(field); } }); return result; } + + public static boolean isBlobFileField(DataType type) { + if (type.getTypeRoot() == DataTypeRoot.BLOB) { + return true; + } + if (type.getTypeRoot() == DataTypeRoot.ARRAY) { + return ((ArrayType) type).getElementType().getTypeRoot() == DataTypeRoot.BLOB; + } + return false; + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BlobArrayPlaceholder.java b/paimon-common/src/main/java/org/apache/paimon/data/BlobArrayPlaceholder.java new file mode 100644 index 000000000000..2807a963b89e --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobArrayPlaceholder.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data; + +import org.apache.paimon.data.variant.Variant; + +import java.io.Serializable; + +/** + * Placeholder for an array blob field in data-evolution blob files. It should never be exposed to + * users. + */ +public final class BlobArrayPlaceholder implements InternalArray, Serializable { + + private static final long serialVersionUID = 1L; + + public static final BlobArrayPlaceholder INSTANCE = new BlobArrayPlaceholder(); + + private BlobArrayPlaceholder() {} + + private Object readResolve() { + return INSTANCE; + } + + private static UnsupportedOperationException unsupported() { + return new UnsupportedOperationException( + "Should never call this method for placeholder blob array."); + } + + @Override + public int size() { + throw unsupported(); + } + + @Override + public boolean isNullAt(int pos) { + throw unsupported(); + } + + @Override + public boolean getBoolean(int pos) { + throw unsupported(); + } + + @Override + public byte getByte(int pos) { + throw unsupported(); + } + + @Override + public short getShort(int pos) { + throw unsupported(); + } + + @Override + public int getInt(int pos) { + throw unsupported(); + } + + @Override + public long getLong(int pos) { + throw unsupported(); + } + + @Override + public float getFloat(int pos) { + throw unsupported(); + } + + @Override + public double getDouble(int pos) { + throw unsupported(); + } + + @Override + public BinaryString getString(int pos) { + throw unsupported(); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + throw unsupported(); + } + + @Override + public Timestamp getTimestamp(int pos, int precision) { + throw unsupported(); + } + + @Override + public byte[] getBinary(int pos) { + throw unsupported(); + } + + @Override + public Variant getVariant(int pos) { + throw unsupported(); + } + + @Override + public Blob getBlob(int pos) { + throw unsupported(); + } + + @Override + public InternalArray getArray(int pos) { + throw unsupported(); + } + + @Override + public InternalVector getVector(int pos) { + throw unsupported(); + } + + @Override + public InternalMap getMap(int pos) { + throw unsupported(); + } + + @Override + public InternalRow getRow(int pos, int numFields) { + throw unsupported(); + } + + @Override + public boolean[] toBooleanArray() { + throw unsupported(); + } + + @Override + public byte[] toByteArray() { + throw unsupported(); + } + + @Override + public short[] toShortArray() { + throw unsupported(); + } + + @Override + public int[] toIntArray() { + throw unsupported(); + } + + @Override + public long[] toLongArray() { + throw unsupported(); + } + + @Override + public float[] toFloatArray() { + throw unsupported(); + } + + @Override + public double[] toDoubleArray() { + throw unsupported(); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java b/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java index 3bbb85f49963..e4aa7e335cd6 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java @@ -153,6 +153,8 @@ static Class getDataClass(DataType type) { return InternalMap.class; case ROW: return InternalRow.class; + case BLOB: + return Blob.class; default: throw new IllegalArgumentException("Illegal type: " + type); } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java index 3c1496b85183..1d4c102d0237 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java @@ -54,7 +54,7 @@ import static java.util.Comparator.comparingLong; import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; import static org.apache.paimon.manifest.ManifestFileMeta.allContainsRowId; -import static org.apache.paimon.types.DataTypeRoot.BLOB; +import static org.apache.paimon.types.BlobType.isBlobFileField; import static org.apache.paimon.types.VectorType.isVectorStoreFile; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -100,7 +100,7 @@ public DataEvolutionCompactCoordinator( ? table.rowType().getFields().stream() .filter( field -> - field.type().is(BLOB) + isBlobFileField(field.type()) && !blobInlineFields.contains( field.name())) .map(DataField::id) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java index 86f555748f4b..845d1d3125e7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java @@ -19,11 +19,13 @@ package org.apache.paimon.operation; import org.apache.paimon.append.ForceSingleBatchReader; +import org.apache.paimon.data.BlobArrayPlaceholder; import org.apache.paimon.data.BlobPlaceholder; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.Range; @@ -55,7 +57,8 @@ public class BlobFallbackRecordReader implements RecordReader { private final List> groupReaders = new ArrayList<>(); - private final int blobIndex; + private final Object blobPlaceholder; + private final InternalRow.FieldGetter blobGetter; private boolean returned; BlobFallbackRecordReader( @@ -64,7 +67,9 @@ public class BlobFallbackRecordReader implements RecordReader { List rowRanges, RowType readRowType, int blobIndex) { - this.blobIndex = blobIndex; + this.blobPlaceholder = blobPlaceholder(readRowType, blobIndex); + this.blobGetter = + InternalRow.createFieldGetter(readRowType.getTypeAt(blobIndex), blobIndex); checkArgument(!files.isEmpty(), "Blob bunch should not be empty."); long firstRowId = Long.MAX_VALUE; @@ -188,7 +193,13 @@ public void releaseBatch() { } private boolean isPlaceHolder(InternalRow row) { - return !row.isNullAt(blobIndex) && row.getBlob(blobIndex) == BlobPlaceholder.INSTANCE; + return blobGetter.getFieldOrNull(row) == blobPlaceholder; + } + + private static Object blobPlaceholder(RowType rowType, int blobIndex) { + return rowType.getTypeAt(blobIndex).getTypeRoot() == DataTypeRoot.ARRAY + ? BlobArrayPlaceholder.INSTANCE + : BlobPlaceholder.INSTANCE; } @Override @@ -263,6 +274,7 @@ public static class BlobSequenceGroupRecordReader implements RecordReader rowRanges; private final RowType readRowType; private final int blobIndex; + private final Object blobPlaceholder; private final long lastRowId; private RecordReader currentReader; @@ -287,6 +299,7 @@ public static class BlobSequenceGroupRecordReader implements RecordReader t.is(BLOB))) { + if (rowType.getFieldTypes().stream().noneMatch(BlobType::isBlobFileField)) { return null; } Set descriptorFields = options.blobDescriptorField(); @@ -62,8 +60,7 @@ public static BlobFileContext create(RowType rowType, CoreOptions options) { String externalStoragePath = options.blobExternalStoragePath(); boolean requireBlobFile = false; for (DataField field : rowType.getFields()) { - DataTypeRoot type = field.type().getTypeRoot(); - if (type == DataTypeRoot.BLOB + if (BlobType.isBlobFileField(field.type()) && (!inlineFields.contains(field.name()) || externalStorageField.contains(field.name()))) { requireBlobFile = true; @@ -83,7 +80,7 @@ public BlobFileContext withBlobConsumer(BlobConsumer blobConsumer) { } public BlobFileContext withWriteType(RowType writeType) { - if (writeType.getFieldTypes().stream().noneMatch(t -> t.is(BLOB))) { + if (writeType.getFieldTypes().stream().noneMatch(BlobType::isBlobFileField)) { return null; } return this; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java index 145fdc9ad4af..91cccfd3ede9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java @@ -46,7 +46,6 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.types.DataField; -import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.FormatReaderMapping; @@ -76,6 +75,7 @@ import static java.util.Comparator.comparingLong; import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; import static org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking; +import static org.apache.paimon.types.BlobType.isBlobFileField; import static org.apache.paimon.types.VectorType.isVectorStoreFile; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; @@ -419,7 +419,7 @@ private RecordReader sequentialReadFiles( private static int findBlobFieldIndex(RowType rowType) { for (int i = 0; i < rowType.getFieldCount(); i++) { - if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) { + if (isBlobFileField(rowType.getTypeAt(i))) { return i; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/ColumnDirectiveUtils.java b/paimon-core/src/main/java/org/apache/paimon/schema/ColumnDirectiveUtils.java index d523bbb37047..587ab9c25768 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/ColumnDirectiveUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/ColumnDirectiveUtils.java @@ -234,18 +234,41 @@ private static DataType convertType( return new VectorType(sourceType.isNullable(), directive.vectorDim(), elementType); } else { DataTypeRoot root = sourceType.getTypeRoot(); + if (root == DataTypeRoot.ARRAY) { + Preconditions.checkArgument( + CoreOptions.BLOB_FIELD.key().equals(directive.optionKey()), + "Column %s declared with '%s' must be of BYTES, BINARY or BLOB type, but was %s. " + + "ARRAY is only supported by '%s'.", + fieldName, + directive.optionKey(), + sourceType, + CoreOptions.BLOB_FIELD.key()); + DataType elementType = ((ArrayType) sourceType).getElementType(); + Preconditions.checkArgument( + isBlobSourceRoot(elementType.getTypeRoot()), + "Column %s declared with a BLOB directive must be of BYTES, BINARY, " + + "BLOB, ARRAY, ARRAY or ARRAY type, but was %s.", + fieldName, + sourceType); + return new ArrayType( + sourceType.isNullable(), new BlobType(elementType.isNullable())); + } Preconditions.checkArgument( - root == DataTypeRoot.VARBINARY - || root == DataTypeRoot.BINARY - || root == DataTypeRoot.BLOB, - "Column %s declared with a BLOB directive must be of BYTES, " - + "BINARY or BLOB type, but was %s.", + isBlobSourceRoot(root), + "Column %s declared with a BLOB directive must be of BYTES, BINARY, " + + "BLOB, ARRAY, ARRAY or ARRAY type, but was %s.", fieldName, sourceType); return new BlobType(sourceType.isNullable()); } } + private static boolean isBlobSourceRoot(DataTypeRoot root) { + return root == DataTypeRoot.VARBINARY + || root == DataTypeRoot.BINARY + || root == DataTypeRoot.BLOB; + } + /** * Append {@code fieldName} to the comma-separated option identified by {@code optionKey}. If * the canonical key is empty but a fallback key holds the value (e.g. legacy {@code @@ -295,19 +318,19 @@ public static void modifyFieldOptions( new ConfigOption[] {CoreOptions.VECTOR_FIELD}; /** - * Remove directive-managed options when a BLOB or VECTOR column is dropped. Only acts on BLOB - * or VECTOR type columns; other types are ignored. + * Remove directive-managed options when a BLOB or VECTOR column is dropped. Only acts on BLOB, + * {@code ARRAY} or VECTOR type columns; other types are ignored. */ public static void removeDroppedDirectiveOptions( - String fieldName, DataTypeRoot typeRoot, Map options) { - if (typeRoot == DataTypeRoot.BLOB) { + String fieldName, DataType type, Map options) { + if (BlobType.isBlobFileField(type)) { for (ConfigOption option : BLOB_OPTIONS) { removeFromCsvOption(option.key(), fieldName, options); for (FallbackKey fk : option.fallbackKeys()) { removeFromCsvOption(fk.getKey(), fieldName, options); } } - } else if (typeRoot == DataTypeRoot.VECTOR) { + } else if (type.getTypeRoot() == DataTypeRoot.VECTOR) { for (ConfigOption option : VECTOR_OPTIONS) { removeFromCsvOption(option.key(), fieldName, options); for (FallbackKey fk : option.fallbackKeys()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index bfe3b7de84d7..7f782c43de63 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -43,7 +43,6 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeCasts; -import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.MapType; import org.apache.paimon.types.ReassignFieldId; import org.apache.paimon.types.RowType; @@ -104,6 +103,7 @@ import static org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP; import static org.apache.paimon.schema.ColumnDirectiveUtils.applyAddColumnDirective; import static org.apache.paimon.schema.ColumnDirectiveUtils.applyDirectives; +import static org.apache.paimon.types.BlobType.isBlobFileField; import static org.apache.paimon.utils.DefaultValueUtils.validateDefaultValue; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -465,7 +465,7 @@ protected void updateLastColumn( .ifPresent( f -> ColumnDirectiveUtils.removeDroppedDirectiveOptions( - dropName, f.type().getTypeRoot(), newOptions)); + dropName, f.type(), newOptions)); } new NestedColumnModifier(drop.fieldNames(), lazyIdentifier) { @Override @@ -964,7 +964,7 @@ private static void assertNotRenamingBlobColumn(List fields, String[] } String fieldName = fieldNames[0]; for (DataField field : fields) { - if (field.name().equals(fieldName) && field.type().is(DataTypeRoot.BLOB)) { + if (field.name().equals(fieldName) && isBlobFileField(field.type())) { throw new UnsupportedOperationException( String.format("Cannot rename BLOB column: [%s]", fieldName)); } @@ -981,8 +981,8 @@ private static void assertNotChangingBlobColumnType( if (!field.name().equals(fieldName)) { continue; } - boolean wasBlob = field.type().is(DataTypeRoot.BLOB); - boolean willBeBlob = newType.is(DataTypeRoot.BLOB); + boolean wasBlob = isBlobFileField(field.type()); + boolean willBeBlob = isBlobFileField(newType); if (wasBlob || willBeBlob) { throw new UnsupportedOperationException( String.format( diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 7733b6a080b5..f57d32c760c2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -86,6 +86,7 @@ import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX; import static org.apache.paimon.table.SpecialFields.SYSTEM_FIELD_NAMES; import static org.apache.paimon.types.BlobType.fieldNamesInBlobFile; +import static org.apache.paimon.types.BlobType.isBlobFileField; import static org.apache.paimon.types.DataTypeRoot.ARRAY; import static org.apache.paimon.types.DataTypeRoot.MAP; import static org.apache.paimon.types.DataTypeRoot.MULTISET; @@ -849,19 +850,19 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) List fields = schema.fields(); List blobNames = fields.stream() - .filter(field -> field.type().is(DataTypeRoot.BLOB)) + .filter(field -> isBlobFileField(field.type())) .map(DataField::name) .collect(Collectors.toList()); if (!blobNames.isEmpty()) { checkArgument( options.dataEvolutionEnabled(), - "Data evolution config must enabled for table with BLOB type column."); + "Data evolution config must enabled for table with BLOB or ARRAY type column."); checkArgument( fields.size() > blobNames.size(), - "Table with BLOB type column must have other normal columns."); + "Table with BLOB or ARRAY type column must have other normal columns."); checkArgument( blobNames.stream().noneMatch(schema.partitionKeys()::contains), - "The BLOB type column can not be part of partition keys."); + "The BLOB or ARRAY type column can not be part of partition keys."); } FileFormat vectorFileFormat = vectorFileFormat(options); @@ -885,7 +886,7 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) private static void validateBlobFields(RowType rowType, CoreOptions options) { Set blobFieldNames = rowType.getFields().stream() - .filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB) + .filter(field -> isBlobFileField(field.type())) .map(DataField::name) .collect(Collectors.toCollection(HashSet::new)); Set configured = @@ -894,7 +895,7 @@ private static void validateBlobFields(RowType rowType, CoreOptions options) { for (String field : configured) { checkArgument( blobFieldNames.contains(field), - "Field '%s' in '%s' must be a BLOB field in table schema.", + "Field '%s' in '%s' must be a BLOB field or ARRAY field in table schema.", field, CoreOptions.BLOB_FIELD.key()); } @@ -910,9 +911,11 @@ private static Set validateBlobDescriptorFields(RowType rowType, CoreOpt for (String field : configured) { checkArgument( blobFieldNames.contains(field), - "Field '%s' in '%s' must be a BLOB field in table schema.", + "Field '%s' in '%s' must be a BLOB field in table schema. " + + "ARRAY is only supported by '%s'.", field, - CoreOptions.BLOB_DESCRIPTOR_FIELD.key()); + CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), + CoreOptions.BLOB_FIELD.key()); } return configured; } @@ -928,9 +931,11 @@ private static Set validateBlobViewFields( for (String field : configured) { checkArgument( blobFieldNames.contains(field), - "Field '%s' in '%s' must be a BLOB field in table schema.", + "Field '%s' in '%s' must be a BLOB field in table schema. " + + "ARRAY is only supported by '%s'.", field, - CoreOptions.BLOB_VIEW_FIELD.key()); + CoreOptions.BLOB_VIEW_FIELD.key(), + CoreOptions.BLOB_FIELD.key()); checkArgument( !blobDescriptorFields.contains(field), "Field '%s' in '%s' can not also be in '%s'.", @@ -952,9 +957,11 @@ private static void validateBlobExternalStorageFields( for (String field : configured) { checkArgument( blobFieldNames.contains(field), - "Field '%s' in '%s' must be a BLOB field in table schema.", + "Field '%s' in '%s' must be a BLOB field in table schema. " + + "ARRAY is only supported by '%s'.", field, - CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()); + CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(), + CoreOptions.BLOB_FIELD.key()); checkArgument( blobDescriptorFields.contains(field), "Field '%s' in '%s' must also be in '%s'.", diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java index 3f12ce039fcb..8dad4401af4a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java @@ -24,12 +24,15 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobArrayPlaceholder; import org.apache.paimon.data.BlobData; import org.apache.paimon.data.BlobDescriptor; import org.apache.paimon.data.BlobPlaceholder; import org.apache.paimon.data.BlobView; import org.apache.paimon.data.BlobViewStruct; +import org.apache.paimon.data.GenericArray; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.fs.FileIO; @@ -154,6 +157,98 @@ public void testBasic() throws Exception { assertThat(integer.get()).isEqualTo(1000); } + @Test + public void testArrayBlobField() throws Exception { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.ARRAY(DataTypes.BLOB())); + schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "1 GB"); + schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "25 MB"); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + catalog.createTable(identifier(), schemaBuilder.build(), true); + + byte[] blob0 = "blob-0".getBytes(); + byte[] blob1 = "blob-1".getBytes(); + byte[] blob2 = "blob-2".getBytes(); + writeDataDefault( + Arrays.asList( + GenericRow.of( + 0, + new GenericArray( + new Object[] { + new BlobData(blob0), null, new BlobData(blob1) + })), + GenericRow.of(1, null), + GenericRow.of(2, new GenericArray(new Object[] {new BlobData(blob2)})))); + + FileStoreTable table = getTableDefault(); + List filesMetas = + table.store().newScan().plan().files().stream() + .map(ManifestEntry::file) + .collect(Collectors.toList()); + List fieldGroups = + DataEvolutionSplitRead.splitFieldBunches(filesMetas, file -> table.rowType()); + assertThat(fieldGroups.size()).isEqualTo(2); + assertThat(countFilesWithSuffix(table.fileIO(), table.location(), ".blob")).isEqualTo(1); + + assertArrayBlobRows(blob0, blob1, null, blob2); + + byte[] updatedBlob1 = "updated-blob-1".getBytes(); + RowType blobWriteType = table.schema().logicalRowType().project("f1"); + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + try (BatchTableWrite write = builder.newWrite().withWriteType(blobWriteType); + BatchTableCommit commit = builder.newCommit()) { + write.write(GenericRow.of(BlobArrayPlaceholder.INSTANCE)); + write.write(GenericRow.of(new GenericArray(new Object[] {new BlobData(updatedBlob1)}))); + write.write(GenericRow.of(BlobArrayPlaceholder.INSTANCE)); + + List commitMessages = write.prepareCommit(); + assignFirstRowId(commitMessages, 0L); + commit.commit(commitMessages); + } + + assertArrayBlobRows(blob0, blob1, updatedBlob1, blob2); + + DataEvolutionCompactCoordinator coordinator = + new DataEvolutionCompactCoordinator(table, true, false); + List tasks = coordinator.plan(); + assertThat(tasks.stream().anyMatch(DataEvolutionCompactTask::isBlobTask)).isTrue(); + + List compactMessages = new ArrayList<>(); + for (DataEvolutionCompactTask task : tasks) { + compactMessages.add(task.doCompact(table, commitUser)); + } + commitDefault(compactMessages); + + assertArrayBlobRows(blob0, blob1, updatedBlob1, blob2); + } + + private void assertArrayBlobRows(byte[] blob0, byte[] blob1, byte[] updatedBlob1, byte[] blob2) + throws Exception { + Map actual = new HashMap<>(); + readDefault(row -> actual.put(row.getInt(0), row.getArray(1))); + + assertThat(actual.size()).isEqualTo(3); + InternalArray first = actual.get(0); + assertThat(first.size()).isEqualTo(3); + assertThat(first.getBlob(0).toData()).isEqualTo(blob0); + assertThat(first.isNullAt(1)).isTrue(); + assertThat(first.getBlob(2).toData()).isEqualTo(blob1); + + if (updatedBlob1 == null) { + assertThat(actual.get(1)).isNull(); + } else { + InternalArray second = actual.get(1); + assertThat(second.size()).isEqualTo(1); + assertThat(second.getBlob(0).toData()).isEqualTo(updatedBlob1); + } + + InternalArray third = actual.get(2); + assertThat(third.size()).isEqualTo(1); + assertThat(third.getBlob(0).toData()).isEqualTo(blob2); + } + @Test public void testUpdateBlobColumn() throws Exception { createTableDefault(); @@ -594,6 +689,48 @@ public void testBlobInlineFieldCanDeclareBlobWithoutBlobField() throws Exception "blob_view_without_blob_field", CoreOptions.BLOB_VIEW_FIELD.key()); } + @Test + public void testBlobInlineFieldRejectsArrayBlob() { + assertArrayBlobInlineOptionRejected(CoreOptions.BLOB_DESCRIPTOR_FIELD.key()); + assertArrayBlobInlineOptionRejected(CoreOptions.BLOB_VIEW_FIELD.key()); + assertArrayBlobInlineOptionRejected(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()); + } + + private void assertArrayBlobInlineOptionRejected(String optionKey) { + assertThatThrownBy( + () -> { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.ARRAY(DataTypes.BLOB())); + schemaBuilder.column("f2", DataTypes.BLOB()); + schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB"); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + schemaBuilder.option(optionKey, "f1"); + if (CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key().equals(optionKey)) { + schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f2"); + schemaBuilder.option( + CoreOptions.BLOB_EXTERNAL_STORAGE_PATH.key(), + "/tmp/target"); + } + catalog.createTable( + identifier( + "array_blob_inline_reject_" + + optionKey + .replace('-', '_') + .replace('.', '_')), + schemaBuilder.build(), + true); + }) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage( + "Field 'f1' in '" + + optionKey + + "' must be a BLOB field in table schema. ARRAY is only supported by '" + + CoreOptions.BLOB_FIELD.key() + + "'."); + } + private void assertCreateBlobInlineFieldWithoutBlobField(String tableName, String optionKey) throws Exception { Schema.Builder schemaBuilder = Schema.newBuilder(); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java index dcea0a6ff130..48c6950f3ab5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java @@ -18,9 +18,12 @@ package org.apache.paimon.operation; +import org.apache.paimon.data.BlobArrayPlaceholder; import org.apache.paimon.data.BlobData; import org.apache.paimon.data.BlobPlaceholder; +import org.apache.paimon.data.GenericArray; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.io.DataFileMeta; @@ -59,6 +62,14 @@ public class BlobFallbackRecordReaderTest { new DataField(1, SpecialFields.ROW_ID.name(), DataTypes.BIGINT()), new DataField( 2, SpecialFields.SEQUENCE_NUMBER.name(), DataTypes.BIGINT()))); + private static final RowType READ_ARRAY_ROW_TYPE = + new RowType( + Arrays.asList( + new DataField( + BLOB_INDEX, BLOB_FIELD, DataTypes.ARRAY(DataTypes.BLOB())), + new DataField(1, SpecialFields.ROW_ID.name(), DataTypes.BIGINT()), + new DataField( + 2, SpecialFields.SEQUENCE_NUMBER.name(), DataTypes.BIGINT()))); @Test public void testBlobSequenceGroupReaderWithRowRanges() throws Exception { @@ -168,6 +179,39 @@ public void testBlobFallbackRecordReaderWithRowRanges() throws Exception { .containsExactly(2L, 2L, 2L, 2L, 2L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 2L, 2L); } + @Test + public void testArrayBlobFallbackRecordReader() throws Exception { + DataFileMeta newFile = blobFile("new-array-file", 0, 3, 2); + DataFileMeta oldFile = blobFile("old-array-file", 0, 5, 1); + Set placeholderRows = placeholderRows(newFile, 1); + + try (RecordReader reader = + new BlobFallbackRecordReader( + Arrays.asList(newFile, oldFile), + file -> oneRowPerBatchReader(arrayFileRows(file, placeholderRows)), + null, + READ_ARRAY_ROW_TYPE, + BLOB_INDEX)) { + List rowIds = new ArrayList<>(); + List sequenceNumbers = new ArrayList<>(); + + RecordIterator batch; + while ((batch = reader.readBatch()) != null) { + InternalRow row; + while ((row = batch.next()) != null) { + InternalArray array = row.getArray(BLOB_INDEX); + assertThat(array).isNotSameAs(BlobArrayPlaceholder.INSTANCE); + rowIds.add(row.getLong(1)); + sequenceNumbers.add(row.getLong(2)); + } + batch.releaseBatch(); + } + + assertThat(rowIds).containsExactly(0L, 1L, 2L, 3L, 4L); + assertThat(sequenceNumbers).containsExactly(2L, 1L, 2L, 1L, 1L); + } + } + private static ReadResult readFallback( List files, List rowRanges, Set placeholderRows) throws Exception { @@ -255,6 +299,19 @@ private static List fileRows( return rows; } + private static List arrayFileRows(DataFileMeta file, Set placeholderRows) { + List rows = new ArrayList<>(); + long lastRowId = file.nonNullFirstRowId() + file.rowCount() - 1; + for (long rowId = file.nonNullFirstRowId(); rowId <= lastRowId; rowId++) { + rows.add( + arrayBlobRow( + rowId, + file.maxSequenceNumber(), + placeholderRows.contains(rowKey(file, rowId)))); + } + return rows; + } + private static boolean selected(long rowId, List rowRanges) { if (rowRanges == null) { return true; @@ -324,6 +381,18 @@ private static InternalRow blobRow(long rowId, long sequenceNumber, boolean plac return row; } + private static InternalRow arrayBlobRow(long rowId, long sequenceNumber, boolean placeholder) { + GenericRow row = new GenericRow(3); + row.setField( + BLOB_INDEX, + placeholder + ? BlobArrayPlaceholder.INSTANCE + : new GenericArray(new Object[] {new BlobData(new byte[] {(byte) rowId})})); + row.setField(1, rowId); + row.setField(2, sequenceNumber); + return row; + } + private static RecordReader oneRowPerBatchReader(List rows) { return new RecordReader() { diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/ColumnDirectiveUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/ColumnDirectiveUtilsTest.java index 2d8494af3b11..63e7a6b199ad 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/ColumnDirectiveUtilsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/ColumnDirectiveUtilsTest.java @@ -19,6 +19,8 @@ package org.apache.paimon.schema; import org.apache.paimon.CoreOptions; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BlobType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.DataTypes; @@ -177,6 +179,24 @@ public void testBlobDirectiveWithBlobSourceType() { assertThat(result.type().getTypeRoot()).isEqualTo(DataTypeRoot.BLOB); } + @Test + public void testBlobDirectiveWithArraySourceType() { + Map opts = new HashMap<>(); + ColumnDirectiveUtils.ConvertedColumn result = + ColumnDirectiveUtils.applyAddColumnDirective( + "__BLOB_FIELD", + "images", + new ArrayType(false, DataTypes.BYTES().copy(false)), + opts); + + assertThat(result).isNotNull(); + assertThat(result.type().getTypeRoot()).isEqualTo(DataTypeRoot.ARRAY); + assertThat(result.type().isNullable()).isFalse(); + BlobType elementType = (BlobType) ((ArrayType) result.type()).getElementType(); + assertThat(elementType.isNullable()).isFalse(); + assertThat(opts).containsEntry(CoreOptions.BLOB_FIELD.key(), "images"); + } + // -- applyAddColumnDirective error cases -- @Test @@ -186,7 +206,39 @@ public void testBlobDirectiveRejectsNonBinaryType() { ColumnDirectiveUtils.applyAddColumnDirective( "__BLOB_FIELD", "col", DataTypes.INT(), new HashMap<>())) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("must be of BYTES, BINARY or BLOB type"); + .hasMessageContaining( + "must be of BYTES, BINARY, BLOB, ARRAY, ARRAY or ARRAY type"); + } + + @Test + public void testInlineBlobDirectivesRejectArraySourceType() { + assertThatThrownBy( + () -> + ColumnDirectiveUtils.applyAddColumnDirective( + "__BLOB_DESCRIPTOR_FIELD", + "images", + DataTypes.ARRAY(DataTypes.BYTES()), + new HashMap<>())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("ARRAY is only supported by 'blob-field'"); + assertThatThrownBy( + () -> + ColumnDirectiveUtils.applyAddColumnDirective( + "__BLOB_VIEW_FIELD", + "images", + DataTypes.ARRAY(DataTypes.BYTES()), + new HashMap<>())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("ARRAY is only supported by 'blob-field'"); + assertThatThrownBy( + () -> + ColumnDirectiveUtils.applyAddColumnDirective( + "__BLOB_EXTERNAL_STORAGE_FIELD", + "images", + DataTypes.ARRAY(DataTypes.BYTES()), + new HashMap<>())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("ARRAY is only supported by 'blob-field'"); } @Test @@ -335,7 +387,7 @@ public void testRemoveDroppedBlobOptions() { opts.put("blob.stored-descriptor-fields", "b,legacy"); opts.put(CoreOptions.VECTOR_FIELD.key(), "v"); - ColumnDirectiveUtils.removeDroppedDirectiveOptions("b", DataTypeRoot.BLOB, opts); + ColumnDirectiveUtils.removeDroppedDirectiveOptions("b", DataTypes.BLOB(), opts); assertThat(opts).containsEntry(CoreOptions.BLOB_FIELD.key(), "a"); assertThat(opts).containsEntry(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "c"); @@ -345,6 +397,17 @@ public void testRemoveDroppedBlobOptions() { assertThat(opts).containsEntry(CoreOptions.VECTOR_FIELD.key(), "v"); } + @Test + public void testRemoveDroppedArrayBlobOptions() { + Map opts = new HashMap<>(); + opts.put(CoreOptions.BLOB_FIELD.key(), "images,other"); + + ColumnDirectiveUtils.removeDroppedDirectiveOptions( + "images", DataTypes.ARRAY(DataTypes.BLOB()), opts); + + assertThat(opts).containsEntry(CoreOptions.BLOB_FIELD.key(), "other"); + } + @Test public void testRemoveDroppedVectorOptions() { Map opts = new HashMap<>(); @@ -352,7 +415,8 @@ public void testRemoveDroppedVectorOptions() { opts.put(CoreOptions.VECTOR_FIELD.key(), "emb,emb2"); opts.put("field.emb.vector-dim", "128"); - ColumnDirectiveUtils.removeDroppedDirectiveOptions("emb", DataTypeRoot.VECTOR, opts); + ColumnDirectiveUtils.removeDroppedDirectiveOptions( + "emb", DataTypes.VECTOR(128, DataTypes.FLOAT()), opts); assertThat(opts).containsEntry(CoreOptions.BLOB_FIELD.key(), "a"); assertThat(opts).containsEntry(CoreOptions.VECTOR_FIELD.key(), "emb2"); @@ -365,7 +429,7 @@ public void testRemoveDroppedNonDirectiveTypeIsNoop() { opts.put(CoreOptions.BLOB_FIELD.key(), "a"); opts.put(CoreOptions.VECTOR_FIELD.key(), "v"); - ColumnDirectiveUtils.removeDroppedDirectiveOptions("x", DataTypeRoot.INTEGER, opts); + ColumnDirectiveUtils.removeDroppedDirectiveOptions("x", DataTypes.INT(), opts); assertThat(opts).containsEntry(CoreOptions.BLOB_FIELD.key(), "a"); assertThat(opts).containsEntry(CoreOptions.VECTOR_FIELD.key(), "v"); diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index f40fec52fb11..3527a0a79f06 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -146,13 +146,15 @@ public void testBlobTableSchema() { options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); assertThatThrownBy(() -> validateBlobSchema(options, emptyList())) - .hasMessage("Data evolution config must enabled for table with BLOB type column."); + .hasMessage( + "Data evolution config must enabled for table with BLOB or ARRAY type column."); options.clear(); options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); assertThatThrownBy(() -> validateBlobSchema(options, singletonList("f2"))) - .hasMessage("The BLOB type column can not be part of partition keys."); + .hasMessage( + "The BLOB or ARRAY type column can not be part of partition keys."); assertThatThrownBy( () -> { @@ -166,7 +168,8 @@ public void testBlobTableSchema() { options, "")); }) - .hasMessage("Table with BLOB type column must have other normal columns."); + .hasMessage( + "Table with BLOB or ARRAY type column must have other normal columns."); } @Test diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index cae85c238621..e2608e15c030 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -1092,7 +1092,8 @@ public static Schema fromCatalogTable(CatalogBaseTable catalogTable) { Map options = new HashMap<>(catalogTable.getOptions()); List blobFields = CoreOptions.blobField(options); - Set blobTypeFields = blobTypeFields(options); + Set blobFileFields = new HashSet<>(CoreOptions.blobField(options)); + Set blobInlineFields = blobInlineFields(options); if (!blobFields.isEmpty()) { checkArgument( options.containsKey(CoreOptions.DATA_EVOLUTION_ENABLED.key()), @@ -1125,26 +1126,31 @@ public static Schema fromCatalogTable(CatalogBaseTable catalogTable) { field.getName(), field.getType(), options, - blobTypeFields), + blobFileFields, + blobInlineFields), columnComments.get(field.getName()))); return schemaBuilder.build(); } - private static Set blobTypeFields(Map options) { - Set blobTypeFields = new HashSet<>(CoreOptions.blobField(options)); - blobTypeFields.addAll(new CoreOptions(options).blobDescriptorField()); - blobTypeFields.addAll(CoreOptions.blobViewField(options)); - return blobTypeFields; + private static Set blobInlineFields(Map options) { + Set blobInlineFields = + new HashSet<>(new CoreOptions(options).blobDescriptorField()); + blobInlineFields.addAll(CoreOptions.blobViewField(options)); + return blobInlineFields; } private static org.apache.paimon.types.DataType resolveDataType( String fieldName, org.apache.flink.table.types.logical.LogicalType logicalType, Map options, - Set blobTypeFields) { - if (blobTypeFields.contains(fieldName)) { - return toBlobType(logicalType); + Set blobFileFields, + Set blobInlineFields) { + if (blobInlineFields.contains(fieldName)) { + return toBlobType(logicalType, false); + } + if (blobFileFields.contains(fieldName)) { + return toBlobType(logicalType, true); } Set vectorFields = CoreOptions.vectorField(options); if (vectorFields.contains(fieldName)) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java index 0447a0c6f904..3c51514bf30d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java @@ -19,11 +19,16 @@ package org.apache.paimon.flink; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Blob; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.RowType; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.DecimalData; @@ -36,15 +41,23 @@ import org.apache.flink.types.variant.BinaryVariant; import org.apache.flink.types.variant.Variant; +import javax.annotation.Nullable; + import static org.apache.paimon.flink.FlinkRowWrapper.fromFlinkRowKind; /** Convert to Flink row data. */ public class FlinkRowData implements RowData { protected InternalRow row; + @Nullable protected final RowType rowType; public FlinkRowData(InternalRow row) { + this(row, null); + } + + public FlinkRowData(InternalRow row, @Nullable RowType rowType) { this.row = row; + this.rowType = rowType; } public FlinkRowData replace(InternalRow row) { @@ -134,7 +147,7 @@ public byte[] getBinary(int pos) { @Override public ArrayData getArray(int pos) { - return new FlinkArrayData(row.getArray(pos)); + return new FlinkArrayData(row.getArray(pos), arrayElementType(typeAt(pos))); } @Override @@ -144,7 +157,10 @@ public MapData getMap(int pos) { @Override public RowData getRow(int pos, int numFields) { - return new FlinkRowData(row.getRow(pos, numFields)); + DataType type = typeAt(pos); + return new FlinkRowData( + row.getRow(pos, numFields), + type != null && type.getTypeRoot() == DataTypeRoot.ROW ? (RowType) type : null); } public Variant getVariant(int pos) { @@ -152,12 +168,30 @@ public Variant getVariant(int pos) { return new BinaryVariant(variant.value(), variant.metadata()); } + @Nullable + private DataType typeAt(int pos) { + return rowType == null ? null : rowType.getTypeAt(pos); + } + + @Nullable + private static DataType arrayElementType(@Nullable DataType type) { + return type != null && type.getTypeRoot() == DataTypeRoot.ARRAY + ? ((ArrayType) type).getElementType() + : null; + } + private static class FlinkArrayData implements ArrayData { private final InternalArray array; + @Nullable private final DataType elementType; private FlinkArrayData(InternalArray array) { + this(array, null); + } + + private FlinkArrayData(InternalArray array, @Nullable DataType elementType) { this.array = array; + this.elementType = elementType; } @Override @@ -232,12 +266,19 @@ public Variant getVariant(int pos) { @Override public byte[] getBinary(int pos) { + if (elementType != null && elementType.getTypeRoot() == DataTypeRoot.BLOB) { + if (array.isNullAt(pos)) { + return null; + } + Blob blob = array.getBlob(pos); + return blob == null ? null : blob.toData(); + } return array.getBinary(pos); } @Override public ArrayData getArray(int pos) { - return new FlinkArrayData(array.getArray(pos)); + return new FlinkArrayData(array.getArray(pos), arrayElementType(elementType)); } @Override @@ -247,7 +288,11 @@ public MapData getMap(int pos) { @Override public RowData getRow(int pos, int numFields) { - return new FlinkRowData(array.getRow(pos, numFields)); + return new FlinkRowData( + array.getRow(pos, numFields), + elementType != null && elementType.getTypeRoot() == DataTypeRoot.ROW + ? (RowType) elementType + : null); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java index bbb84fd77cfd..36926a1901c9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java @@ -21,6 +21,9 @@ import org.apache.paimon.data.Blob; import org.apache.paimon.data.BlobView; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; import java.util.Set; @@ -32,7 +35,15 @@ public class FlinkRowDataWithBlob extends FlinkRowData { public FlinkRowDataWithBlob( InternalRow row, Set blobFields, boolean blobAsDescriptor) { - super(row); + this(row, null, blobFields, blobAsDescriptor); + } + + public FlinkRowDataWithBlob( + InternalRow row, + @Nullable RowType rowType, + Set blobFields, + boolean blobAsDescriptor) { + super(row, rowType); this.blobFields = blobFields; this.blobAsDescriptor = blobAsDescriptor; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java index 556dbd95ff31..6aa6bccdbf56 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java @@ -46,11 +46,35 @@ public static LogicalType toLogicalType(DataType dataType) { return dataType.accept(DataTypeToLogicalType.INSTANCE); } - public static BlobType toBlobType(LogicalType logicalType) { + public static DataType toBlobType(LogicalType logicalType) { + return toBlobType(logicalType, true); + } + + public static DataType toBlobType(LogicalType logicalType, boolean allowArray) { + if (logicalType instanceof org.apache.flink.table.types.logical.ArrayType) { + checkArgument( + allowArray, + "ARRAY is only supported by '" + CoreOptions.BLOB_FIELD.key() + "'."); + LogicalType elementType = + ((org.apache.flink.table.types.logical.ArrayType) logicalType).getElementType(); + checkArgument( + isBinaryType(elementType), + "Expected BinaryType, VarBinaryType or ArrayType with BinaryType or " + + "VarBinaryType element, but got: " + + logicalType); + return new org.apache.paimon.types.ArrayType( + logicalType.isNullable(), new BlobType(elementType.isNullable())); + } checkArgument( - logicalType instanceof BinaryType || logicalType instanceof VarBinaryType, - "Expected BinaryType or VarBinaryType, but got: " + logicalType); - return new BlobType(); + isBinaryType(logicalType), + "Expected BinaryType, VarBinaryType or ArrayType with BinaryType or " + + "VarBinaryType element, but got: " + + logicalType); + return new BlobType(logicalType.isNullable()); + } + + private static boolean isBinaryType(LogicalType logicalType) { + return logicalType instanceof BinaryType || logicalType instanceof VarBinaryType; } public static VectorType toVectorType( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java index 40c89113803c..4bf7f4ccc5e2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java @@ -37,6 +37,7 @@ import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.types.BlobType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeCasts; @@ -499,10 +500,10 @@ private void checkSchema(Table source) { throw new IllegalStateException( "Column not found in target table: " + flinkColumn.getName()); } - if (targetField.type().getTypeRoot() == DataTypeRoot.BLOB + if (BlobType.isBlobFileField(targetField.type()) && !updatableBlobFields.contains(flinkColumn.getName())) { throw new IllegalStateException( - "Should not append/update raw-data BLOB column '" + "Should not append/update raw-data BLOB or ARRAY column '" + flinkColumn.getName() + "' through MERGE INTO. " + "Only descriptor-based BLOB columns (configured via '" diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java index b49b9adb9476..da62817d1678 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java @@ -268,9 +268,11 @@ private class FileStoreRecordIterator implements BulkFormat.RecordIterator recordAndPosition = new MutableRecordAndPosition<>(); + @Nullable private final RowType rowType; private final Set blobFields; private FileStoreRecordIterator(@Nullable RowType rowType) { + this.rowType = rowType; this.blobFields = rowType == null ? Collections.emptySet() : blobFieldIndex(rowType); } @@ -308,8 +310,8 @@ public RecordAndPosition next() { recordAndPosition.setNext( blobFields.isEmpty() - ? new FlinkRowData(row) - : new FlinkRowDataWithBlob(row, blobFields, blobAsDescriptor)); + ? new FlinkRowData(row, rowType) + : new FlinkRowDataWithBlob(row, rowType, blobFields, blobAsDescriptor)); currentNumRead++; if (limiter != null) { limiter.increment(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java index e89a24132f4b..5c7632020cad 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java @@ -33,6 +33,7 @@ import org.apache.paimon.utils.UriReader; import org.apache.paimon.utils.UriReaderFactory; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -63,6 +64,7 @@ protected List ddl() { "CREATE TABLE IF NOT EXISTS blob_table (id INT, data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-compaction.enabled'='true', 'blob-field'='picture')", "CREATE TABLE IF NOT EXISTS blob_table_descriptor (id INT, data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture', 'blob-as-descriptor'='true')", "CREATE TABLE IF NOT EXISTS multiple_blob_table (id INT, data STRING, pic1 BYTES, pic2 BYTES) WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-compaction.enabled'='true', 'blob-field'='pic1,pic2')", + "CREATE TABLE IF NOT EXISTS array_blob_table (id INT, data STRING, pictures ARRAY) WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-compaction.enabled'='true', 'blob-field'='pictures')", String.format( "CREATE TABLE IF NOT EXISTS copy_blob_table (id INT, data STRING, picture BYTES)" + " WITH ('row-tracking.enabled'='true', 'data-evolution.enabled'='true'," @@ -109,6 +111,54 @@ public void testMultipleBlobs() { .isEqualTo(3); } + @Test + public void testArrayBlobField() throws Exception { + org.apache.paimon.types.ArrayType arrayType = + (org.apache.paimon.types.ArrayType) + paimonTable("array_blob_table").rowType().getTypeAt(2); + assertThat(arrayType.getElementType().getTypeRoot()).isEqualTo(DataTypeRoot.BLOB); + + String dataId = + TestValuesTableFactory.registerData( + Arrays.asList( + Row.of( + 1, + "paimon", + new byte[][] { + new byte[] {72, 101, 108, 108, 111}, + null, + new byte[] {89, 69} + }), + Row.of(2, "one", new byte[][] {new byte[] {65, 66, 67}}), + Row.of(3, "null-array", null))); + tEnv.executeSql( + String.format( + "CREATE TEMPORARY TABLE array_blob_source " + + "(id INT, data STRING, pictures ARRAY) " + + "WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", + dataId)) + .await(); + batchSql("INSERT INTO array_blob_table SELECT * FROM array_blob_source"); + + assertThat( + batchSql( + "SELECT id, CARDINALITY(pictures), pictures[1], pictures[2], pictures[3] " + + "FROM array_blob_table ORDER BY id")) + .containsExactly( + Row.of( + 1, + 3, + new byte[] {72, 101, 108, 108, 111}, + null, + new byte[] {89, 69}), + Row.of(2, 1, new byte[] {65, 66, 67}, null, null), + Row.of(3, null, null, null, null)); + assertThat( + batchSql( + "SELECT COUNT(*) > 0 FROM `array_blob_table$files` WHERE file_path LIKE '%%.blob'")) + .containsExactly(Row.of(true)); + } + @Test public void testBlobCompaction() throws Exception { for (int i = 1; i <= 10; i++) { @@ -433,6 +483,34 @@ public void testBlobInlineFieldCanDeclareBlobWithoutBlobField() throws Exception "blob_view_without_blob_field", "blob-view-field"); } + @Test + public void testBlobInlineFieldRejectsArrayBlob() { + assertArrayBlobInlineFieldRejected( + "array_blob_descriptor_reject", "'blob-descriptor-field'='pictures'"); + assertArrayBlobInlineFieldRejected( + "array_blob_view_reject", "'blob-view-field'='pictures'"); + assertArrayBlobInlineFieldRejected( + "array_blob_external_reject", + String.format( + "'blob-descriptor-field'='pictures'," + + " 'blob-external-storage-field'='pictures'," + + " 'blob-external-storage-path'='%s'", + warehouse.resolve("array-blob-external-reject"))); + } + + private void assertArrayBlobInlineFieldRejected(String tableName, String blobOptions) { + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "CREATE TABLE %s (id INT, pictures ARRAY)" + + " WITH ('row-tracking.enabled'='true'," + + " 'data-evolution.enabled'='true', %s)", + tableName, blobOptions)) + .await()) + .hasRootCauseMessage("ARRAY is only supported by 'blob-field'."); + } + private void assertSecondaryBlobFieldCanDeclareBlobWithoutBlobField( String tableName, String optionKey) throws Exception { tEnv.executeSql( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java index 06b08d240182..8c938e41ef10 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java @@ -24,6 +24,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.types.Row; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -717,8 +718,9 @@ public void testUpdateRawBlobColumnThrowsError() throws Exception { Throwable t = Assertions.assertThrows(IllegalStateException.class, () -> action.run()); Assertions.assertTrue( - t.getMessage().contains("raw-data BLOB column"), - "Expected error about raw-data BLOB column but got: " + t.getMessage()); + t.getMessage().contains("raw-data BLOB or ARRAY column"), + "Expected error about raw-data BLOB or ARRAY column but got: " + + t.getMessage()); } @Test @@ -773,6 +775,84 @@ public void testUpdateNonBlobColumnOnRawBlobTableWithSplitFiles() throws Excepti testBatchRead("SELECT id, name FROM RAW_BLOB_SPLIT_T ORDER BY id", expected); } + @Test + public void testUpdateNonBlobColumnOnRawArrayBlobTableWithSplitFiles() throws Exception { + sEnv.executeSql( + buildDdl( + "RAW_ARRAY_BLOB_SPLIT_T", + Arrays.asList("id INT", "name STRING", "pictures ARRAY"), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put(ROW_TRACKING_ENABLED.key(), "true"); + put(DATA_EVOLUTION_ENABLED.key(), "true"); + put("blob-field", "pictures"); + put(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "1 b"); + put("sink.parallelism", "1"); + } + })); + String dataId = + TestValuesTableFactory.registerData( + Arrays.asList( + Row.of( + 1, + "name1", + new byte[][] { + new byte[] {72, 101, 108, 108, 111}, + new byte[] {89, 69} + }), + Row.of(2, "name2", new byte[][] {new byte[] {65, 66, 67}}), + Row.of(3, "name3", null))); + sEnv.executeSql( + String.format( + "CREATE TEMPORARY TABLE RAW_ARRAY_BLOB_SPLIT_SOURCE " + + "(id INT, name STRING, pictures ARRAY) " + + "WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", + dataId)) + .await(); + sEnv.executeSql( + "INSERT INTO RAW_ARRAY_BLOB_SPLIT_T " + + "SELECT * FROM RAW_ARRAY_BLOB_SPLIT_SOURCE") + .await(); + testBatchRead( + "SELECT COUNT(*) > 1 FROM `RAW_ARRAY_BLOB_SPLIT_T$files` " + + "WHERE file_path LIKE '%.blob'", + Collections.singletonList(changelogRow("+I", true))); + + sEnv.executeSql( + buildDdl( + "RAW_ARRAY_BLOB_SPLIT_S", + Arrays.asList("id INT", "name STRING"), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyMap())); + insertInto("RAW_ARRAY_BLOB_SPLIT_S", "(1, 'updated_name1')"); + + builder(warehouse, database, "RAW_ARRAY_BLOB_SPLIT_T") + .withMergeCondition("RAW_ARRAY_BLOB_SPLIT_T.id=RAW_ARRAY_BLOB_SPLIT_S.id") + .withMatchedUpdateSet("RAW_ARRAY_BLOB_SPLIT_T.name=RAW_ARRAY_BLOB_SPLIT_S.name") + .withSourceTable("RAW_ARRAY_BLOB_SPLIT_S") + .withSinkParallelism(1) + .build() + .run(); + + List expected = + Arrays.asList( + changelogRow( + "+I", + 1, + "updated_name1", + new byte[] {72, 101, 108, 108, 111}, + new byte[] {89, 69}), + changelogRow("+I", 2, "name2", new byte[] {65, 66, 67}, null), + changelogRow("+I", 3, "name3", null, null)); + testBatchRead( + "SELECT id, name, pictures[1], pictures[2] " + + "FROM RAW_ARRAY_BLOB_SPLIT_T ORDER BY id", + expected); + } + @Test public void testUpdateNonBlobColumnOnDescriptorBlobTableSucceeds() throws Exception { // Create a table with descriptor BLOB column. diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java index c65f76eeaa9d..38073fada2ea 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java @@ -33,6 +33,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.FileRecordReader; import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.IOUtils; @@ -44,6 +45,7 @@ import java.util.List; import java.util.Optional; +import static org.apache.paimon.types.BlobType.isBlobFileField; import static org.apache.paimon.utils.Preconditions.checkArgument; /** {@link FileFormat} for blob file. */ @@ -87,8 +89,8 @@ public FormatWriterFactory createWriterFactory(RowType type) { public void validateDataFields(RowType rowType) { checkArgument(rowType.getFieldCount() == 1, "BlobFileFormat only support one field."); checkArgument( - rowType.getField(0).type().getTypeRoot() == DataTypeRoot.BLOB, - "BlobFileFormat only support blob type."); + isBlobFileField(rowType.getField(0).type()), + "BlobFileFormat only support blob type or array of blob type."); } @Override @@ -117,6 +119,7 @@ private static class BlobFormatReaderFactory implements FormatReaderFactory { private final boolean blobAsDescriptor; private final int fieldCount; private final int blobIndex; + private final DataType blobFieldType; public BlobFormatReaderFactory(boolean blobAsDescriptor, RowType projectedRowType) { this.blobAsDescriptor = blobAsDescriptor; @@ -125,6 +128,7 @@ public BlobFormatReaderFactory(boolean blobAsDescriptor, RowType projectedRowTyp Preconditions.checkState( this.blobIndex >= 0, "Read type of a blob format does not contain any blob field."); + this.blobFieldType = projectedRowType.getTypeAt(this.blobIndex); } @Override @@ -137,18 +141,26 @@ public FileRecordReader createReader(Context context) throws IOExce in = fileIO.newInputStream(filePath); fileMeta = new BlobFileMeta(in, context.fileSize(), context.selection()); } finally { - if (blobAsDescriptor) { + if (blobAsDescriptor && blobFieldType.getTypeRoot() != DataTypeRoot.ARRAY) { IOUtils.closeQuietly(in); in = null; } } - return new BlobFormatReader(fileIO, filePath, fileMeta, in, fieldCount, blobIndex); + return new BlobFormatReader( + fileIO, + filePath, + fileMeta, + in, + fieldCount, + blobIndex, + blobFieldType, + blobAsDescriptor); } private static int findBlobFieldIndex(RowType rowType) { for (int i = 0; i < rowType.getFieldCount(); i++) { - if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) { + if (isBlobFileField(rowType.getTypeAt(i))) { return i; } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java index 73222ea575c9..5f3bf55452d6 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java @@ -19,7 +19,9 @@ package org.apache.paimon.format.blob; import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobArrayPlaceholder; import org.apache.paimon.data.BlobPlaceholder; +import org.apache.paimon.data.GenericArray; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.FileIO; @@ -27,11 +29,16 @@ import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.reader.FileRecordIterator; import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.utils.DeltaVarintCompressor; import org.apache.paimon.utils.IOUtils; import javax.annotation.Nullable; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; /** {@link FileRecordReader} for blob file. */ public class BlobFormatReader implements FileRecordReader { @@ -43,6 +50,9 @@ public class BlobFormatReader implements FileRecordReader { private final @Nullable SeekableInputStream in; private final int fieldCount; private final int blobIndex; + private final DataType blobFieldType; + private final Object blobPlaceholder; + private final boolean blobAsDescriptor; private boolean returned; @@ -52,7 +62,9 @@ public BlobFormatReader( BlobFileMeta fileMeta, @Nullable SeekableInputStream in, int fieldCount, - int blobIndex) { + int blobIndex, + DataType blobFieldType, + boolean blobAsDescriptor) { this.fileIO = fileIO; this.filePath = filePath; this.filePathString = filePath.toString(); @@ -60,6 +72,12 @@ public BlobFormatReader( this.in = in; this.fieldCount = fieldCount; this.blobIndex = blobIndex; + this.blobFieldType = blobFieldType; + this.blobPlaceholder = + blobFieldType.getTypeRoot() == DataTypeRoot.ARRAY + ? BlobArrayPlaceholder.INSTANCE + : BlobPlaceholder.INSTANCE; + this.blobAsDescriptor = blobAsDescriptor; this.returned = false; } @@ -92,23 +110,23 @@ public InternalRow next() { return null; } - Blob blob; + Object field; if (fileMeta.isNull(currentPosition)) { - blob = null; + field = null; } else if (fileMeta.isPlaceHolder(currentPosition)) { - blob = BlobPlaceholder.INSTANCE; + field = blobPlaceholder; } else { long offset = fileMeta.blobOffset(currentPosition) + 4; long length = fileMeta.blobLength(currentPosition) - 16; - if (in != null) { - blob = Blob.fromData(readInlineBlob(in, offset, length)); + if (blobFieldType.getTypeRoot() == DataTypeRoot.ARRAY) { + field = readBlobArray(in, offset, length); } else { - blob = Blob.fromFile(fileIO, filePathString, offset, length); + field = readBlob(in, offset, length); } } currentPosition++; GenericRow row = new GenericRow(fieldCount); - row.setField(blobIndex, blob); + row.setField(blobIndex, field); return row; } @@ -117,6 +135,13 @@ public void releaseBatch() {} }; } + private Blob readBlob(@Nullable SeekableInputStream in, long position, long length) { + if (in != null && !blobAsDescriptor) { + return Blob.fromData(readInlineBlob(in, position, length)); + } + return Blob.fromFile(fileIO, filePathString, position, length); + } + @Override public void close() throws IOException { IOUtils.closeQuietly(in); @@ -132,4 +157,66 @@ private static byte[] readInlineBlob(SeekableInputStream in, long position, long } return blobData; } + + private GenericArray readBlobArray(SeekableInputStream in, long position, long length) { + if (in == null) { + throw new IllegalStateException("Input stream must be available for ARRAY."); + } + + try { + byte[] header = new byte[9]; + in.seek(position); + IOUtils.readFully(in, header); + ByteBuffer headerBuffer = littleEndianBuffer(header); + int magic = headerBuffer.getInt(); + if (magic != BlobFormatWriter.ARRAY_MAGIC_NUMBER) { + throw new IllegalArgumentException( + "Invalid ARRAY payload magic number: " + magic); + } + byte version = headerBuffer.get(); + if (version > BlobFormatWriter.ARRAY_VERSION) { + throw new UnsupportedOperationException( + "Unsupported ARRAY payload version: " + version); + } + int elementCount = headerBuffer.getInt(); + + long elementIndexLengthPosition = position + length - Integer.BYTES; + byte[] indexLengthBytes = new byte[Integer.BYTES]; + in.seek(elementIndexLengthPosition); + IOUtils.readFully(in, indexLengthBytes); + int elementIndexLength = littleEndianBuffer(indexLengthBytes).getInt(); + + byte[] elementIndexBytes = new byte[elementIndexLength]; + in.seek(elementIndexLengthPosition - elementIndexLength); + IOUtils.readFully(in, elementIndexBytes); + long[] elementLengths = DeltaVarintCompressor.decompress(elementIndexBytes); + if (elementLengths.length != elementCount) { + throw new IllegalArgumentException( + "ARRAY element count does not match element index length."); + } + + Object[] blobs = new Object[elementCount]; + long elementOffset = position + header.length; + for (int i = 0; i < elementCount; i++) { + long elementLength = elementLengths[i]; + if (elementLength == BlobFormatWriter.ARRAY_NULL_ELEMENT_LENGTH) { + blobs[i] = null; + } else if (blobAsDescriptor) { + blobs[i] = Blob.fromFile(fileIO, filePathString, elementOffset, elementLength); + elementOffset += elementLength; + } else { + blobs[i] = Blob.fromData(readInlineBlob(in, elementOffset, elementLength)); + elementOffset += elementLength; + } + } + + return new GenericArray(blobs); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static ByteBuffer littleEndianBuffer(byte[] bytes) { + return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); + } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java index f97158c17186..a02926787b1e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java @@ -19,15 +19,19 @@ package org.apache.paimon.format.blob; import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobArrayPlaceholder; import org.apache.paimon.data.BlobConsumer; import org.apache.paimon.data.BlobDescriptor; import org.apache.paimon.data.BlobPlaceholder; +import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.FileAwareFormatWriter; import org.apache.paimon.format.FormatWriter; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.DeltaVarintCompressor; import org.apache.paimon.utils.LongArrayList; @@ -47,12 +51,17 @@ public class BlobFormatWriter implements FileAwareFormatWriter { public static final byte VERSION = 1; public static final int MAGIC_NUMBER = 1481511375; public static final byte[] MAGIC_NUMBER_BYTES = intToLittleEndian(MAGIC_NUMBER); + public static final byte ARRAY_VERSION = 1; + public static final int ARRAY_MAGIC_NUMBER = 1094861634; + public static final byte[] ARRAY_MAGIC_NUMBER_BYTES = intToLittleEndian(ARRAY_MAGIC_NUMBER); public static final long NULL_LENGTH = -1L; public static final long PLACE_HOLDER_LENGTH = -2L; + public static final long ARRAY_NULL_ELEMENT_LENGTH = -1L; private final PositionOutputStream out; @Nullable private final BlobConsumer writeConsumer; private final String blobFieldName; + private final DataType blobFieldType; private final CRC32 crc32; private final byte[] tmpBuffer; private final LongArrayList lengths; @@ -65,6 +74,7 @@ public BlobFormatWriter( this.writeConsumer = writeConsumer; checkArgument(type.getFieldCount() == 1, "BlobFormatWriter only support one field."); this.blobFieldName = type.getFieldNames().get(0); + this.blobFieldType = type.getTypeAt(0); this.crc32 = new CRC32(); this.tmpBuffer = new byte[4096]; this.lengths = new LongArrayList(16); @@ -90,27 +100,34 @@ public void addElement(InternalRow element) throws IOException { } return; } + + if (blobFieldType.getTypeRoot() == DataTypeRoot.ARRAY) { + InternalArray array = element.getArray(0); + if (array == BlobArrayPlaceholder.INSTANCE) { + lengths.add(PLACE_HOLDER_LENGTH); + } else { + addBlobArray(array); + } + return; + } + Blob blob = element.getBlob(0); if (blob == BlobPlaceholder.INSTANCE) { lengths.add(PLACE_HOLDER_LENGTH); return; } + addBlob(blob); + } + + private void addBlob(Blob blob) throws IOException { long previousPos = out.getPos(); crc32.reset(); write(MAGIC_NUMBER_BYTES); - long blobPos = out.getPos(); - try (SeekableInputStream in = blob.newInputStream()) { - int bytesRead = in.read(tmpBuffer); - while (bytesRead >= 0) { - write(tmpBuffer, bytesRead); - bytesRead = in.read(tmpBuffer); - } - } + BlobDescriptor descriptor = writeBlobData(blob); - long blobLength = out.getPos() - blobPos; long binLength = out.getPos() - previousPos + 12; lengths.add(binLength); byte[] lenBytes = longToLittleEndian(binLength); @@ -119,7 +136,6 @@ public void addElement(InternalRow element) throws IOException { out.write(intToLittleEndian(crcValue)); if (writeConsumer != null) { - BlobDescriptor descriptor = new BlobDescriptor(pathString, blobPos, blobLength); boolean flush = writeConsumer.accept(blobFieldName, descriptor); if (flush) { out.flush(); @@ -127,6 +143,60 @@ public void addElement(InternalRow element) throws IOException { } } + private void addBlobArray(InternalArray array) throws IOException { + long previousPos = out.getPos(); + crc32.reset(); + + write(MAGIC_NUMBER_BYTES); + + write(ARRAY_MAGIC_NUMBER_BYTES); + write(new byte[] {ARRAY_VERSION}); + write(intToLittleEndian(array.size())); + + long[] elementLengths = new long[array.size()]; + boolean flush = false; + for (int i = 0; i < array.size(); i++) { + if (array.isNullAt(i)) { + elementLengths[i] = ARRAY_NULL_ELEMENT_LENGTH; + continue; + } + + Blob blob = array.getBlob(i); + BlobDescriptor descriptor = writeBlobData(blob); + elementLengths[i] = descriptor.length(); + if (writeConsumer != null) { + flush |= writeConsumer.accept(blobFieldName, descriptor); + } + } + + byte[] elementIndexBytes = DeltaVarintCompressor.compress(elementLengths); + write(elementIndexBytes); + write(intToLittleEndian(elementIndexBytes.length)); + + long binLength = out.getPos() - previousPos + 12; + lengths.add(binLength); + write(longToLittleEndian(binLength)); + int crcValue = (int) crc32.getValue(); + out.write(intToLittleEndian(crcValue)); + + if (flush) { + out.flush(); + } + } + + private BlobDescriptor writeBlobData(Blob blob) throws IOException { + long blobPos = out.getPos(); + try (SeekableInputStream in = blob.newInputStream()) { + int bytesRead = in.read(tmpBuffer); + while (bytesRead >= 0) { + write(tmpBuffer, bytesRead); + bytesRead = in.read(tmpBuffer); + } + } + + return new BlobDescriptor(pathString, blobPos, out.getPos() - blobPos); + } + private void write(byte[] bytes) throws IOException { write(bytes, bytes.length); } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java index 6c7542bfcb05..d54a9b299249 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java @@ -19,10 +19,13 @@ package org.apache.paimon.format.blob; import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobArrayPlaceholder; import org.apache.paimon.data.BlobData; import org.apache.paimon.data.BlobPlaceholder; import org.apache.paimon.data.BlobRef; +import org.apache.paimon.data.GenericArray; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.FormatReaderFactory; @@ -34,6 +37,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.RoaringBitmap32; import org.junit.jupiter.api.BeforeEach; @@ -74,6 +78,39 @@ public void testReadBlobInlineBytes() throws IOException { innerTest(false); } + @Test + public void testArrayBlobAsDescriptor() throws IOException { + innerTestArray(true); + } + + @Test + public void testReadArrayBlobInlineBytes() throws IOException { + innerTestArray(false); + } + + @Test + public void testWriteArrayBlobPlaceholderWithProjectedRow() throws IOException { + BlobFileFormat format = new BlobFileFormat(); + RowType rowType = RowType.of(DataTypes.ARRAY(DataTypes.BLOB())); + + try (PositionOutputStream out = fileIO.newOutputStream(file, false)) { + FormatWriter formatWriter = format.createWriterFactory(rowType).create(out, null); + ProjectedRow projectedRow = ProjectedRow.from(new int[] {1}); + formatWriter.addElement( + projectedRow.replaceRow(GenericRow.of(0, BlobArrayPlaceholder.INSTANCE))); + formatWriter.close(); + } + + FormatReaderFactory readerFactory = format.createReaderFactory(null, rowType, null); + FormatReaderContext context = + new FormatReaderContext(fileIO, file, fileIO.getFileSize(file)); + List rows = new ArrayList<>(); + readerFactory.createReader(context).forEachRemaining(rows::add); + + assertThat(rows).hasSize(1); + assertThat(rows.get(0).getArray(0)).isSameAs(BlobArrayPlaceholder.INSTANCE); + } + private void innerTest(boolean blobAsDescriptor) throws IOException { BlobFileFormat format = new BlobFileFormat(blobAsDescriptor); RowType rowType = RowType.of(DataTypes.BLOB()); @@ -184,4 +221,82 @@ public void testReadWithProjectedRowTypeContainingExtraFields() throws IOExcepti assertThat(rows.get(0).getBlob(1).toData()).isEqualTo("hello".getBytes()); assertThat(rows.get(1).getBlob(1).toData()).isEqualTo("world".getBytes()); } + + private void innerTestArray(boolean blobAsDescriptor) throws IOException { + BlobFileFormat format = new BlobFileFormat(blobAsDescriptor); + RowType rowType = RowType.of(DataTypes.ARRAY(DataTypes.BLOB())); + + GenericArray first = + new GenericArray( + new Object[] { + new BlobData("hello".getBytes()), null, new BlobData("world".getBytes()) + }); + GenericArray empty = new GenericArray(new Object[0]); + List arrays = Arrays.asList(first, null, BlobArrayPlaceholder.INSTANCE, empty); + + try (PositionOutputStream out = fileIO.newOutputStream(file, false)) { + FormatWriter formatWriter = format.createWriterFactory(rowType).create(out, null); + for (Object array : arrays) { + formatWriter.addElement(GenericRow.of(array)); + } + formatWriter.close(); + } + + FormatReaderFactory readerFactory = format.createReaderFactory(null, rowType, null); + FormatReaderContext context = + new FormatReaderContext(fileIO, file, fileIO.getFileSize(file)); + List result = new ArrayList<>(); + readerFactory + .createReader(context) + .forEachRemaining( + row -> { + if (row.isNullAt(0)) { + result.add(null); + } else { + addArrayBlobResult(row, blobAsDescriptor, result); + } + }); + + assertThat(result).hasSize(4); + assertThat((byte[]) ((Object[]) result.get(0))[0]).isEqualTo("hello".getBytes()); + assertThat(((Object[]) result.get(0))[1]).isNull(); + assertThat((byte[]) ((Object[]) result.get(0))[2]).isEqualTo("world".getBytes()); + assertThat(result.get(1)).isNull(); + assertThat(result.get(2)).isSameAs(BlobArrayPlaceholder.INSTANCE); + assertThat((Object[]) result.get(3)).isEmpty(); + + RoaringBitmap32 selection = new RoaringBitmap32(); + selection.add(0); + context = new FormatReaderContext(fileIO, file, fileIO.getFileSize(file), selection); + result.clear(); + readerFactory + .createReader(context) + .forEachRemaining(row -> result.add(row.getArray(0).getBlob(0).toData())); + assertThat(result).hasSize(1); + assertThat((byte[]) result.get(0)).isEqualTo("hello".getBytes()); + } + + private static void addArrayBlobResult( + InternalRow row, boolean blobAsDescriptor, List result) { + InternalArray array = row.getArray(0); + if (array == BlobArrayPlaceholder.INSTANCE) { + result.add(BlobArrayPlaceholder.INSTANCE); + return; + } + Object[] bytes = new Object[array.size()]; + for (int i = 0; i < array.size(); i++) { + if (array.isNullAt(i)) { + bytes[i] = null; + } else { + Blob blob = array.getBlob(i); + if (blobAsDescriptor) { + assertThat(blob).isInstanceOf(BlobRef.class); + } else { + assertThat(blob).isInstanceOf(BlobData.class); + } + bytes[i] = blob.toData(); + } + } + result.add(bytes); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java index 5c8026f461df..6b278ee17eaf 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java @@ -19,6 +19,7 @@ package org.apache.paimon.spark; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Blob; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; @@ -67,6 +68,8 @@ public static Object fromPaimon(Object o, DataType type) { return fromPaimon((InternalMap) o, type); case ROW: return fromPaimon((InternalRow) o, (RowType) type); + case BLOB: + return ((Blob) o).toData(); default: return o; } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 165be98980d9..dc251723d868 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -76,6 +76,7 @@ import org.apache.spark.sql.execution.datasources.FileFormat; import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2; import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.BinaryType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -607,13 +608,10 @@ private Schema toInitialSchema( for (StructField field : schema.fields()) { String name = field.name(); DataType type; - if (blobFields.contains(name) - || blobDescriptorFields.contains(name) - || blobViewFields.contains(name)) { - checkArgument( - field.dataType() instanceof org.apache.spark.sql.types.BinaryType, - "The type of blob field must be binary"); - type = new BlobType(); + if (blobDescriptorFields.contains(name) || blobViewFields.contains(name)) { + type = toBlobType(field, false); + } else if (blobFields.contains(name)) { + type = toBlobType(field, true); } else if (vectorFields.contains(field.name())) { Preconditions.checkArgument( field.dataType() instanceof ArrayType, @@ -645,6 +643,26 @@ private Schema toInitialSchema( return schemaBuilder.build(); } + private static DataType toBlobType(StructField field, boolean allowArray) { + org.apache.spark.sql.types.DataType sparkType = field.dataType(); + if (sparkType instanceof BinaryType) { + return new BlobType(field.nullable()); + } + if (sparkType instanceof ArrayType) { + checkArgument( + allowArray, + "ARRAY is only supported by '" + CoreOptions.BLOB_FIELD.key() + "'."); + ArrayType arrayType = (ArrayType) sparkType; + checkArgument( + arrayType.elementType() instanceof BinaryType, + "The element type of array blob field must be binary"); + return new org.apache.paimon.types.ArrayType( + field.nullable(), new BlobType(arrayType.containsNull())); + } + throw new IllegalArgumentException( + "The type of blob field must be binary or array of binary"); + } + private void validateAlterProperty(String alterKey) { if (PRIMARY_KEY_IDENTIFIER.equals(alterKey)) { throw new UnsupportedOperationException("Alter primary key is not supported"); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala index c49368d1bf38..3ae92161921b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala @@ -24,8 +24,8 @@ import org.apache.paimon.spark.write.{DataEvolutionTableDataWrite, WriteHelper, import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink._ import org.apache.paimon.table.source.DataSplit +import org.apache.paimon.types.BlobType import org.apache.paimon.types.DataType -import org.apache.paimon.types.DataTypeRoot.BLOB import org.apache.paimon.types.VectorType.isVectorStoreFile import org.apache.paimon.utils.SerializationUtils @@ -52,10 +52,10 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable, dataSplits: Se val options = new CoreOptions(table.schema().options()) val updatableBlobFields = options.updatableBlobFields() val hasRawDataBlob = writeType.getFields.asScala.exists( - f => f.`type`().is(BLOB) && !updatableBlobFields.contains(f.name())) + f => BlobType.isBlobFileField(f.`type`()) && !updatableBlobFields.contains(f.name())) if (hasRawDataBlob) { throw new UnsupportedOperationException( - "DataEvolution does not support writing partial columns with raw-data BLOB type. " + + "DataEvolution does not support writing partial columns with raw-data BLOB or ARRAY type. " + "Only descriptor-based BLOB columns (configured via '" + CoreOptions.BLOB_DESCRIPTOR_FIELD.key() + "' or '" + CoreOptions.BLOB_VIEW_FIELD.key() + "' or '" + diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkArrayData.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkArrayData.scala index 790d273d0c93..ff91dfd8c23e 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkArrayData.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkArrayData.scala @@ -20,7 +20,7 @@ package org.apache.paimon.spark.data import org.apache.paimon.data.InternalArray import org.apache.paimon.spark.DataConverter -import org.apache.paimon.types.{ArrayType => PaimonArrayType, BigIntType, DataType => PaimonDataType, DataTypeChecks, RowType} +import org.apache.paimon.types.{ArrayType => PaimonArrayType, BigIntType, BlobType, DataType => PaimonDataType, DataTypeChecks, RowType} import org.apache.paimon.utils.InternalRowUtils import org.apache.spark.sql.catalyst.InternalRow @@ -91,7 +91,15 @@ abstract class AbstractSparkArrayData extends SparkArrayData { override def getUTF8String(ordinal: Int): UTF8String = DataConverter.fromPaimon(paimonArray.getString(ordinal)) - override def getBinary(ordinal: Int): Array[Byte] = paimonArray.getBinary(ordinal) + override def getBinary(ordinal: Int): Array[Byte] = elementType match { + case _: BlobType => + if (paimonArray.isNullAt(ordinal)) { + null + } else { + paimonArray.getBlob(ordinal).toData + } + case _ => paimonArray.getBinary(ordinal) + } override def getInterval(ordinal: Int): CalendarInterval = throw new UnsupportedOperationException() diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala index 9c912e027343..e0c8caee7726 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala @@ -77,6 +77,73 @@ class BlobTestBase extends PaimonSparkTestBase { } } + test("Blob: test array blob") { + withTable("t") { + sql("CREATE TABLE t (id INT, data STRING, pictures ARRAY) TBLPROPERTIES (" + + "'row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='pictures')") + sql( + "INSERT INTO t VALUES " + + "(1, 'paimon', array(X'48656C6C6F', CAST(NULL AS BINARY), X'5945')), " + + "(2, 'one', array(X'414243')), " + + "(3, 'null-array', CAST(NULL AS ARRAY))") + + val pictures = sql("SELECT pictures FROM t WHERE id = 1") + .collect()(0) + .getSeq[Array[Byte]](0) + assert(pictures.size == 3) + assert(util.Arrays.equals(pictures.head, Array[Byte](72, 101, 108, 108, 111))) + assert(pictures(1) == null) + assert(util.Arrays.equals(pictures(2), Array[Byte](89, 69))) + + val first = + sql("SELECT id, size(pictures), pictures[0], pictures[1], pictures[2] FROM t WHERE id = 1") + .collect()(0) + assert(first.getInt(0) == 1) + assert(first.getInt(1) == 3) + assert(util.Arrays.equals(first.getAs[Array[Byte]](2), Array[Byte](72, 101, 108, 108, 111))) + assert(first.isNullAt(3)) + assert(util.Arrays.equals(first.getAs[Array[Byte]](4), Array[Byte](89, 69))) + + val second = sql("SELECT id, size(pictures), pictures[0] FROM t WHERE id = 2").collect()(0) + assert(second.getInt(0) == 2) + assert(second.getInt(1) == 1) + assert(util.Arrays.equals(second.getAs[Array[Byte]](2), Array[Byte](65, 66, 67))) + + checkAnswer( + sql("SELECT id, pictures IS NULL FROM t WHERE id = 3"), + Seq(Row(3, true)) + ) + checkAnswer( + sql("SELECT COUNT(*) > 0 FROM `t$files` WHERE file_path LIKE '%.blob'"), + Seq(Row(true)) + ) + } + } + + test("Blob: array blob inline fields are rejected") { + Seq( + ("array_blob_descriptor_reject", "'blob-descriptor-field'='pictures'"), + ("array_blob_view_reject", "'blob-view-field'='pictures'"), + ( + "array_blob_external_reject", + s"'blob-descriptor-field'='pictures', " + + s"'blob-external-storage-field'='pictures', " + + s"'blob-external-storage-path'='${tempDBDir.getCanonicalPath}/array_blob_external'") + ).foreach { + case (tableName, blobOptions) => + withTable(tableName) { + val error = intercept[Exception] { + sql( + s"CREATE TABLE $tableName (id INT, pictures ARRAY) TBLPROPERTIES (" + + s"'row-tracking.enabled'='true', 'data-evolution.enabled'='true', $blobOptions)") + } + assert( + containsMessage(error, "ARRAY is only supported by 'blob-field'."), + throwableMessages(error)) + } + } + } + test("Blob: test write blob descriptor") { withTable("t") { val blobData = new Array[Byte](1024 * 1024) @@ -480,6 +547,54 @@ class BlobTestBase extends PaimonSparkTestBase { } } + test("Blob: merge-into updates non-blob column on raw array blob table with split files") { + withTable("s", "t") { + sql( + "CREATE TABLE t (id INT, name STRING, pictures ARRAY) TBLPROPERTIES " + + "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', " + + "'blob-field'='pictures', 'blob.target-file-size'='1 b')") + sql( + "INSERT INTO t VALUES " + + "(1, 'name1', array(X'48656C6C6F', X'5945')), " + + "(2, 'name2', array(X'414243')), " + + "(3, 'name3', CAST(NULL AS ARRAY))") + + checkAnswer( + sql("SELECT COUNT(*) > 1 FROM `t$files` WHERE file_path LIKE '%.blob'"), + Seq(Row(true)) + ) + + sql("CREATE TABLE s (id INT, name STRING)") + sql("INSERT INTO s VALUES (1, 'updated_name1')") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.name = s.name + |""".stripMargin) + + checkAnswer( + sql("SELECT id, name FROM t ORDER BY id"), + Seq(Row(1, "updated_name1"), Row(2, "name2"), Row(3, "name3")) + ) + + val first = sql("SELECT name, pictures[0], pictures[1] FROM t WHERE id = 1").collect()(0) + assert(first.getString(0) == "updated_name1") + assert(util.Arrays.equals(first.getAs[Array[Byte]](1), Array[Byte](72, 101, 108, 108, 111))) + assert(util.Arrays.equals(first.getAs[Array[Byte]](2), Array[Byte](89, 69))) + + val second = sql("SELECT pictures[0], pictures[1] FROM t WHERE id = 2").collect()(0) + assert(util.Arrays.equals(second.getAs[Array[Byte]](0), Array[Byte](65, 66, 67))) + assert(second.isNullAt(1)) + + checkAnswer( + sql("SELECT pictures IS NULL FROM t WHERE id = 3"), + Seq(Row(true)) + ) + } + } + test("Blob: self merge reads raw blob column to update non-blob column") { withTable("t") { sql( @@ -575,6 +690,30 @@ class BlobTestBase extends PaimonSparkTestBase { } } + private def containsMessage(error: Throwable, expected: String): Boolean = { + var current = error + while (current != null) { + if (current.getMessage != null && current.getMessage.contains(expected)) { + return true + } + current = current.getCause + } + false + } + + private def throwableMessages(error: Throwable): String = { + val builder = new StringBuilder + var current = error + while (current != null) { + if (builder.nonEmpty) { + builder.append(" -> ") + } + builder.append(current.getClass.getName).append(": ").append(current.getMessage) + current = current.getCause + } + builder.toString() + } + private val HEX_ARRAY = "0123456789ABCDEF".toCharArray def bytesToHex(bytes: Array[Byte]): String = {