Skip to content

Commit 9b6d43d

Browse files
authored
[flink] support performing incremental clustering by flink (#6395)
1 parent 70382b1 commit 9b6d43d

8 files changed

Lines changed: 896 additions & 12 deletions

File tree

docs/content/append-table/incremental-clustering.md

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,14 @@ clustering and small-file merging must be performed exclusively via Incremental
9595
## Run Incremental Clustering
9696
{{< hint info >}}
9797

98-
Currently, only support running Incremental Clustering in spark, support for flink will be added in the near future.
98+
only support running Incremental Clustering in batch mode.
9999

100100
{{< /hint >}}
101101

102-
To run a Incremental Clustering job, follow these instructions.
102+
To run a Incremental Clustering job, follow these instructions.
103+
104+
You don’t need to specify any clustering-related parameters when running Incremental Clustering,
105+
these options are already defined as table options. If you need to change clustering settings, please update the corresponding table options.
103106

104107
{{< tabs "incremental-clustering" >}}
105108

@@ -117,8 +120,46 @@ CALL sys.compact(table => 'T')
117120
-- run incremental clustering with full mode, this will recluster all data
118121
CALL sys.compact(table => 'T', compact_strategy => 'full')
119122
```
120-
You don’t need to specify any clustering-related parameters when running Incremental Clustering,
121-
these are already defined as table options. If you need to change clustering settings, please update the corresponding table options.
123+
{{< /tab >}}
124+
125+
{{< tab "Flink Action" >}}
126+
127+
Run the following command to submit a incremental clustering job for the table.
128+
129+
```bash
130+
<FLINK_HOME>/bin/flink run \
131+
/path/to/paimon-flink-action-{{< version >}}.jar \
132+
compact \
133+
--warehouse <warehouse-path> \
134+
--database <database-name> \
135+
--table <table-name> \
136+
[--compact_strategy <minor / full>] \
137+
[--table_conf <table_conf>] \
138+
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
139+
```
140+
141+
Example: run incremental clustering
142+
143+
```bash
144+
<FLINK_HOME>/bin/flink run \
145+
/path/to/paimon-flink-action-{{< version >}}.jar \
146+
compact \
147+
--warehouse s3:///path/to/warehouse \
148+
--database test_db \
149+
--table test_table \
150+
--table_conf sink.parallelism=2 \
151+
--compact_strategy minor \
152+
--catalog_conf s3.endpoint=https://****.com \
153+
--catalog_conf s3.access-key=***** \
154+
--catalog_conf s3.secret-key=*****
155+
```
156+
* `--compact_strategy` Determines how to pick files to be cluster, the default is `minor`.
157+
* `full` : All files will be selected for clustered.
158+
* `minor` : Pick the set of files that need to be clustered based on specified conditions.
159+
160+
Note: write parallelism is set by `sink.parallelism`, if too big, may generate a large number of small files.
161+
162+
You can use `-D execution.runtime-mode=batch` or `-yD execution.runtime-mode=batch` (for the ON-YARN scenario) to use batch mode.
122163
{{< /tab >}}
123164
124165
{{< /tabs >}}

paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,8 @@ public List<DataSplit> toSplits(BinaryRow partition, List<DataFileMeta> files) {
226226
return splits;
227227
}
228228

229-
public List<DataFileMeta> upgrade(List<DataFileMeta> filesAfterCluster, int outputLevel) {
229+
public static List<DataFileMeta> upgrade(
230+
List<DataFileMeta> filesAfterCluster, int outputLevel) {
230231
return filesAfterCluster.stream()
231232
.map(file -> file.upgrade(outputLevel))
232233
.collect(Collectors.toList());

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java

Lines changed: 147 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919
package org.apache.paimon.flink.action;
2020

2121
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.append.cluster.IncrementalClusterManager;
23+
import org.apache.paimon.compact.CompactUnit;
2224
import org.apache.paimon.data.BinaryRow;
2325
import org.apache.paimon.data.InternalRow;
2426
import org.apache.paimon.flink.FlinkConnectorOptions;
27+
import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource;
28+
import org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator;
2529
import org.apache.paimon.flink.compact.AppendTableCompactBuilder;
2630
import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource;
2731
import org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator;
@@ -32,7 +36,10 @@
3236
import org.apache.paimon.flink.sink.FixedBucketSink;
3337
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
3438
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
39+
import org.apache.paimon.flink.sink.RowAppendTableSink;
3540
import org.apache.paimon.flink.sink.RowDataChannelComputer;
41+
import org.apache.paimon.flink.sorter.TableSortInfo;
42+
import org.apache.paimon.flink.sorter.TableSorter;
3643
import org.apache.paimon.flink.source.CompactorSourceBuilder;
3744
import org.apache.paimon.manifest.ManifestEntry;
3845
import org.apache.paimon.options.Options;
@@ -43,6 +50,7 @@
4350
import org.apache.paimon.predicate.PredicateProjectionConverter;
4451
import org.apache.paimon.table.BucketMode;
4552
import org.apache.paimon.table.FileStoreTable;
53+
import org.apache.paimon.table.source.DataSplit;
4654
import org.apache.paimon.types.RowType;
4755
import org.apache.paimon.utils.InternalRowPartitionComputer;
4856
import org.apache.paimon.utils.Pair;
@@ -68,6 +76,7 @@
6876
import java.util.LinkedHashMap;
6977
import java.util.List;
7078
import java.util.Map;
79+
import java.util.stream.Collectors;
7180

7281
import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
7382
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
@@ -101,10 +110,6 @@ public CompactAction(
101110
checkArgument(
102111
!((FileStoreTable) table).coreOptions().dataEvolutionEnabled(),
103112
"Compact action does not support data evolution table yet. ");
104-
checkArgument(
105-
!(((FileStoreTable) table).bucketMode() == BucketMode.BUCKET_UNAWARE
106-
&& ((FileStoreTable) table).coreOptions().clusteringIncrementalEnabled()),
107-
"The table has enabled incremental clustering, and do not support compact in flink yet.");
108113
HashMap<String, String> dynamicOptions = new HashMap<>(tableConf);
109114
dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
110115
table = table.copy(dynamicOptions);
@@ -148,8 +153,12 @@ private boolean buildImpl() throws Exception {
148153
if (fileStoreTable.coreOptions().bucket() == BucketMode.POSTPONE_BUCKET) {
149154
return buildForPostponeBucketCompaction(env, fileStoreTable, isStreaming);
150155
} else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
151-
buildForAppendTableCompact(env, fileStoreTable, isStreaming);
152-
return true;
156+
if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {
157+
return buildForIncrementalClustering(env, fileStoreTable, isStreaming);
158+
} else {
159+
buildForAppendTableCompact(env, fileStoreTable, isStreaming);
160+
return true;
161+
}
153162
} else {
154163
buildForBucketedTableCompact(env, fileStoreTable, isStreaming);
155164
return true;
@@ -203,6 +212,138 @@ private void buildForAppendTableCompact(
203212
builder.build();
204213
}
205214

215+
private boolean buildForIncrementalClustering(
216+
StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) {
217+
checkArgument(!isStreaming, "Incremental clustering currently only supports batch mode");
218+
checkArgument(
219+
partitions == null,
220+
"Incremental clustering currently does not support specifying partitions");
221+
checkArgument(
222+
whereSql == null, "Incremental clustering currently does not support predicates");
223+
224+
IncrementalClusterManager incrementalClusterManager = new IncrementalClusterManager(table);
225+
226+
// non-full strategy as default for incremental clustering
227+
if (fullCompaction == null) {
228+
fullCompaction = false;
229+
}
230+
Options options = new Options(table.options());
231+
int localSampleMagnification = table.coreOptions().getLocalSampleMagnification();
232+
if (localSampleMagnification < 20) {
233+
throw new IllegalArgumentException(
234+
String.format(
235+
"the config '%s=%d' should not be set too small, greater than or equal to 20 is needed.",
236+
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
237+
localSampleMagnification));
238+
}
239+
String commitUser = CoreOptions.createCommitUser(options);
240+
InternalRowPartitionComputer partitionComputer =
241+
new InternalRowPartitionComputer(
242+
table.coreOptions().partitionDefaultName(),
243+
table.store().partitionType(),
244+
table.partitionKeys().toArray(new String[0]),
245+
table.coreOptions().legacyPartitionName());
246+
247+
// 1. pick cluster files for each partition
248+
Map<BinaryRow, CompactUnit> compactUnits =
249+
incrementalClusterManager.prepareForCluster(fullCompaction);
250+
if (compactUnits.isEmpty()) {
251+
LOGGER.info(
252+
"No partition needs to be incrementally clustered. "
253+
+ "Please set '--compact_strategy full' if you need to forcibly trigger the cluster.");
254+
if (this.forceStartFlinkJob) {
255+
env.fromSequence(0, 0)
256+
.name("Nothing to Cluster Source")
257+
.sinkTo(new DiscardingSink<>());
258+
return true;
259+
} else {
260+
return false;
261+
}
262+
}
263+
Map<BinaryRow, DataSplit[]> partitionSplits =
264+
compactUnits.entrySet().stream()
265+
.collect(
266+
Collectors.toMap(
267+
Map.Entry::getKey,
268+
entry ->
269+
incrementalClusterManager
270+
.toSplits(
271+
entry.getKey(),
272+
entry.getValue().files())
273+
.toArray(new DataSplit[0])));
274+
275+
// 2. read,sort and write in partition
276+
List<DataStream<Committable>> dataStreams = new ArrayList<>();
277+
278+
for (Map.Entry<BinaryRow, DataSplit[]> entry : partitionSplits.entrySet()) {
279+
DataSplit[] splits = entry.getValue();
280+
LinkedHashMap<String, String> partitionSpec =
281+
partitionComputer.generatePartValues(entry.getKey());
282+
// 2.1 generate source for current partition
283+
Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
284+
IncrementalClusterSplitSource.buildSource(
285+
env,
286+
table,
287+
partitionSpec,
288+
splits,
289+
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
290+
291+
// 2.2 cluster in partition
292+
Integer sinkParallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM);
293+
if (sinkParallelism == null) {
294+
sinkParallelism = sourcePair.getLeft().getParallelism();
295+
}
296+
TableSortInfo sortInfo =
297+
new TableSortInfo.Builder()
298+
.setSortColumns(incrementalClusterManager.clusterKeys())
299+
.setSortStrategy(incrementalClusterManager.clusterCurve())
300+
.setSinkParallelism(sinkParallelism)
301+
.setLocalSampleSize(sinkParallelism * localSampleMagnification)
302+
.setGlobalSampleSize(sinkParallelism * 1000)
303+
.setRangeNumber(sinkParallelism * 10)
304+
.build();
305+
DataStream<RowData> sorted =
306+
TableSorter.getSorter(env, sourcePair.getLeft(), table, sortInfo).sort();
307+
308+
// 2.3 write and then reorganize the committable
309+
// set parallelism to null, and it'll forward parallelism when doWrite()
310+
RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null);
311+
boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
312+
DataStream<Committable> clusterCommittable =
313+
sink.doWrite(
314+
FlinkSinkBuilder.mapToInternalRow(
315+
sorted,
316+
table.rowType(),
317+
blobAsDescriptor,
318+
table.catalogEnvironment().catalogContext()),
319+
commitUser,
320+
null)
321+
.transform(
322+
"Rewrite cluster committable",
323+
new CommittableTypeInfo(),
324+
new RewriteIncrementalClusterCommittableOperator(
325+
table,
326+
compactUnits.entrySet().stream()
327+
.collect(
328+
Collectors.toMap(
329+
Map.Entry::getKey,
330+
unit ->
331+
unit.getValue()
332+
.outputLevel()))));
333+
dataStreams.add(clusterCommittable);
334+
dataStreams.add(sourcePair.getRight());
335+
}
336+
337+
// 3. commit
338+
RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null);
339+
DataStream<Committable> dataStream = dataStreams.get(0);
340+
for (int i = 1; i < dataStreams.size(); i++) {
341+
dataStream = dataStream.union(dataStreams.get(i));
342+
}
343+
sink.doCommit(dataStream, commitUser);
344+
return true;
345+
}
346+
206347
protected PartitionPredicate getPartitionPredicate() throws Exception {
207348
checkArgument(
208349
partitions == null || whereSql == null,

0 commit comments

Comments
 (0)