From 3e619a3efef720dc7bc5bf438f76c9126d98bc38 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Fri, 10 Oct 2025 17:23:15 +0800 Subject: [PATCH 1/6] flink 1 --- .../cluster/IncrementalClusterManager.java | 3 +- .../paimon/flink/action/CompactAction.java | 118 +++++++++++++++++- .../IncrementalClusterSplitSource.java | 118 ++++++++++++++++++ .../RemoveClusterBeforeFilesOperator.java | 58 +++++++++ ...IncrementalClusterCommittableOperator.java | 116 +++++++++++++++++ .../spark/procedure/CompactProcedure.java | 2 +- 6 files changed, 412 insertions(+), 3 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java index 887150577c23..407f6b17ef43 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java @@ -226,7 +226,8 @@ public List toSplits(BinaryRow partition, List files) { return splits; } - public List upgrade(List filesAfterCluster, int outputLevel) { + public static List upgrade( + List filesAfterCluster, int outputLevel) { return filesAfterCluster.stream() .map(file -> file.upgrade(outputLevel)) .collect(Collectors.toList()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 602d3a59a5b7..062a0345940e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -19,9 +19,13 @@ package org.apache.paimon.flink.action; import org.apache.paimon.CoreOptions; +import org.apache.paimon.append.cluster.IncrementalClusterManager; +import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource; +import org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator; import org.apache.paimon.flink.compact.AppendTableCompactBuilder; import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource; import org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator; @@ -32,7 +36,10 @@ import org.apache.paimon.flink.sink.FixedBucketSink; import org.apache.paimon.flink.sink.FlinkSinkBuilder; import org.apache.paimon.flink.sink.FlinkStreamPartitioner; +import org.apache.paimon.flink.sink.RowAppendTableSink; import org.apache.paimon.flink.sink.RowDataChannelComputer; +import org.apache.paimon.flink.sorter.TableSortInfo; +import org.apache.paimon.flink.sorter.TableSorter; import org.apache.paimon.flink.source.CompactorSourceBuilder; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.options.Options; @@ -43,6 +50,7 @@ import org.apache.paimon.predicate.PredicateProjectionConverter; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.Pair; @@ -68,6 +76,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions; import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; @@ -148,7 +157,11 @@ private boolean buildImpl() throws Exception { if (fileStoreTable.coreOptions().bucket() == BucketMode.POSTPONE_BUCKET) { return buildForPostponeBucketCompaction(env, fileStoreTable, isStreaming); } else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) { - buildForAppendTableCompact(env, fileStoreTable, isStreaming); + if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) { + buildForIncrementalClustering(env, fileStoreTable, isStreaming); + } else { + buildForAppendTableCompact(env, fileStoreTable, isStreaming); + } return true; } else { buildForBucketedTableCompact(env, fileStoreTable, isStreaming); @@ -203,6 +216,109 @@ private void buildForAppendTableCompact( builder.build(); } + private void buildForIncrementalClustering( + StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) { + checkArgument( + !isStreaming, "Postpone bucket compaction currently only supports batch mode"); + checkArgument( + partitions == null, + "Postpone bucket compaction currently does not support specifying partitions"); + checkArgument( + whereSql == null, + "Postpone bucket compaction currently does not support predicates"); + + IncrementalClusterManager incrementalClusterManager = new IncrementalClusterManager(table); + + // incremental cluster config + if (fullCompaction == null) { + fullCompaction = false; + } + Options options = new Options(table.options()); + Integer sinkParallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM); + int localSampleMagnification = table.coreOptions().getLocalSampleMagnification(); + if (localSampleMagnification < 20) { + throw new IllegalArgumentException( + String.format( + "the config '%s=%d' should not be set too small, greater than or equal to 20 is needed.", + CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(), + localSampleMagnification)); + } + TableSortInfo sortInfo = + new TableSortInfo.Builder() + .setSortColumns(incrementalClusterManager.clusterKeys()) + .setSortStrategy(incrementalClusterManager.clusterCurve()) + .setSinkParallelism(sinkParallelism) + .setLocalSampleSize(sinkParallelism * localSampleMagnification) + .setGlobalSampleSize(sinkParallelism * 1000) + .setRangeNumber(sinkParallelism * 10) + .build(); + String commitUser = CoreOptions.createCommitUser(options); + InternalRowPartitionComputer partitionComputer = + new InternalRowPartitionComputer( + table.coreOptions().partitionDefaultName(), + table.store().partitionType(), + table.partitionKeys().toArray(new String[0]), + table.coreOptions().legacyPartitionName()); + + // 1. pick cluster files for each partition + Map compactUnits = + incrementalClusterManager.prepareForCluster(fullCompaction); + Map partitionSplits = + compactUnits.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + incrementalClusterManager + .toSplits( + entry.getKey(), + entry.getValue().files()) + .toArray(new DataSplit[0]))); + + // 2. handle in partition + List> dataStreams = new ArrayList<>(); + + for (Map.Entry entry : partitionSplits.entrySet()) { + DataSplit[] splits = entry.getValue(); + LinkedHashMap partitionSpec = + partitionComputer.generatePartValues(entry.getKey()); + Pair, DataStream> sourcePair = + IncrementalClusterSplitSource.buildSource( + env, + table, + partitionSpec, + splits, + options.get(FlinkConnectorOptions.SCAN_PARALLELISM)); + + // cluster in partition + DataStream sorted = + TableSorter.getSorter(env, sourcePair.getLeft(), table, sortInfo).sort(); + + // rewrite + RowAppendTableSink sink = new RowAppendTableSink(table, null, null, sinkParallelism); + DataStream clusterCommittable = + sink.doWrite( + FlinkSinkBuilder.mapToInternalRow(sorted, table.rowType()), + commitUser, + sinkParallelism) + .forward() + .transform( + "Rewrite compact committable", + new CommittableTypeInfo(), + new RewriteIncrementalClusterCommittableOperator( + table, compactUnits)); + dataStreams.add(clusterCommittable); + } + + // 3. commit + RowAppendTableSink sink = new RowAppendTableSink(table, null, null, sinkParallelism); + DataStream dataStream = dataStreams.get(0); + for (int i = 1; i < dataStreams.size(); i++) { + dataStream = dataStream.union(dataStreams.get(i)); + } + sink.doCommit(dataStream, commitUser); + } + protected PartitionPredicate getPartitionPredicate() throws Exception { checkArgument( partitions == null || whereSql == null, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java new file mode 100644 index 000000000000..388d46951245 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.cluster; + +import org.apache.paimon.flink.LogicalTypeConversion; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.sink.CommittableTypeInfo; +import org.apache.paimon.flink.source.AbstractNonCoordinatedSource; +import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; +import org.apache.paimon.flink.source.SimpleSourceSplit; +import org.apache.paimon.flink.source.operator.ReadOperator; +import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.utils.Pair; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; + +import javax.annotation.Nullable; + +import java.util.Map; + +/** Source for incremental cluster. */ +public class IncrementalClusterSplitSource extends AbstractNonCoordinatedSource { + private static final long serialVersionUID = 1L; + + private final Split[] splits; + + public IncrementalClusterSplitSource(Split[] splits) { + this.splits = splits; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) + throws Exception { + return new IncrementalClusterSplitSource.Reader(); + } + + private class Reader extends AbstractNonCoordinatedSourceReader { + + @Override + public InputStatus pollNext(ReaderOutput output) throws Exception { + for (Split split : splits) { + DataSplit dataSplit = (DataSplit) split; + output.collect(dataSplit); + } + return InputStatus.END_OF_INPUT; + } + } + + public static Pair, DataStream> buildSource( + StreamExecutionEnvironment env, + FileStoreTable table, + Map partitionSpec, + DataSplit[] splits, + @Nullable Integer parallelism) { + DataStreamSource source = + env.fromSource( + new IncrementalClusterSplitSource(splits), + WatermarkStrategy.noWatermarks(), + String.format( + "Incremental-cluster split generator: %s - %s", + table.fullName(), partitionSpec), + new JavaTypeInfo<>(Split.class)); + + if (parallelism != null) { + source.setParallelism(parallelism); + } + + return Pair.of( + new DataStream<>(source.getExecutionEnvironment(), source.getTransformation()) + .transform( + String.format( + "Incremental-cluster reader: %s - %s", + table.fullName(), partitionSpec), + InternalTypeInfo.of( + LogicalTypeConversion.toLogicalType(table.rowType())), + new ReadOperator(table::newRead, null, null)), + source.forward() + .transform( + "Remove files to be clustered", + new CommittableTypeInfo(), + new RemoveClusterBeforeFilesOperator()) + .forceNonParallel()); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java new file mode 100644 index 000000000000..907a69ba120a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.cluster; + +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.utils.BoundedOneInputOperator; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.Split; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.Collections; + +/** Operator used with {@link IncrementalClusterSplitSource}, to remove files to be clustered. */ +public class RemoveClusterBeforeFilesOperator extends BoundedOneInputOperator { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement(StreamRecord element) throws Exception { + DataSplit dataSplit = (DataSplit) element.getValue(); + CommitMessageImpl message = + new CommitMessageImpl( + dataSplit.partition(), + dataSplit.bucket(), + dataSplit.totalBuckets(), + DataIncrement.emptyIncrement(), + new CompactIncrement( + dataSplit.dataFiles(), + Collections.emptyList(), + Collections.emptyList())); + output.collect( + new StreamRecord<>( + new Committable(Long.MAX_VALUE, Committable.Kind.FILE, message))); + } + + @Override + public void endInput() throws Exception {} +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java new file mode 100644 index 000000000000..d86d7228abd8 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.cluster; + +import org.apache.paimon.append.cluster.IncrementalClusterManager; +import org.apache.paimon.compact.CompactUnit; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.utils.BoundedOneInputOperator; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessageImpl; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Rewrite committable from postpone bucket table compactor. It moves all new files into compact + * results, and delete unused new files, because compactor only produce compact snapshots. + */ +public class RewriteIncrementalClusterCommittableOperator + extends BoundedOneInputOperator { + private static final long serialVersionUID = 1L; + + private final FileStoreTable table; + private final Map compactUnits; + + private transient Map> partitionFiles; + + public RewriteIncrementalClusterCommittableOperator( + FileStoreTable table, Map compactUnits) { + this.table = table; + this.compactUnits = compactUnits; + } + + @Override + public void open() throws Exception { + partitionFiles = new HashMap<>(); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + Committable committable = element.getValue(); + if (committable.kind() != Committable.Kind.FILE) { + output.collect(element); + } + + CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable(); + checkArgument(message.bucket() == 0); + BinaryRow partition = message.partition(); + partitionFiles + .computeIfAbsent(partition, file -> new ArrayList<>()) + .addAll(message.newFilesIncrement().newFiles()); + } + + @Override + public void endInput() throws Exception { + emitAll(Long.MAX_VALUE); + } + + protected void emitAll(long checkpointId) { + for (Map.Entry> partitionEntry : partitionFiles.entrySet()) { + BinaryRow partition = partitionEntry.getKey(); + List clusterBefore = compactUnits.get(partition).files(); + // upgrade the clustered file to outputLevel + List clusterAfter = + IncrementalClusterManager.upgrade( + partitionEntry.getValue(), compactUnits.get(partition).outputLevel()); + LOG.info( + "Partition {}: upgrade file level to {}", + partition, + compactUnits.get(partition).outputLevel()); + CompactIncrement compactIncrement = + new CompactIncrement(clusterBefore, clusterAfter, Collections.emptyList()); + CommitMessageImpl clusterMessage = + new CommitMessageImpl( + partition, + // bucket 0 is bucket for unaware-bucket table + // for compatibility with the old design + 0, + table.coreOptions().bucket(), + DataIncrement.emptyIncrement(), + compactIncrement); + output.collect( + new StreamRecord<>( + new Committable(checkpointId, Committable.Kind.FILE, clusterMessage))); + } + + partitionFiles.clear(); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 2796b6deb629..3e7608d1a9d7 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -620,7 +620,7 @@ private void clusterIncrementalUnAwareBucketTable( List clusterBefore = compactUnits.get(partition).files(); // upgrade the clustered file to outputLevel List clusterAfter = - incrementalClusterManager.upgrade( + IncrementalClusterManager.upgrade( entry.getValue(), compactUnits.get(partition).outputLevel()); LOG.info( "Partition {}: upgrade file level to {}", From 3e2fc651f3f27134e0b20682713c6455a99820ea Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Mon, 13 Oct 2025 13:51:41 +0800 Subject: [PATCH 2/6] flink 2 --- .../paimon/flink/action/CompactAction.java | 56 ++- ...IncrementalClusterCommittableOperator.java | 15 +- .../IncrementalClusterActionITCase.java | 403 ++++++++++++++++++ 3 files changed, 446 insertions(+), 28 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 062a0345940e..e728eaf66382 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -110,10 +110,6 @@ public CompactAction( checkArgument( !((FileStoreTable) table).coreOptions().dataEvolutionEnabled(), "Compact action does not support data evolution table yet. "); - checkArgument( - !(((FileStoreTable) table).bucketMode() == BucketMode.BUCKET_UNAWARE - && ((FileStoreTable) table).coreOptions().clusteringIncrementalEnabled()), - "The table has enabled incremental clustering, and do not support compact in flink yet."); HashMap dynamicOptions = new HashMap<>(tableConf); dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false"); table = table.copy(dynamicOptions); @@ -158,11 +154,11 @@ private boolean buildImpl() throws Exception { return buildForPostponeBucketCompaction(env, fileStoreTable, isStreaming); } else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) { if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) { - buildForIncrementalClustering(env, fileStoreTable, isStreaming); + return buildForIncrementalClustering(env, fileStoreTable, isStreaming); } else { buildForAppendTableCompact(env, fileStoreTable, isStreaming); + return true; } - return true; } else { buildForBucketedTableCompact(env, fileStoreTable, isStreaming); return true; @@ -216,20 +212,18 @@ private void buildForAppendTableCompact( builder.build(); } - private void buildForIncrementalClustering( + private boolean buildForIncrementalClustering( StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) { - checkArgument( - !isStreaming, "Postpone bucket compaction currently only supports batch mode"); + checkArgument(!isStreaming, "Incremental clustering currently only supports batch mode"); checkArgument( partitions == null, - "Postpone bucket compaction currently does not support specifying partitions"); + "Incremental clustering currently does not support specifying partitions"); checkArgument( - whereSql == null, - "Postpone bucket compaction currently does not support predicates"); + whereSql == null, "Incremental clustering currently does not support predicates"); IncrementalClusterManager incrementalClusterManager = new IncrementalClusterManager(table); - // incremental cluster config + // non-full strategy as default for incremental clustering if (fullCompaction == null) { fullCompaction = false; } @@ -263,6 +257,16 @@ private void buildForIncrementalClustering( // 1. pick cluster files for each partition Map compactUnits = incrementalClusterManager.prepareForCluster(fullCompaction); + if (compactUnits.isEmpty()) { + LOGGER.info( + "No partition needs to be incrementally clustered. " + + "Please set '--compact_strategy full' if you need to forcibly trigger the cluster."); + return false; + // throw new RuntimeException( + // "No partition needs to be incrementally clustered. " + // + "Please set '--compact_strategy full' if you need to + // forcibly trigger the cluster."); + } Map partitionSplits = compactUnits.entrySet().stream() .collect( @@ -275,13 +279,14 @@ private void buildForIncrementalClustering( entry.getValue().files()) .toArray(new DataSplit[0]))); - // 2. handle in partition + // 2. read,sort and write in partition List> dataStreams = new ArrayList<>(); for (Map.Entry entry : partitionSplits.entrySet()) { DataSplit[] splits = entry.getValue(); LinkedHashMap partitionSpec = partitionComputer.generatePartValues(entry.getKey()); + // 2.1 generate source for current partition Pair, DataStream> sourcePair = IncrementalClusterSplitSource.buildSource( env, @@ -290,24 +295,34 @@ private void buildForIncrementalClustering( splits, options.get(FlinkConnectorOptions.SCAN_PARALLELISM)); - // cluster in partition + // 2.2 cluster in partition DataStream sorted = TableSorter.getSorter(env, sourcePair.getLeft(), table, sortInfo).sort(); - // rewrite + // 2.3 write and then reorganize the committable RowAppendTableSink sink = new RowAppendTableSink(table, null, null, sinkParallelism); DataStream clusterCommittable = sink.doWrite( FlinkSinkBuilder.mapToInternalRow(sorted, table.rowType()), commitUser, - sinkParallelism) - .forward() + null) .transform( - "Rewrite compact committable", + "Rewrite cluster committable", new CommittableTypeInfo(), new RewriteIncrementalClusterCommittableOperator( - table, compactUnits)); + table, + compactUnits.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry + ::getKey, // 保持原有的BinaryRow键 + unit -> + unit.getValue() + .outputLevel() // 提取CompactUnit中的level值 + )))) + .forceNonParallel(); dataStreams.add(clusterCommittable); + dataStreams.add(sourcePair.getRight()); } // 3. commit @@ -317,6 +332,7 @@ private void buildForIncrementalClustering( dataStream = dataStream.union(dataStreams.get(i)); } sink.doCommit(dataStream, commitUser); + return true; } protected PartitionPredicate getPartitionPredicate() throws Exception { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java index d86d7228abd8..bd0a0cda9fa7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java @@ -19,7 +19,6 @@ package org.apache.paimon.flink.cluster; import org.apache.paimon.append.cluster.IncrementalClusterManager; -import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.utils.BoundedOneInputOperator; @@ -48,14 +47,14 @@ public class RewriteIncrementalClusterCommittableOperator private static final long serialVersionUID = 1L; private final FileStoreTable table; - private final Map compactUnits; + private final Map outputLevels; private transient Map> partitionFiles; public RewriteIncrementalClusterCommittableOperator( - FileStoreTable table, Map compactUnits) { + FileStoreTable table, Map outputLevels) { this.table = table; - this.compactUnits = compactUnits; + this.outputLevels = outputLevels; } @Override @@ -86,17 +85,17 @@ public void endInput() throws Exception { protected void emitAll(long checkpointId) { for (Map.Entry> partitionEntry : partitionFiles.entrySet()) { BinaryRow partition = partitionEntry.getKey(); - List clusterBefore = compactUnits.get(partition).files(); // upgrade the clustered file to outputLevel List clusterAfter = IncrementalClusterManager.upgrade( - partitionEntry.getValue(), compactUnits.get(partition).outputLevel()); + partitionEntry.getValue(), outputLevels.get(partition)); LOG.info( "Partition {}: upgrade file level to {}", partition, - compactUnits.get(partition).outputLevel()); + outputLevels.get(partition)); CompactIncrement compactIncrement = - new CompactIncrement(clusterBefore, clusterAfter, Collections.emptyList()); + new CompactIncrement( + Collections.emptyList(), clusterAfter, Collections.emptyList()); CommitMessageImpl clusterMessage = new CommitMessageImpl( partition, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java new file mode 100644 index 000000000000..ebf255bdf34b --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.StringUtils; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT cases for incremental clustering action. */ +public class IncrementalClusterActionITCase extends ActionITCaseBase { + + @Test + public void testClusterUnpartitionedTable() throws Exception { + FileStoreTable table = createTable(null); + + BinaryString randomStr = + BinaryString.fromString( + UUID.randomUUID().toString() + UUID.randomUUID() + UUID.randomUUID()); + List messages1 = new ArrayList<>(); + + // first write + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + messages1.addAll(write(GenericRow.of(i, j, randomStr))); + } + } + commit(messages1); + ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {0, 1}); + List result1 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + List expected1 = + Lists.newArrayList( + "+I[0, 0]", + "+I[0, 1]", + "+I[0, 2]", + "+I[1, 0]", + "+I[1, 1]", + "+I[1, 2]", + "+I[2, 0]", + "+I[2, 1]", + "+I[2, 2]"); + assertThat(result1).containsExactlyElementsOf(expected1); + + // first cluster + runAction(Collections.emptyList()); + checkSnapshot(table); + List splits = readBuilder.newScan().plan().splits(); + assertThat(splits.size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + List result2 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected2 = + Lists.newArrayList( + "+I[0, 0]", + "+I[0, 1]", + "+I[1, 0]", + "+I[1, 1]", + "+I[0, 2]", + "+I[1, 2]", + "+I[2, 0]", + "+I[2, 1]", + "+I[2, 2]"); + assertThat(result2).containsExactlyElementsOf(expected2); + + // second write + messages1.clear(); + messages1.addAll( + write( + GenericRow.of(0, 3, null), + GenericRow.of(1, 3, null), + GenericRow.of(2, 3, null))); + messages1.addAll( + write( + GenericRow.of(3, 0, null), + GenericRow.of(3, 1, null), + GenericRow.of(3, 2, null), + GenericRow.of(3, 3, null))); + commit(messages1); + + List result3 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + List expected3 = new ArrayList<>(expected2); + expected3.addAll( + Lists.newArrayList( + "+I[0, 3]", + "+I[1, 3]", + "+I[2, 3]", + "+I[3, 0]", + "+I[3, 1]", + "+I[3, 2]", + "+I[3, 3]")); + assertThat(result3).containsExactlyElementsOf(expected3); + + // second cluster + runAction(Collections.emptyList()); + checkSnapshot(table); + splits = readBuilder.newScan().plan().splits(); + List result4 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected4 = new ArrayList<>(expected2); + expected4.addAll( + Lists.newArrayList( + "+I[0, 3]", + "+I[1, 3]", + "+I[3, 0]", + "+I[3, 1]", + "+I[2, 3]", + "+I[3, 2]", + "+I[3, 3]")); + assertThat(splits.size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4); + assertThat(result4).containsExactlyElementsOf(expected4); + + // full cluster + runAction(Lists.newArrayList("--compact_strategy", "full")); + checkSnapshot(table); + splits = readBuilder.newScan().plan().splits(); + List result5 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected5 = new ArrayList<>(); + expected5.addAll( + Lists.newArrayList( + "+I[0, 0]", + "+I[0, 1]", + "+I[1, 0]", + "+I[1, 1]", + "+I[0, 2]", + "+I[0, 3]", + "+I[1, 2]", + "+I[1, 3]", + "+I[2, 0]", + "+I[2, 1]", + "+I[3, 0]", + "+I[3, 1]", + "+I[2, 2]", + "+I[2, 3]", + "+I[3, 2]", + "+I[3, 3]")); + assertThat(splits.size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(result5).containsExactlyElementsOf(expected5); + } + + @Test + public void testClusterPartitionedTable() throws Exception { + FileStoreTable table = createTable(null); + + BinaryString randomStr = + BinaryString.fromString( + UUID.randomUUID().toString() + UUID.randomUUID() + UUID.randomUUID()); + List messages1 = new ArrayList<>(); + + // first write + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + messages1.addAll(write(GenericRow.of(i, j, randomStr))); + } + } + commit(messages1); + ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {0, 1}); + List result1 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + List expected1 = + Lists.newArrayList( + "+I[0, 0]", + "+I[0, 1]", + "+I[0, 2]", + "+I[1, 0]", + "+I[1, 1]", + "+I[1, 2]", + "+I[2, 0]", + "+I[2, 1]", + "+I[2, 2]"); + assertThat(result1).containsExactlyElementsOf(expected1); + + // first cluster + runAction(Collections.emptyList()); + checkSnapshot(table); + List splits = readBuilder.newScan().plan().splits(); + assertThat(splits.size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + List result2 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected2 = + Lists.newArrayList( + "+I[0, 0]", + "+I[0, 1]", + "+I[1, 0]", + "+I[1, 1]", + "+I[0, 2]", + "+I[1, 2]", + "+I[2, 0]", + "+I[2, 1]", + "+I[2, 2]"); + assertThat(result2).containsExactlyElementsOf(expected2); + + // second write + messages1.clear(); + messages1.addAll( + write( + GenericRow.of(0, 3, null), + GenericRow.of(1, 3, null), + GenericRow.of(2, 3, null))); + messages1.addAll( + write( + GenericRow.of(3, 0, null), + GenericRow.of(3, 1, null), + GenericRow.of(3, 2, null), + GenericRow.of(3, 3, null))); + commit(messages1); + + List result3 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + List expected3 = new ArrayList<>(expected2); + expected3.addAll( + Lists.newArrayList( + "+I[0, 3]", + "+I[1, 3]", + "+I[2, 3]", + "+I[3, 0]", + "+I[3, 1]", + "+I[3, 2]", + "+I[3, 3]")); + assertThat(result3).containsExactlyElementsOf(expected3); + + // second cluster + runAction(Collections.emptyList()); + checkSnapshot(table); + splits = readBuilder.newScan().plan().splits(); + List result4 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected4 = new ArrayList<>(expected2); + expected4.addAll( + Lists.newArrayList( + "+I[0, 3]", + "+I[1, 3]", + "+I[3, 0]", + "+I[3, 1]", + "+I[2, 3]", + "+I[3, 2]", + "+I[3, 3]")); + assertThat(splits.size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4); + assertThat(result4).containsExactlyElementsOf(expected4); + + // full cluster + runAction(Lists.newArrayList("--compact_strategy", "full")); + checkSnapshot(table); + splits = readBuilder.newScan().plan().splits(); + List result5 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected5 = new ArrayList<>(); + expected5.addAll( + Lists.newArrayList( + "+I[0, 0]", + "+I[0, 1]", + "+I[1, 0]", + "+I[1, 1]", + "+I[0, 2]", + "+I[0, 3]", + "+I[1, 2]", + "+I[1, 3]", + "+I[2, 0]", + "+I[2, 1]", + "+I[3, 0]", + "+I[3, 1]", + "+I[2, 2]", + "+I[2, 3]", + "+I[3, 2]", + "+I[3, 3]")); + assertThat(splits.size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(result5).containsExactlyElementsOf(expected5); + } + + protected FileStoreTable createTable(String partitionKeys) throws Exception { + catalog.createDatabase(database, true); + catalog.createTable(identifier(), schema(partitionKeys), true); + return (FileStoreTable) catalog.getTable(identifier()); + } + + private FileStoreTable getTable() throws Exception { + return (FileStoreTable) catalog.getTable(identifier()); + } + + private Identifier identifier() { + return Identifier.create(database, tableName); + } + + private List write(GenericRow... data) throws Exception { + BatchWriteBuilder builder = getTable().newBatchWriteBuilder(); + try (BatchTableWrite batchTableWrite = builder.newWrite()) { + for (GenericRow row : data) { + batchTableWrite.write(row); + } + return batchTableWrite.prepareCommit(); + } + } + + private void commit(List messages) throws Exception { + BatchTableCommit commit = getTable().newBatchWriteBuilder().newCommit(); + commit.commit(messages); + commit.close(); + } + + private static Schema schema(String partitionKeys) { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("a", DataTypes.INT()); + schemaBuilder.column("b", DataTypes.INT()); + schemaBuilder.column("c", DataTypes.STRING()); + schemaBuilder.option("bucket", "-1"); + schemaBuilder.option("num-levels", "6"); + schemaBuilder.option("num-sorted-run.compaction-trigger", "2"); + schemaBuilder.option("clustering.columns", "a,b"); + schemaBuilder.option("clustering.strategy", "zorder"); + schemaBuilder.option("clustering.incremental", "true"); + schemaBuilder.option("scan.parallelism", "1"); + schemaBuilder.option("sink.parallelism", "1"); + if (!StringUtils.isNullOrWhitespaceOnly(partitionKeys)) { + schemaBuilder.partitionKeys(partitionKeys); + } + return schemaBuilder.build(); + } + + private void checkSnapshot(FileStoreTable table) { + assertThat(table.latestSnapshot().get().commitKind()) + .isEqualTo(Snapshot.CommitKind.COMPACT); + } + + private void runAction(List extra) throws Exception { + StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().batchMode().build(); + ArrayList baseArgs = + Lists.newArrayList("compact", "--database", database, "--table", tableName); + ThreadLocalRandom random = ThreadLocalRandom.current(); + if (random.nextBoolean()) { + baseArgs.addAll(Lists.newArrayList("--warehouse", warehouse)); + } else { + baseArgs.addAll(Lists.newArrayList("--catalog_conf", "warehouse=" + warehouse)); + } + baseArgs.addAll(extra); + + CompactAction action = createAction(CompactAction.class, baseArgs.toArray(new String[0])); + // action.withStreamExecutionEnvironment(env).build(); + // env.execute(); + action.withStreamExecutionEnvironment(env); + action.run(); + } +} From 9ae48e463babf65acb193d9ae408200b8369e2f1 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Mon, 13 Oct 2025 17:02:37 +0800 Subject: [PATCH 3/6] add test --- .../paimon/flink/action/CompactAction.java | 44 +-- .../IncrementalClusterSplitSource.java | 2 +- .../IncrementalClusterActionITCase.java | 284 +++++++++--------- 3 files changed, 173 insertions(+), 157 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index e728eaf66382..5df4aeef65a8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -228,7 +228,6 @@ private boolean buildForIncrementalClustering( fullCompaction = false; } Options options = new Options(table.options()); - Integer sinkParallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM); int localSampleMagnification = table.coreOptions().getLocalSampleMagnification(); if (localSampleMagnification < 20) { throw new IllegalArgumentException( @@ -237,15 +236,6 @@ private boolean buildForIncrementalClustering( CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(), localSampleMagnification)); } - TableSortInfo sortInfo = - new TableSortInfo.Builder() - .setSortColumns(incrementalClusterManager.clusterKeys()) - .setSortStrategy(incrementalClusterManager.clusterCurve()) - .setSinkParallelism(sinkParallelism) - .setLocalSampleSize(sinkParallelism * localSampleMagnification) - .setGlobalSampleSize(sinkParallelism * 1000) - .setRangeNumber(sinkParallelism * 10) - .build(); String commitUser = CoreOptions.createCommitUser(options); InternalRowPartitionComputer partitionComputer = new InternalRowPartitionComputer( @@ -261,11 +251,14 @@ private boolean buildForIncrementalClustering( LOGGER.info( "No partition needs to be incrementally clustered. " + "Please set '--compact_strategy full' if you need to forcibly trigger the cluster."); - return false; - // throw new RuntimeException( - // "No partition needs to be incrementally clustered. " - // + "Please set '--compact_strategy full' if you need to - // forcibly trigger the cluster."); + if (this.forceStartFlinkJob) { + env.fromSequence(0, 0) + .name("Nothing to Cluster Source") + .sinkTo(new DiscardingSink<>()); + return true; + } else { + return false; + } } Map partitionSplits = compactUnits.entrySet().stream() @@ -296,11 +289,25 @@ private boolean buildForIncrementalClustering( options.get(FlinkConnectorOptions.SCAN_PARALLELISM)); // 2.2 cluster in partition + Integer sinkParallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM); + if (sinkParallelism == null) { + sinkParallelism = sourcePair.getLeft().getParallelism(); + } + TableSortInfo sortInfo = + new TableSortInfo.Builder() + .setSortColumns(incrementalClusterManager.clusterKeys()) + .setSortStrategy(incrementalClusterManager.clusterCurve()) + .setSinkParallelism(sinkParallelism) + .setLocalSampleSize(sinkParallelism * localSampleMagnification) + .setGlobalSampleSize(sinkParallelism * 1000) + .setRangeNumber(sinkParallelism * 10) + .build(); DataStream sorted = TableSorter.getSorter(env, sourcePair.getLeft(), table, sortInfo).sort(); // 2.3 write and then reorganize the committable - RowAppendTableSink sink = new RowAppendTableSink(table, null, null, sinkParallelism); + // set parallelism to null, and it'll forward parallelism when doWrite() + RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null); DataStream clusterCommittable = sink.doWrite( FlinkSinkBuilder.mapToInternalRow(sorted, table.rowType()), @@ -319,14 +326,13 @@ private boolean buildForIncrementalClustering( unit -> unit.getValue() .outputLevel() // 提取CompactUnit中的level值 - )))) - .forceNonParallel(); + )))); dataStreams.add(clusterCommittable); dataStreams.add(sourcePair.getRight()); } // 3. commit - RowAppendTableSink sink = new RowAppendTableSink(table, null, null, sinkParallelism); + RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null); DataStream dataStream = dataStreams.get(0); for (int i = 1; i < dataStreams.size(); i++) { dataStream = dataStream.union(dataStreams.get(i)); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java index 388d46951245..eae1d3b8e994 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java @@ -47,7 +47,7 @@ import java.util.Map; -/** Source for incremental cluster. */ +/** Source for Incremental Clustering. */ public class IncrementalClusterSplitSource extends AbstractNonCoordinatedSource { private static final long serialVersionUID = 1L; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java index ebf255bdf34b..29f02adfd543 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java @@ -42,30 +42,28 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; /** IT cases for incremental clustering action. */ public class IncrementalClusterActionITCase extends ActionITCaseBase { @Test public void testClusterUnpartitionedTable() throws Exception { - FileStoreTable table = createTable(null); + FileStoreTable table = createTable(null, 1); - BinaryString randomStr = - BinaryString.fromString( - UUID.randomUUID().toString() + UUID.randomUUID() + UUID.randomUUID()); - List messages1 = new ArrayList<>(); + BinaryString randomStr = BinaryString.fromString(randomString(150)); + List messages = new ArrayList<>(); // first write for (int i = 0; i < 3; i++) { for (int j = 0; j < 3; j++) { - messages1.addAll(write(GenericRow.of(i, j, randomStr))); + messages.addAll(write(GenericRow.of(i, j, randomStr, 0))); } } - commit(messages1); + commit(messages); ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {0, 1}); List result1 = getResult( @@ -107,19 +105,19 @@ public void testClusterUnpartitionedTable() throws Exception { assertThat(result2).containsExactlyElementsOf(expected2); // second write - messages1.clear(); - messages1.addAll( + messages.clear(); + messages.addAll( write( - GenericRow.of(0, 3, null), - GenericRow.of(1, 3, null), - GenericRow.of(2, 3, null))); - messages1.addAll( + GenericRow.of(0, 3, null, 0), + GenericRow.of(1, 3, null, 0), + GenericRow.of(2, 3, null, 0))); + messages.addAll( write( - GenericRow.of(3, 0, null), - GenericRow.of(3, 1, null), - GenericRow.of(3, 2, null), - GenericRow.of(3, 3, null))); - commit(messages1); + GenericRow.of(3, 0, null, 0), + GenericRow.of(3, 1, null, 0), + GenericRow.of(3, 2, null, 0), + GenericRow.of(3, 3, null, 0))); + commit(messages); List result3 = getResult( @@ -164,25 +162,25 @@ public void testClusterUnpartitionedTable() throws Exception { checkSnapshot(table); splits = readBuilder.newScan().plan().splits(); List result5 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); - List expected5 = new ArrayList<>(); - expected5.addAll( - Lists.newArrayList( - "+I[0, 0]", - "+I[0, 1]", - "+I[1, 0]", - "+I[1, 1]", - "+I[0, 2]", - "+I[0, 3]", - "+I[1, 2]", - "+I[1, 3]", - "+I[2, 0]", - "+I[2, 1]", - "+I[3, 0]", - "+I[3, 1]", - "+I[2, 2]", - "+I[2, 3]", - "+I[3, 2]", - "+I[3, 3]")); + List expected5 = + new ArrayList<>( + Lists.newArrayList( + "+I[0, 0]", + "+I[0, 1]", + "+I[1, 0]", + "+I[1, 1]", + "+I[0, 2]", + "+I[0, 3]", + "+I[1, 2]", + "+I[1, 3]", + "+I[2, 0]", + "+I[2, 1]", + "+I[3, 0]", + "+I[3, 1]", + "+I[2, 2]", + "+I[2, 3]", + "+I[3, 2]", + "+I[3, 3]")); assertThat(splits.size()).isEqualTo(1); assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); @@ -191,90 +189,85 @@ public void testClusterUnpartitionedTable() throws Exception { @Test public void testClusterPartitionedTable() throws Exception { - FileStoreTable table = createTable(null); + FileStoreTable table = createTable("pt", 1); - BinaryString randomStr = - BinaryString.fromString( - UUID.randomUUID().toString() + UUID.randomUUID() + UUID.randomUUID()); - List messages1 = new ArrayList<>(); + BinaryString randomStr = BinaryString.fromString(randomString(150)); + List messages = new ArrayList<>(); // first write - for (int i = 0; i < 3; i++) { - for (int j = 0; j < 3; j++) { - messages1.addAll(write(GenericRow.of(i, j, randomStr))); + List expected1 = new ArrayList<>(); + for (int pt = 0; pt < 2; pt++) { + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + messages.addAll(write(GenericRow.of(i, j, (pt == 0) ? randomStr : null, pt))); + expected1.add(String.format("+I[%s, %s, %s]", i, j, pt)); + } } } - commit(messages1); - ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {0, 1}); + commit(messages); + ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {0, 1, 3}); List result1 = getResult( readBuilder.newRead(), readBuilder.newScan().plan().splits(), readBuilder.readType()); - List expected1 = - Lists.newArrayList( - "+I[0, 0]", - "+I[0, 1]", - "+I[0, 2]", - "+I[1, 0]", - "+I[1, 1]", - "+I[1, 2]", - "+I[2, 0]", - "+I[2, 1]", - "+I[2, 2]"); assertThat(result1).containsExactlyElementsOf(expected1); // first cluster runAction(Collections.emptyList()); checkSnapshot(table); List splits = readBuilder.newScan().plan().splits(); - assertThat(splits.size()).isEqualTo(1); + assertThat(splits.size()).isEqualTo(2); assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); List result2 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); - List expected2 = - Lists.newArrayList( - "+I[0, 0]", - "+I[0, 1]", - "+I[1, 0]", - "+I[1, 1]", - "+I[0, 2]", - "+I[1, 2]", - "+I[2, 0]", - "+I[2, 1]", - "+I[2, 2]"); + List expected2 = new ArrayList<>(); + for (int pt = 0; pt < 2; pt++) { + expected2.add(String.format("+I[0, 0, %s]", pt)); + expected2.add(String.format("+I[0, 1, %s]", pt)); + expected2.add(String.format("+I[1, 0, %s]", pt)); + expected2.add(String.format("+I[1, 1, %s]", pt)); + expected2.add(String.format("+I[0, 2, %s]", pt)); + expected2.add(String.format("+I[1, 2, %s]", pt)); + expected2.add(String.format("+I[2, 0, %s]", pt)); + expected2.add(String.format("+I[2, 1, %s]", pt)); + expected2.add(String.format("+I[2, 2, %s]", pt)); + } assertThat(result2).containsExactlyElementsOf(expected2); // second write - messages1.clear(); - messages1.addAll( - write( - GenericRow.of(0, 3, null), - GenericRow.of(1, 3, null), - GenericRow.of(2, 3, null))); - messages1.addAll( - write( - GenericRow.of(3, 0, null), - GenericRow.of(3, 1, null), - GenericRow.of(3, 2, null), - GenericRow.of(3, 3, null))); - commit(messages1); + messages.clear(); + for (int pt = 0; pt < 2; pt++) { + messages.addAll( + write( + GenericRow.of(0, 3, null, pt), + GenericRow.of(1, 3, null, pt), + GenericRow.of(2, 3, null, pt))); + messages.addAll( + write( + GenericRow.of(3, 0, null, pt), + GenericRow.of(3, 1, null, pt), + GenericRow.of(3, 2, null, pt), + GenericRow.of(3, 3, null, pt))); + } + commit(messages); List result3 = getResult( readBuilder.newRead(), readBuilder.newScan().plan().splits(), readBuilder.readType()); - List expected3 = new ArrayList<>(expected2); - expected3.addAll( - Lists.newArrayList( - "+I[0, 3]", - "+I[1, 3]", - "+I[2, 3]", - "+I[3, 0]", - "+I[3, 1]", - "+I[3, 2]", - "+I[3, 3]")); + List expected3 = new ArrayList<>(); + for (int pt = 0; pt < 2; pt++) { + expected3.addAll(expected2.subList(9 * pt, 9 * pt + 9)); + expected3.add(String.format("+I[0, 3, %s]", pt)); + expected3.add(String.format("+I[1, 3, %s]", pt)); + expected3.add(String.format("+I[2, 3, %s]", pt)); + expected3.add(String.format("+I[3, 0, %s]", pt)); + expected3.add(String.format("+I[3, 1, %s]", pt)); + expected3.add(String.format("+I[3, 2, %s]", pt)); + expected3.add(String.format("+I[3, 3, %s]", pt)); + } assertThat(result3).containsExactlyElementsOf(expected3); // second cluster @@ -282,55 +275,60 @@ public void testClusterPartitionedTable() throws Exception { checkSnapshot(table); splits = readBuilder.newScan().plan().splits(); List result4 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); - List expected4 = new ArrayList<>(expected2); - expected4.addAll( - Lists.newArrayList( - "+I[0, 3]", - "+I[1, 3]", - "+I[3, 0]", - "+I[3, 1]", - "+I[2, 3]", - "+I[3, 2]", - "+I[3, 3]")); - assertThat(splits.size()).isEqualTo(1); + List expected4 = new ArrayList<>(); + // for partition-0: only file in level-0 will be picked for clustering, outputLevel is 4 + expected4.add("+I[0, 0, 0]"); + expected4.add("+I[0, 1, 0]"); + expected4.add("+I[1, 0, 0]"); + expected4.add("+I[1, 1, 0]"); + expected4.add("+I[0, 2, 0]"); + expected4.add("+I[1, 2, 0]"); + expected4.add("+I[2, 0, 0]"); + expected4.add("+I[2, 1, 0]"); + expected4.add("+I[2, 2, 0]"); + expected4.add("+I[0, 3, 0]"); + expected4.add("+I[1, 3, 0]"); + expected4.add("+I[3, 0, 0]"); + expected4.add("+I[3, 1, 0]"); + expected4.add("+I[2, 3, 0]"); + expected4.add("+I[3, 2, 0]"); + expected4.add("+I[3, 3, 0]"); + // for partition-1:all files will be picked for clustering, outputLevel is 5 + expected4.add("+I[0, 0, 1]"); + expected4.add("+I[0, 1, 1]"); + expected4.add("+I[1, 0, 1]"); + expected4.add("+I[1, 1, 1]"); + expected4.add("+I[0, 2, 1]"); + expected4.add("+I[0, 3, 1]"); + expected4.add("+I[1, 2, 1]"); + expected4.add("+I[1, 3, 1]"); + expected4.add("+I[2, 0, 1]"); + expected4.add("+I[2, 1, 1]"); + expected4.add("+I[3, 0, 1]"); + expected4.add("+I[3, 1, 1]"); + expected4.add("+I[2, 2, 1]"); + expected4.add("+I[2, 3, 1]"); + expected4.add("+I[3, 2, 1]"); + expected4.add("+I[3, 3, 1]"); + assertThat(splits.size()).isEqualTo(2); assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(1)).dataFiles().size()).isEqualTo(1); assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4); + assertThat(((DataSplit) splits.get(1)).dataFiles().get(0).level()).isEqualTo(5); assertThat(result4).containsExactlyElementsOf(expected4); + } - // full cluster - runAction(Lists.newArrayList("--compact_strategy", "full")); - checkSnapshot(table); - splits = readBuilder.newScan().plan().splits(); - List result5 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); - List expected5 = new ArrayList<>(); - expected5.addAll( - Lists.newArrayList( - "+I[0, 0]", - "+I[0, 1]", - "+I[1, 0]", - "+I[1, 1]", - "+I[0, 2]", - "+I[0, 3]", - "+I[1, 2]", - "+I[1, 3]", - "+I[2, 0]", - "+I[2, 1]", - "+I[3, 0]", - "+I[3, 1]", - "+I[2, 2]", - "+I[2, 3]", - "+I[3, 2]", - "+I[3, 3]")); - assertThat(splits.size()).isEqualTo(1); - assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); - assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); - assertThat(result5).containsExactlyElementsOf(expected5); + @Test + public void testClusterOnEmptyData() throws Exception { + createTable("pt", 1); + assertThatCode(() -> runAction(Collections.emptyList())).doesNotThrowAnyException(); } - protected FileStoreTable createTable(String partitionKeys) throws Exception { + protected FileStoreTable createTable(String partitionKeys, int sinkParallelism) + throws Exception { catalog.createDatabase(database, true); - catalog.createTable(identifier(), schema(partitionKeys), true); + catalog.createTable(identifier(), schema(partitionKeys, sinkParallelism), true); return (FileStoreTable) catalog.getTable(identifier()); } @@ -358,25 +356,37 @@ private void commit(List messages) throws Exception { commit.close(); } - private static Schema schema(String partitionKeys) { + private static Schema schema(String partitionKeys, int sinkParallelism) { Schema.Builder schemaBuilder = Schema.newBuilder(); schemaBuilder.column("a", DataTypes.INT()); schemaBuilder.column("b", DataTypes.INT()); schemaBuilder.column("c", DataTypes.STRING()); + schemaBuilder.column("pt", DataTypes.INT()); schemaBuilder.option("bucket", "-1"); schemaBuilder.option("num-levels", "6"); schemaBuilder.option("num-sorted-run.compaction-trigger", "2"); + schemaBuilder.option("scan.plan-sort-partition", "true"); schemaBuilder.option("clustering.columns", "a,b"); schemaBuilder.option("clustering.strategy", "zorder"); schemaBuilder.option("clustering.incremental", "true"); schemaBuilder.option("scan.parallelism", "1"); - schemaBuilder.option("sink.parallelism", "1"); + schemaBuilder.option("sink.parallelism", String.valueOf(sinkParallelism)); if (!StringUtils.isNullOrWhitespaceOnly(partitionKeys)) { schemaBuilder.partitionKeys(partitionKeys); } return schemaBuilder.build(); } + private static String randomString(int length) { + String chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + StringBuilder sb = new StringBuilder(length); + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = 0; i < length; i++) { + sb.append(chars.charAt(random.nextInt(chars.length()))); + } + return sb.toString(); + } + private void checkSnapshot(FileStoreTable table) { assertThat(table.latestSnapshot().get().commitKind()) .isEqualTo(Snapshot.CommitKind.COMPACT); From 47b54ea14c019950fad991b3a360e434a4f591ed Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Mon, 13 Oct 2025 17:12:04 +0800 Subject: [PATCH 4/6] fix --- .../java/org/apache/paimon/flink/action/CompactAction.java | 6 ++---- .../RewriteIncrementalClusterCommittableOperator.java | 5 +---- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 5df4aeef65a8..56820588dcad 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -321,12 +321,10 @@ private boolean buildForIncrementalClustering( compactUnits.entrySet().stream() .collect( Collectors.toMap( - Map.Entry - ::getKey, // 保持原有的BinaryRow键 + Map.Entry::getKey, unit -> unit.getValue() - .outputLevel() // 提取CompactUnit中的level值 - )))); + .outputLevel())))); dataStreams.add(clusterCommittable); dataStreams.add(sourcePair.getRight()); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java index bd0a0cda9fa7..6b0ef47f3f18 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java @@ -38,10 +38,7 @@ import static org.apache.paimon.utils.Preconditions.checkArgument; -/** - * Rewrite committable from postpone bucket table compactor. It moves all new files into compact - * results, and delete unused new files, because compactor only produce compact snapshots. - */ +/** Rewrite committable for new files written after clustered. */ public class RewriteIncrementalClusterCommittableOperator extends BoundedOneInputOperator { private static final long serialVersionUID = 1L; From abd5f696ddd72411a8e23101959de9dfa0406d5b Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Mon, 13 Oct 2025 17:32:10 +0800 Subject: [PATCH 5/6] add docs --- .../append-table/incremental-clustering.md | 49 +++++++++++++++++-- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/docs/content/append-table/incremental-clustering.md b/docs/content/append-table/incremental-clustering.md index 72f24ec17e86..1cb08cbbdb78 100644 --- a/docs/content/append-table/incremental-clustering.md +++ b/docs/content/append-table/incremental-clustering.md @@ -95,11 +95,14 @@ clustering and small-file merging must be performed exclusively via Incremental ## Run Incremental Clustering {{< hint info >}} -Currently, only support running Incremental Clustering in spark, support for flink will be added in the near future. +only support running Incremental Clustering in batch mode. {{< /hint >}} -To run a Incremental Clustering job, follow these instructions. +To run a Incremental Clustering job, follow these instructions. + +You don’t need to specify any clustering-related parameters when running Incremental Clustering, +these options are already defined as table options. If you need to change clustering settings, please update the corresponding table options. {{< tabs "incremental-clustering" >}} @@ -117,8 +120,46 @@ CALL sys.compact(table => 'T') -- run incremental clustering with full mode, this will recluster all data CALL sys.compact(table => 'T', compact_strategy => 'full') ``` -You don’t need to specify any clustering-related parameters when running Incremental Clustering, -these are already defined as table options. If you need to change clustering settings, please update the corresponding table options. +{{< /tab >}} + +{{< tab "Flink Action" >}} + +Run the following command to submit a incremental clustering job for the table. + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + compact \ + --warehouse \ + --database \ + --table \ + [--compact_strategy ] \ + [--table_conf ] \ + [--catalog_conf [--catalog_conf ...]] +``` + +Example: run incremental clustering + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + compact \ + --warehouse s3:///path/to/warehouse \ + --database test_db \ + --table test_table \ + --table_conf sink.parallelism=2 \ + --compact_strategy minor \ + --catalog_conf s3.endpoint=https://****.com \ + --catalog_conf s3.access-key=***** \ + --catalog_conf s3.secret-key=***** +``` +* `--compact_strategy` Determines how to pick files to be cluster, the default is `minor`. + * `full` : All files will be selected for clustered. + * `minor` : Pick the set of files that need to be clustered based on specified conditions. + +Note: write parallelism is set by `sink.parallelism`, if too big, may generate a large number of small files. + +You can use `-D execution.runtime-mode=batch` or `-yD execution.runtime-mode=batch` (for the ON-YARN scenario) to use batch mode. {{< /tab >}} {{< /tabs >}} From 6de0c29f1b7ec036becd277f031bac49c6e16916 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Tue, 14 Oct 2025 10:31:16 +0800 Subject: [PATCH 6/6] rebase master and solve conflicts --- .../java/org/apache/paimon/flink/action/CompactAction.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 56820588dcad..86259582fa06 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -308,9 +308,14 @@ private boolean buildForIncrementalClustering( // 2.3 write and then reorganize the committable // set parallelism to null, and it'll forward parallelism when doWrite() RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null); + boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor(); DataStream clusterCommittable = sink.doWrite( - FlinkSinkBuilder.mapToInternalRow(sorted, table.rowType()), + FlinkSinkBuilder.mapToInternalRow( + sorted, + table.rowType(), + blobAsDescriptor, + table.catalogEnvironment().catalogContext()), commitUser, null) .transform(