Skip to content

Commit b1e7fd1

Browse files
LsomeYeahJingsongLi
authored andcommitted
[flink] support performing incremental clustering by flink (#6395)
1 parent f325c44 commit b1e7fd1

8 files changed

Lines changed: 896 additions & 12 deletions

File tree

docs/content/append-table/incremental-clustering.md

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,14 @@ clustering and small-file merging must be performed exclusively via Incremental
9595
## Run Incremental Clustering
9696
{{< hint info >}}
9797

98-
Currently, only support running Incremental Clustering in spark, support for flink will be added in the near future.
98+
only support running Incremental Clustering in batch mode.
9999

100100
{{< /hint >}}
101101

102-
To run a Incremental Clustering job, follow these instructions.
102+
To run a Incremental Clustering job, follow these instructions.
103+
104+
You don’t need to specify any clustering-related parameters when running Incremental Clustering,
105+
these options are already defined as table options. If you need to change clustering settings, please update the corresponding table options.
103106

104107
{{< tabs "incremental-clustering" >}}
105108

@@ -117,8 +120,46 @@ CALL sys.compact(table => 'T')
117120
-- run incremental clustering with full mode, this will recluster all data
118121
CALL sys.compact(table => 'T', compact_strategy => 'full')
119122
```
120-
You don’t need to specify any clustering-related parameters when running Incremental Clustering,
121-
these are already defined as table options. If you need to change clustering settings, please update the corresponding table options.
123+
{{< /tab >}}
124+
125+
{{< tab "Flink Action" >}}
126+
127+
Run the following command to submit a incremental clustering job for the table.
128+
129+
```bash
130+
<FLINK_HOME>/bin/flink run \
131+
/path/to/paimon-flink-action-{{< version >}}.jar \
132+
compact \
133+
--warehouse <warehouse-path> \
134+
--database <database-name> \
135+
--table <table-name> \
136+
[--compact_strategy <minor / full>] \
137+
[--table_conf <table_conf>] \
138+
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
139+
```
140+
141+
Example: run incremental clustering
142+
143+
```bash
144+
<FLINK_HOME>/bin/flink run \
145+
/path/to/paimon-flink-action-{{< version >}}.jar \
146+
compact \
147+
--warehouse s3:///path/to/warehouse \
148+
--database test_db \
149+
--table test_table \
150+
--table_conf sink.parallelism=2 \
151+
--compact_strategy minor \
152+
--catalog_conf s3.endpoint=https://****.com \
153+
--catalog_conf s3.access-key=***** \
154+
--catalog_conf s3.secret-key=*****
155+
```
156+
* `--compact_strategy` Determines how to pick files to be cluster, the default is `minor`.
157+
* `full` : All files will be selected for clustered.
158+
* `minor` : Pick the set of files that need to be clustered based on specified conditions.
159+
160+
Note: write parallelism is set by `sink.parallelism`, if too big, may generate a large number of small files.
161+
162+
You can use `-D execution.runtime-mode=batch` or `-yD execution.runtime-mode=batch` (for the ON-YARN scenario) to use batch mode.
122163
{{< /tab >}}
123164
124165
{{< /tabs >}}

paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,8 @@ public List<DataSplit> toSplits(BinaryRow partition, List<DataFileMeta> files) {
226226
return splits;
227227
}
228228

229-
public List<DataFileMeta> upgrade(List<DataFileMeta> filesAfterCluster, int outputLevel) {
229+
public static List<DataFileMeta> upgrade(
230+
List<DataFileMeta> filesAfterCluster, int outputLevel) {
230231
return filesAfterCluster.stream()
231232
.map(file -> file.upgrade(outputLevel))
232233
.collect(Collectors.toList());

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java

Lines changed: 147 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919
package org.apache.paimon.flink.action;
2020

2121
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.append.cluster.IncrementalClusterManager;
23+
import org.apache.paimon.compact.CompactUnit;
2224
import org.apache.paimon.data.BinaryRow;
2325
import org.apache.paimon.data.InternalRow;
2426
import org.apache.paimon.flink.FlinkConnectorOptions;
27+
import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource;
28+
import org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator;
2529
import org.apache.paimon.flink.compact.AppendTableCompactBuilder;
2630
import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource;
2731
import org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator;
@@ -32,7 +36,10 @@
3236
import org.apache.paimon.flink.sink.FixedBucketSink;
3337
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
3438
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
39+
import org.apache.paimon.flink.sink.RowAppendTableSink;
3540
import org.apache.paimon.flink.sink.RowDataChannelComputer;
41+
import org.apache.paimon.flink.sorter.TableSortInfo;
42+
import org.apache.paimon.flink.sorter.TableSorter;
3643
import org.apache.paimon.flink.source.CompactorSourceBuilder;
3744
import org.apache.paimon.manifest.ManifestEntry;
3845
import org.apache.paimon.options.Options;
@@ -43,6 +50,7 @@
4350
import org.apache.paimon.predicate.PredicateProjectionConverter;
4451
import org.apache.paimon.table.BucketMode;
4552
import org.apache.paimon.table.FileStoreTable;
53+
import org.apache.paimon.table.source.DataSplit;
4654
import org.apache.paimon.types.RowType;
4755
import org.apache.paimon.utils.InternalRowPartitionComputer;
4856
import org.apache.paimon.utils.Pair;
@@ -67,6 +75,7 @@
6775
import java.util.LinkedHashMap;
6876
import java.util.List;
6977
import java.util.Map;
78+
import java.util.stream.Collectors;
7079

7180
import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
7281
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
@@ -100,10 +109,6 @@ public CompactAction(
100109
checkArgument(
101110
!((FileStoreTable) table).coreOptions().dataEvolutionEnabled(),
102111
"Compact action does not support data evolution table yet. ");
103-
checkArgument(
104-
!(((FileStoreTable) table).bucketMode() == BucketMode.BUCKET_UNAWARE
105-
&& ((FileStoreTable) table).coreOptions().clusteringIncrementalEnabled()),
106-
"The table has enabled incremental clustering, and do not support compact in flink yet.");
107112
HashMap<String, String> dynamicOptions = new HashMap<>(tableConf);
108113
dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
109114
table = table.copy(dynamicOptions);
@@ -147,8 +152,12 @@ private boolean buildImpl() throws Exception {
147152
if (fileStoreTable.coreOptions().bucket() == BucketMode.POSTPONE_BUCKET) {
148153
return buildForPostponeBucketCompaction(env, fileStoreTable, isStreaming);
149154
} else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
150-
buildForAppendTableCompact(env, fileStoreTable, isStreaming);
151-
return true;
155+
if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {
156+
return buildForIncrementalClustering(env, fileStoreTable, isStreaming);
157+
} else {
158+
buildForAppendTableCompact(env, fileStoreTable, isStreaming);
159+
return true;
160+
}
152161
} else {
153162
buildForBucketedTableCompact(env, fileStoreTable, isStreaming);
154163
return true;
@@ -202,6 +211,138 @@ private void buildForAppendTableCompact(
202211
builder.build();
203212
}
204213

214+
private boolean buildForIncrementalClustering(
215+
StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) {
216+
checkArgument(!isStreaming, "Incremental clustering currently only supports batch mode");
217+
checkArgument(
218+
partitions == null,
219+
"Incremental clustering currently does not support specifying partitions");
220+
checkArgument(
221+
whereSql == null, "Incremental clustering currently does not support predicates");
222+
223+
IncrementalClusterManager incrementalClusterManager = new IncrementalClusterManager(table);
224+
225+
// non-full strategy as default for incremental clustering
226+
if (fullCompaction == null) {
227+
fullCompaction = false;
228+
}
229+
Options options = new Options(table.options());
230+
int localSampleMagnification = table.coreOptions().getLocalSampleMagnification();
231+
if (localSampleMagnification < 20) {
232+
throw new IllegalArgumentException(
233+
String.format(
234+
"the config '%s=%d' should not be set too small, greater than or equal to 20 is needed.",
235+
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
236+
localSampleMagnification));
237+
}
238+
String commitUser = CoreOptions.createCommitUser(options);
239+
InternalRowPartitionComputer partitionComputer =
240+
new InternalRowPartitionComputer(
241+
table.coreOptions().partitionDefaultName(),
242+
table.store().partitionType(),
243+
table.partitionKeys().toArray(new String[0]),
244+
table.coreOptions().legacyPartitionName());
245+
246+
// 1. pick cluster files for each partition
247+
Map<BinaryRow, CompactUnit> compactUnits =
248+
incrementalClusterManager.prepareForCluster(fullCompaction);
249+
if (compactUnits.isEmpty()) {
250+
LOGGER.info(
251+
"No partition needs to be incrementally clustered. "
252+
+ "Please set '--compact_strategy full' if you need to forcibly trigger the cluster.");
253+
if (this.forceStartFlinkJob) {
254+
env.fromSequence(0, 0)
255+
.name("Nothing to Cluster Source")
256+
.sinkTo(new DiscardingSink<>());
257+
return true;
258+
} else {
259+
return false;
260+
}
261+
}
262+
Map<BinaryRow, DataSplit[]> partitionSplits =
263+
compactUnits.entrySet().stream()
264+
.collect(
265+
Collectors.toMap(
266+
Map.Entry::getKey,
267+
entry ->
268+
incrementalClusterManager
269+
.toSplits(
270+
entry.getKey(),
271+
entry.getValue().files())
272+
.toArray(new DataSplit[0])));
273+
274+
// 2. read,sort and write in partition
275+
List<DataStream<Committable>> dataStreams = new ArrayList<>();
276+
277+
for (Map.Entry<BinaryRow, DataSplit[]> entry : partitionSplits.entrySet()) {
278+
DataSplit[] splits = entry.getValue();
279+
LinkedHashMap<String, String> partitionSpec =
280+
partitionComputer.generatePartValues(entry.getKey());
281+
// 2.1 generate source for current partition
282+
Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
283+
IncrementalClusterSplitSource.buildSource(
284+
env,
285+
table,
286+
partitionSpec,
287+
splits,
288+
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
289+
290+
// 2.2 cluster in partition
291+
Integer sinkParallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM);
292+
if (sinkParallelism == null) {
293+
sinkParallelism = sourcePair.getLeft().getParallelism();
294+
}
295+
TableSortInfo sortInfo =
296+
new TableSortInfo.Builder()
297+
.setSortColumns(incrementalClusterManager.clusterKeys())
298+
.setSortStrategy(incrementalClusterManager.clusterCurve())
299+
.setSinkParallelism(sinkParallelism)
300+
.setLocalSampleSize(sinkParallelism * localSampleMagnification)
301+
.setGlobalSampleSize(sinkParallelism * 1000)
302+
.setRangeNumber(sinkParallelism * 10)
303+
.build();
304+
DataStream<RowData> sorted =
305+
TableSorter.getSorter(env, sourcePair.getLeft(), table, sortInfo).sort();
306+
307+
// 2.3 write and then reorganize the committable
308+
// set parallelism to null, and it'll forward parallelism when doWrite()
309+
RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null);
310+
boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
311+
DataStream<Committable> clusterCommittable =
312+
sink.doWrite(
313+
FlinkSinkBuilder.mapToInternalRow(
314+
sorted,
315+
table.rowType(),
316+
blobAsDescriptor,
317+
table.catalogEnvironment().catalogContext()),
318+
commitUser,
319+
null)
320+
.transform(
321+
"Rewrite cluster committable",
322+
new CommittableTypeInfo(),
323+
new RewriteIncrementalClusterCommittableOperator(
324+
table,
325+
compactUnits.entrySet().stream()
326+
.collect(
327+
Collectors.toMap(
328+
Map.Entry::getKey,
329+
unit ->
330+
unit.getValue()
331+
.outputLevel()))));
332+
dataStreams.add(clusterCommittable);
333+
dataStreams.add(sourcePair.getRight());
334+
}
335+
336+
// 3. commit
337+
RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null);
338+
DataStream<Committable> dataStream = dataStreams.get(0);
339+
for (int i = 1; i < dataStreams.size(); i++) {
340+
dataStream = dataStream.union(dataStreams.get(i));
341+
}
342+
sink.doCommit(dataStream, commitUser);
343+
return true;
344+
}
345+
205346
protected PartitionPredicate getPartitionPredicate() throws Exception {
206347
checkArgument(
207348
partitions == null || whereSql == null,

0 commit comments

Comments
 (0)