Skip to content

Commit a7b584d

Browse files
authored
[flink] Produce real random id in SourceSplitGenerator (#6441)
1 parent fef6888 commit a7b584d

4 files changed

Lines changed: 52 additions & 46 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,22 @@ public int hashCode() {
328328
rawConvertible);
329329
}
330330

331+
@Override
332+
public String toString() {
333+
return "{"
334+
+ "snapshotId="
335+
+ snapshotId
336+
+ ", partition=hash-"
337+
+ partition.hashCode()
338+
+ ", bucket="
339+
+ bucket
340+
+ ", rawConvertible="
341+
+ rawConvertible
342+
+ '}'
343+
+ "@"
344+
+ Integer.toHexString(hashCode());
345+
}
346+
331347
private void writeObject(ObjectOutputStream out) throws IOException {
332348
serialize(new DataOutputViewStreamWrapper(out));
333349
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplit.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,17 @@ public boolean equals(Object o) {
7979
public int hashCode() {
8080
return Objects.hash(id, split, recordsToSkip);
8181
}
82+
83+
@Override
84+
public String toString() {
85+
return "{"
86+
+ "id='"
87+
+ id
88+
+ '\''
89+
+ ", split="
90+
+ split
91+
+ ", recordsToSkip="
92+
+ recordsToSkip
93+
+ '}';
94+
}
8295
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818

1919
package org.apache.paimon.flink.source;
2020

21-
import org.apache.paimon.table.source.Split;
2221
import org.apache.paimon.table.source.TableScan;
2322

2423
import java.util.List;
24+
import java.util.UUID;
25+
import java.util.concurrent.atomic.AtomicInteger;
2526
import java.util.stream.Collectors;
2627

2728
/**
@@ -30,44 +31,16 @@
3031
*/
3132
public class FileStoreSourceSplitGenerator {
3233

33-
/**
34-
* The current Id as a mutable string representation. This covers more values than the integer
35-
* value range, so we should never overflow.
36-
*/
37-
private final char[] currentId = "0000000000".toCharArray();
34+
private final String uuid = UUID.randomUUID().toString();
35+
private final AtomicInteger idCounter = new AtomicInteger(1);
3836

3937
public List<FileStoreSourceSplit> createSplits(TableScan.Plan plan) {
4038
return plan.splits().stream()
4139
.map(s -> new FileStoreSourceSplit(getNextId(), s))
4240
.collect(Collectors.toList());
4341
}
4442

45-
public List<FileStoreSourceSplit> createSplits(List<Split> splits) {
46-
return splits.stream()
47-
.map(s -> new FileStoreSourceSplit(getNextId(), s))
48-
.collect(Collectors.toList());
49-
}
50-
5143
protected final String getNextId() {
52-
// because we just increment numbers, we increment the char representation directly,
53-
// rather than incrementing an integer and converting it to a string representation
54-
// every time again (requires quite some expensive conversion logic).
55-
incrementCharArrayByOne(currentId, currentId.length - 1);
56-
return new String(currentId);
57-
}
58-
59-
private static void incrementCharArrayByOne(char[] array, int pos) {
60-
if (pos < 0) {
61-
throw new RuntimeException("Produce too many splits.");
62-
}
63-
64-
char c = array[pos];
65-
c++;
66-
67-
if (c > '9') {
68-
c = '0';
69-
incrementCharArrayByOne(array, pos - 1);
70-
}
71-
array[pos] = c;
44+
return uuid + "-" + idCounter.getAndIncrement();
7245
}
7346
}

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -70,23 +70,27 @@ public void test() {
7070
o -> ((DataSplit) ((FileStoreSourceSplit) o).split()).bucket()));
7171

7272
// splitId should be the input order!
73-
assertSplit(splits.get(0), "0000000001", 1, 0, Arrays.asList("f0", "f1"));
74-
assertSplit(splits.get(1), "0000000002", 1, 1, Collections.singletonList("f2"));
75-
assertSplit(splits.get(2), "0000000003", 2, 0, Arrays.asList("f3", "f4", "f5"));
76-
assertSplit(splits.get(3), "0000000004", 2, 1, Collections.singletonList("f6"));
77-
assertSplit(splits.get(4), "0000000005", 3, 0, Collections.singletonList("f7"));
78-
assertSplit(splits.get(5), "0000000006", 3, 1, Collections.singletonList("f8"));
79-
assertSplit(splits.get(6), "0000000007", 4, 0, Collections.singletonList("f9"));
80-
assertSplit(splits.get(7), "0000000008", 4, 1, Collections.singletonList("f10"));
81-
assertSplit(splits.get(8), "0000000009", 5, 0, Collections.singletonList("f11"));
82-
assertSplit(splits.get(9), "0000000010", 5, 1, Collections.singletonList("f12"));
83-
assertSplit(splits.get(10), "0000000011", 6, 0, Collections.singletonList("f13"));
84-
assertSplit(splits.get(11), "0000000012", 6, 1, Collections.singletonList("f14"));
73+
assertSplit(splits.get(0), "-1", 1, 0, Arrays.asList("f0", "f1"));
74+
assertSplit(splits.get(1), "-2", 1, 1, Collections.singletonList("f2"));
75+
assertSplit(splits.get(2), "-3", 2, 0, Arrays.asList("f3", "f4", "f5"));
76+
assertSplit(splits.get(3), "-4", 2, 1, Collections.singletonList("f6"));
77+
assertSplit(splits.get(4), "-5", 3, 0, Collections.singletonList("f7"));
78+
assertSplit(splits.get(5), "-6", 3, 1, Collections.singletonList("f8"));
79+
assertSplit(splits.get(6), "-7", 4, 0, Collections.singletonList("f9"));
80+
assertSplit(splits.get(7), "-8", 4, 1, Collections.singletonList("f10"));
81+
assertSplit(splits.get(8), "-9", 5, 0, Collections.singletonList("f11"));
82+
assertSplit(splits.get(9), "-10", 5, 1, Collections.singletonList("f12"));
83+
assertSplit(splits.get(10), "-11", 6, 0, Collections.singletonList("f13"));
84+
assertSplit(splits.get(11), "-12", 6, 1, Collections.singletonList("f14"));
8585
}
8686

8787
private void assertSplit(
88-
FileStoreSourceSplit split, String splitId, int part, int bucket, List<String> files) {
89-
assertThat(split.splitId()).isEqualTo(splitId);
88+
FileStoreSourceSplit split,
89+
String splitIdSuffix,
90+
int part,
91+
int bucket,
92+
List<String> files) {
93+
assertThat(split.splitId()).endsWith(splitIdSuffix);
9094
assertThat(((DataSplit) split.split()).partition().getInt(0)).isEqualTo(part);
9195
assertThat(((DataSplit) split.split()).bucket()).isEqualTo(bucket);
9296
assertThat(

0 commit comments

Comments
 (0)