Skip to content

Commit 72600f9

Browse files
[core] Fix blob read failure after alter table and compaction (#7618)
Reading a blob table fails after ALTER TABLE SET and compaction with: `java.lang.IllegalArgumentException: All files in this bunch should have the same schema id.` This PR fixes the above issue by do not check schemaId for blob file and do not allow rename for blob col.
1 parent f424d26 commit 72600f9

10 files changed

Lines changed: 473 additions & 8 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -548,9 +548,11 @@ void add(DataFileMeta file) {
548548
}
549549
}
550550
if (!files.isEmpty()) {
551-
checkArgument(
552-
file.schemaId() == files.get(0).schemaId(),
553-
"All files in this bunch should have the same schema id.");
551+
if (!isBlobFile(file.fileName())) {
552+
checkArgument(
553+
file.schemaId() == files.get(0).schemaId(),
554+
"All files in this bunch should have the same schema id.");
555+
}
554556
checkArgument(
555557
file.writeCols().equals(files.get(0).writeCols()),
556558
"All files in this bunch should have the same write columns.");

paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.paimon.types.DataField;
4343
import org.apache.paimon.types.DataType;
4444
import org.apache.paimon.types.DataTypeCasts;
45+
import org.apache.paimon.types.DataTypeRoot;
4546
import org.apache.paimon.types.MapType;
4647
import org.apache.paimon.types.ReassignFieldId;
4748
import org.apache.paimon.types.RowType;
@@ -404,6 +405,7 @@ protected void updateLastColumn(
404405
} else if (change instanceof RenameColumn) {
405406
RenameColumn rename = (RenameColumn) change;
406407
assertNotUpdatingPartitionKeys(oldTableSchema, rename.fieldNames(), "rename");
408+
assertNotRenamingBlobColumn(newFields, rename.fieldNames());
407409
new NestedColumnModifier(rename.fieldNames(), lazyIdentifier) {
408410
@Override
409411
protected void updateLastColumn(
@@ -908,6 +910,19 @@ private static void assertNotUpdatingPrimaryKeys(
908910
}
909911
}
910912

913+
private static void assertNotRenamingBlobColumn(List<DataField> fields, String[] fieldNames) {
914+
if (fieldNames.length > 1) {
915+
return;
916+
}
917+
String fieldName = fieldNames[0];
918+
for (DataField field : fields) {
919+
if (field.name().equals(fieldName) && field.type().is(DataTypeRoot.BLOB)) {
920+
throw new UnsupportedOperationException(
921+
String.format("Cannot rename BLOB column: [%s]", fieldName));
922+
}
923+
}
924+
}
925+
911926
private abstract static class NestedColumnModifier {
912927

913928
private final String[] updateFieldNames;

paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,86 @@ public void testJavaWriteCompressedTextAppendTable() throws Exception {
791791
}
792792
}
793793

794+
@Test
795+
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
796+
public void testBlobWriteAlterCompact() throws Exception {
797+
Identifier identifier = identifier("blob_alter_compact_test");
798+
catalog.dropTable(identifier, true);
799+
Schema schema =
800+
Schema.newBuilder()
801+
.column("f0", DataTypes.INT())
802+
.column("f1", DataTypes.STRING())
803+
.column("f2", DataTypes.BLOB())
804+
.option("target-file-size", "100 MB")
805+
.option("row-tracking.enabled", "true")
806+
.option("data-evolution.enabled", "true")
807+
.option("compaction.min.file-num", "2")
808+
.option("bucket", "-1")
809+
.build();
810+
catalog.createTable(identifier, schema, false);
811+
812+
byte[] blobBytes = new byte[1024];
813+
new java.util.Random(42).nextBytes(blobBytes);
814+
815+
// Batch 1: write with schemaId=0
816+
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
817+
StreamTableWrite write =
818+
table.newStreamWriteBuilder().withCommitUser(commitUser).newWrite();
819+
StreamTableCommit commit = table.newCommit(commitUser);
820+
for (int i = 0; i < 100; i++) {
821+
write.write(
822+
GenericRow.of(
823+
1,
824+
BinaryString.fromString("batch1"),
825+
new org.apache.paimon.data.BlobData(blobBytes)));
826+
}
827+
commit.commit(0, write.prepareCommit(false, 0));
828+
829+
// ALTER TABLE SET -> schemaId becomes 1
830+
catalog.alterTable(
831+
identifier,
832+
org.apache.paimon.schema.SchemaChange.setOption("snapshot.num-retained.min", "5"),
833+
false);
834+
835+
// Batch 2: write with schemaId=1
836+
table = (FileStoreTable) catalog.getTable(identifier);
837+
write = table.newStreamWriteBuilder().withCommitUser(commitUser).newWrite();
838+
commit = table.newCommit(commitUser);
839+
for (int i = 0; i < 100; i++) {
840+
write.write(
841+
GenericRow.of(
842+
2,
843+
BinaryString.fromString("batch2"),
844+
new org.apache.paimon.data.BlobData(blobBytes)));
845+
}
846+
commit.commit(1, write.prepareCommit(false, 1));
847+
write.close();
848+
commit.close();
849+
850+
// Compact
851+
table = (FileStoreTable) catalog.getTable(identifier);
852+
org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator coordinator =
853+
new org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator(
854+
table, false, false);
855+
List<org.apache.paimon.append.dataevolution.DataEvolutionCompactTask> tasks =
856+
coordinator.plan();
857+
assertThat(tasks.size()).isGreaterThan(0);
858+
List<org.apache.paimon.table.sink.CommitMessage> compactMessages = new ArrayList<>();
859+
for (org.apache.paimon.append.dataevolution.DataEvolutionCompactTask task : tasks) {
860+
compactMessages.add(task.doCompact(table, commitUser));
861+
}
862+
StreamTableCommit compactCommit = table.newCommit(commitUser);
863+
compactCommit.commit(2, compactMessages);
864+
compactCommit.close();
865+
866+
FileStoreTable readTable = (FileStoreTable) catalog.getTable(identifier);
867+
List<Split> splits = new ArrayList<>(readTable.newSnapshotReader().read().dataSplits());
868+
TableRead read = readTable.newRead();
869+
RowType rowType = readTable.rowType();
870+
List<String> res = getResult(read, splits, row -> internalRowToString(row, rowType));
871+
assertThat(res).hasSize(200);
872+
}
873+
794874
// Helper method from TableTestBase
795875
protected Identifier identifier(String tableName) {
796876
return new Identifier(database, tableName);

0 commit comments

Comments
 (0)