4747import java .util .List ;
4848import java .util .Map ;
4949import java .util .Optional ;
50+ import java .util .TreeMap ;
5051
5152import static java .util .stream .Collectors .toList ;
5253import static org .apache .flink .runtime .checkpoint .CheckpointIDCounter .INITIAL_CHECKPOINT_ID ;
@@ -62,6 +63,15 @@ public class IcebergCommitter implements Committer<WriteResultWrapper> {
6263
6364 public static final String TABLE_GROUP_KEY = "table" ;
6465
66+ // Use a flink-cdc. prefix so these don't clash with the flink. namespace reserved by the
67+ // Iceberg Flink connector.
68+
69+ /** Snapshot summary key for the batch index; used to resume partial commits on retry. */
70+ static final String FLINK_BATCH_INDEX = "flink-cdc.batch-index" ;
71+
72+ /** Snapshot summary key for the checkpoint ID on intermediate batch commits. */
73+ static final String FLINK_CHECKPOINT_ID_PROP = "flink-cdc.checkpoint-id" ;
74+
6575 private final Catalog catalog ;
6676
6777 private final SinkCommitterMetricGroup metricGroup ;
@@ -96,74 +106,140 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
96106 if (writeResultWrappers .isEmpty ()) {
97107 return ;
98108 }
99- // all commits a same checkpoint-id
100109 long checkpointId = writeResultWrappers .get (0 ).getCheckpointId ();
101110 String newFlinkJobId = writeResultWrappers .get (0 ).getJobId ();
102111 String operatorId = writeResultWrappers .get (0 ).getOperatorId ();
103112
104- Map <TableId , List <WriteResult >> tableMap = new HashMap <>();
105- for (WriteResultWrapper writeResultWrapper : writeResultWrappers ) {
106- List <WriteResult > writeResult =
107- tableMap .getOrDefault (writeResultWrapper .getTableId (), new ArrayList <>());
108- writeResult .add (writeResultWrapper .getWriteResult ());
109- tableMap .put (writeResultWrapper .getTableId (), writeResult );
110- LOGGER .info (writeResultWrapper .buildDescription ());
113+ Map <TableId , List <WriteResultWrapper >> tableMap = new HashMap <>();
114+ for (WriteResultWrapper w : writeResultWrappers ) {
115+ tableMap .computeIfAbsent (w .getTableId (), k -> new ArrayList <>()).add (w );
111116 }
112- for (Map .Entry <TableId , List <WriteResult >> entry : tableMap .entrySet ()) {
117+
118+ for (Map .Entry <TableId , List <WriteResultWrapper >> entry : tableMap .entrySet ()) {
113119 TableId tableId = entry .getKey ();
114120
121+ // Group by batchIndex so wrappers from different subtasks for the same batch
122+ // are merged into one snapshot, not committed separately.
123+ TreeMap <Integer , List <WriteResultWrapper >> batchGroups = new TreeMap <>();
124+ for (WriteResultWrapper w : entry .getValue ()) {
125+ batchGroups .computeIfAbsent (w .getBatchIndex (), k -> new ArrayList <>()).add (w );
126+ LOGGER .info (w .buildDescription ());
127+ }
128+
115129 Table table =
116130 catalog .loadTable (
117131 TableIdentifier .of (tableId .getSchemaName (), tableId .getTableName ()));
118132
133+ int startBatchIndex = 0 ;
119134 Snapshot snapshot = table .currentSnapshot ();
120135 if (snapshot != null ) {
121136 Iterable <Snapshot > ancestors =
122137 SnapshotUtil .ancestorsOf (snapshot .snapshotId (), table ::snapshot );
123- long lastCheckpointId =
138+ long lastCommittedCheckpointId =
124139 getMaxCommittedCheckpointId (ancestors , newFlinkJobId , operatorId );
125- if (lastCheckpointId = = checkpointId ) {
140+ if (lastCommittedCheckpointId > = checkpointId ) {
126141 LOGGER .warn (
127142 "Checkpoint id {} has been committed to table {}, skipping" ,
128143 checkpointId ,
129144 tableId .identifier ());
130145 continue ;
131146 }
147+ ancestors = SnapshotUtil .ancestorsOf (snapshot .snapshotId (), table ::snapshot );
148+ startBatchIndex =
149+ getLastCommittedBatchIndex (
150+ ancestors , newFlinkJobId , operatorId , checkpointId )
151+ + 1 ;
132152 }
133153
134154 Optional <TableMetric > tableMetric = getTableMetric (tableId );
135155 tableMetric .ifPresent (TableMetric ::increaseCommitTimes );
136156
137- List <WriteResult > results = entry .getValue ();
138- List <DataFile > dataFiles =
139- results .stream ()
140- .filter (payload -> payload .dataFiles () != null )
141- .flatMap (payload -> Arrays .stream (payload .dataFiles ()))
142- .filter (dataFile -> dataFile .recordCount () > 0 )
143- .collect (toList ());
144- List <DeleteFile > deleteFiles =
145- results .stream ()
146- .filter (payload -> payload .deleteFiles () != null )
147- .flatMap (payload -> Arrays .stream (payload .deleteFiles ()))
148- .filter (deleteFile -> deleteFile .recordCount () > 0 )
149- .collect (toList ());
150- if (dataFiles .isEmpty () && deleteFiles .isEmpty ()) {
151- LOGGER .info (String .format ("Nothing to commit to table %s, skipping" , table .name ()));
152- } else {
157+ int lastNonEmptyBatchIndex = -1 ;
158+ for (Map .Entry <Integer , List <WriteResultWrapper >> g : batchGroups .entrySet ()) {
159+ List <DataFile > df = collectDataFilesFromGroup (g .getValue ());
160+ List <DeleteFile > del = collectDeleteFilesFromGroup (g .getValue ());
161+ if (!df .isEmpty () || !del .isEmpty ()) {
162+ lastNonEmptyBatchIndex = g .getKey ();
163+ }
164+ }
165+
166+ // Commit each batch as a separate snapshot so sequence numbers increase per batch.
167+ for (Map .Entry <Integer , List <WriteResultWrapper >> g : batchGroups .entrySet ()) {
168+ int batchIdx = g .getKey ();
169+ if (batchIdx < startBatchIndex ) {
170+ LOGGER .info (
171+ "Batch {} for checkpoint {} of table {} already committed, skipping" ,
172+ batchIdx ,
173+ checkpointId ,
174+ tableId .identifier ());
175+ continue ;
176+ }
177+
178+ List <DataFile > dataFiles = collectDataFilesFromGroup (g .getValue ());
179+ List <DeleteFile > deleteFiles = collectDeleteFilesFromGroup (g .getValue ());
180+
181+ if (dataFiles .isEmpty () && deleteFiles .isEmpty ()) {
182+ LOGGER .info (
183+ "Batch {} for checkpoint {} of table {} has nothing to commit, skipping" ,
184+ batchIdx ,
185+ checkpointId ,
186+ tableId .identifier ());
187+ continue ;
188+ }
189+
190+ SnapshotUpdate <?> operation ;
153191 if (deleteFiles .isEmpty ()) {
154192 AppendFiles append = table .newAppend ();
155193 dataFiles .forEach (append ::appendFile );
156- commitOperation ( append , newFlinkJobId , operatorId , checkpointId ) ;
194+ operation = append ;
157195 } else {
158196 RowDelta delta = table .newRowDelta ();
159197 dataFiles .forEach (delta ::addRows );
160198 deleteFiles .forEach (delta ::addDeletes );
161- commitOperation ( delta , newFlinkJobId , operatorId , checkpointId ) ;
199+ operation = delta ;
162200 }
201+
202+ operation .set (SinkUtil .FLINK_JOB_ID , newFlinkJobId );
203+ operation .set (SinkUtil .OPERATOR_ID , operatorId );
204+ operation .set (FLINK_BATCH_INDEX , String .valueOf (batchIdx ));
205+ operation .set (FLINK_CHECKPOINT_ID_PROP , String .valueOf (checkpointId ));
206+ if (batchIdx == lastNonEmptyBatchIndex ) {
207+ operation .set (
208+ SinkUtil .MAX_COMMITTED_CHECKPOINT_ID , String .valueOf (checkpointId ));
209+ }
210+ operation .commit ();
163211 }
164212 }
165213 }
166214
215+ private static List <DataFile > collectDataFilesFromGroup (List <WriteResultWrapper > group ) {
216+ return group .stream ()
217+ .flatMap (w -> collectDataFiles (w .getWriteResult ()).stream ())
218+ .collect (toList ());
219+ }
220+
221+ private static List <DeleteFile > collectDeleteFilesFromGroup (List <WriteResultWrapper > group ) {
222+ return group .stream ()
223+ .flatMap (w -> collectDeleteFiles (w .getWriteResult ()).stream ())
224+ .collect (toList ());
225+ }
226+
227+ private static List <DataFile > collectDataFiles (WriteResult result ) {
228+ if (result .dataFiles () == null ) {
229+ return new ArrayList <>();
230+ }
231+ return Arrays .stream (result .dataFiles ()).filter (f -> f .recordCount () > 0 ).collect (toList ());
232+ }
233+
234+ private static List <DeleteFile > collectDeleteFiles (WriteResult result ) {
235+ if (result .deleteFiles () == null ) {
236+ return new ArrayList <>();
237+ }
238+ return Arrays .stream (result .deleteFiles ())
239+ .filter (f -> f .recordCount () > 0 )
240+ .collect (toList ());
241+ }
242+
167243 private static long getMaxCommittedCheckpointId (
168244 Iterable <Snapshot > ancestors , String flinkJobId , String operatorId ) {
169245 long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID - 1 ;
@@ -185,15 +261,35 @@ private static long getMaxCommittedCheckpointId(
185261 return lastCommittedCheckpointId ;
186262 }
187263
188- private static void commitOperation (
189- SnapshotUpdate <?> operation ,
190- String newFlinkJobId ,
191- String operatorId ,
192- long checkpointId ) {
193- operation .set (SinkUtil .MAX_COMMITTED_CHECKPOINT_ID , Long .toString (checkpointId ));
194- operation .set (SinkUtil .FLINK_JOB_ID , newFlinkJobId );
195- operation .set (SinkUtil .OPERATOR_ID , operatorId );
196- operation .commit ();
264+ /**
265+ * Returns the highest batch index already committed for the given checkpoint, or -1 if none.
266+ * Used to skip already-persisted batches on retry.
267+ */
268+ private static int getLastCommittedBatchIndex (
269+ Iterable <Snapshot > ancestors , String flinkJobId , String operatorId , long checkpointId ) {
270+ for (Snapshot ancestor : ancestors ) {
271+ Map <String , String > summary = ancestor .summary ();
272+ if (!flinkJobId .equals (summary .get (SinkUtil .FLINK_JOB_ID ))) {
273+ continue ;
274+ }
275+ String snapshotOperatorId = summary .get (SinkUtil .OPERATOR_ID );
276+ if (snapshotOperatorId != null && !snapshotOperatorId .equals (operatorId )) {
277+ continue ;
278+ }
279+ // Stop once we pass a fully-committed earlier checkpoint; intermediate batch
280+ // snapshots for the current checkpoint lie between it and the current tip.
281+ String maxCommittedStr = summary .get (SinkUtil .MAX_COMMITTED_CHECKPOINT_ID );
282+ if (maxCommittedStr != null && Long .parseLong (maxCommittedStr ) < checkpointId ) {
283+ break ;
284+ }
285+ String snapshotCheckpointId = summary .get (FLINK_CHECKPOINT_ID_PROP );
286+ if (snapshotCheckpointId != null
287+ && Long .parseLong (snapshotCheckpointId ) == checkpointId ) {
288+ String batchIndexStr = summary .get (FLINK_BATCH_INDEX );
289+ return batchIndexStr != null ? Integer .parseInt (batchIndexStr ) : 0 ;
290+ }
291+ }
292+ return -1 ;
197293 }
198294
199295 private Optional <TableMetric > getTableMetric (TableId tableId ) {
0 commit comments