Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 45 additions & 4 deletions docs/content/append-table/incremental-clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,14 @@ clustering and small-file merging must be performed exclusively via Incremental
## Run Incremental Clustering
{{< hint info >}}

Currently, only support running Incremental Clustering in spark, support for flink will be added in the near future.
only support running Incremental Clustering in batch mode.

{{< /hint >}}

To run a Incremental Clustering job, follow these instructions.
To run a Incremental Clustering job, follow these instructions.

You don’t need to specify any clustering-related parameters when running Incremental Clustering,
these options are already defined as table options. If you need to change clustering settings, please update the corresponding table options.

{{< tabs "incremental-clustering" >}}

Expand All @@ -117,8 +120,46 @@ CALL sys.compact(table => 'T')
-- run incremental clustering with full mode, this will recluster all data
CALL sys.compact(table => 'T', compact_strategy => 'full')
```
You don’t need to specify any clustering-related parameters when running Incremental Clustering,
these are already defined as table options. If you need to change clustering settings, please update the corresponding table options.
{{< /tab >}}

{{< tab "Flink Action" >}}

Run the following command to submit a incremental clustering job for the table.

```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
compact \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
[--compact_strategy <minor / full>] \
[--table_conf <table_conf>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```

Example: run incremental clustering

```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
compact \
--warehouse s3:///path/to/warehouse \
--database test_db \
--table test_table \
--table_conf sink.parallelism=2 \
--compact_strategy minor \
--catalog_conf s3.endpoint=https://****.com \
--catalog_conf s3.access-key=***** \
--catalog_conf s3.secret-key=*****
```
* `--compact_strategy` Determines how to pick files to be cluster, the default is `minor`.
* `full` : All files will be selected for clustered.
* `minor` : Pick the set of files that need to be clustered based on specified conditions.

Note: write parallelism is set by `sink.parallelism`, if too big, may generate a large number of small files.

You can use `-D execution.runtime-mode=batch` or `-yD execution.runtime-mode=batch` (for the ON-YARN scenario) to use batch mode.
{{< /tab >}}

{{< /tabs >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ public List<DataSplit> toSplits(BinaryRow partition, List<DataFileMeta> files) {
return splits;
}

public List<DataFileMeta> upgrade(List<DataFileMeta> filesAfterCluster, int outputLevel) {
public static List<DataFileMeta> upgrade(
List<DataFileMeta> filesAfterCluster, int outputLevel) {
return filesAfterCluster.stream()
.map(file -> file.upgrade(outputLevel))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
package org.apache.paimon.flink.action;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.cluster.IncrementalClusterManager;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource;
import org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator;
import org.apache.paimon.flink.compact.AppendTableCompactBuilder;
import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource;
import org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator;
Expand All @@ -32,7 +36,10 @@
import org.apache.paimon.flink.sink.FixedBucketSink;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.RowAppendTableSink;
import org.apache.paimon.flink.sink.RowDataChannelComputer;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
Expand All @@ -43,6 +50,7 @@
import org.apache.paimon.predicate.PredicateProjectionConverter;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
Expand All @@ -68,6 +76,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
Expand Down Expand Up @@ -101,10 +110,6 @@ public CompactAction(
checkArgument(
!((FileStoreTable) table).coreOptions().dataEvolutionEnabled(),
"Compact action does not support data evolution table yet. ");
checkArgument(
!(((FileStoreTable) table).bucketMode() == BucketMode.BUCKET_UNAWARE
&& ((FileStoreTable) table).coreOptions().clusteringIncrementalEnabled()),
"The table has enabled incremental clustering, and do not support compact in flink yet.");
HashMap<String, String> dynamicOptions = new HashMap<>(tableConf);
dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
table = table.copy(dynamicOptions);
Expand Down Expand Up @@ -148,8 +153,12 @@ private boolean buildImpl() throws Exception {
if (fileStoreTable.coreOptions().bucket() == BucketMode.POSTPONE_BUCKET) {
return buildForPostponeBucketCompaction(env, fileStoreTable, isStreaming);
} else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
buildForAppendTableCompact(env, fileStoreTable, isStreaming);
return true;
if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {
return buildForIncrementalClustering(env, fileStoreTable, isStreaming);
} else {
buildForAppendTableCompact(env, fileStoreTable, isStreaming);
return true;
}
} else {
buildForBucketedTableCompact(env, fileStoreTable, isStreaming);
return true;
Expand Down Expand Up @@ -203,6 +212,138 @@ private void buildForAppendTableCompact(
builder.build();
}

private boolean buildForIncrementalClustering(
StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) {
checkArgument(!isStreaming, "Incremental clustering currently only supports batch mode");
checkArgument(
partitions == null,
"Incremental clustering currently does not support specifying partitions");
checkArgument(
whereSql == null, "Incremental clustering currently does not support predicates");

IncrementalClusterManager incrementalClusterManager = new IncrementalClusterManager(table);

// non-full strategy as default for incremental clustering
if (fullCompaction == null) {
fullCompaction = false;
}
Options options = new Options(table.options());
int localSampleMagnification = table.coreOptions().getLocalSampleMagnification();
if (localSampleMagnification < 20) {
throw new IllegalArgumentException(
String.format(
"the config '%s=%d' should not be set too small, greater than or equal to 20 is needed.",
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
localSampleMagnification));
}
String commitUser = CoreOptions.createCommitUser(options);
InternalRowPartitionComputer partitionComputer =
new InternalRowPartitionComputer(
table.coreOptions().partitionDefaultName(),
table.store().partitionType(),
table.partitionKeys().toArray(new String[0]),
table.coreOptions().legacyPartitionName());

// 1. pick cluster files for each partition
Map<BinaryRow, CompactUnit> compactUnits =
incrementalClusterManager.prepareForCluster(fullCompaction);
if (compactUnits.isEmpty()) {
LOGGER.info(
"No partition needs to be incrementally clustered. "
+ "Please set '--compact_strategy full' if you need to forcibly trigger the cluster.");
if (this.forceStartFlinkJob) {
env.fromSequence(0, 0)
.name("Nothing to Cluster Source")
.sinkTo(new DiscardingSink<>());
return true;
} else {
return false;
}
}
Map<BinaryRow, DataSplit[]> partitionSplits =
compactUnits.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry ->
incrementalClusterManager
.toSplits(
entry.getKey(),
entry.getValue().files())
.toArray(new DataSplit[0])));

// 2. read,sort and write in partition
List<DataStream<Committable>> dataStreams = new ArrayList<>();

for (Map.Entry<BinaryRow, DataSplit[]> entry : partitionSplits.entrySet()) {
DataSplit[] splits = entry.getValue();
LinkedHashMap<String, String> partitionSpec =
partitionComputer.generatePartValues(entry.getKey());
// 2.1 generate source for current partition
Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
IncrementalClusterSplitSource.buildSource(
env,
table,
partitionSpec,
splits,
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));

// 2.2 cluster in partition
Integer sinkParallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM);
if (sinkParallelism == null) {
sinkParallelism = sourcePair.getLeft().getParallelism();
}
TableSortInfo sortInfo =
new TableSortInfo.Builder()
.setSortColumns(incrementalClusterManager.clusterKeys())
.setSortStrategy(incrementalClusterManager.clusterCurve())
.setSinkParallelism(sinkParallelism)
.setLocalSampleSize(sinkParallelism * localSampleMagnification)
.setGlobalSampleSize(sinkParallelism * 1000)
.setRangeNumber(sinkParallelism * 10)
.build();
DataStream<RowData> sorted =
TableSorter.getSorter(env, sourcePair.getLeft(), table, sortInfo).sort();

// 2.3 write and then reorganize the committable
// set parallelism to null, and it'll forward parallelism when doWrite()
RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null);
boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
DataStream<Committable> clusterCommittable =
sink.doWrite(
FlinkSinkBuilder.mapToInternalRow(
sorted,
table.rowType(),
blobAsDescriptor,
table.catalogEnvironment().catalogContext()),
commitUser,
null)
.transform(
"Rewrite cluster committable",
new CommittableTypeInfo(),
new RewriteIncrementalClusterCommittableOperator(
table,
compactUnits.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
unit ->
unit.getValue()
.outputLevel()))));
dataStreams.add(clusterCommittable);
dataStreams.add(sourcePair.getRight());
}

// 3. commit
RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null);
DataStream<Committable> dataStream = dataStreams.get(0);
for (int i = 1; i < dataStreams.size(); i++) {
dataStream = dataStream.union(dataStreams.get(i));
}
sink.doCommit(dataStream, commitUser);
return true;
}

protected PartitionPredicate getPartitionPredicate() throws Exception {
checkArgument(
partitions == null || whereSql == null,
Expand Down
Loading
Loading