Skip to content

Commit 7b8dcac

Browse files
committed
[core][flink][spark] Support ARRAY<BLOB> blob files
1 parent 773f942 commit 7b8dcac

26 files changed

Lines changed: 875 additions & 77 deletions

File tree

paimon-api/src/main/java/org/apache/paimon/types/BlobType.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,21 @@ public static List<DataField> fieldsInBlobFile(RowType rowType, Set<String> desc
103103
rowType.getFields()
104104
.forEach(
105105
field -> {
106-
DataTypeRoot type = field.type().getTypeRoot();
107-
if (type == DataTypeRoot.BLOB
106+
if (isBlobFileField(field.type())
108107
&& !descriptorFields.contains(field.name())) {
109108
result.add(field);
110109
}
111110
});
112111
return result;
113112
}
113+
114+
public static boolean isBlobFileField(DataType type) {
115+
if (type.getTypeRoot() == DataTypeRoot.BLOB) {
116+
return true;
117+
}
118+
if (type.getTypeRoot() == DataTypeRoot.ARRAY) {
119+
return ((ArrayType) type).getElementType().getTypeRoot() == DataTypeRoot.BLOB;
120+
}
121+
return false;
122+
}
114123
}
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.data;
20+
21+
import org.apache.paimon.data.variant.Variant;
22+
23+
import java.io.Serializable;
24+
25+
/**
26+
* Placeholder for an ARRAY<BLOB> field in data-evolution blob files. It should never be exposed to
27+
* users.
28+
*/
29+
public final class BlobArrayPlaceholder implements InternalArray, Serializable {
30+
31+
private static final long serialVersionUID = 1L;
32+
33+
public static final BlobArrayPlaceholder INSTANCE = new BlobArrayPlaceholder();
34+
35+
private BlobArrayPlaceholder() {}
36+
37+
private Object readResolve() {
38+
return INSTANCE;
39+
}
40+
41+
private static UnsupportedOperationException unsupported() {
42+
return new UnsupportedOperationException(
43+
"Should never call this method for placeholder blob array.");
44+
}
45+
46+
@Override
47+
public int size() {
48+
throw unsupported();
49+
}
50+
51+
@Override
52+
public boolean isNullAt(int pos) {
53+
throw unsupported();
54+
}
55+
56+
@Override
57+
public boolean getBoolean(int pos) {
58+
throw unsupported();
59+
}
60+
61+
@Override
62+
public byte getByte(int pos) {
63+
throw unsupported();
64+
}
65+
66+
@Override
67+
public short getShort(int pos) {
68+
throw unsupported();
69+
}
70+
71+
@Override
72+
public int getInt(int pos) {
73+
throw unsupported();
74+
}
75+
76+
@Override
77+
public long getLong(int pos) {
78+
throw unsupported();
79+
}
80+
81+
@Override
82+
public float getFloat(int pos) {
83+
throw unsupported();
84+
}
85+
86+
@Override
87+
public double getDouble(int pos) {
88+
throw unsupported();
89+
}
90+
91+
@Override
92+
public BinaryString getString(int pos) {
93+
throw unsupported();
94+
}
95+
96+
@Override
97+
public Decimal getDecimal(int pos, int precision, int scale) {
98+
throw unsupported();
99+
}
100+
101+
@Override
102+
public Timestamp getTimestamp(int pos, int precision) {
103+
throw unsupported();
104+
}
105+
106+
@Override
107+
public byte[] getBinary(int pos) {
108+
throw unsupported();
109+
}
110+
111+
@Override
112+
public Variant getVariant(int pos) {
113+
throw unsupported();
114+
}
115+
116+
@Override
117+
public Blob getBlob(int pos) {
118+
throw unsupported();
119+
}
120+
121+
@Override
122+
public InternalArray getArray(int pos) {
123+
throw unsupported();
124+
}
125+
126+
@Override
127+
public InternalVector getVector(int pos) {
128+
throw unsupported();
129+
}
130+
131+
@Override
132+
public InternalMap getMap(int pos) {
133+
throw unsupported();
134+
}
135+
136+
@Override
137+
public InternalRow getRow(int pos, int numFields) {
138+
throw unsupported();
139+
}
140+
141+
@Override
142+
public boolean[] toBooleanArray() {
143+
throw unsupported();
144+
}
145+
146+
@Override
147+
public byte[] toByteArray() {
148+
throw unsupported();
149+
}
150+
151+
@Override
152+
public short[] toShortArray() {
153+
throw unsupported();
154+
}
155+
156+
@Override
157+
public int[] toIntArray() {
158+
throw unsupported();
159+
}
160+
161+
@Override
162+
public long[] toLongArray() {
163+
throw unsupported();
164+
}
165+
166+
@Override
167+
public float[] toFloatArray() {
168+
throw unsupported();
169+
}
170+
171+
@Override
172+
public double[] toDoubleArray() {
173+
throw unsupported();
174+
}
175+
}

paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ static Class<?> getDataClass(DataType type) {
153153
return InternalMap.class;
154154
case ROW:
155155
return InternalRow.class;
156+
case BLOB:
157+
return Blob.class;
156158
default:
157159
throw new IllegalArgumentException("Illegal type: " + type);
158160
}

paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
import static java.util.Comparator.comparingLong;
5555
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
5656
import static org.apache.paimon.manifest.ManifestFileMeta.allContainsRowId;
57-
import static org.apache.paimon.types.DataTypeRoot.BLOB;
57+
import static org.apache.paimon.types.BlobType.isBlobFileField;
5858
import static org.apache.paimon.types.VectorType.isVectorStoreFile;
5959
import static org.apache.paimon.utils.Preconditions.checkArgument;
6060

@@ -100,7 +100,7 @@ public DataEvolutionCompactCoordinator(
100100
? table.rowType().getFields().stream()
101101
.filter(
102102
field ->
103-
field.type().is(BLOB)
103+
isBlobFileField(field.type())
104104
&& !blobInlineFields.contains(
105105
field.name()))
106106
.map(DataField::id)

paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
package org.apache.paimon.operation;
2020

2121
import org.apache.paimon.append.ForceSingleBatchReader;
22+
import org.apache.paimon.data.BlobArrayPlaceholder;
2223
import org.apache.paimon.data.BlobPlaceholder;
2324
import org.apache.paimon.data.GenericRow;
2425
import org.apache.paimon.data.InternalRow;
2526
import org.apache.paimon.io.DataFileMeta;
2627
import org.apache.paimon.reader.RecordReader;
28+
import org.apache.paimon.types.DataTypeRoot;
2729
import org.apache.paimon.types.RowType;
2830
import org.apache.paimon.utils.Preconditions;
2931
import org.apache.paimon.utils.Range;
@@ -55,7 +57,8 @@
5557
public class BlobFallbackRecordReader implements RecordReader<InternalRow> {
5658

5759
private final List<RecordReader<InternalRow>> groupReaders = new ArrayList<>();
58-
private final int blobIndex;
60+
private final Object blobPlaceholder;
61+
private final InternalRow.FieldGetter blobGetter;
5962
private boolean returned;
6063

6164
BlobFallbackRecordReader(
@@ -64,7 +67,9 @@ public class BlobFallbackRecordReader implements RecordReader<InternalRow> {
6467
List<Range> rowRanges,
6568
RowType readRowType,
6669
int blobIndex) {
67-
this.blobIndex = blobIndex;
70+
this.blobPlaceholder = blobPlaceholder(readRowType, blobIndex);
71+
this.blobGetter =
72+
InternalRow.createFieldGetter(readRowType.getTypeAt(blobIndex), blobIndex);
6873

6974
checkArgument(!files.isEmpty(), "Blob bunch should not be empty.");
7075
long firstRowId = Long.MAX_VALUE;
@@ -188,7 +193,13 @@ public void releaseBatch() {
188193
}
189194

190195
private boolean isPlaceHolder(InternalRow row) {
191-
return !row.isNullAt(blobIndex) && row.getBlob(blobIndex) == BlobPlaceholder.INSTANCE;
196+
return blobGetter.getFieldOrNull(row) == blobPlaceholder;
197+
}
198+
199+
private static Object blobPlaceholder(RowType rowType, int blobIndex) {
200+
return rowType.getTypeAt(blobIndex).getTypeRoot() == DataTypeRoot.ARRAY
201+
? BlobArrayPlaceholder.INSTANCE
202+
: BlobPlaceholder.INSTANCE;
192203
}
193204

194205
@Override
@@ -263,6 +274,7 @@ public static class BlobSequenceGroupRecordReader implements RecordReader<Intern
263274
private final List<Range> rowRanges;
264275
private final RowType readRowType;
265276
private final int blobIndex;
277+
private final Object blobPlaceholder;
266278
private final long lastRowId;
267279

268280
private RecordReader<InternalRow> currentReader;
@@ -287,6 +299,7 @@ public static class BlobSequenceGroupRecordReader implements RecordReader<Intern
287299
this.rowRanges = rowRanges == null ? null : Range.sortAndMergeOverlap(rowRanges);
288300
this.readRowType = readRowType;
289301
this.blobIndex = blobIndex;
302+
this.blobPlaceholder = blobPlaceholder(readRowType, blobIndex);
290303
this.lastRowId = lastRowId;
291304

292305
this.nextFileIndex = 0;
@@ -391,7 +404,7 @@ public void releaseBatch() {
391404
private InternalRow placeHolderRow() {
392405
if (placeholderRow == null) {
393406
GenericRow row = new GenericRow(readRowType.getFieldCount());
394-
row.setField(blobIndex, BlobPlaceholder.INSTANCE);
407+
row.setField(blobIndex, blobPlaceholder);
395408
placeholderRow = row;
396409
}
397410
return placeholderRow;

paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,14 @@
2020

2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.data.BlobConsumer;
23+
import org.apache.paimon.types.BlobType;
2324
import org.apache.paimon.types.DataField;
24-
import org.apache.paimon.types.DataTypeRoot;
2525
import org.apache.paimon.types.RowType;
2626

2727
import javax.annotation.Nullable;
2828

2929
import java.util.Set;
3030

31-
import static org.apache.paimon.types.DataTypeRoot.BLOB;
32-
3331
/** Context for blob file. */
3432
public class BlobFileContext {
3533

@@ -53,7 +51,7 @@ private BlobFileContext(
5351

5452
@Nullable
5553
public static BlobFileContext create(RowType rowType, CoreOptions options) {
56-
if (rowType.getFieldTypes().stream().noneMatch(t -> t.is(BLOB))) {
54+
if (rowType.getFieldTypes().stream().noneMatch(BlobType::isBlobFileField)) {
5755
return null;
5856
}
5957
Set<String> descriptorFields = options.blobDescriptorField();
@@ -62,8 +60,7 @@ public static BlobFileContext create(RowType rowType, CoreOptions options) {
6260
String externalStoragePath = options.blobExternalStoragePath();
6361
boolean requireBlobFile = false;
6462
for (DataField field : rowType.getFields()) {
65-
DataTypeRoot type = field.type().getTypeRoot();
66-
if (type == DataTypeRoot.BLOB
63+
if (BlobType.isBlobFileField(field.type())
6764
&& (!inlineFields.contains(field.name())
6865
|| externalStorageField.contains(field.name()))) {
6966
requireBlobFile = true;
@@ -83,7 +80,7 @@ public BlobFileContext withBlobConsumer(BlobConsumer blobConsumer) {
8380
}
8481

8582
public BlobFileContext withWriteType(RowType writeType) {
86-
if (writeType.getFieldTypes().stream().noneMatch(t -> t.is(BLOB))) {
83+
if (writeType.getFieldTypes().stream().noneMatch(BlobType::isBlobFileField)) {
8784
return null;
8885
}
8986
return this;

paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.apache.paimon.table.source.DataSplit;
4747
import org.apache.paimon.table.source.Split;
4848
import org.apache.paimon.types.DataField;
49-
import org.apache.paimon.types.DataTypeRoot;
5049
import org.apache.paimon.types.RowType;
5150
import org.apache.paimon.utils.FileStorePathFactory;
5251
import org.apache.paimon.utils.FormatReaderMapping;
@@ -76,6 +75,7 @@
7675
import static java.util.Comparator.comparingLong;
7776
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
7877
import static org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking;
78+
import static org.apache.paimon.types.BlobType.isBlobFileField;
7979
import static org.apache.paimon.types.VectorType.isVectorStoreFile;
8080
import static org.apache.paimon.utils.Preconditions.checkArgument;
8181
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -419,7 +419,7 @@ private RecordReader<InternalRow> sequentialReadFiles(
419419

420420
private static int findBlobFieldIndex(RowType rowType) {
421421
for (int i = 0; i < rowType.getFieldCount(); i++) {
422-
if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) {
422+
if (isBlobFileField(rowType.getTypeAt(i))) {
423423
return i;
424424
}
425425
}

0 commit comments

Comments
 (0)