Skip to content

Commit 294f80d

Browse files
Flink: Use native slot sharing group inheritance for maintenance tasks (#16329)
1 parent adfd4a4 commit 294f80d

12 files changed

Lines changed: 258 additions & 243 deletions

File tree

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java

Lines changed: 84 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -209,72 +209,73 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {
209209

210210
// Collect all data files
211211
SingleOutputStreamOperator<MetadataTablePlanner.SplitInfo> splits =
212-
trigger
213-
.process(
214-
new MetadataTablePlanner(
215-
taskName(),
216-
index(),
217-
tableLoader(),
218-
FILE_PATH_SCAN_CONTEXT,
219-
MetadataTableType.ALL_FILES,
220-
planningWorkerPoolSize))
221-
.name(operatorName(PLANNER_TASK_NAME))
222-
.uid(PLANNER_TASK_NAME + uidSuffix())
223-
.slotSharingGroup(slotSharingGroup())
224-
.forceNonParallel();
212+
setSlotSharingGroup(
213+
trigger
214+
.process(
215+
new MetadataTablePlanner(
216+
taskName(),
217+
index(),
218+
tableLoader(),
219+
FILE_PATH_SCAN_CONTEXT,
220+
MetadataTableType.ALL_FILES,
221+
planningWorkerPoolSize))
222+
.name(operatorName(PLANNER_TASK_NAME))
223+
.uid(PLANNER_TASK_NAME + uidSuffix())
224+
.forceNonParallel());
225225

226226
// Read the records and get all data files
227227
SingleOutputStreamOperator<String> tableDataFiles =
228-
splits
229-
.rebalance()
230-
.process(
231-
new FileNameReader(
232-
taskName(),
233-
index(),
234-
tableLoader(),
235-
FILE_PATH_SCHEMA,
236-
FILE_PATH_SCAN_CONTEXT,
237-
MetadataTableType.ALL_FILES))
238-
.name(operatorName(READER_TASK_NAME))
239-
.uid(READER_TASK_NAME + uidSuffix())
240-
.slotSharingGroup(slotSharingGroup())
241-
.setParallelism(parallelism());
228+
setSlotSharingGroup(
229+
splits
230+
.rebalance()
231+
.process(
232+
new FileNameReader(
233+
taskName(),
234+
index(),
235+
tableLoader(),
236+
FILE_PATH_SCHEMA,
237+
FILE_PATH_SCAN_CONTEXT,
238+
MetadataTableType.ALL_FILES))
239+
.name(operatorName(READER_TASK_NAME))
240+
.uid(READER_TASK_NAME + uidSuffix())
241+
.setParallelism(parallelism()));
242242

243243
// Collect all meta data files
244244
SingleOutputStreamOperator<String> tableMetadataFiles =
245-
trigger
246-
.process(new ListMetadataFiles(taskName(), index(), tableLoader()))
247-
.name(operatorName(METADATA_FILES_TASK_NAME))
248-
.uid(METADATA_FILES_TASK_NAME + uidSuffix())
249-
.slotSharingGroup(slotSharingGroup())
250-
.forceNonParallel();
245+
setSlotSharingGroup(
246+
trigger
247+
.process(new ListMetadataFiles(taskName(), index(), tableLoader()))
248+
.name(operatorName(METADATA_FILES_TASK_NAME))
249+
.uid(METADATA_FILES_TASK_NAME + uidSuffix())
250+
.forceNonParallel());
251251

252252
// List the all file system files
253253
SingleOutputStreamOperator<String> allFsFiles =
254-
trigger
255-
.process(
256-
new ListFileSystemFiles(
257-
taskName(),
258-
index(),
259-
tableLoader(),
260-
location,
261-
minAge.toMillis(),
262-
usePrefixListing))
263-
.name(operatorName(FILESYSTEM_FILES_TASK_NAME))
264-
.uid(FILESYSTEM_FILES_TASK_NAME + uidSuffix())
265-
.slotSharingGroup(slotSharingGroup())
266-
.forceNonParallel();
254+
setSlotSharingGroup(
255+
trigger
256+
.process(
257+
new ListFileSystemFiles(
258+
taskName(),
259+
index(),
260+
tableLoader(),
261+
location,
262+
minAge.toMillis(),
263+
usePrefixListing))
264+
.name(operatorName(FILESYSTEM_FILES_TASK_NAME))
265+
.uid(FILESYSTEM_FILES_TASK_NAME + uidSuffix())
266+
.forceNonParallel());
267267

268268
SingleOutputStreamOperator<String> filesToDelete =
269-
tableMetadataFiles
270-
.union(tableDataFiles)
271-
.keyBy(new FileUriKeySelector(equalSchemes, equalAuthorities))
272-
.connect(allFsFiles.keyBy(new FileUriKeySelector(equalSchemes, equalAuthorities)))
273-
.process(new OrphanFilesDetector(prefixMismatchMode, equalSchemes, equalAuthorities))
274-
.slotSharingGroup(slotSharingGroup())
275-
.name(operatorName(FILTER_FILES_TASK_NAME))
276-
.uid(FILTER_FILES_TASK_NAME + uidSuffix())
277-
.setParallelism(parallelism());
269+
setSlotSharingGroup(
270+
tableMetadataFiles
271+
.union(tableDataFiles)
272+
.keyBy(new FileUriKeySelector(equalSchemes, equalAuthorities))
273+
.connect(allFsFiles.keyBy(new FileUriKeySelector(equalSchemes, equalAuthorities)))
274+
.process(
275+
new OrphanFilesDetector(prefixMismatchMode, equalSchemes, equalAuthorities))
276+
.name(operatorName(FILTER_FILES_TASK_NAME))
277+
.uid(FILTER_FILES_TASK_NAME + uidSuffix())
278+
.setParallelism(parallelism()));
278279

279280
DataStream<Exception> errorStream =
280281
tableMetadataFiles
@@ -287,38 +288,38 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {
287288

288289
// Stop deleting the files if there is an error
289290
SingleOutputStreamOperator<String> filesOrSkip =
290-
filesToDelete
291-
.connect(errorStream)
292-
.transform(
293-
operatorName(SKIP_ON_ERROR_TASK_NAME),
294-
TypeInformation.of(String.class),
295-
new SkipOnError())
296-
.uid(SKIP_ON_ERROR_TASK_NAME + uidSuffix())
297-
.slotSharingGroup(slotSharingGroup())
298-
.forceNonParallel();
291+
setSlotSharingGroup(
292+
filesToDelete
293+
.connect(errorStream)
294+
.transform(
295+
operatorName(SKIP_ON_ERROR_TASK_NAME),
296+
TypeInformation.of(String.class),
297+
new SkipOnError())
298+
.uid(SKIP_ON_ERROR_TASK_NAME + uidSuffix())
299+
.forceNonParallel());
299300

300301
// delete the files
301-
filesOrSkip
302-
.rebalance()
303-
.transform(
304-
operatorName(DELETE_FILES_TASK_NAME),
305-
TypeInformation.of(Void.class),
306-
new DeleteFilesProcessor(
307-
tableLoader().loadTable(), taskName(), index(), deleteBatchSize))
308-
.uid(DELETE_FILES_TASK_NAME + uidSuffix())
309-
.slotSharingGroup(slotSharingGroup())
310-
.setParallelism(parallelism());
302+
setSlotSharingGroup(
303+
filesOrSkip
304+
.rebalance()
305+
.transform(
306+
operatorName(DELETE_FILES_TASK_NAME),
307+
TypeInformation.of(Void.class),
308+
new DeleteFilesProcessor(
309+
tableLoader().loadTable(), taskName(), index(), deleteBatchSize))
310+
.uid(DELETE_FILES_TASK_NAME + uidSuffix())
311+
.setParallelism(parallelism()));
311312

312313
// Ignore the file deletion result and return the DataStream<TaskResult> directly
313-
return trigger
314-
.connect(errorStream)
315-
.transform(
316-
operatorName(AGGREGATOR_TASK_NAME),
317-
TypeInformation.of(TaskResult.class),
318-
new TaskResultAggregator(tableName(), taskName(), index()))
319-
.uid(AGGREGATOR_TASK_NAME + uidSuffix())
320-
.slotSharingGroup(slotSharingGroup())
321-
.forceNonParallel();
314+
return setSlotSharingGroup(
315+
trigger
316+
.connect(errorStream)
317+
.transform(
318+
operatorName(AGGREGATOR_TASK_NAME),
319+
TypeInformation.of(TaskResult.class),
320+
new TaskResultAggregator(tableName(), taskName(), index()))
321+
.uid(AGGREGATOR_TASK_NAME + uidSuffix())
322+
.forceNonParallel());
322323
}
323324
}
324325

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -126,30 +126,30 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {
126126
Preconditions.checkNotNull(tableLoader(), "TableLoader should not be null");
127127

128128
SingleOutputStreamOperator<TaskResult> result =
129-
trigger
130-
.process(
131-
new ExpireSnapshotsProcessor(
132-
tableLoader(),
133-
maxSnapshotAge == null ? null : maxSnapshotAge.toMillis(),
134-
numSnapshots,
135-
planningWorkerPoolSize,
136-
cleanExpiredMetadata))
137-
.name(operatorName(EXECUTOR_OPERATOR_NAME))
138-
.uid(EXECUTOR_OPERATOR_NAME + uidSuffix())
139-
.slotSharingGroup(slotSharingGroup())
140-
.forceNonParallel();
141-
142-
result
143-
.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM)
144-
.rebalance()
145-
.transform(
146-
operatorName(DELETE_FILES_OPERATOR_NAME),
147-
TypeInformation.of(Void.class),
148-
new DeleteFilesProcessor(
149-
tableLoader().loadTable(), taskName(), index(), deleteBatchSize))
150-
.uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
151-
.slotSharingGroup(slotSharingGroup())
152-
.setParallelism(parallelism());
129+
setSlotSharingGroup(
130+
trigger
131+
.process(
132+
new ExpireSnapshotsProcessor(
133+
tableLoader(),
134+
maxSnapshotAge == null ? null : maxSnapshotAge.toMillis(),
135+
numSnapshots,
136+
planningWorkerPoolSize,
137+
cleanExpiredMetadata))
138+
.name(operatorName(EXECUTOR_OPERATOR_NAME))
139+
.uid(EXECUTOR_OPERATOR_NAME + uidSuffix())
140+
.forceNonParallel());
141+
142+
setSlotSharingGroup(
143+
result
144+
.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM)
145+
.rebalance()
146+
.transform(
147+
operatorName(DELETE_FILES_OPERATOR_NAME),
148+
TypeInformation.of(Void.class),
149+
new DeleteFilesProcessor(
150+
tableLoader().loadTable(), taskName(), index(), deleteBatchSize))
151+
.uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
152+
.setParallelism(parallelism()));
153153

154154
// Ignore the file deletion result and return the DataStream<TaskResult> directly
155155
return result;

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.configuration.ConfigOption;
2424
import org.apache.flink.configuration.ConfigOptions;
2525
import org.apache.flink.configuration.ReadableConfig;
26-
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
2726
import org.apache.iceberg.Table;
2827
import org.apache.iceberg.flink.FlinkConfParser;
2928

@@ -60,7 +59,7 @@ public class FlinkMaintenanceConfig {
6059
public static final ConfigOption<String> SLOT_SHARING_GROUP_OPTION =
6160
ConfigOptions.key(SLOT_SHARING_GROUP)
6261
.stringType()
63-
.defaultValue(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
62+
.noDefaultValue()
6463
.withDescription(
6564
"The slot sharing group for maintenance tasks. "
6665
+ "Determines which operators can share slots in the Flink execution environment.");
@@ -114,8 +113,7 @@ public String slotSharingGroup() {
114113
.stringConf()
115114
.option(SLOT_SHARING_GROUP)
116115
.flinkConfig(SLOT_SHARING_GROUP_OPTION)
117-
.defaultValue(SLOT_SHARING_GROUP_OPTION.defaultValue())
118-
.parse();
116+
.parseOptional();
119117
}
120118

121119
public RewriteDataFilesConfig createRewriteDataFilesConfig() {

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,4 +226,8 @@ DataStream<TaskResult> append(
226226

227227
return append(sourceStream);
228228
}
229+
230+
<O> SingleOutputStreamOperator<O> setSlotSharingGroup(SingleOutputStreamOperator<O> operator) {
231+
return slotSharingGroup == null ? operator : operator.slotSharingGroup(slotSharingGroup);
232+
}
229233
}

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java

Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -290,58 +290,58 @@ public Builder config(RewriteDataFilesConfig rewriteDataFilesConfig) {
290290
@Override
291291
DataStream<TaskResult> append(DataStream<Trigger> trigger) {
292292
SingleOutputStreamOperator<DataFileRewritePlanner.PlannedGroup> planned =
293-
trigger
294-
.process(
295-
new DataFileRewritePlanner(
296-
tableName(),
297-
taskName(),
298-
index(),
299-
tableLoader(),
300-
partialProgressEnabled ? partialProgressMaxCommits : 1,
301-
maxRewriteBytes,
302-
rewriteOptions,
303-
filterSupplier,
304-
branch))
305-
.name(operatorName(PLANNER_TASK_NAME))
306-
.uid(PLANNER_TASK_NAME + uidSuffix())
307-
.slotSharingGroup(slotSharingGroup())
308-
.forceNonParallel();
293+
setSlotSharingGroup(
294+
trigger
295+
.process(
296+
new DataFileRewritePlanner(
297+
tableName(),
298+
taskName(),
299+
index(),
300+
tableLoader(),
301+
partialProgressEnabled ? partialProgressMaxCommits : 1,
302+
maxRewriteBytes,
303+
rewriteOptions,
304+
filterSupplier,
305+
branch))
306+
.name(operatorName(PLANNER_TASK_NAME))
307+
.uid(PLANNER_TASK_NAME + uidSuffix())
308+
.forceNonParallel());
309309

310310
SingleOutputStreamOperator<DataFileRewriteRunner.ExecutedGroup> rewritten =
311-
planned
312-
.rebalance()
313-
.process(new DataFileRewriteRunner(tableName(), taskName(), index()))
314-
.name(operatorName(REWRITE_TASK_NAME))
315-
.uid(REWRITE_TASK_NAME + uidSuffix())
316-
.slotSharingGroup(slotSharingGroup())
317-
.setParallelism(parallelism());
311+
setSlotSharingGroup(
312+
planned
313+
.rebalance()
314+
.process(new DataFileRewriteRunner(tableName(), taskName(), index()))
315+
.name(operatorName(REWRITE_TASK_NAME))
316+
.uid(REWRITE_TASK_NAME + uidSuffix())
317+
.setParallelism(parallelism()));
318318

319319
SingleOutputStreamOperator<Trigger> updated =
320-
rewritten
321-
.transform(
322-
operatorName(COMMIT_TASK_NAME),
323-
TypeInformation.of(Trigger.class),
324-
new DataFileRewriteCommitter(
325-
tableName(), taskName(), index(), tableLoader(), branch))
326-
.uid(COMMIT_TASK_NAME + uidSuffix())
327-
.slotSharingGroup(slotSharingGroup())
328-
.forceNonParallel();
320+
setSlotSharingGroup(
321+
rewritten
322+
.transform(
323+
operatorName(COMMIT_TASK_NAME),
324+
TypeInformation.of(Trigger.class),
325+
new DataFileRewriteCommitter(
326+
tableName(), taskName(), index(), tableLoader(), branch))
327+
.uid(COMMIT_TASK_NAME + uidSuffix())
328+
.forceNonParallel());
329329

330-
return trigger
331-
.union(updated)
332-
.connect(
333-
planned
334-
.getSideOutput(TaskResultAggregator.ERROR_STREAM)
335-
.union(
336-
rewritten.getSideOutput(TaskResultAggregator.ERROR_STREAM),
337-
updated.getSideOutput(TaskResultAggregator.ERROR_STREAM)))
338-
.transform(
339-
operatorName(AGGREGATOR_TASK_NAME),
340-
TypeInformation.of(TaskResult.class),
341-
new TaskResultAggregator(tableName(), taskName(), index()))
342-
.uid(AGGREGATOR_TASK_NAME + uidSuffix())
343-
.slotSharingGroup(slotSharingGroup())
344-
.forceNonParallel();
330+
return setSlotSharingGroup(
331+
trigger
332+
.union(updated)
333+
.connect(
334+
planned
335+
.getSideOutput(TaskResultAggregator.ERROR_STREAM)
336+
.union(
337+
rewritten.getSideOutput(TaskResultAggregator.ERROR_STREAM),
338+
updated.getSideOutput(TaskResultAggregator.ERROR_STREAM)))
339+
.transform(
340+
operatorName(AGGREGATOR_TASK_NAME),
341+
TypeInformation.of(TaskResult.class),
342+
new TaskResultAggregator(tableName(), taskName(), index()))
343+
.uid(AGGREGATOR_TASK_NAME + uidSuffix())
344+
.forceNonParallel());
345345
}
346346
}
347347
}

0 commit comments

Comments
 (0)