1818
1919package org .apache .paimon .flink ;
2020
21+ import org .apache .paimon .CoreOptions ;
22+ import org .apache .paimon .data .BinaryRow ;
23+ import org .apache .paimon .data .BinaryString ;
2124import org .apache .paimon .flink .sink .FlinkSinkBuilder ;
25+ import org .apache .paimon .partition .PartitionPredicate ;
26+ import org .apache .paimon .predicate .Predicate ;
27+ import org .apache .paimon .predicate .PredicateBuilder ;
2228import org .apache .paimon .table .ChainTableStreamScan ;
2329import org .apache .paimon .table .FileStoreTable ;
30+ import org .apache .paimon .table .source .DataTableScan ;
31+ import org .apache .paimon .table .source .TableScan ;
2432import org .apache .paimon .utils .BlockingIterator ;
2533
2634import org .apache .flink .api .common .JobID ;
@@ -1533,13 +1541,13 @@ public void testRestoreScanAll() throws Exception {
15331541 ChainTableStreamScan scan = (ChainTableStreamScan ) table .newStreamScan ();
15341542
15351543 // Phase 1: starting
1536- org . apache . paimon . table . source . TableScan .Plan plan1 = scan .plan ();
1544+ TableScan .Plan plan1 = scan .plan ();
15371545 assertThat (plan1 .splits ()).as ("Phase 1 should produce splits" ).isNotEmpty ();
15381546 Long checkpoint = scan .checkpoint ();
15391547 assertThat (checkpoint ).as ("Checkpoint should be non-null after Phase 1" ).isNotNull ();
15401548
15411549 // Phase 2: no new data → empty plan
1542- org . apache . paimon . table . source . TableScan .Plan plan2 = scan .plan ();
1550+ TableScan .Plan plan2 = scan .plan ();
15431551 assertThat (plan2 .splits ()).as ("Phase 2 with no new data should be empty" ).isEmpty ();
15441552
15451553 // restore(id, scanAll=true): should reset to starting, preserve delta position
@@ -1549,7 +1557,7 @@ public void testRestoreScanAll() throws Exception {
15491557 .isEqualTo (checkpoint );
15501558
15511559 // Starting should run again
1552- org . apache . paimon . table . source . TableScan .Plan plan3 = scan .plan ();
1560+ TableScan .Plan plan3 = scan .plan ();
15531561 assertThat (plan3 .splits ())
15541562 .as ("Starting should produce splits again after restore(id, true)" )
15551563 .isNotEmpty ();
@@ -1611,7 +1619,7 @@ public void testRestoreNullScanAll() throws Exception {
16111619 .isNull ();
16121620
16131621 // Starting should run again
1614- org . apache . paimon . table . source . TableScan .Plan plan = scan .plan ();
1622+ TableScan .Plan plan = scan .plan ();
16151623 assertThat (plan .splits ()).as ("Starting should produce splits" ).isNotEmpty ();
16161624 assertThat (scan .checkpoint ()).as ("Checkpoint should be set after new starting" ).isNotNull ();
16171625 }
@@ -1779,12 +1787,12 @@ public void testWithShardForwarding() throws Exception {
17791787 // Shard 0 of 2: should get a subset of data
17801788 ChainTableStreamScan scan0 = (ChainTableStreamScan ) table .newStreamScan ();
17811789 scan0 .withShard (0 , 2 );
1782- org . apache . paimon . table . source . TableScan .Plan plan0 = scan0 .plan ();
1790+ TableScan .Plan plan0 = scan0 .plan ();
17831791
17841792 // Shard 1 of 2: should get the other subset
17851793 ChainTableStreamScan scan1 = (ChainTableStreamScan ) table .newStreamScan ();
17861794 scan1 .withShard (1 , 2 );
1787- org . apache . paimon . table . source . TableScan .Plan plan1 = scan1 .plan ();
1795+ TableScan .Plan plan1 = scan1 .plan ();
17881796
17891797 // Together both shards should produce non-empty results
17901798 // (exact split depends on bucket hashing, but total should cover all data)
@@ -1831,15 +1839,15 @@ public void testStreamingReadBothBranchesEmpty() throws Exception {
18311839 // Both branches are empty — Phase 1 should produce no splits
18321840 FileStoreTable table = paimonTable ("chain_both_empty" );
18331841 ChainTableStreamScan scan = (ChainTableStreamScan ) table .newStreamScan ();
1834- org . apache . paimon . table . source . TableScan .Plan plan1 = scan .plan ();
1842+ TableScan .Plan plan1 = scan .plan ();
18351843 assertThat (plan1 .splits ()).as ("Phase 1 with both branches empty should be empty" ).isEmpty ();
18361844
18371845 // Phase 2: write new delta data and verify it streams through
18381846 sql (
18391847 "INSERT INTO `chain_both_empty$branch_delta` PARTITION (dt = '20250808')"
18401848 + " VALUES (1, 1, 'v1'), (2, 1, 'v2')" );
18411849
1842- org . apache . paimon . table . source . TableScan .Plan plan2 = scan .plan ();
1850+ TableScan .Plan plan2 = scan .plan ();
18431851 assertThat (plan2 .splits ()).as ("Phase 2 should pick up new delta data" ).isNotEmpty ();
18441852 }
18451853
@@ -1888,7 +1896,7 @@ public void testStreamingReadDeltaOverwriteInPhase2() throws Exception {
18881896 ChainTableStreamScan scan = (ChainTableStreamScan ) table .newStreamScan ();
18891897
18901898 // Phase 1: read initial delta data
1891- org . apache . paimon . table . source . TableScan .Plan plan1 = scan .plan ();
1899+ TableScan .Plan plan1 = scan .plan ();
18921900 assertThat (plan1 .splits ()).as ("Phase 1 should produce splits" ).isNotEmpty ();
18931901
18941902 // Phase 2: OVERWRITE the same partition on delta branch.
@@ -1900,7 +1908,7 @@ public void testStreamingReadDeltaOverwriteInPhase2() throws Exception {
19001908
19011909 // Verify scan.plan() does not throw after OVERWRITE
19021910 for (int i = 0 ; i < 3 ; i ++) {
1903- org . apache . paimon . table . source . TableScan .Plan planN = scan .plan ();
1911+ TableScan .Plan planN = scan .plan ();
19041912 assertThat (planN ).as ("plan() should not return null after OVERWRITE" ).isNotNull ();
19051913 }
19061914 }
@@ -1952,7 +1960,7 @@ public void testStreamingReadRestoreAfterNewData() throws Exception {
19521960 ChainTableStreamScan scan = (ChainTableStreamScan ) table .newStreamScan ();
19531961
19541962 // Phase 1: snapshot at dt=20250807 (latest), delta at dt=20250808
1955- org . apache . paimon . table . source . TableScan .Plan plan1 = scan .plan ();
1963+ TableScan .Plan plan1 = scan .plan ();
19561964 int phase1Size = plan1 .splits ().size ();
19571965 assertThat (phase1Size ).as ("Phase 1 should produce splits" ).isGreaterThan (0 );
19581966
@@ -1970,7 +1978,7 @@ public void testStreamingReadRestoreAfterNewData() throws Exception {
19701978 // - Delta dt=20250808 excluded (older than latest snapshot dt=20250809)
19711979 // - Delta dt=20250810 included (newer than dt=20250809)
19721980 scan .restore (null );
1973- org . apache . paimon . table . source . TableScan .Plan plan2 = scan .plan ();
1981+ TableScan .Plan plan2 = scan .plan ();
19741982 assertThat (plan2 .splits ())
19751983 .as ("Restore(null) should re-run Phase 1 with current data" )
19761984 .isNotEmpty ();
@@ -1997,12 +2005,10 @@ public void testStreamingReadRejectsPartitionFilter() throws Exception {
19972005 ChainTableStreamScan scan = (ChainTableStreamScan ) table .newStreamScan ();
19982006
19992007 // dt is the 4th field (index 3) in the schema: t1(0), t2(1), t3(2), dt(3)
2000- org .apache .paimon .predicate .PredicateBuilder builder =
2001- new org .apache .paimon .predicate .PredicateBuilder (table .rowType ());
2008+ PredicateBuilder builder = new PredicateBuilder (table .rowType ());
20022009
20032010 // Partition-only filter should be rejected
2004- org .apache .paimon .predicate .Predicate partitionFilter =
2005- builder .equal (3 , org .apache .paimon .data .BinaryString .fromString ("20250808" ));
2011+ Predicate partitionFilter = builder .equal (3 , BinaryString .fromString ("20250808" ));
20062012 assertThatThrownBy (() -> scan .withFilter (partitionFilter ))
20072013 .isInstanceOf (UnsupportedOperationException .class )
20082014 .hasMessageContaining ("Partition filter is not supported" );
@@ -2110,16 +2116,103 @@ public void testStreamingReadRejectsWithPartitionFilter() throws Exception {
21102116 .hasMessageContaining ("Partition filter is not supported" );
21112117
21122118 // withPartitionFilter(PartitionPredicate) should be rejected
2113- org .apache .paimon .predicate .PredicateBuilder ppBuilder =
2114- new org .apache .paimon .predicate .PredicateBuilder (
2115- table .schema ().logicalPartitionType ());
2116- org .apache .paimon .partition .PartitionPredicate pp =
2117- org .apache .paimon .partition .PartitionPredicate .fromPredicate (
2119+ PredicateBuilder ppBuilder = new PredicateBuilder (table .schema ().logicalPartitionType ());
2120+ PartitionPredicate pp =
2121+ PartitionPredicate .fromPredicate (
21182122 table .schema ().logicalPartitionType (),
2119- ppBuilder .equal (
2120- 0 , org .apache .paimon .data .BinaryString .fromString ("20250808" )));
2123+ ppBuilder .equal (0 , BinaryString .fromString ("20250808" )));
21212124 assertThatThrownBy (() -> scan .withPartitionFilter (pp ))
21222125 .isInstanceOf (UnsupportedOperationException .class )
21232126 .hasMessageContaining ("Partition filter is not supported" );
21242127 }
2128+
2129+ /**
2130+ * Tests that chain table streaming rejects checkpoint-align mode at job construction time, not
2131+ * at runtime. ChainSplit has no snapshotId and cannot participate in snapshot-aligned
2132+ * checkpoint grouping.
2133+ */
2134+ @ Test
2135+ public void testStreamingReadRejectsCheckpointAlign () throws Exception {
2136+ createChainTable ("chain_align" );
2137+ setupChainTableBranches ("chain_align" );
2138+
2139+ sql (
2140+ "INSERT INTO `chain_align$branch_delta` PARTITION (dt = '20250808')"
2141+ + " VALUES (1, 1, 'v1')" );
2142+
2143+ // Setting checkpoint-align.enabled on a chain table streaming read should throw
2144+ // at job construction time, not at runtime when ChainSplits are encountered.
2145+ assertThatThrownBy (
2146+ () ->
2147+ sEnv .executeSql (
2148+ "SELECT * FROM chain_align "
2149+ + "/*+ OPTIONS('source.checkpoint-align.enabled' = 'true') */" ))
2150+ .isInstanceOf (UnsupportedOperationException .class )
2151+ .hasMessageContaining (
2152+ "Chain table streaming is not compatible with checkpoint-align" );
2153+ }
2154+
2155+ /**
2156+ * Tests that primary-key predicates do NOT affect partition discovery in chain table streaming
2157+ * Phase 1. This is the scenario from JingsongLi's review comment:
2158+ *
2159+ * <p>"if the latest snapshot partition no longer has k=1 but an older delta partition still
2160+ * does, SELECT ... WHERE k=1 can make this listing miss the latest snapshot partition and then
2161+ * include the old delta row, even though that partition should be considered outdated."
2162+ *
2163+ * <p>The test creates: snapshot@20250808 with t1=1,2; snapshot@20250809 with t1=3,4 (no t1=1);
2164+ * delta@20250808 with t1=1. Then filters on t1=1 (a primary key field). Partition discovery
2165+ * must still see both snapshot partitions so the chain boundary is correct.
2166+ */
2167+ @ Test
2168+ public void testStreamingReadPKFilterDoesNotAffectPartitionDiscovery () throws Exception {
2169+ createChainTable ("chain_pk_filter" );
2170+ setupChainTableBranches ("chain_pk_filter" );
2171+
2172+ // Snapshot@20250808: has t1=1 and t1=2
2173+ sql (
2174+ "INSERT INTO `chain_pk_filter$branch_snapshot` PARTITION (dt = '20250808')"
2175+ + " VALUES (1, 1, 'v1'), (2, 1, 'v2')" );
2176+ // Snapshot@20250809: has t1=3 and t1=4 (NO t1=1)
2177+ sql (
2178+ "INSERT INTO `chain_pk_filter$branch_snapshot` PARTITION (dt = '20250809')"
2179+ + " VALUES (3, 1, 'v3'), (4, 1, 'v4')" );
2180+
2181+ // Delta@20250808: has t1=1
2182+ sql (
2183+ "INSERT INTO `chain_pk_filter$branch_delta` PARTITION (dt = '20250808')"
2184+ + " VALUES (1, 2, 'delta_v1')" );
2185+
2186+ // --- Part 1: Verify listPartitions() with a PK predicate ---
2187+ FileStoreTable mainTable = paimonTable ("chain_pk_filter" );
2188+ FileStoreTable snapshotTable =
2189+ mainTable .copy (
2190+ java .util .Collections .singletonMap (CoreOptions .BRANCH .key (), "snapshot" ));
2191+
2192+ DataTableScan scan = snapshotTable .newScan ();
2193+ PredicateBuilder builder = new PredicateBuilder (snapshotTable .rowType ());
2194+ // t1 is field index 0, part of primary key (dt, t1).
2195+ // Only snapshot@20250808 has t1=1.
2196+ Predicate t1Equals1 = builder .equal (0 , 1L );
2197+ scan .withFilter (t1Equals1 );
2198+
2199+ // listPartitions() must return BOTH snapshot partitions even though only
2200+ // 20250808 contains t1=1. If it returned only 20250808, the chain boundary
2201+ // would be wrong and stale data could be included.
2202+ List <BinaryRow > partitions = scan .listPartitions ();
2203+ assertThat (partitions )
2204+ .as (
2205+ "listPartitions() must return all snapshot partitions even with a PK filter. "
2206+ + "If only dt=20250808 is returned, the chain boundary is wrong." )
2207+ .hasSize (2 );
2208+
2209+ // --- Part 2: Verify filtered batch SELECT returns correct data ---
2210+ // Chain-merged batch view: snapshot@20250808(t1=1,2), snapshot@20250809(t1=3,4).
2211+ // WHERE t1 = 1 should return only the snapshot row (t1=1, dt=20250808).
2212+ List <String > filtered = collectResult ("SELECT * FROM chain_pk_filter WHERE t1 = 1" );
2213+ assertThat (filtered )
2214+ .as ("WHERE t1=1 should find the snapshot row at dt=20250808" )
2215+ .hasSize (1 )
2216+ .containsExactly ("+I[1, 1, v1, 20250808]" );
2217+ }
21252218}
0 commit comments