Skip to content

Commit 5012a79

Browse files
committed
[core][flink][spark] Support ARRAY<BLOB> blob files
1 parent 773f942 commit 5012a79

24 files changed

Lines changed: 612 additions & 59 deletions

File tree

paimon-api/src/main/java/org/apache/paimon/types/BlobType.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,21 @@ public static List<DataField> fieldsInBlobFile(RowType rowType, Set<String> desc
103103
rowType.getFields()
104104
.forEach(
105105
field -> {
106-
DataTypeRoot type = field.type().getTypeRoot();
107-
if (type == DataTypeRoot.BLOB
106+
if (isBlobFileField(field.type())
108107
&& !descriptorFields.contains(field.name())) {
109108
result.add(field);
110109
}
111110
});
112111
return result;
113112
}
113+
114+
public static boolean isBlobFileField(DataType type) {
115+
if (type.getTypeRoot() == DataTypeRoot.BLOB) {
116+
return true;
117+
}
118+
if (type.getTypeRoot() == DataTypeRoot.ARRAY) {
119+
return ((ArrayType) type).getElementType().getTypeRoot() == DataTypeRoot.BLOB;
120+
}
121+
return false;
122+
}
114123
}

paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ static Class<?> getDataClass(DataType type) {
153153
return InternalMap.class;
154154
case ROW:
155155
return InternalRow.class;
156+
case BLOB:
157+
return Blob.class;
156158
default:
157159
throw new IllegalArgumentException("Illegal type: " + type);
158160
}

paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
import static java.util.Comparator.comparingLong;
5555
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
5656
import static org.apache.paimon.manifest.ManifestFileMeta.allContainsRowId;
57-
import static org.apache.paimon.types.DataTypeRoot.BLOB;
57+
import static org.apache.paimon.types.BlobType.isBlobFileField;
5858
import static org.apache.paimon.types.VectorType.isVectorStoreFile;
5959
import static org.apache.paimon.utils.Preconditions.checkArgument;
6060

@@ -100,7 +100,7 @@ public DataEvolutionCompactCoordinator(
100100
? table.rowType().getFields().stream()
101101
.filter(
102102
field ->
103-
field.type().is(BLOB)
103+
isBlobFileField(field.type())
104104
&& !blobInlineFields.contains(
105105
field.name()))
106106
.map(DataField::id)

paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,9 @@ public void releaseBatch() {
188188
}
189189

190190
private boolean isPlaceHolder(InternalRow row) {
191+
if (row instanceof GenericRow) {
192+
return ((GenericRow) row).getField(blobIndex) == BlobPlaceholder.INSTANCE;
193+
}
191194
return !row.isNullAt(blobIndex) && row.getBlob(blobIndex) == BlobPlaceholder.INSTANCE;
192195
}
193196

paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,13 @@
2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.data.BlobConsumer;
2323
import org.apache.paimon.types.DataField;
24-
import org.apache.paimon.types.DataTypeRoot;
2524
import org.apache.paimon.types.RowType;
2625

2726
import javax.annotation.Nullable;
2827

2928
import java.util.Set;
3029

31-
import static org.apache.paimon.types.DataTypeRoot.BLOB;
30+
import static org.apache.paimon.types.BlobType.isBlobFileField;
3231

3332
/** Context for blob file. */
3433
public class BlobFileContext {
@@ -53,7 +52,7 @@ private BlobFileContext(
5352

5453
@Nullable
5554
public static BlobFileContext create(RowType rowType, CoreOptions options) {
56-
if (rowType.getFieldTypes().stream().noneMatch(t -> t.is(BLOB))) {
55+
if (rowType.getFieldTypes().stream().noneMatch(BlobFileContext::hasBlobField)) {
5756
return null;
5857
}
5958
Set<String> descriptorFields = options.blobDescriptorField();
@@ -62,8 +61,7 @@ public static BlobFileContext create(RowType rowType, CoreOptions options) {
6261
String externalStoragePath = options.blobExternalStoragePath();
6362
boolean requireBlobFile = false;
6463
for (DataField field : rowType.getFields()) {
65-
DataTypeRoot type = field.type().getTypeRoot();
66-
if (type == DataTypeRoot.BLOB
64+
if (isBlobFileField(field.type())
6765
&& (!inlineFields.contains(field.name())
6866
|| externalStorageField.contains(field.name()))) {
6967
requireBlobFile = true;
@@ -83,12 +81,16 @@ public BlobFileContext withBlobConsumer(BlobConsumer blobConsumer) {
8381
}
8482

8583
public BlobFileContext withWriteType(RowType writeType) {
86-
if (writeType.getFieldTypes().stream().noneMatch(t -> t.is(BLOB))) {
84+
if (writeType.getFieldTypes().stream().noneMatch(BlobFileContext::hasBlobField)) {
8785
return null;
8886
}
8987
return this;
9088
}
9189

90+
private static boolean hasBlobField(org.apache.paimon.types.DataType type) {
91+
return isBlobFileField(type);
92+
}
93+
9294
public Set<String> blobDescriptorFields() {
9395
return blobDescriptorFields;
9496
}

paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.apache.paimon.table.source.DataSplit;
4747
import org.apache.paimon.table.source.Split;
4848
import org.apache.paimon.types.DataField;
49-
import org.apache.paimon.types.DataTypeRoot;
5049
import org.apache.paimon.types.RowType;
5150
import org.apache.paimon.utils.FileStorePathFactory;
5251
import org.apache.paimon.utils.FormatReaderMapping;
@@ -76,6 +75,7 @@
7675
import static java.util.Comparator.comparingLong;
7776
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
7877
import static org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking;
78+
import static org.apache.paimon.types.BlobType.isBlobFileField;
7979
import static org.apache.paimon.types.VectorType.isVectorStoreFile;
8080
import static org.apache.paimon.utils.Preconditions.checkArgument;
8181
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -419,7 +419,7 @@ private RecordReader<InternalRow> sequentialReadFiles(
419419

420420
private static int findBlobFieldIndex(RowType rowType) {
421421
for (int i = 0; i < rowType.getFieldCount(); i++) {
422-
if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) {
422+
if (isBlobFileField(rowType.getTypeAt(i))) {
423423
return i;
424424
}
425425
}

paimon-core/src/main/java/org/apache/paimon/schema/ColumnDirectiveUtils.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -234,18 +234,33 @@ private static DataType convertType(
234234
return new VectorType(sourceType.isNullable(), directive.vectorDim(), elementType);
235235
} else {
236236
DataTypeRoot root = sourceType.getTypeRoot();
237+
if (root == DataTypeRoot.ARRAY) {
238+
DataType elementType = ((ArrayType) sourceType).getElementType();
239+
Preconditions.checkArgument(
240+
isBlobSourceRoot(elementType.getTypeRoot()),
241+
"Column %s declared with a BLOB directive must be of BYTES, BINARY, "
242+
+ "BLOB, ARRAY<BYTES>, ARRAY<BINARY> or ARRAY<BLOB> type, but was %s.",
243+
fieldName,
244+
sourceType);
245+
return new ArrayType(
246+
sourceType.isNullable(), new BlobType(elementType.isNullable()));
247+
}
237248
Preconditions.checkArgument(
238-
root == DataTypeRoot.VARBINARY
239-
|| root == DataTypeRoot.BINARY
240-
|| root == DataTypeRoot.BLOB,
241-
"Column %s declared with a BLOB directive must be of BYTES, "
242-
+ "BINARY or BLOB type, but was %s.",
249+
isBlobSourceRoot(root),
250+
"Column %s declared with a BLOB directive must be of BYTES, BINARY, "
251+
+ "BLOB, ARRAY<BYTES>, ARRAY<BINARY> or ARRAY<BLOB> type, but was %s.",
243252
fieldName,
244253
sourceType);
245254
return new BlobType(sourceType.isNullable());
246255
}
247256
}
248257

258+
private static boolean isBlobSourceRoot(DataTypeRoot root) {
259+
return root == DataTypeRoot.VARBINARY
260+
|| root == DataTypeRoot.BINARY
261+
|| root == DataTypeRoot.BLOB;
262+
}
263+
249264
/**
250265
* Append {@code fieldName} to the comma-separated option identified by {@code optionKey}. If
251266
* the canonical key is empty but a fallback key holds the value (e.g. legacy {@code
@@ -295,19 +310,19 @@ public static void modifyFieldOptions(
295310
new ConfigOption[] {CoreOptions.VECTOR_FIELD};
296311

297312
/**
298-
* Remove directive-managed options when a BLOB or VECTOR column is dropped. Only acts on BLOB
299-
* or VECTOR type columns; other types are ignored.
313+
* Remove directive-managed options when a BLOB or VECTOR column is dropped. Only acts on BLOB,
314+
* {@code ARRAY<BLOB>} or VECTOR type columns; other types are ignored.
300315
*/
301316
public static void removeDroppedDirectiveOptions(
302-
String fieldName, DataTypeRoot typeRoot, Map<String, String> options) {
303-
if (typeRoot == DataTypeRoot.BLOB) {
317+
String fieldName, DataType type, Map<String, String> options) {
318+
if (BlobType.isBlobFileField(type)) {
304319
for (ConfigOption<String> option : BLOB_OPTIONS) {
305320
removeFromCsvOption(option.key(), fieldName, options);
306321
for (FallbackKey fk : option.fallbackKeys()) {
307322
removeFromCsvOption(fk.getKey(), fieldName, options);
308323
}
309324
}
310-
} else if (typeRoot == DataTypeRoot.VECTOR) {
325+
} else if (type.getTypeRoot() == DataTypeRoot.VECTOR) {
311326
for (ConfigOption<String> option : VECTOR_OPTIONS) {
312327
removeFromCsvOption(option.key(), fieldName, options);
313328
for (FallbackKey fk : option.fallbackKeys()) {

paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.apache.paimon.types.DataField;
4444
import org.apache.paimon.types.DataType;
4545
import org.apache.paimon.types.DataTypeCasts;
46-
import org.apache.paimon.types.DataTypeRoot;
4746
import org.apache.paimon.types.MapType;
4847
import org.apache.paimon.types.ReassignFieldId;
4948
import org.apache.paimon.types.RowType;
@@ -104,6 +103,7 @@
104103
import static org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
105104
import static org.apache.paimon.schema.ColumnDirectiveUtils.applyAddColumnDirective;
106105
import static org.apache.paimon.schema.ColumnDirectiveUtils.applyDirectives;
106+
import static org.apache.paimon.types.BlobType.isBlobFileField;
107107
import static org.apache.paimon.utils.DefaultValueUtils.validateDefaultValue;
108108
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
109109
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -465,7 +465,7 @@ protected void updateLastColumn(
465465
.ifPresent(
466466
f ->
467467
ColumnDirectiveUtils.removeDroppedDirectiveOptions(
468-
dropName, f.type().getTypeRoot(), newOptions));
468+
dropName, f.type(), newOptions));
469469
}
470470
new NestedColumnModifier(drop.fieldNames(), lazyIdentifier) {
471471
@Override
@@ -964,7 +964,7 @@ private static void assertNotRenamingBlobColumn(List<DataField> fields, String[]
964964
}
965965
String fieldName = fieldNames[0];
966966
for (DataField field : fields) {
967-
if (field.name().equals(fieldName) && field.type().is(DataTypeRoot.BLOB)) {
967+
if (field.name().equals(fieldName) && isBlobFileField(field.type())) {
968968
throw new UnsupportedOperationException(
969969
String.format("Cannot rename BLOB column: [%s]", fieldName));
970970
}
@@ -981,8 +981,8 @@ private static void assertNotChangingBlobColumnType(
981981
if (!field.name().equals(fieldName)) {
982982
continue;
983983
}
984-
boolean wasBlob = field.type().is(DataTypeRoot.BLOB);
985-
boolean willBeBlob = newType.is(DataTypeRoot.BLOB);
984+
boolean wasBlob = isBlobFileField(field.type());
985+
boolean willBeBlob = isBlobFileField(newType);
986986
if (wasBlob || willBeBlob) {
987987
throw new UnsupportedOperationException(
988988
String.format(

paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
8787
import static org.apache.paimon.table.SpecialFields.SYSTEM_FIELD_NAMES;
8888
import static org.apache.paimon.types.BlobType.fieldNamesInBlobFile;
89+
import static org.apache.paimon.types.BlobType.isBlobFileField;
8990
import static org.apache.paimon.types.DataTypeRoot.ARRAY;
9091
import static org.apache.paimon.types.DataTypeRoot.MAP;
9192
import static org.apache.paimon.types.DataTypeRoot.MULTISET;
@@ -849,13 +850,13 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options)
849850
List<DataField> fields = schema.fields();
850851
List<String> blobNames =
851852
fields.stream()
852-
.filter(field -> field.type().is(DataTypeRoot.BLOB))
853+
.filter(field -> isBlobFileField(field.type()))
853854
.map(DataField::name)
854855
.collect(Collectors.toList());
855856
if (!blobNames.isEmpty()) {
856857
checkArgument(
857858
options.dataEvolutionEnabled(),
858-
"Data evolution config must enabled for table with BLOB type column.");
859+
"Data evolution config must enabled for table with BLOB or ARRAY<BLOB> type column.");
859860
checkArgument(
860861
fields.size() > blobNames.size(),
861862
"Table with BLOB type column must have other normal columns.");
@@ -885,7 +886,7 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options)
885886
private static void validateBlobFields(RowType rowType, CoreOptions options) {
886887
Set<String> blobFieldNames =
887888
rowType.getFields().stream()
888-
.filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB)
889+
.filter(field -> isBlobFileField(field.type()))
889890
.map(DataField::name)
890891
.collect(Collectors.toCollection(HashSet::new));
891892
Set<String> configured =
@@ -894,7 +895,7 @@ private static void validateBlobFields(RowType rowType, CoreOptions options) {
894895
for (String field : configured) {
895896
checkArgument(
896897
blobFieldNames.contains(field),
897-
"Field '%s' in '%s' must be a BLOB field in table schema.",
898+
"Field '%s' in '%s' must be a BLOB field or ARRAY<BLOB> field in table schema.",
898899
field,
899900
CoreOptions.BLOB_FIELD.key());
900901
}

paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import org.apache.paimon.data.BlobPlaceholder;
3030
import org.apache.paimon.data.BlobView;
3131
import org.apache.paimon.data.BlobViewStruct;
32+
import org.apache.paimon.data.GenericArray;
3233
import org.apache.paimon.data.GenericRow;
34+
import org.apache.paimon.data.InternalArray;
3335
import org.apache.paimon.data.InternalRow;
3436
import org.apache.paimon.data.serializer.InternalRowSerializer;
3537
import org.apache.paimon.fs.FileIO;
@@ -154,6 +156,56 @@ public void testBasic() throws Exception {
154156
assertThat(integer.get()).isEqualTo(1000);
155157
}
156158

159+
@Test
160+
public void testArrayBlobField() throws Exception {
161+
Schema.Builder schemaBuilder = Schema.newBuilder();
162+
schemaBuilder.column("f0", DataTypes.INT());
163+
schemaBuilder.column("f1", DataTypes.ARRAY(DataTypes.BLOB()));
164+
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "1 GB");
165+
schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "25 MB");
166+
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
167+
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
168+
catalog.createTable(identifier(), schemaBuilder.build(), true);
169+
170+
byte[] blob0 = "blob-0".getBytes();
171+
byte[] blob1 = "blob-1".getBytes();
172+
byte[] blob2 = "blob-2".getBytes();
173+
writeDataDefault(
174+
Arrays.asList(
175+
GenericRow.of(
176+
0,
177+
new GenericArray(
178+
new Object[] {
179+
new BlobData(blob0), null, new BlobData(blob1)
180+
})),
181+
GenericRow.of(1, null),
182+
GenericRow.of(2, new GenericArray(new Object[] {new BlobData(blob2)}))));
183+
184+
FileStoreTable table = getTableDefault();
185+
List<DataFileMeta> filesMetas =
186+
table.store().newScan().plan().files().stream()
187+
.map(ManifestEntry::file)
188+
.collect(Collectors.toList());
189+
List<DataEvolutionSplitRead.FieldBunch> fieldGroups =
190+
DataEvolutionSplitRead.splitFieldBunches(filesMetas, file -> table.rowType());
191+
assertThat(fieldGroups.size()).isEqualTo(2);
192+
assertThat(countFilesWithSuffix(table.fileIO(), table.location(), ".blob")).isEqualTo(1);
193+
194+
Map<Integer, InternalArray> actual = new HashMap<>();
195+
readDefault(row -> actual.put(row.getInt(0), row.getArray(1)));
196+
197+
assertThat(actual.size()).isEqualTo(3);
198+
InternalArray first = actual.get(0);
199+
assertThat(first.size()).isEqualTo(3);
200+
assertThat(first.getBlob(0).toData()).isEqualTo(blob0);
201+
assertThat(first.isNullAt(1)).isTrue();
202+
assertThat(first.getBlob(2).toData()).isEqualTo(blob1);
203+
assertThat(actual.get(1)).isNull();
204+
InternalArray third = actual.get(2);
205+
assertThat(third.size()).isEqualTo(1);
206+
assertThat(third.getBlob(0).toData()).isEqualTo(blob2);
207+
}
208+
157209
@Test
158210
public void testUpdateBlobColumn() throws Exception {
159211
createTableDefault();

0 commit comments

Comments
 (0)