Skip to content

Commit ae5b7be

Browse files
authored
[core] Fix: blob meta should contains a filter to match normal data file meta (#6412)
1 parent 27e9224 commit ae5b7be

2 files changed

Lines changed: 32 additions & 4 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ public static List<List<DataFileMeta>> split(List<DataFileMeta> files) {
9797
f2.maxSequenceNumber(), f1.maxSequenceNumber());
9898
}));
9999

100+
files = filterBlob(files);
101+
100102
// Split files by firstRowId
101103
long lastRowId = -1;
102104
long checkRowIdStart = 0;
@@ -128,4 +130,26 @@ public static List<List<DataFileMeta>> split(List<DataFileMeta> files) {
128130

129131
return splitByRowId;
130132
}
133+
134+
private static List<DataFileMeta> filterBlob(List<DataFileMeta> files) {
135+
List<DataFileMeta> result = new ArrayList<>();
136+
long rowIdStart = -1;
137+
long rowIdEnd = -1;
138+
for (DataFileMeta file : files) {
139+
if (file.firstRowId() == null) {
140+
result.add(file);
141+
continue;
142+
}
143+
if (!isBlobFile(file.fileName())) {
144+
rowIdStart = file.firstRowId();
145+
rowIdEnd = file.firstRowId() + file.rowCount();
146+
result.add(file);
147+
} else {
148+
if (file.firstRowId() >= rowIdStart && file.firstRowId() < rowIdEnd) {
149+
result.add(file);
150+
}
151+
}
152+
}
153+
return result;
154+
}
131155
}

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,14 @@ class BlobTestBase extends PaimonSparkTestBase {
6565

6666
sql(
6767
"CREATE TABLE t (id INT, data STRING, picture BINARY) TBLPROPERTIES ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture', 'blob-as-descriptor'='true')")
68-
sql("INSERT INTO t VALUES (1, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "')")
69-
68+
sql(
69+
"INSERT INTO t VALUES (1, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "'),"
70+
+ "(5, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "'),"
71+
+ "(2, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "'),"
72+
+ "(3, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "'),"
73+
+ "(4, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "')")
7074
val newDescriptorBytes =
71-
sql("SELECT picture FROM t").collect()(0).get(0).asInstanceOf[Array[Byte]]
75+
sql("SELECT picture FROM t WHERE id = 1").collect()(0).get(0).asInstanceOf[Array[Byte]]
7276
val newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes)
7377
val options = new Options()
7478
options.set("warehouse", tempDBDir.toString)
@@ -79,7 +83,7 @@ class BlobTestBase extends PaimonSparkTestBase {
7983

8084
sql("ALTER TABLE t SET TBLPROPERTIES ('blob-as-descriptor'='false')")
8185
checkAnswer(
82-
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"),
86+
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t WHERE id = 1"),
8387
Seq(Row(1, "paimon", blobData, 0, 1))
8488
)
8589
}

0 commit comments

Comments
 (0)