Skip to content

Commit c8f6eea

Browse files
authored
[hotfix][flink] fix the partition error for IncrementalClusterSplitSource (#6408)
1 parent 383a5b7 commit c8f6eea

4 files changed

Lines changed: 76 additions & 23 deletions

File tree

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,10 @@ private boolean buildForIncrementalClustering(
248248
Map<BinaryRow, CompactUnit> compactUnits =
249249
incrementalClusterManager.prepareForCluster(fullCompaction);
250250
if (compactUnits.isEmpty()) {
251-
LOGGER.info(
251+
LOGGER.warn(
252252
"No partition needs to be incrementally clustered. "
253-
+ "Please set '--compact_strategy full' if you need to forcibly trigger the cluster.");
253+
+ "Please set '--compact_strategy full' if you need forcibly trigger the cluster."
254+
+ "Please set '--force_start_flink_job true' if you need forcibly start a flink job.");
254255
if (this.forceStartFlinkJob) {
255256
env.fromSequence(0, 0)
256257
.name("Nothing to Cluster Source")
@@ -309,15 +310,17 @@ private boolean buildForIncrementalClustering(
309310
// set parallelism to null, and it'll forward parallelism when doWrite()
310311
RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null);
311312
boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
312-
DataStream<Committable> clusterCommittable =
313+
DataStream<Committable> written =
313314
sink.doWrite(
314-
FlinkSinkBuilder.mapToInternalRow(
315-
sorted,
316-
table.rowType(),
317-
blobAsDescriptor,
318-
table.catalogEnvironment().catalogContext()),
319-
commitUser,
320-
null)
315+
FlinkSinkBuilder.mapToInternalRow(
316+
sorted,
317+
table.rowType(),
318+
blobAsDescriptor,
319+
table.catalogEnvironment().catalogContext()),
320+
commitUser,
321+
null);
322+
DataStream<Committable> clusterCommittable =
323+
written.forward()
321324
.transform(
322325
"Rewrite cluster committable",
323326
new CommittableTypeInfo(),
@@ -329,7 +332,8 @@ private boolean buildForIncrementalClustering(
329332
Map.Entry::getKey,
330333
unit ->
331334
unit.getValue()
332-
.outputLevel()))));
335+
.outputLevel()))))
336+
.setParallelism(written.getParallelism());
333337
dataStreams.add(clusterCommittable);
334338
dataStreams.add(sourcePair.getRight());
335339
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@
3838
import org.apache.flink.api.connector.source.SourceReaderContext;
3939
import org.apache.flink.core.io.InputStatus;
4040
import org.apache.flink.streaming.api.datastream.DataStream;
41-
import org.apache.flink.streaming.api.datastream.DataStreamSource;
4241
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
42+
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
43+
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
4344
import org.apache.flink.table.data.RowData;
4445
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
4546

@@ -86,21 +87,25 @@ public static Pair<DataStream<RowData>, DataStream<Committable>> buildSource(
8687
Map<String, String> partitionSpec,
8788
DataSplit[] splits,
8889
@Nullable Integer parallelism) {
89-
DataStreamSource<Split> source =
90+
DataStream<Split> source =
9091
env.fromSource(
91-
new IncrementalClusterSplitSource(splits),
92-
WatermarkStrategy.noWatermarks(),
93-
String.format(
94-
"Incremental-cluster split generator: %s - %s",
95-
table.fullName(), partitionSpec),
96-
new JavaTypeInfo<>(Split.class));
92+
new IncrementalClusterSplitSource(splits),
93+
WatermarkStrategy.noWatermarks(),
94+
String.format(
95+
"Incremental-cluster split generator: %s - %s",
96+
table.fullName(), partitionSpec),
97+
new JavaTypeInfo<>(Split.class))
98+
.forceNonParallel();
9799

100+
PartitionTransformation<Split> partitioned =
101+
new PartitionTransformation<>(
102+
source.getTransformation(), new RebalancePartitioner<>());
98103
if (parallelism != null) {
99-
source.setParallelism(parallelism);
104+
partitioned.setParallelism(parallelism);
100105
}
101106

102107
return Pair.of(
103-
new DataStream<>(source.getExecutionEnvironment(), source.getTransformation())
108+
new DataStream<>(source.getExecutionEnvironment(), partitioned)
104109
.transform(
105110
String.format(
106111
"Incremental-cluster reader: %s - %s",

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.paimon.table.sink.CommitMessageImpl;
3030

3131
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
3234

3335
import java.util.ArrayList;
3436
import java.util.Collections;
@@ -41,6 +43,9 @@
4143
/** Rewrite committable for new files written after clustered. */
4244
public class RewriteIncrementalClusterCommittableOperator
4345
extends BoundedOneInputOperator<Committable, Committable> {
46+
47+
protected static final Logger LOG =
48+
LoggerFactory.getLogger(RewriteIncrementalClusterCommittableOperator.class);
4449
private static final long serialVersionUID = 1L;
4550

4651
private final FileStoreTable table;

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,47 @@ public void testClusterOnEmptyData() throws Exception {
325325
assertThatCode(() -> runAction(Collections.emptyList())).doesNotThrowAnyException();
326326
}
327327

328+
@Test
329+
public void testMultiParallelism() throws Exception {
330+
FileStoreTable table = createTable(null, 2);
331+
332+
BinaryString randomStr = BinaryString.fromString(randomString(150));
333+
List<CommitMessage> messages = new ArrayList<>();
334+
335+
// first write
336+
for (int i = 0; i < 3; i++) {
337+
for (int j = 0; j < 3; j++) {
338+
messages.addAll(write(GenericRow.of(i, j, randomStr, 0)));
339+
}
340+
}
341+
commit(messages);
342+
ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {0, 1});
343+
List<String> result1 =
344+
getResult(
345+
readBuilder.newRead(),
346+
readBuilder.newScan().plan().splits(),
347+
readBuilder.readType());
348+
List<String> expected1 =
349+
Lists.newArrayList(
350+
"+I[0, 0]",
351+
"+I[0, 1]",
352+
"+I[0, 2]",
353+
"+I[1, 0]",
354+
"+I[1, 1]",
355+
"+I[1, 2]",
356+
"+I[2, 0]",
357+
"+I[2, 1]",
358+
"+I[2, 2]");
359+
assertThat(result1).containsExactlyElementsOf(expected1);
360+
361+
runAction(Lists.newArrayList("--table_conf", "scan.parallelism=2"));
362+
checkSnapshot(table);
363+
List<Split> splits = readBuilder.newScan().plan().splits();
364+
assertThat(splits.size()).isEqualTo(1);
365+
assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isGreaterThanOrEqualTo(1);
366+
assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
367+
}
368+
328369
protected FileStoreTable createTable(String partitionKeys, int sinkParallelism)
329370
throws Exception {
330371
catalog.createDatabase(database, true);
@@ -405,8 +446,6 @@ private void runAction(List<String> extra) throws Exception {
405446
baseArgs.addAll(extra);
406447

407448
CompactAction action = createAction(CompactAction.class, baseArgs.toArray(new String[0]));
408-
// action.withStreamExecutionEnvironment(env).build();
409-
// env.execute();
410449
action.withStreamExecutionEnvironment(env);
411450
action.run();
412451
}

0 commit comments

Comments
 (0)