2222import org .apache .paimon .codegen .CodeGenUtils ;
2323import org .apache .paimon .codegen .RecordComparator ;
2424import org .apache .paimon .data .BinaryRow ;
25- import org .apache .paimon .io .DataFileMeta ;
2625import org .apache .paimon .manifest .PartitionEntry ;
26+ import org .apache .paimon .partition .PartitionPredicate ;
27+ import org .apache .paimon .predicate .PartitionPredicateVisitor ;
2728import org .apache .paimon .predicate .Predicate ;
2829import org .apache .paimon .table .source .ChainSplit ;
2930import org .apache .paimon .table .source .DataFilePlan ;
@@ -87,6 +88,9 @@ public class ChainTableStreamScan implements StreamDataTableScan {
8788 /** Comparator for chain partition keys only. */
8889 private final RecordComparator chainPartitionComparator ;
8990
91+ /** Partition keys of the table, used to reject partition filters in streaming mode. */
92+ private final List <String > partitionKeys ;
93+
9094 /**
9195 * Checkpoint state: the next delta snapshot id to read. Null before Phase 1 completes; non-null
9296 * once Phase 1 is done or after a stateful restore.
@@ -122,6 +126,7 @@ public ChainTableStreamScan(ChainGroupReadTable chainGroupReadTable) {
122126 this .chainPartitionComparator =
123127 CodeGenUtils .newRecordComparator (
124128 partitionProjector .chainPartitionType ().getFieldTypes ());
129+ this .partitionKeys = chainGroupReadTable .schema ().partitionKeys ();
125130 }
126131
127132 @ Override
@@ -174,45 +179,48 @@ private TableScan.Plan planStarting() {
174179 deltaSplitsByPartition = Collections .emptyMap ();
175180 }
176181
177- // 2. Read all snapshot branch data, grouped by partition.
178- // Reuse batchScan.mainScan which has predicates/shard already applied.
179- Map <BinaryRow , List <DataSplit >> snapshotSplitsByPartition =
180- chainGroupReadTable .wrapped .snapshotManager ().latestSnapshotId () != null
181- ? groupByPartition (batchScan .mainScan )
182- : Collections .emptyMap ();
183-
184- // 3. Find the latest snapshot partition per group (based on chain partition keys).
185- // Only output the latest snapshot partition and delta partitions after it.
182+ // 2. List snapshot partitions (lightweight — partition metadata only, no file I/O).
183+ // Find the latest chain partition per group, then scan only those partitions for files.
184+ // This avoids reading file manifests for hundreds of historical partitions that will be
185+ // discarded (only the latest per group is kept).
186186 Map <Object , BinaryRow > latestChainPartitionPerGroup = new HashMap <>();
187- for (BinaryRow partition : snapshotSplitsByPartition .keySet ()) {
188- Object groupKey = toGroupKey (partition );
189- BinaryRow existingLatest = latestChainPartitionPerGroup .get (groupKey );
190- if (existingLatest == null
191- || chainPartitionComparator .compare (
192- partitionProjector .extractChainPartition (partition ),
193- partitionProjector .extractChainPartition (existingLatest ))
194- > 0 ) {
195- latestChainPartitionPerGroup .put (groupKey , partition );
187+ if (chainGroupReadTable .wrapped .snapshotManager ().latestSnapshotId () != null ) {
188+ DataTableScan partitionListingScan = chainGroupReadTable .wrapped .newScan ();
189+ applyPredicatesAndShard (partitionListingScan );
190+ for (BinaryRow partition : partitionListingScan .listPartitions ()) {
191+ Object groupKey = toGroupKey (partition );
192+ BinaryRow existingLatest = latestChainPartitionPerGroup .get (groupKey );
193+ if (existingLatest == null
194+ || chainPartitionComparator .compare (
195+ partitionProjector .extractChainPartition (partition ),
196+ partitionProjector .extractChainPartition (existingLatest ))
197+ > 0 ) {
198+ latestChainPartitionPerGroup .put (groupKey , partition );
199+ }
196200 }
197201 }
198202
203+ // 3. Scan file splits for latest snapshot partitions only.
204+ List <BinaryRow > latestPartitions = new ArrayList <>(latestChainPartitionPerGroup .values ());
205+ Map <BinaryRow , List <DataSplit >> snapshotSplitsByPartition ;
206+ if (!latestPartitions .isEmpty ()) {
207+ DataTableScan snapshotScan = chainGroupReadTable .wrapped .newScan ();
208+ snapshotScan .withPartitionFilter (latestPartitions );
209+ applyPredicatesAndShard (snapshotScan );
210+ snapshotSplitsByPartition = groupByPartition (snapshotScan );
211+ } else {
212+ snapshotSplitsByPartition = Collections .emptyMap ();
213+ }
214+
199215 // 4. Build ChainSplits:
200- // - For snapshot partitions: only include if chain key == latest for that group.
201- // - For delta partitions: include if (a) chain key > latest for that group, or
216+ // - Snapshot partitions are already filtered to latest per group.
217+ // - Delta partitions: include if (a) chain key > latest for that group, or
202218 // (b) no snapshot exists for that group.
203219 List <Split > allSplits = new ArrayList <>();
204220
205221 for (Map .Entry <BinaryRow , List <DataSplit >> entry : snapshotSplitsByPartition .entrySet ()) {
206- BinaryRow partition = entry .getKey ();
207- Object groupKey = toGroupKey (partition );
208- BinaryRow latestPartition = latestChainPartitionPerGroup .get (groupKey );
209- if (chainPartitionComparator .compare (
210- partitionProjector .extractChainPartition (partition ),
211- partitionProjector .extractChainPartition (latestPartition ))
212- == 0 ) {
213- for (DataSplit ds : entry .getValue ()) {
214- allSplits .add (dataSplitToChainSplit (ds , snapshotBranch ));
215- }
222+ for (DataSplit ds : entry .getValue ()) {
223+ allSplits .add (ChainSplit .from (ds , snapshotBranch ));
216224 }
217225 }
218226
@@ -229,7 +237,7 @@ private TableScan.Plan planStarting() {
229237 partitionProjector .extractChainPartition (latestPartition ))
230238 > 0 ) {
231239 for (DataSplit ds : entry .getValue ()) {
232- allSplits .add (dataSplitToChainSplit (ds , deltaBranch ));
240+ allSplits .add (ChainSplit . from (ds , deltaBranch ));
233241 }
234242 }
235243 }
@@ -294,32 +302,56 @@ private Object toGroupKey(BinaryRow fullPartition) {
294302 return partitionProjector .extractGroupPartition (fullPartition );
295303 }
296304
297- /**
298- * Converts a {@link DataSplit} to a {@link ChainSplit} where all files belong to the given
299- * branch. The partition value is preserved as-is (no rewriting).
300- */
301- private static ChainSplit dataSplitToChainSplit (DataSplit dataSplit , String branch ) {
302- HashMap <String , String > fileBranchMapping = new HashMap <>();
303- HashMap <String , String > fileBucketPathMapping = new HashMap <>();
304- for (DataFileMeta file : dataSplit .dataFiles ()) {
305- fileBranchMapping .put (file .fileName (), branch );
306- fileBucketPathMapping .put (file .fileName (), dataSplit .bucketPath ());
307- }
308- return new ChainSplit (
309- dataSplit .partition (),
310- dataSplit .dataFiles (),
311- fileBranchMapping ,
312- fileBucketPathMapping );
313- }
314-
315305 @ Override
316306 public InnerTableScan withFilter (Predicate predicate ) {
307+ if (predicate == null ) {
308+ return this ;
309+ }
310+ if (!partitionKeys .isEmpty ()
311+ && predicate .visit (new PartitionPredicateVisitor (partitionKeys ))) {
312+ throw new UnsupportedOperationException (
313+ "Partition filter is not supported in chain table streaming read. "
314+ + "The chain table streaming scan determines which partitions to read "
315+ + "based on the chain-merge logic across snapshot and delta branches. "
316+ + "Applying a partition filter would interfere with this logic. "
317+ + "If you need to read a specific partition, use batch mode instead." );
318+ }
317319 predicates .add (predicate );
318320 batchScan .withFilter (predicate );
319321 deltaStreamScan .withFilter (predicate );
320322 return this ;
321323 }
322324
325+ @ Override
326+ public InnerTableScan withPartitionFilter (Map <String , String > partitionSpec ) {
327+ throw new UnsupportedOperationException (
328+ "Partition filter is not supported in chain table streaming read." );
329+ }
330+
331+ @ Override
332+ public InnerTableScan withPartitionFilter (List <BinaryRow > partitions ) {
333+ throw new UnsupportedOperationException (
334+ "Partition filter is not supported in chain table streaming read." );
335+ }
336+
337+ @ Override
338+ public InnerTableScan withPartitionFilter (PartitionPredicate partitionPredicate ) {
339+ if (partitionPredicate != null ) {
340+ throw new UnsupportedOperationException (
341+ "Partition filter is not supported in chain table streaming read." );
342+ }
343+ return this ;
344+ }
345+
346+ @ Override
347+ public InnerTableScan withPartitionFilter (Predicate predicate ) {
348+ if (predicate != null ) {
349+ throw new UnsupportedOperationException (
350+ "Partition filter is not supported in chain table streaming read." );
351+ }
352+ return this ;
353+ }
354+
323355 @ Override
324356 public DataTableScan withShard (int indexOfThisSubtask , int numberOfParallelSubtasks ) {
325357 shardIndex = indexOfThisSubtask ;
@@ -345,6 +377,9 @@ private void applyPredicatesAndShard(DataTableScan scan) {
345377 @ Nullable
346378 @ Override
347379 public Long checkpoint () {
380+ if (startingDone ) {
381+ return deltaStreamScan .checkpoint ();
382+ }
348383 return nextDeltaSnapshotId ;
349384 }
350385
@@ -363,6 +398,8 @@ public void restore(@Nullable Long nextSnapshotId) {
363398 if (nextSnapshotId != null ) {
364399 startingDone = true ;
365400 deltaStreamScan .restore (nextSnapshotId );
401+ } else {
402+ startingDone = false ;
366403 }
367404 }
368405
0 commit comments