Skip to content

Commit fe9b975

Browse files
authored
[core] When scan.primary-branch is set, allow primary branch to be an append only table (#7614)
In `scan.fallback-branch`, we allow main branch to be append only, and fallback branch to have primary key. So in `scan.primary-branch`, we should allow main branch to have primary key, and primary branch to be append only.
1 parent 1e1c9f4 commit fe9b975

2 files changed

Lines changed: 42 additions & 24 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -224,30 +224,6 @@ protected void validateSchema() {
224224
mainRowType,
225225
otherBranch,
226226
otherRowType);
227-
228-
List<String> mainPrimaryKeys = wrapped.schema().primaryKeys();
229-
List<String> otherPrimaryKeys = other.schema().primaryKeys();
230-
if (!mainPrimaryKeys.isEmpty()) {
231-
if (otherPrimaryKeys.isEmpty()) {
232-
throw new IllegalArgumentException(
233-
"Branch "
234-
+ mainBranch
235-
+ " has primary keys while branch "
236-
+ otherBranch
237-
+ " does not. This is not allowed.");
238-
}
239-
Preconditions.checkArgument(
240-
mainPrimaryKeys.equals(otherPrimaryKeys),
241-
"Branch %s and %s both have primary keys but are not the same.\n"
242-
+ "Primary keys of %s are %s.\n"
243-
+ "Primary keys of %s are %s.",
244-
mainBranch,
245-
otherBranch,
246-
mainBranch,
247-
mainPrimaryKeys,
248-
otherBranch,
249-
otherPrimaryKeys);
250-
}
251227
}
252228

253229
private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType otherRowType) {

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -923,6 +923,48 @@ public void testRenameBranch() throws Exception {
923923
.hasMessageContaining("Branch");
924924
}
925925

926+
@Test
927+
public void testPrimaryBranchBatchRead() throws Exception {
928+
// Create non-PK table, then create branch, then ALTER main to add PKs.
929+
// This results in main = PK table, branch = non-PK table.
930+
sql(
931+
"CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) "
932+
+ "PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )");
933+
934+
sql("CALL sys.create_branch('default.t', 'nb')");
935+
sql("ALTER TABLE t SET ( 'primary-key' = 'pt, k', 'bucket' = '2' )");
936+
sql("ALTER TABLE t SET ( 'scan.primary-branch' = 'nb' )");
937+
938+
// Insert into non-PK branch (primary, has priority)
939+
sql("INSERT INTO `t$branch_nb` VALUES (1, 20, 'cat'), (1, 30, 'dog')");
940+
// Insert overlapping partition into PK main (fallback)
941+
sql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')");
942+
943+
// pt=1 exists in primary branch → read from branch
944+
assertThat(collectResult("SELECT v, k FROM t"))
945+
.containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]");
946+
assertThat(collectResult("SELECT v, k FROM `t$branch_nb`"))
947+
.containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]");
948+
949+
// Insert pt=2 into primary branch, pt=3 only into main
950+
sql("INSERT INTO `t$branch_nb` VALUES (2, 10, 'tiger'), (2, 20, 'wolf')");
951+
sql("INSERT INTO t VALUES (3, 10, 'horse')");
952+
953+
// pt=1,2 from primary branch; pt=3 from main (fallback)
954+
assertThat(collectResult("SELECT v, k FROM t"))
955+
.containsExactlyInAnyOrder(
956+
"+I[cat, 20]",
957+
"+I[dog, 30]",
958+
"+I[tiger, 10]",
959+
"+I[wolf, 20]",
960+
"+I[horse, 10]");
961+
962+
// Unset scan.primary-branch, main table should show its own data
963+
sql("ALTER TABLE t RESET ( 'scan.primary-branch' )");
964+
assertThat(collectResult("SELECT v, k FROM t"))
965+
.containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]", "+I[horse, 10]");
966+
}
967+
926968
private List<String> collectResult(String sql) throws Exception {
927969
List<String> result = new ArrayList<>();
928970
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {

0 commit comments

Comments
 (0)