Skip to content

Commit a4f5b28

Browse files
authored
[spark] Fix sort compact with partition filter (#6371)
1 parent bfdd516 commit a4f5b28

2 files changed

Lines changed: 20 additions & 6 deletions

File tree

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ private boolean execute(
263263
}
264264
boolean fullCompact = compactStrategy.equalsIgnoreCase(FULL);
265265
RowType partitionType = table.schema().logicalPartitionType();
266-
Predicate filter =
266+
Predicate partitionFilter =
267267
condition == null
268268
? null
269269
: ExpressionUtils.convertConditionToPaimonPredicate(
@@ -273,7 +273,7 @@ private boolean execute(
273273
false)
274274
.getOrElse(null);
275275
PartitionPredicate partitionPredicate =
276-
PartitionPredicate.fromPredicate(partitionType, filter);
276+
PartitionPredicate.fromPredicate(partitionType, partitionFilter);
277277

278278
if (orderType.equals(OrderType.NONE)) {
279279
JavaSparkContext javaSparkContext = new JavaSparkContext(spark().sparkContext());
@@ -302,7 +302,8 @@ private boolean execute(
302302
} else {
303303
switch (bucketMode) {
304304
case BUCKET_UNAWARE:
305-
sortCompactUnAwareBucketTable(table, orderType, sortColumns, relation, filter);
305+
sortCompactUnAwareBucketTable(
306+
table, orderType, sortColumns, relation, partitionFilter);
306307
break;
307308
default:
308309
throw new UnsupportedOperationException(
@@ -521,10 +522,10 @@ private void sortCompactUnAwareBucketTable(
521522
OrderType orderType,
522523
List<String> sortColumns,
523524
DataSourceV2Relation relation,
524-
@Nullable Predicate filter) {
525+
@Nullable Predicate partitionFilter) {
525526
SnapshotReader snapshotReader = table.newSnapshotReader();
526-
if (filter != null) {
527-
snapshotReader.withFilter(filter);
527+
if (partitionFilter != null) {
528+
snapshotReader.withPartitionFilter(partitionFilter);
528529
}
529530
Map<BinaryRow, DataSplit[]> packedSplits = packForSort(snapshotReader.read().dataSplits());
530531
TableSorter sorter = TableSorter.getSorter(table, orderType, sortColumns);

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,19 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT
343343
}
344344
}
345345

346+
test("Paimon Procedure: sort compact with partition filter") {
347+
withTable("t") {
348+
sql("CREATE TABLE t (a INT, pt INT) PARTITIONED BY (pt)")
349+
sql("INSERT INTO t VALUES (1, 1)")
350+
sql("INSERT INTO t VALUES (2, 1)")
351+
sql(
352+
"CALL sys.compact(table => 't', order_strategy => 'order', where => 'pt = 1', order_by => 'a')")
353+
val table = loadTable("t")
354+
assert(table.latestSnapshot().get().commitKind.equals(CommitKind.OVERWRITE))
355+
checkAnswer(sql("SELECT * FROM t ORDER BY a"), Seq(Row(1, 1), Row(2, 1)))
356+
}
357+
}
358+
346359
test("Paimon Procedure: compact for pk") {
347360
failAfter(streamingTimeout) {
348361
withTempDir {

0 commit comments

Comments
 (0)