Skip to content

Commit 664ce45

Browse files
committed
[core][flink] Add option to skip expired partitions in compaction job
1 parent 34df0ce commit 664ce45

8 files changed

Lines changed: 199 additions & 8 deletions

File tree

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,12 @@
128128
<td>MemorySize</td>
129129
<td>Memory page size for caching.</td>
130130
</tr>
131+
<tr>
132+
<td><h5>chain-table.chain-partition-keys</h5></td>
133+
<td style="word-wrap: break-word;">(none)</td>
134+
<td>String</td>
135+
<td>Partition keys that participate in chain logic. Must be a contiguous suffix of the table's partition keys. Comma-separated. If not set, all partition keys participate in chain.</td>
136+
</tr>
131137
<tr>
132138
<td><h5>chain-table.enabled</h5></td>
133139
<td style="word-wrap: break-word;">false</td>
@@ -374,6 +380,12 @@
374380
<td>Integer</td>
375381
<td>Percentage flexibility while comparing sorted run size for changelog mode table. If the candidate sorted run(s) size is 1% smaller than the next sorted run's size, then include next sorted run into this candidate set.</td>
376382
</tr>
383+
<tr>
384+
<td><h5>compaction.skip-expired-partitions</h5></td>
385+
<td style="word-wrap: break-word;">false</td>
386+
<td>Boolean</td>
387+
<td>If true, the compaction job will skip partitions that are already expired according to 'partition.expiration-time' with 'values-time' strategy. Only effective when 'partition.expiration-time' is set and 'partition.expiration-strategy' is 'values-time'.</td>
388+
</tr>
377389
<tr>
378390
<td><h5>compaction.total-size-threshold</h5></td>
379391
<td style="word-wrap: break-word;">(none)</td>
@@ -1187,12 +1199,6 @@
11871199
<td>String</td>
11881200
<td>When a batch job queries from a chain table, if a partition does not exist in the main branch, the reader will try to get this partition from chain snapshot branch.</td>
11891201
</tr>
1190-
<tr>
1191-
<td><h5>chain-table.chain-partition-keys</h5></td>
1192-
<td style="word-wrap: break-word;">(none)</td>
1193-
<td>String</td>
1194-
<td>Partition keys that participate in chain logic. Must be a contiguous suffix of the table's partition keys. Comma-separated. If not set, all partition keys participate in chain.</td>
1195-
</tr>
11961202
<tr>
11971203
<td><h5>scan.file-creation-time-millis</h5></td>
11981204
<td style="word-wrap: break-word;">(none)</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1123,6 +1123,23 @@ public InlineElement getDescription() {
11231123
"Whether only overwrite dynamic partition when overwriting a partitioned table with "
11241124
+ "dynamic partition columns. Works only when the table has partition keys.");
11251125

1126+
/** The strategy for partition expiration. */
1127+
public enum PartitionExpireStrategy {
1128+
VALUES_TIME("values-time"),
1129+
UPDATE_TIME("update-time");
1130+
1131+
private final String value;
1132+
1133+
PartitionExpireStrategy(String value) {
1134+
this.value = value;
1135+
}
1136+
1137+
@Override
1138+
public String toString() {
1139+
return value;
1140+
}
1141+
}
1142+
11261143
public static final ConfigOption<String> PARTITION_EXPIRATION_STRATEGY =
11271144
key("partition.expiration-strategy")
11281145
.stringType()
@@ -1168,6 +1185,16 @@ public InlineElement getDescription() {
11681185
+ "By default, all partitions to be expired will be expired together, which may cause a risk of out-of-memory. "
11691186
+ "Use this parameter to divide partition expiration process and mitigate memory pressure.");
11701187

1188+
public static final ConfigOption<Boolean> COMPACTION_SKIP_EXPIRED_PARTITIONS =
1189+
key("compaction.skip-expired-partitions")
1190+
.booleanType()
1191+
.defaultValue(false)
1192+
.withDescription(
1193+
"Whether to skip compacting partitions that are already expired "
1194+
+ "according to 'partition.expiration-time'. "
1195+
+ "Only effective when 'partition.expiration-time' is set "
1196+
+ "and 'partition.expiration-strategy' is 'values-time'.");
1197+
11711198
public static final ConfigOption<String> PARTITION_TIMESTAMP_FORMATTER =
11721199
key("partition.timestamp-formatter")
11731200
.stringType()
@@ -3310,6 +3337,10 @@ public String partitionExpireStrategy() {
33103337
return options.get(PARTITION_EXPIRATION_STRATEGY);
33113338
}
33123339

3340+
public boolean compactionSkipExpiredPartitions() {
3341+
return options.get(COMPACTION_SKIP_EXPIRED_PARTITIONS);
3342+
}
3343+
33133344
@Nullable
33143345
public String dataFileExternalPaths() {
33153346
return options.get(DATA_FILE_EXTERNAL_PATHS);

paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import javax.annotation.Nullable;
3131

32+
import java.io.Serializable;
3233
import java.time.LocalDateTime;
3334
import java.util.ArrayList;
3435
import java.util.LinkedHashMap;
@@ -37,7 +38,9 @@
3738
import java.util.Optional;
3839

3940
/** Strategy for partition expiration. */
40-
public abstract class PartitionExpireStrategy {
41+
public abstract class PartitionExpireStrategy implements Serializable {
42+
43+
private static final long serialVersionUID = 1L;
4144

4245
protected final List<String> partitionKeys;
4346
protected final String partitionDefaultName;

paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import javax.annotation.Nullable;
2222

23+
import java.io.Serializable;
2324
import java.time.LocalDate;
2425
import java.time.LocalDateTime;
2526
import java.time.LocalTime;
@@ -43,7 +44,9 @@
4344
import static java.time.temporal.ChronoField.YEAR;
4445

4546
/** Time extractor to extract time from partition values. */
46-
public class PartitionTimeExtractor {
47+
public class PartitionTimeExtractor implements Serializable {
48+
49+
private static final long serialVersionUID = 1L;
4750

4851
private static final DateTimeFormatter TIMESTAMP_FORMATTER =
4952
new DateTimeFormatterBuilder()

paimon-core/src/main/java/org/apache/paimon/partition/PartitionUpdateTimeExpireStrategy.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
*/
3535
public class PartitionUpdateTimeExpireStrategy extends PartitionExpireStrategy {
3636

37+
private static final long serialVersionUID = 1L;
38+
3739
public PartitionUpdateTimeExpireStrategy(CoreOptions options, RowType partitionType) {
3840
super(partitionType, options.partitionDefaultName());
3941
}

paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
*/
4646
public class PartitionValuesTimeExpireStrategy extends PartitionExpireStrategy {
4747

48+
private static final long serialVersionUID = 1L;
49+
4850
private static final Logger LOG =
4951
LoggerFactory.getLogger(PartitionValuesTimeExpireStrategy.class);
5052

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.paimon.manifest.PartitionEntry;
2626
import org.apache.paimon.options.Options;
2727
import org.apache.paimon.partition.PartitionPredicate;
28+
import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy;
2829
import org.apache.paimon.table.FileStoreTable;
2930
import org.apache.paimon.table.source.ReadBuilder;
3031
import org.apache.paimon.table.system.CompactBucketsTable;
@@ -141,6 +142,26 @@ public DataStreamSource<RowData> build() {
141142
});
142143
dataStream = new DataStreamSource<>(filterStream);
143144
}
145+
CoreOptions coreOptions = table.coreOptions();
146+
if (coreOptions.compactionSkipExpiredPartitions()
147+
&& coreOptions.partitionExpireTime() != null
148+
&& CoreOptions.PartitionExpireStrategy.VALUES_TIME
149+
.toString()
150+
.equals(coreOptions.partitionExpireStrategy())) {
151+
RowType partitionType = table.schema().logicalPartitionType();
152+
Duration expireTime = coreOptions.partitionExpireTime();
153+
PartitionValuesTimeExpireStrategy expireStrategy =
154+
new PartitionValuesTimeExpireStrategy(coreOptions, partitionType);
155+
SingleOutputStreamOperator<RowData> filterStream =
156+
dataStream.filter(
157+
rowData -> {
158+
LocalDateTime expireDateTime =
159+
LocalDateTime.now().minus(expireTime);
160+
BinaryRow partition = deserializeBinaryRow(rowData.getBinary(1));
161+
return !expireStrategy.isExpired(expireDateTime, partition);
162+
});
163+
dataStream = new DataStreamSource<>(filterStream);
164+
}
144165
Integer parallelism =
145166
Options.fromMap(table.options()).get(FlinkConnectorOptions.SCAN_PARALLELISM);
146167
if (parallelism != null) {

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@
6262
import org.junit.jupiter.params.provider.ValueSource;
6363

6464
import java.time.Duration;
65+
import java.time.LocalDate;
66+
import java.time.format.DateTimeFormatter;
6567
import java.util.ArrayList;
6668
import java.util.Arrays;
6769
import java.util.Collections;
@@ -926,6 +928,127 @@ public void testDataEvolutionTableCompact() throws Exception {
926928
assertThat(value).isEqualTo(30000);
927929
}
928930

931+
@Test
932+
@Timeout(60)
933+
public void testSkipExpiredPartitions() throws Exception {
934+
// Use a date far in the past (expired) and today's date (not expired)
935+
String expiredDt =
936+
LocalDate.now().minusDays(30).format(DateTimeFormatter.ofPattern("yyyyMMdd"));
937+
String activeDt = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
938+
939+
Map<String, String> tableOptions = new HashMap<>();
940+
tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
941+
tableOptions.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), "7 d");
942+
tableOptions.put(
943+
CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(),
944+
CoreOptions.PartitionExpireStrategy.VALUES_TIME.toString());
945+
tableOptions.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd");
946+
947+
FileStoreTable table =
948+
prepareTable(
949+
Collections.singletonList("dt"),
950+
Arrays.asList("dt", "k"),
951+
Collections.emptyList(),
952+
tableOptions);
953+
954+
// Write two batches to each partition so each has multiple files
955+
writeData(
956+
rowData(1, 100, 15, BinaryString.fromString(expiredDt)),
957+
rowData(1, 100, 15, BinaryString.fromString(activeDt)));
958+
959+
writeData(
960+
rowData(2, 100, 15, BinaryString.fromString(expiredDt)),
961+
rowData(2, 100, 15, BinaryString.fromString(activeDt)));
962+
963+
checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
964+
965+
CompactAction action =
966+
createAction(
967+
CompactAction.class,
968+
"compact",
969+
"--warehouse",
970+
warehouse,
971+
"--database",
972+
database,
973+
"--table",
974+
tableName,
975+
"--table_conf",
976+
CoreOptions.COMPACTION_SKIP_EXPIRED_PARTITIONS.key() + "=true");
977+
StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().batchMode().build();
978+
action.withStreamExecutionEnvironment(env).build();
979+
env.execute();
980+
981+
checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT);
982+
983+
List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
984+
for (DataSplit split : splits) {
985+
String dt = split.partition().getString(0).toString();
986+
if (dt.equals(activeDt)) {
987+
// active partition should be compacted into 1 file
988+
assertThat(split.dataFiles().size()).isEqualTo(1);
989+
} else {
990+
// expired partition should be skipped, still has 2 files
991+
assertThat(split.dataFiles().size()).isEqualTo(2);
992+
}
993+
}
994+
}
995+
996+
@Test
997+
@Timeout(60)
998+
public void testNotSkipExpiredPartitionsByDefault() throws Exception {
999+
String expiredDt =
1000+
LocalDate.now().minusDays(30).format(DateTimeFormatter.ofPattern("yyyyMMdd"));
1001+
String activeDt = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
1002+
1003+
Map<String, String> tableOptions = new HashMap<>();
1004+
tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
1005+
tableOptions.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), "7 d");
1006+
tableOptions.put(
1007+
CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(),
1008+
CoreOptions.PartitionExpireStrategy.VALUES_TIME.toString());
1009+
tableOptions.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd");
1010+
// COMPACTION_SKIP_EXPIRED_PARTITIONS is not set, default is false
1011+
1012+
FileStoreTable table =
1013+
prepareTable(
1014+
Collections.singletonList("dt"),
1015+
Arrays.asList("dt", "k"),
1016+
Collections.emptyList(),
1017+
tableOptions);
1018+
1019+
writeData(
1020+
rowData(1, 100, 15, BinaryString.fromString(expiredDt)),
1021+
rowData(1, 100, 15, BinaryString.fromString(activeDt)));
1022+
1023+
writeData(
1024+
rowData(2, 100, 15, BinaryString.fromString(expiredDt)),
1025+
rowData(2, 100, 15, BinaryString.fromString(activeDt)));
1026+
1027+
checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
1028+
1029+
CompactAction action =
1030+
createAction(
1031+
CompactAction.class,
1032+
"compact",
1033+
"--warehouse",
1034+
warehouse,
1035+
"--database",
1036+
database,
1037+
"--table",
1038+
tableName);
1039+
StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().batchMode().build();
1040+
action.withStreamExecutionEnvironment(env).build();
1041+
env.execute();
1042+
1043+
checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT);
1044+
1045+
// both expired and active partitions should be compacted into 1 file
1046+
List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
1047+
for (DataSplit split : splits) {
1048+
assertThat(split.dataFiles().size()).isEqualTo(1);
1049+
}
1050+
}
1051+
9291052
private void setFirstRowId(List<CommitMessage> commitables, long firstRowId) {
9301053
commitables.forEach(
9311054
c -> {

0 commit comments

Comments
 (0)