diff --git a/docs/content/append-table/incremental-clustering.md b/docs/content/append-table/incremental-clustering.md index 72f24ec17e86..1cb08cbbdb78 100644 --- a/docs/content/append-table/incremental-clustering.md +++ b/docs/content/append-table/incremental-clustering.md @@ -95,11 +95,14 @@ clustering and small-file merging must be performed exclusively via Incremental ## Run Incremental Clustering {{< hint info >}} -Currently, only support running Incremental Clustering in spark, support for flink will be added in the near future. +only support running Incremental Clustering in batch mode. {{< /hint >}} -To run a Incremental Clustering job, follow these instructions. +To run a Incremental Clustering job, follow these instructions. + +You don’t need to specify any clustering-related parameters when running Incremental Clustering, +these options are already defined as table options. If you need to change clustering settings, please update the corresponding table options. {{< tabs "incremental-clustering" >}} @@ -117,8 +120,46 @@ CALL sys.compact(table => 'T') -- run incremental clustering with full mode, this will recluster all data CALL sys.compact(table => 'T', compact_strategy => 'full') ``` -You don’t need to specify any clustering-related parameters when running Incremental Clustering, -these are already defined as table options. If you need to change clustering settings, please update the corresponding table options. +{{< /tab >}} + +{{< tab "Flink Action" >}} + +Run the following command to submit a incremental clustering job for the table. + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + compact \ + --warehouse \ + --database \ + --table \ + [--compact_strategy ] \ + [--table_conf ] \ + [--catalog_conf [--catalog_conf ...]] +``` + +Example: run incremental clustering + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + compact \ + --warehouse s3:///path/to/warehouse \ + --database test_db \ + --table test_table \ + --table_conf sink.parallelism=2 \ + --compact_strategy minor \ + --catalog_conf s3.endpoint=https://****.com \ + --catalog_conf s3.access-key=***** \ + --catalog_conf s3.secret-key=***** +``` +* `--compact_strategy` Determines how to pick files to be cluster, the default is `minor`. + * `full` : All files will be selected for clustered. + * `minor` : Pick the set of files that need to be clustered based on specified conditions. + +Note: write parallelism is set by `sink.parallelism`, if too big, may generate a large number of small files. + +You can use `-D execution.runtime-mode=batch` or `-yD execution.runtime-mode=batch` (for the ON-YARN scenario) to use batch mode. {{< /tab >}} {{< /tabs >}} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java index 887150577c23..407f6b17ef43 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java @@ -226,7 +226,8 @@ public List toSplits(BinaryRow partition, List files) { return splits; } - public List upgrade(List filesAfterCluster, int outputLevel) { + public static List upgrade( + List filesAfterCluster, int outputLevel) { return filesAfterCluster.stream() .map(file -> file.upgrade(outputLevel)) .collect(Collectors.toList()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 602d3a59a5b7..86259582fa06 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -19,9 +19,13 @@ package org.apache.paimon.flink.action; import org.apache.paimon.CoreOptions; +import org.apache.paimon.append.cluster.IncrementalClusterManager; +import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource; +import org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator; import org.apache.paimon.flink.compact.AppendTableCompactBuilder; import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource; import org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator; @@ -32,7 +36,10 @@ import org.apache.paimon.flink.sink.FixedBucketSink; import org.apache.paimon.flink.sink.FlinkSinkBuilder; import org.apache.paimon.flink.sink.FlinkStreamPartitioner; +import org.apache.paimon.flink.sink.RowAppendTableSink; import org.apache.paimon.flink.sink.RowDataChannelComputer; +import org.apache.paimon.flink.sorter.TableSortInfo; +import org.apache.paimon.flink.sorter.TableSorter; import org.apache.paimon.flink.source.CompactorSourceBuilder; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.options.Options; @@ -43,6 +50,7 @@ import org.apache.paimon.predicate.PredicateProjectionConverter; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.Pair; @@ -68,6 +76,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions; import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; @@ -101,10 +110,6 @@ public CompactAction( checkArgument( !((FileStoreTable) table).coreOptions().dataEvolutionEnabled(), "Compact action does not support data evolution table yet. "); - checkArgument( - !(((FileStoreTable) table).bucketMode() == BucketMode.BUCKET_UNAWARE - && ((FileStoreTable) table).coreOptions().clusteringIncrementalEnabled()), - "The table has enabled incremental clustering, and do not support compact in flink yet."); HashMap dynamicOptions = new HashMap<>(tableConf); dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false"); table = table.copy(dynamicOptions); @@ -148,8 +153,12 @@ private boolean buildImpl() throws Exception { if (fileStoreTable.coreOptions().bucket() == BucketMode.POSTPONE_BUCKET) { return buildForPostponeBucketCompaction(env, fileStoreTable, isStreaming); } else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) { - buildForAppendTableCompact(env, fileStoreTable, isStreaming); - return true; + if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) { + return buildForIncrementalClustering(env, fileStoreTable, isStreaming); + } else { + buildForAppendTableCompact(env, fileStoreTable, isStreaming); + return true; + } } else { buildForBucketedTableCompact(env, fileStoreTable, isStreaming); return true; @@ -203,6 +212,138 @@ private void buildForAppendTableCompact( builder.build(); } + private boolean buildForIncrementalClustering( + StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) { + checkArgument(!isStreaming, "Incremental clustering currently only supports batch mode"); + checkArgument( + partitions == null, + "Incremental clustering currently does not support specifying partitions"); + checkArgument( + whereSql == null, "Incremental clustering currently does not support predicates"); + + IncrementalClusterManager incrementalClusterManager = new IncrementalClusterManager(table); + + // non-full strategy as default for incremental clustering + if (fullCompaction == null) { + fullCompaction = false; + } + Options options = new Options(table.options()); + int localSampleMagnification = table.coreOptions().getLocalSampleMagnification(); + if (localSampleMagnification < 20) { + throw new IllegalArgumentException( + String.format( + "the config '%s=%d' should not be set too small, greater than or equal to 20 is needed.", + CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(), + localSampleMagnification)); + } + String commitUser = CoreOptions.createCommitUser(options); + InternalRowPartitionComputer partitionComputer = + new InternalRowPartitionComputer( + table.coreOptions().partitionDefaultName(), + table.store().partitionType(), + table.partitionKeys().toArray(new String[0]), + table.coreOptions().legacyPartitionName()); + + // 1. pick cluster files for each partition + Map compactUnits = + incrementalClusterManager.prepareForCluster(fullCompaction); + if (compactUnits.isEmpty()) { + LOGGER.info( + "No partition needs to be incrementally clustered. " + + "Please set '--compact_strategy full' if you need to forcibly trigger the cluster."); + if (this.forceStartFlinkJob) { + env.fromSequence(0, 0) + .name("Nothing to Cluster Source") + .sinkTo(new DiscardingSink<>()); + return true; + } else { + return false; + } + } + Map partitionSplits = + compactUnits.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + incrementalClusterManager + .toSplits( + entry.getKey(), + entry.getValue().files()) + .toArray(new DataSplit[0]))); + + // 2. read,sort and write in partition + List> dataStreams = new ArrayList<>(); + + for (Map.Entry entry : partitionSplits.entrySet()) { + DataSplit[] splits = entry.getValue(); + LinkedHashMap partitionSpec = + partitionComputer.generatePartValues(entry.getKey()); + // 2.1 generate source for current partition + Pair, DataStream> sourcePair = + IncrementalClusterSplitSource.buildSource( + env, + table, + partitionSpec, + splits, + options.get(FlinkConnectorOptions.SCAN_PARALLELISM)); + + // 2.2 cluster in partition + Integer sinkParallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM); + if (sinkParallelism == null) { + sinkParallelism = sourcePair.getLeft().getParallelism(); + } + TableSortInfo sortInfo = + new TableSortInfo.Builder() + .setSortColumns(incrementalClusterManager.clusterKeys()) + .setSortStrategy(incrementalClusterManager.clusterCurve()) + .setSinkParallelism(sinkParallelism) + .setLocalSampleSize(sinkParallelism * localSampleMagnification) + .setGlobalSampleSize(sinkParallelism * 1000) + .setRangeNumber(sinkParallelism * 10) + .build(); + DataStream sorted = + TableSorter.getSorter(env, sourcePair.getLeft(), table, sortInfo).sort(); + + // 2.3 write and then reorganize the committable + // set parallelism to null, and it'll forward parallelism when doWrite() + RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null); + boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor(); + DataStream clusterCommittable = + sink.doWrite( + FlinkSinkBuilder.mapToInternalRow( + sorted, + table.rowType(), + blobAsDescriptor, + table.catalogEnvironment().catalogContext()), + commitUser, + null) + .transform( + "Rewrite cluster committable", + new CommittableTypeInfo(), + new RewriteIncrementalClusterCommittableOperator( + table, + compactUnits.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + unit -> + unit.getValue() + .outputLevel())))); + dataStreams.add(clusterCommittable); + dataStreams.add(sourcePair.getRight()); + } + + // 3. commit + RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null); + DataStream dataStream = dataStreams.get(0); + for (int i = 1; i < dataStreams.size(); i++) { + dataStream = dataStream.union(dataStreams.get(i)); + } + sink.doCommit(dataStream, commitUser); + return true; + } + protected PartitionPredicate getPartitionPredicate() throws Exception { checkArgument( partitions == null || whereSql == null, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java new file mode 100644 index 000000000000..eae1d3b8e994 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.cluster; + +import org.apache.paimon.flink.LogicalTypeConversion; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.sink.CommittableTypeInfo; +import org.apache.paimon.flink.source.AbstractNonCoordinatedSource; +import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; +import org.apache.paimon.flink.source.SimpleSourceSplit; +import org.apache.paimon.flink.source.operator.ReadOperator; +import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.utils.Pair; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; + +import javax.annotation.Nullable; + +import java.util.Map; + +/** Source for Incremental Clustering. */ +public class IncrementalClusterSplitSource extends AbstractNonCoordinatedSource { + private static final long serialVersionUID = 1L; + + private final Split[] splits; + + public IncrementalClusterSplitSource(Split[] splits) { + this.splits = splits; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) + throws Exception { + return new IncrementalClusterSplitSource.Reader(); + } + + private class Reader extends AbstractNonCoordinatedSourceReader { + + @Override + public InputStatus pollNext(ReaderOutput output) throws Exception { + for (Split split : splits) { + DataSplit dataSplit = (DataSplit) split; + output.collect(dataSplit); + } + return InputStatus.END_OF_INPUT; + } + } + + public static Pair, DataStream> buildSource( + StreamExecutionEnvironment env, + FileStoreTable table, + Map partitionSpec, + DataSplit[] splits, + @Nullable Integer parallelism) { + DataStreamSource source = + env.fromSource( + new IncrementalClusterSplitSource(splits), + WatermarkStrategy.noWatermarks(), + String.format( + "Incremental-cluster split generator: %s - %s", + table.fullName(), partitionSpec), + new JavaTypeInfo<>(Split.class)); + + if (parallelism != null) { + source.setParallelism(parallelism); + } + + return Pair.of( + new DataStream<>(source.getExecutionEnvironment(), source.getTransformation()) + .transform( + String.format( + "Incremental-cluster reader: %s - %s", + table.fullName(), partitionSpec), + InternalTypeInfo.of( + LogicalTypeConversion.toLogicalType(table.rowType())), + new ReadOperator(table::newRead, null, null)), + source.forward() + .transform( + "Remove files to be clustered", + new CommittableTypeInfo(), + new RemoveClusterBeforeFilesOperator()) + .forceNonParallel()); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java new file mode 100644 index 000000000000..907a69ba120a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.cluster; + +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.utils.BoundedOneInputOperator; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.Split; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.Collections; + +/** Operator used with {@link IncrementalClusterSplitSource}, to remove files to be clustered. */ +public class RemoveClusterBeforeFilesOperator extends BoundedOneInputOperator { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement(StreamRecord element) throws Exception { + DataSplit dataSplit = (DataSplit) element.getValue(); + CommitMessageImpl message = + new CommitMessageImpl( + dataSplit.partition(), + dataSplit.bucket(), + dataSplit.totalBuckets(), + DataIncrement.emptyIncrement(), + new CompactIncrement( + dataSplit.dataFiles(), + Collections.emptyList(), + Collections.emptyList())); + output.collect( + new StreamRecord<>( + new Committable(Long.MAX_VALUE, Committable.Kind.FILE, message))); + } + + @Override + public void endInput() throws Exception {} +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java new file mode 100644 index 000000000000..6b0ef47f3f18 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.cluster; + +import org.apache.paimon.append.cluster.IncrementalClusterManager; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.utils.BoundedOneInputOperator; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessageImpl; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Rewrite committable for new files written after clustered. */ +public class RewriteIncrementalClusterCommittableOperator + extends BoundedOneInputOperator { + private static final long serialVersionUID = 1L; + + private final FileStoreTable table; + private final Map outputLevels; + + private transient Map> partitionFiles; + + public RewriteIncrementalClusterCommittableOperator( + FileStoreTable table, Map outputLevels) { + this.table = table; + this.outputLevels = outputLevels; + } + + @Override + public void open() throws Exception { + partitionFiles = new HashMap<>(); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + Committable committable = element.getValue(); + if (committable.kind() != Committable.Kind.FILE) { + output.collect(element); + } + + CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable(); + checkArgument(message.bucket() == 0); + BinaryRow partition = message.partition(); + partitionFiles + .computeIfAbsent(partition, file -> new ArrayList<>()) + .addAll(message.newFilesIncrement().newFiles()); + } + + @Override + public void endInput() throws Exception { + emitAll(Long.MAX_VALUE); + } + + protected void emitAll(long checkpointId) { + for (Map.Entry> partitionEntry : partitionFiles.entrySet()) { + BinaryRow partition = partitionEntry.getKey(); + // upgrade the clustered file to outputLevel + List clusterAfter = + IncrementalClusterManager.upgrade( + partitionEntry.getValue(), outputLevels.get(partition)); + LOG.info( + "Partition {}: upgrade file level to {}", + partition, + outputLevels.get(partition)); + CompactIncrement compactIncrement = + new CompactIncrement( + Collections.emptyList(), clusterAfter, Collections.emptyList()); + CommitMessageImpl clusterMessage = + new CommitMessageImpl( + partition, + // bucket 0 is bucket for unaware-bucket table + // for compatibility with the old design + 0, + table.coreOptions().bucket(), + DataIncrement.emptyIncrement(), + compactIncrement); + output.collect( + new StreamRecord<>( + new Committable(checkpointId, Committable.Kind.FILE, clusterMessage))); + } + + partitionFiles.clear(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java new file mode 100644 index 000000000000..29f02adfd543 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.StringUtils; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +/** IT cases for incremental clustering action. */ +public class IncrementalClusterActionITCase extends ActionITCaseBase { + + @Test + public void testClusterUnpartitionedTable() throws Exception { + FileStoreTable table = createTable(null, 1); + + BinaryString randomStr = BinaryString.fromString(randomString(150)); + List messages = new ArrayList<>(); + + // first write + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + messages.addAll(write(GenericRow.of(i, j, randomStr, 0))); + } + } + commit(messages); + ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {0, 1}); + List result1 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + List expected1 = + Lists.newArrayList( + "+I[0, 0]", + "+I[0, 1]", + "+I[0, 2]", + "+I[1, 0]", + "+I[1, 1]", + "+I[1, 2]", + "+I[2, 0]", + "+I[2, 1]", + "+I[2, 2]"); + assertThat(result1).containsExactlyElementsOf(expected1); + + // first cluster + runAction(Collections.emptyList()); + checkSnapshot(table); + List splits = readBuilder.newScan().plan().splits(); + assertThat(splits.size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + List result2 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected2 = + Lists.newArrayList( + "+I[0, 0]", + "+I[0, 1]", + "+I[1, 0]", + "+I[1, 1]", + "+I[0, 2]", + "+I[1, 2]", + "+I[2, 0]", + "+I[2, 1]", + "+I[2, 2]"); + assertThat(result2).containsExactlyElementsOf(expected2); + + // second write + messages.clear(); + messages.addAll( + write( + GenericRow.of(0, 3, null, 0), + GenericRow.of(1, 3, null, 0), + GenericRow.of(2, 3, null, 0))); + messages.addAll( + write( + GenericRow.of(3, 0, null, 0), + GenericRow.of(3, 1, null, 0), + GenericRow.of(3, 2, null, 0), + GenericRow.of(3, 3, null, 0))); + commit(messages); + + List result3 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + List expected3 = new ArrayList<>(expected2); + expected3.addAll( + Lists.newArrayList( + "+I[0, 3]", + "+I[1, 3]", + "+I[2, 3]", + "+I[3, 0]", + "+I[3, 1]", + "+I[3, 2]", + "+I[3, 3]")); + assertThat(result3).containsExactlyElementsOf(expected3); + + // second cluster + runAction(Collections.emptyList()); + checkSnapshot(table); + splits = readBuilder.newScan().plan().splits(); + List result4 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected4 = new ArrayList<>(expected2); + expected4.addAll( + Lists.newArrayList( + "+I[0, 3]", + "+I[1, 3]", + "+I[3, 0]", + "+I[3, 1]", + "+I[2, 3]", + "+I[3, 2]", + "+I[3, 3]")); + assertThat(splits.size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4); + assertThat(result4).containsExactlyElementsOf(expected4); + + // full cluster + runAction(Lists.newArrayList("--compact_strategy", "full")); + checkSnapshot(table); + splits = readBuilder.newScan().plan().splits(); + List result5 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected5 = + new ArrayList<>( + Lists.newArrayList( + "+I[0, 0]", + "+I[0, 1]", + "+I[1, 0]", + "+I[1, 1]", + "+I[0, 2]", + "+I[0, 3]", + "+I[1, 2]", + "+I[1, 3]", + "+I[2, 0]", + "+I[2, 1]", + "+I[3, 0]", + "+I[3, 1]", + "+I[2, 2]", + "+I[2, 3]", + "+I[3, 2]", + "+I[3, 3]")); + assertThat(splits.size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(result5).containsExactlyElementsOf(expected5); + } + + @Test + public void testClusterPartitionedTable() throws Exception { + FileStoreTable table = createTable("pt", 1); + + BinaryString randomStr = BinaryString.fromString(randomString(150)); + List messages = new ArrayList<>(); + + // first write + List expected1 = new ArrayList<>(); + for (int pt = 0; pt < 2; pt++) { + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + messages.addAll(write(GenericRow.of(i, j, (pt == 0) ? randomStr : null, pt))); + expected1.add(String.format("+I[%s, %s, %s]", i, j, pt)); + } + } + } + commit(messages); + ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {0, 1, 3}); + List result1 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + assertThat(result1).containsExactlyElementsOf(expected1); + + // first cluster + runAction(Collections.emptyList()); + checkSnapshot(table); + List splits = readBuilder.newScan().plan().splits(); + assertThat(splits.size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + List result2 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected2 = new ArrayList<>(); + for (int pt = 0; pt < 2; pt++) { + expected2.add(String.format("+I[0, 0, %s]", pt)); + expected2.add(String.format("+I[0, 1, %s]", pt)); + expected2.add(String.format("+I[1, 0, %s]", pt)); + expected2.add(String.format("+I[1, 1, %s]", pt)); + expected2.add(String.format("+I[0, 2, %s]", pt)); + expected2.add(String.format("+I[1, 2, %s]", pt)); + expected2.add(String.format("+I[2, 0, %s]", pt)); + expected2.add(String.format("+I[2, 1, %s]", pt)); + expected2.add(String.format("+I[2, 2, %s]", pt)); + } + assertThat(result2).containsExactlyElementsOf(expected2); + + // second write + messages.clear(); + for (int pt = 0; pt < 2; pt++) { + messages.addAll( + write( + GenericRow.of(0, 3, null, pt), + GenericRow.of(1, 3, null, pt), + GenericRow.of(2, 3, null, pt))); + messages.addAll( + write( + GenericRow.of(3, 0, null, pt), + GenericRow.of(3, 1, null, pt), + GenericRow.of(3, 2, null, pt), + GenericRow.of(3, 3, null, pt))); + } + commit(messages); + + List result3 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + List expected3 = new ArrayList<>(); + for (int pt = 0; pt < 2; pt++) { + expected3.addAll(expected2.subList(9 * pt, 9 * pt + 9)); + expected3.add(String.format("+I[0, 3, %s]", pt)); + expected3.add(String.format("+I[1, 3, %s]", pt)); + expected3.add(String.format("+I[2, 3, %s]", pt)); + expected3.add(String.format("+I[3, 0, %s]", pt)); + expected3.add(String.format("+I[3, 1, %s]", pt)); + expected3.add(String.format("+I[3, 2, %s]", pt)); + expected3.add(String.format("+I[3, 3, %s]", pt)); + } + assertThat(result3).containsExactlyElementsOf(expected3); + + // second cluster + runAction(Collections.emptyList()); + checkSnapshot(table); + splits = readBuilder.newScan().plan().splits(); + List result4 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected4 = new ArrayList<>(); + // for partition-0: only file in level-0 will be picked for clustering, outputLevel is 4 + expected4.add("+I[0, 0, 0]"); + expected4.add("+I[0, 1, 0]"); + expected4.add("+I[1, 0, 0]"); + expected4.add("+I[1, 1, 0]"); + expected4.add("+I[0, 2, 0]"); + expected4.add("+I[1, 2, 0]"); + expected4.add("+I[2, 0, 0]"); + expected4.add("+I[2, 1, 0]"); + expected4.add("+I[2, 2, 0]"); + expected4.add("+I[0, 3, 0]"); + expected4.add("+I[1, 3, 0]"); + expected4.add("+I[3, 0, 0]"); + expected4.add("+I[3, 1, 0]"); + expected4.add("+I[2, 3, 0]"); + expected4.add("+I[3, 2, 0]"); + expected4.add("+I[3, 3, 0]"); + // for partition-1:all files will be picked for clustering, outputLevel is 5 + expected4.add("+I[0, 0, 1]"); + expected4.add("+I[0, 1, 1]"); + expected4.add("+I[1, 0, 1]"); + expected4.add("+I[1, 1, 1]"); + expected4.add("+I[0, 2, 1]"); + expected4.add("+I[0, 3, 1]"); + expected4.add("+I[1, 2, 1]"); + expected4.add("+I[1, 3, 1]"); + expected4.add("+I[2, 0, 1]"); + expected4.add("+I[2, 1, 1]"); + expected4.add("+I[3, 0, 1]"); + expected4.add("+I[3, 1, 1]"); + expected4.add("+I[2, 2, 1]"); + expected4.add("+I[2, 3, 1]"); + expected4.add("+I[3, 2, 1]"); + expected4.add("+I[3, 3, 1]"); + assertThat(splits.size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(1)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4); + assertThat(((DataSplit) splits.get(1)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(result4).containsExactlyElementsOf(expected4); + } + + @Test + public void testClusterOnEmptyData() throws Exception { + createTable("pt", 1); + assertThatCode(() -> runAction(Collections.emptyList())).doesNotThrowAnyException(); + } + + protected FileStoreTable createTable(String partitionKeys, int sinkParallelism) + throws Exception { + catalog.createDatabase(database, true); + catalog.createTable(identifier(), schema(partitionKeys, sinkParallelism), true); + return (FileStoreTable) catalog.getTable(identifier()); + } + + private FileStoreTable getTable() throws Exception { + return (FileStoreTable) catalog.getTable(identifier()); + } + + private Identifier identifier() { + return Identifier.create(database, tableName); + } + + private List write(GenericRow... data) throws Exception { + BatchWriteBuilder builder = getTable().newBatchWriteBuilder(); + try (BatchTableWrite batchTableWrite = builder.newWrite()) { + for (GenericRow row : data) { + batchTableWrite.write(row); + } + return batchTableWrite.prepareCommit(); + } + } + + private void commit(List messages) throws Exception { + BatchTableCommit commit = getTable().newBatchWriteBuilder().newCommit(); + commit.commit(messages); + commit.close(); + } + + private static Schema schema(String partitionKeys, int sinkParallelism) { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("a", DataTypes.INT()); + schemaBuilder.column("b", DataTypes.INT()); + schemaBuilder.column("c", DataTypes.STRING()); + schemaBuilder.column("pt", DataTypes.INT()); + schemaBuilder.option("bucket", "-1"); + schemaBuilder.option("num-levels", "6"); + schemaBuilder.option("num-sorted-run.compaction-trigger", "2"); + schemaBuilder.option("scan.plan-sort-partition", "true"); + schemaBuilder.option("clustering.columns", "a,b"); + schemaBuilder.option("clustering.strategy", "zorder"); + schemaBuilder.option("clustering.incremental", "true"); + schemaBuilder.option("scan.parallelism", "1"); + schemaBuilder.option("sink.parallelism", String.valueOf(sinkParallelism)); + if (!StringUtils.isNullOrWhitespaceOnly(partitionKeys)) { + schemaBuilder.partitionKeys(partitionKeys); + } + return schemaBuilder.build(); + } + + private static String randomString(int length) { + String chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + StringBuilder sb = new StringBuilder(length); + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = 0; i < length; i++) { + sb.append(chars.charAt(random.nextInt(chars.length()))); + } + return sb.toString(); + } + + private void checkSnapshot(FileStoreTable table) { + assertThat(table.latestSnapshot().get().commitKind()) + .isEqualTo(Snapshot.CommitKind.COMPACT); + } + + private void runAction(List extra) throws Exception { + StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().batchMode().build(); + ArrayList baseArgs = + Lists.newArrayList("compact", "--database", database, "--table", tableName); + ThreadLocalRandom random = ThreadLocalRandom.current(); + if (random.nextBoolean()) { + baseArgs.addAll(Lists.newArrayList("--warehouse", warehouse)); + } else { + baseArgs.addAll(Lists.newArrayList("--catalog_conf", "warehouse=" + warehouse)); + } + baseArgs.addAll(extra); + + CompactAction action = createAction(CompactAction.class, baseArgs.toArray(new String[0])); + // action.withStreamExecutionEnvironment(env).build(); + // env.execute(); + action.withStreamExecutionEnvironment(env); + action.run(); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 2796b6deb629..3e7608d1a9d7 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -620,7 +620,7 @@ private void clusterIncrementalUnAwareBucketTable( List clusterBefore = compactUnits.get(partition).files(); // upgrade the clustered file to outputLevel List clusterAfter = - incrementalClusterManager.upgrade( + IncrementalClusterManager.upgrade( entry.getValue(), compactUnits.get(partition).outputLevel()); LOG.info( "Partition {}: upgrade file level to {}",