8888public class GenericIndexTopoBuilder {
8989
9090 private static final Logger LOG = LoggerFactory .getLogger (GenericIndexTopoBuilder .class );
91+ public static final long NO_MAX_INDEXED_ROW_ID = -1L ;
9192
9293 public static void buildIndexAndExecute (
9394 StreamExecutionEnvironment env ,
@@ -97,6 +98,25 @@ public static void buildIndexAndExecute(
9798 PartitionPredicate partitionPredicate ,
9899 Options userOptions )
99100 throws Exception {
101+ buildIndexAndExecute (
102+ env ,
103+ table ,
104+ indexColumn ,
105+ indexType ,
106+ partitionPredicate ,
107+ userOptions ,
108+ NO_MAX_INDEXED_ROW_ID );
109+ }
110+
111+ public static void buildIndexAndExecute (
112+ StreamExecutionEnvironment env ,
113+ FileStoreTable table ,
114+ String indexColumn ,
115+ String indexType ,
116+ PartitionPredicate partitionPredicate ,
117+ Options userOptions ,
118+ long maxIndexedRowId )
119+ throws Exception {
100120 boolean hasIndexToBuild =
101121 buildIndex (
102122 env ,
@@ -105,14 +125,35 @@ public static void buildIndexAndExecute(
105125 indexColumn ,
106126 indexType ,
107127 partitionPredicate ,
108- userOptions );
128+ userOptions ,
129+ maxIndexedRowId );
109130 if (hasIndexToBuild ) {
110131 env .execute ("Create " + indexType + " global index for table: " + table .name ());
111132 } else {
112133 LOG .info ("No index to build, nothing to do." );
113134 }
114135 }
115136
137+ public static boolean buildIndex (
138+ StreamExecutionEnvironment env ,
139+ Supplier <GenericGlobalIndexBuilder > indexBuilderSupplier ,
140+ FileStoreTable table ,
141+ String indexColumn ,
142+ String indexType ,
143+ PartitionPredicate partitionPredicate ,
144+ Options userOptions )
145+ throws Exception {
146+ return buildIndex (
147+ env ,
148+ indexBuilderSupplier ,
149+ table ,
150+ indexColumn ,
151+ indexType ,
152+ partitionPredicate ,
153+ userOptions ,
154+ NO_MAX_INDEXED_ROW_ID );
155+ }
156+
116157 /**
117158 * Builds a generic global index topology using a {@link GenericGlobalIndexBuilder} supplier.
118159 *
@@ -126,7 +167,8 @@ public static boolean buildIndex(
126167 String indexColumn ,
127168 String indexType ,
128169 PartitionPredicate partitionPredicate ,
129- Options userOptions )
170+ Options userOptions ,
171+ long maxIndexedRowId )
130172 throws Exception {
131173 GenericGlobalIndexBuilder indexBuilder = indexBuilderSupplier .get ();
132174 if (partitionPredicate != null ) {
@@ -135,14 +177,47 @@ public static boolean buildIndex(
135177
136178 List <ManifestEntry > entries = indexBuilder .scan ();
137179 List <IndexManifestEntry > deletedIndexEntries = indexBuilder .deletedIndexEntries ();
180+
181+ return buildTopology (
182+ env ,
183+ table ,
184+ indexColumn ,
185+ indexType ,
186+ userOptions ,
187+ entries ,
188+ deletedIndexEntries ,
189+ maxIndexedRowId );
190+ }
191+
192+ /**
193+ * Builds the Flink topology for global index creation from pre-scanned entries. Supports both
194+ * full builds ({@code maxIndexedRowId = NO_MAX_INDEXED_ROW_ID}) and incremental builds where
195+ * rows up to {@code maxIndexedRowId} are skipped.
196+ *
197+ * @param maxIndexedRowId the maximum row ID already indexed; use {@link #NO_MAX_INDEXED_ROW_ID}
198+ * for a full build
199+ * @return {@code true} if a Flink topology was built, {@code false} if nothing to index
200+ */
201+ private static boolean buildTopology (
202+ StreamExecutionEnvironment env ,
203+ FileStoreTable table ,
204+ String indexColumn ,
205+ String indexType ,
206+ Options userOptions ,
207+ List <ManifestEntry > entries ,
208+ List <IndexManifestEntry > deletedIndexEntries ,
209+ long maxIndexedRowId )
210+ throws Exception {
138211 long totalRowCount = entries .stream ().mapToLong (e -> e .file ().rowCount ()).sum ();
139212 LOG .info (
140- "Scanned {} files ({} rows) across {} partitions for {} index on column '{}'." ,
213+ "Scanned {} files ({} rows) across {} partitions for {} index on column '{}'"
214+ + (maxIndexedRowId >= 0 ? ", maxIndexedRowId={}." : "." ),
141215 entries .size (),
142216 totalRowCount ,
143217 entries .stream ().map (ManifestEntry ::partition ).distinct ().count (),
144218 indexType ,
145- indexColumn );
219+ indexColumn ,
220+ maxIndexedRowId );
146221
147222 RowType rowType = table .rowType ();
148223 DataField indexField = rowType .getField (indexColumn );
@@ -160,7 +235,8 @@ public static boolean buildIndex(
160235 "Option 'global-index.row-count-per-shard' must be greater than 0." );
161236
162237 // Compute shard tasks at file level from the provided entries
163- List <ShardTask > shardTasks = computeShardTasks (table , entries , rowsPerShard );
238+ List <ShardTask > shardTasks =
239+ computeShardTasks (table , entries , rowsPerShard , maxIndexedRowId );
164240 if (shardTasks .isEmpty ()) {
165241 LOG .info ("No shard tasks generated, nothing to index." );
166242 return false ;
@@ -216,13 +292,33 @@ public static boolean buildIndex(
216292 return true ;
217293 }
218294
295+ /**
296+ * Compute shard tasks for a full build (no rows to skip).
297+ *
298+ * @see #computeShardTasks(FileStoreTable, List, long, long)
299+ */
300+ static List <ShardTask > computeShardTasks (
301+ FileStoreTable table , List <ManifestEntry > entries , long rowsPerShard ) {
302+ return computeShardTasks (table , entries , rowsPerShard , NO_MAX_INDEXED_ROW_ID );
303+ }
304+
219305 /**
220306 * Compute shard tasks at file level from the given manifest entries. Each shard only contains
221307 * the files whose row ID ranges overlap with its shard range. A file spanning multiple shard
222308 * boundaries is included in each overlapping shard.
309+ *
310+ * <p>When {@code maxIndexedRowId >= 0}, each shard's effective start is advanced past {@code
311+ * maxIndexedRowId}, skipping fully-indexed shards entirely. This enables incremental index
312+ * building where only new (un-indexed) rows are processed.
313+ *
314+ * @param maxIndexedRowId the maximum row ID already indexed; use {@link #NO_MAX_INDEXED_ROW_ID}
315+ * for a full build
223316 */
224317 static List <ShardTask > computeShardTasks (
225- FileStoreTable table , List <ManifestEntry > entries , long rowsPerShard ) {
318+ FileStoreTable table ,
319+ List <ManifestEntry > entries ,
320+ long rowsPerShard ,
321+ long maxIndexedRowId ) {
226322 // Group by partition (bucket is always 0 for unaware-bucket tables)
227323 Map <BinaryRow , List <ManifestEntry >> entriesByPartition =
228324 entries .stream ().collect (Collectors .groupingBy (ManifestEntry ::partition ));
@@ -266,6 +362,15 @@ static List<ShardTask> computeShardTasks(
266362 continue ;
267363 }
268364
365+ // For incremental builds, advance past already-indexed rows
366+ long effectiveStart =
367+ maxIndexedRowId >= 0
368+ ? Math .max (shardStart , maxIndexedRowId + 1 )
369+ : shardStart ;
370+ if (effectiveStart > shardEnd ) {
371+ continue ; // entire shard already indexed
372+ }
373+
269374 shardFiles .sort (Comparator .comparingLong (DataFileMeta ::nonNullFirstRowId ));
270375
271376 // Group contiguous files; gaps produce separate tasks
@@ -286,7 +391,7 @@ static List<ShardTask> computeShardTasks(
286391 tasks .add (
287392 createShardTask (
288393 currentGroup ,
289- shardStart ,
394+ effectiveStart ,
290395 shardEnd ,
291396 partition ,
292397 partBucketPath ));
@@ -298,7 +403,11 @@ static List<ShardTask> computeShardTasks(
298403 if (!currentGroup .isEmpty ()) {
299404 tasks .add (
300405 createShardTask (
301- currentGroup , shardStart , shardEnd , partition , partBucketPath ));
406+ currentGroup ,
407+ effectiveStart ,
408+ shardEnd ,
409+ partition ,
410+ partBucketPath ));
302411 }
303412 }
304413 }
@@ -307,16 +416,16 @@ static List<ShardTask> computeShardTasks(
307416
308417 private static ShardTask createShardTask (
309418 List <DataFileMeta > files ,
310- long shardStart ,
419+ long effectiveStart ,
311420 long shardEnd ,
312421 BinaryRow partition ,
313422 String bucketPath ) {
314423 long groupMinRowId = files .get (0 ).nonNullFirstRowId ();
315424 long groupMaxRowId =
316425 files .stream ().mapToLong (f -> f .nonNullRowIdRange ().to ).max ().getAsLong ();
317426
318- // Clamp to shard boundaries
319- long rangeFrom = Math .max (groupMinRowId , shardStart );
427+ // Clamp to effective boundaries
428+ long rangeFrom = Math .max (groupMinRowId , effectiveStart );
320429 long rangeTo = Math .min (groupMaxRowId , shardEnd );
321430
322431 DataSplit dataSplit =
0 commit comments