Skip to content

Commit ef998c2

Browse files
authored
[hotfix][flink] fix the partition error for IncrementalClusterSplitSource (#6416)
1 parent 689b3ef commit ef998c2

4 files changed

Lines changed: 72 additions & 19 deletions

File tree

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,10 @@ private boolean buildForIncrementalClustering(
247247
Map<BinaryRow, CompactUnit> compactUnits =
248248
incrementalClusterManager.prepareForCluster(fullCompaction);
249249
if (compactUnits.isEmpty()) {
250-
LOGGER.info(
250+
LOGGER.warn(
251251
"No partition needs to be incrementally clustered. "
252-
+ "Please set '--compact_strategy full' if you need to forcibly trigger the cluster.");
252+
+ "Please set '--compact_strategy full' if you need forcibly trigger the cluster."
253+
+ "Please set '--force_start_flink_job true' if you need forcibly start a flink job.");
253254
return false;
254255
}
255256
Map<BinaryRow, DataSplit[]> partitionSplits =
@@ -300,11 +301,13 @@ private boolean buildForIncrementalClustering(
300301
// 2.3 write and then reorganize the committable
301302
// set parallelism to null, and it'll forward parallelism when doWrite()
302303
RowAppendTableSink sink = new RowAppendTableSink(table, null, null, null);
303-
DataStream<Committable> clusterCommittable =
304+
DataStream<Committable> written =
304305
sink.doWrite(
305-
FlinkSinkBuilder.mapToInternalRow(sorted, table.rowType()),
306-
commitUser,
307-
null)
306+
FlinkSinkBuilder.mapToInternalRow(sorted, table.rowType()),
307+
commitUser,
308+
null);
309+
DataStream<Committable> clusterCommittable =
310+
written.forward()
308311
.transform(
309312
"Rewrite cluster committable",
310313
new CommittableTypeInfo(),
@@ -316,7 +319,8 @@ private boolean buildForIncrementalClustering(
316319
Map.Entry::getKey,
317320
unit ->
318321
unit.getValue()
319-
.outputLevel()))));
322+
.outputLevel()))))
323+
.setParallelism(written.getParallelism());
320324
dataStreams.add(clusterCommittable);
321325
dataStreams.add(sourcePair.getRight());
322326
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@
3838
import org.apache.flink.api.connector.source.SourceReaderContext;
3939
import org.apache.flink.core.io.InputStatus;
4040
import org.apache.flink.streaming.api.datastream.DataStream;
41-
import org.apache.flink.streaming.api.datastream.DataStreamSource;
4241
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
42+
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
43+
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
4344
import org.apache.flink.table.data.RowData;
4445
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
4546

@@ -86,21 +87,25 @@ public static Pair<DataStream<RowData>, DataStream<Committable>> buildSource(
8687
Map<String, String> partitionSpec,
8788
DataSplit[] splits,
8889
@Nullable Integer parallelism) {
89-
DataStreamSource<Split> source =
90+
DataStream<Split> source =
9091
env.fromSource(
91-
new IncrementalClusterSplitSource(splits),
92-
WatermarkStrategy.noWatermarks(),
93-
String.format(
94-
"Incremental-cluster split generator: %s - %s",
95-
table.fullName(), partitionSpec),
96-
new JavaTypeInfo<>(Split.class));
92+
new IncrementalClusterSplitSource(splits),
93+
WatermarkStrategy.noWatermarks(),
94+
String.format(
95+
"Incremental-cluster split generator: %s - %s",
96+
table.fullName(), partitionSpec),
97+
new JavaTypeInfo<>(Split.class))
98+
.forceNonParallel();
9799

100+
PartitionTransformation<Split> partitioned =
101+
new PartitionTransformation<>(
102+
source.getTransformation(), new RebalancePartitioner<>());
98103
if (parallelism != null) {
99-
source.setParallelism(parallelism);
104+
partitioned.setParallelism(parallelism);
100105
}
101106

102107
return Pair.of(
103-
new DataStream<>(source.getExecutionEnvironment(), source.getTransformation())
108+
new DataStream<>(source.getExecutionEnvironment(), partitioned)
104109
.transform(
105110
String.format(
106111
"Incremental-cluster reader: %s - %s",

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.paimon.table.sink.CommitMessageImpl;
3030

3131
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
3234

3335
import java.util.ArrayList;
3436
import java.util.Collections;
@@ -41,6 +43,9 @@
4143
/** Rewrite committable for new files written after clustered. */
4244
public class RewriteIncrementalClusterCommittableOperator
4345
extends BoundedOneInputOperator<Committable, Committable> {
46+
47+
protected static final Logger LOG =
48+
LoggerFactory.getLogger(RewriteIncrementalClusterCommittableOperator.class);
4449
private static final long serialVersionUID = 1L;
4550

4651
private final FileStoreTable table;

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,47 @@ public void testClusterOnEmptyData() throws Exception {
325325
assertThatCode(() -> runAction(Collections.emptyList())).doesNotThrowAnyException();
326326
}
327327

328+
@Test
329+
public void testMultiParallelism() throws Exception {
330+
FileStoreTable table = createTable(null, 2);
331+
332+
BinaryString randomStr = BinaryString.fromString(randomString(150));
333+
List<CommitMessage> messages = new ArrayList<>();
334+
335+
// first write
336+
for (int i = 0; i < 3; i++) {
337+
for (int j = 0; j < 3; j++) {
338+
messages.addAll(write(GenericRow.of(i, j, randomStr, 0)));
339+
}
340+
}
341+
commit(messages);
342+
ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {0, 1});
343+
List<String> result1 =
344+
getResult(
345+
readBuilder.newRead(),
346+
readBuilder.newScan().plan().splits(),
347+
readBuilder.readType());
348+
List<String> expected1 =
349+
Lists.newArrayList(
350+
"+I[0, 0]",
351+
"+I[0, 1]",
352+
"+I[0, 2]",
353+
"+I[1, 0]",
354+
"+I[1, 1]",
355+
"+I[1, 2]",
356+
"+I[2, 0]",
357+
"+I[2, 1]",
358+
"+I[2, 2]");
359+
assertThat(result1).containsExactlyElementsOf(expected1);
360+
361+
runAction(Lists.newArrayList("--table_conf", "scan.parallelism=2"));
362+
checkSnapshot(table);
363+
List<Split> splits = readBuilder.newScan().plan().splits();
364+
assertThat(splits.size()).isEqualTo(1);
365+
assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isGreaterThanOrEqualTo(1);
366+
assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
367+
}
368+
328369
protected FileStoreTable createTable(String partitionKeys, int sinkParallelism)
329370
throws Exception {
330371
catalog.createDatabase(database, true);
@@ -405,8 +446,6 @@ private void runAction(List<String> extra) throws Exception {
405446
baseArgs.addAll(extra);
406447

407448
CompactAction action = createAction(CompactAction.class, baseArgs.toArray(new String[0]));
408-
// action.withStreamExecutionEnvironment(env).build();
409-
// env.execute();
410449
action.withStreamExecutionEnvironment(env);
411450
action.run();
412451
}

0 commit comments

Comments
 (0)