From 1116648ea75911849497ef879bfb4e39ec726134 Mon Sep 17 00:00:00 2001 From: Sun Dapeng Date: Sat, 11 Apr 2026 02:57:50 +0800 Subject: [PATCH 1/4] [core] Support partition predicate pushdown for PartitionsTable PartitionsTable.withFilter() was a no-op (TODO), causing full manifest scans when querying with partition filters. This adds predicate pushdown following the same pattern as BucketsTable (#7592) and FilesTable (#7376). Key changes: - PartitionsScan extracts partition predicate via LeafPredicateExtractor - PartitionsSplit carries the predicate to PartitionsRead - Catalog path: in-memory filter preserving metadata columns - TableScan path: manifest-level pushdown via withPartitionFilter - PartitionPredicateHelper refactored to build+apply two-step pattern - parsePartitionSpec extended for key=value/key=value format Co-Authored-By: Claude Opus 4.6 --- .../paimon/table/system/PartitionsTable.java | 137 ++++++++++++++++-- .../utils/PartitionPredicateHelper.java | 120 ++++++++++----- .../table/system/PartitionsTableTest.java | 124 ++++++++++++++++ 3 files changed, 331 insertions(+), 50 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java index 296542d70a0c..dd3caa116293 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java @@ -35,7 +35,12 @@ import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.In; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.LeafPredicateExtractor; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; @@ -56,15 +61,19 @@ import org.apache.paimon.utils.InternalRowUtils; import org.apache.paimon.utils.IteratorRecordReader; import org.apache.paimon.utils.JsonSerdeUtil; +import org.apache.paimon.utils.PartitionPredicateHelper; import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.SerializationUtils; import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; +import javax.annotation.Nullable; + import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -140,14 +149,67 @@ public Table copy(Map dynamicOptions) { private static class PartitionsScan extends ReadOnceTableScan { + @Nullable private LeafPredicate partitionPredicate; + @Override public InnerTableScan withFilter(Predicate predicate) { + if (predicate == null) { + return this; + } + + Map leafPredicates = + predicate.visit(LeafPredicateExtractor.INSTANCE); + this.partitionPredicate = leafPredicates.get("partition"); + + // Handle Or(Equal, Equal...) pattern from PredicateBuilder.in() with <=20 literals + if (this.partitionPredicate == null) { + for (Predicate andChild : PredicateBuilder.splitAnd(predicate)) { + LeafPredicate inPred = convertOrEqualsToIn(andChild, "partition"); + if (inPred != null) { + this.partitionPredicate = inPred; + break; + } + } + } return this; } + @Nullable + private static LeafPredicate convertOrEqualsToIn(Predicate predicate, String targetField) { + List orChildren = PredicateBuilder.splitOr(predicate); + if (orChildren.size() <= 1) { + return null; + } + List literals = new ArrayList<>(); + String fieldName = null; + int fieldIndex = -1; + DataType fieldType = null; + for (Predicate child : orChildren) { + if (!(child instanceof LeafPredicate)) { + return null; + } + LeafPredicate leaf = (LeafPredicate) child; + if (!(leaf.function() instanceof Equal)) { + return null; + } + if (fieldName == null) { + fieldName = leaf.fieldName(); + fieldIndex = leaf.index(); + fieldType = leaf.type(); + } else if (!fieldName.equals(leaf.fieldName())) { + return null; + } + literals.addAll(leaf.literals()); + } + if (!targetField.equals(fieldName)) { + return null; + } + return new LeafPredicate(In.INSTANCE, fieldType, fieldIndex, fieldName, literals); + } + @Override public Plan innerPlan() { - return () -> Collections.singletonList(new PartitionsSplit()); + return () -> Collections.singletonList(new PartitionsSplit(partitionPredicate)); } } @@ -155,17 +217,27 @@ private static class PartitionsSplit extends SingletonSplit { private static final long serialVersionUID = 1L; + @Nullable private final LeafPredicate partitionPredicate; + + private PartitionsSplit(@Nullable LeafPredicate partitionPredicate) { + this.partitionPredicate = partitionPredicate; + } + @Override public boolean equals(Object o) { if (this == o) { return true; } - return o != null && getClass() == o.getClass(); + if (o == null || getClass() != o.getClass()) { + return false; + } + PartitionsSplit that = (PartitionsSplit) o; + return Objects.equals(partitionPredicate, that.partitionPredicate); } @Override public int hashCode() { - return 1; + return Objects.hash(partitionPredicate); } @Override @@ -186,7 +258,7 @@ public PartitionsRead(FileStoreTable table) { @Override public InnerTableRead withFilter(Predicate predicate) { - // TODO + // filter pushdown is handled at the Scan layer through PartitionsSplit return this; } @@ -207,7 +279,8 @@ public RecordReader createReader(Split split) throws IOException { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } - List partitions = listPartitions(); + PartitionsSplit partitionsSplit = (PartitionsSplit) split; + List partitions = listPartitions(partitionsSplit.partitionPredicate); List fieldTypes = fileStoreTable.schema().logicalPartitionType().getFieldTypes(); @@ -320,17 +393,17 @@ private Timestamp toTimestamp(Long epochMillis) { Instant.ofEpochMilli(epochMillis), ZoneId.systemDefault())); } - private List listPartitions() { + private List listPartitions(@Nullable LeafPredicate partitionPredicate) { CatalogLoader catalogLoader = fileStoreTable.catalogEnvironment().catalogLoader(); if (TimeTravelUtil.hasTimeTravelOptions(new Options(fileStoreTable.options())) || catalogLoader == null) { - return listPartitionEntries(); + return listPartitionEntries(partitionPredicate); } try (Catalog catalog = catalogLoader.load()) { Identifier baseIdentifier = fileStoreTable.catalogEnvironment().identifier(); if (baseIdentifier == null) { - return listPartitionEntries(); + return listPartitionEntries(partitionPredicate); } String branch = fileStoreTable.coreOptions().branch(); Identifier identifier; @@ -343,15 +416,53 @@ private List listPartitions() { } else { identifier = baseIdentifier; } - return catalog.listPartitions(identifier); + List partitions = catalog.listPartitions(identifier); + return filterByPredicate(partitions, partitionPredicate); } catch (Exception e) { - return listPartitionEntries(); + return listPartitionEntries(partitionPredicate); } } - private List listPartitionEntries() { - List partitionEntries = - fileStoreTable.newScan().withLevelFilter(level -> true).listPartitionEntries(); + private List filterByPredicate( + List partitions, @Nullable LeafPredicate partitionPredicate) { + if (partitionPredicate == null) { + return partitions; + } + List partitionKeys = fileStoreTable.partitionKeys(); + return partitions.stream() + .filter( + p -> { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < partitionKeys.size(); i++) { + if (i > 0) { + sb.append("/"); + } + sb.append(partitionKeys.get(i)) + .append("=") + .append(p.spec().get(partitionKeys.get(i))); + } + return partitionPredicate.test( + GenericRow.of(BinaryString.fromString(sb.toString()))); + }) + .collect(Collectors.toList()); + } + + private List listPartitionEntries(@Nullable LeafPredicate partitionPredicate) { + InnerTableScan scan = fileStoreTable.newScan().withLevelFilter(level -> true); + + if (partitionPredicate != null) { + List partitionKeys = fileStoreTable.partitionKeys(); + RowType partitionType = fileStoreTable.schema().logicalPartitionType(); + Predicate partPred = + PartitionPredicateHelper.buildPartitionPredicate( + partitionPredicate, partitionKeys, partitionType); + if (partPred == null) { + return Collections.emptyList(); + } + scan.withPartitionFilter(partPred); + } + + List partitionEntries = scan.listPartitionEntries(); RowType partitionType = fileStoreTable.schema().logicalPartitionType(); String defaultPartitionName = fileStoreTable.coreOptions().partitionDefaultName(); String[] partitionColumns = fileStoreTable.partitionKeys().toArray(new String[0]); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPredicateHelper.java b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPredicateHelper.java index 634d9e5d5bdc..1806fdfa9450 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPredicateHelper.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPredicateHelper.java @@ -36,27 +36,36 @@ /** * Helper for applying partition predicate pushdown in system tables (BucketsTable, FilesTable, - * FileKeyRangesTable). + * FileKeyRangesTable, PartitionsTable). */ public class PartitionPredicateHelper { - public static boolean applyPartitionFilter( - SnapshotReader snapshotReader, - @Nullable LeafPredicate partitionPredicate, - List partitionKeys, - RowType partitionType) { - if (partitionPredicate == null) { - return true; - } - + /** + * Build a partition-typed predicate from a string-based leaf predicate on the "partition" + * column. + * + * @return the predicate on partition fields, or {@code null} if the partition spec is invalid + * (indicating no results should be returned) + */ + @Nullable + public static Predicate buildPartitionPredicate( + LeafPredicate partitionPredicate, List partitionKeys, RowType partitionType) { if (partitionPredicate.function() instanceof Equal) { LinkedHashMap partSpec = parsePartitionSpec( partitionPredicate.literals().get(0).toString(), partitionKeys); if (partSpec == null) { - return false; + return null; } - snapshotReader.withPartitionFilter(partSpec); + PredicateBuilder partBuilder = new PredicateBuilder(partitionType); + List predicates = new ArrayList<>(); + for (int i = 0; i < partitionKeys.size(); i++) { + Object value = + TypeUtils.castFromString( + partSpec.get(partitionKeys.get(i)), partitionType.getTypeAt(i)); + predicates.add(partBuilder.equal(i, value)); + } + return PredicateBuilder.and(predicates); } else if (partitionPredicate.function() instanceof In) { List orPredicates = new ArrayList<>(); PredicateBuilder partBuilder = new PredicateBuilder(partitionType); @@ -75,51 +84,88 @@ public static boolean applyPartitionFilter( } orPredicates.add(PredicateBuilder.and(andPredicates)); } - if (!orPredicates.isEmpty()) { - snapshotReader.withPartitionFilter(PredicateBuilder.or(orPredicates)); - } + return orPredicates.isEmpty() ? null : PredicateBuilder.or(orPredicates); } else if (partitionPredicate.function() instanceof LeafBinaryFunction) { LinkedHashMap partSpec = parsePartitionSpec( partitionPredicate.literals().get(0).toString(), partitionKeys); - if (partSpec != null) { - PredicateBuilder partBuilder = new PredicateBuilder(partitionType); - List predicates = new ArrayList<>(); - for (int i = 0; i < partitionKeys.size(); i++) { - Object value = - TypeUtils.castFromString( - partSpec.get(partitionKeys.get(i)), partitionType.getTypeAt(i)); - predicates.add( - new LeafPredicate( - partitionPredicate.function(), - partitionType.getTypeAt(i), - i, - partitionKeys.get(i), - Collections.singletonList(value))); - } - snapshotReader.withPartitionFilter(PredicateBuilder.and(predicates)); + if (partSpec == null) { + return null; + } + PredicateBuilder partBuilder = new PredicateBuilder(partitionType); + List predicates = new ArrayList<>(); + for (int i = 0; i < partitionKeys.size(); i++) { + Object value = + TypeUtils.castFromString( + partSpec.get(partitionKeys.get(i)), partitionType.getTypeAt(i)); + predicates.add( + new LeafPredicate( + partitionPredicate.function(), + partitionType.getTypeAt(i), + i, + partitionKeys.get(i), + Collections.singletonList(value))); } + return PredicateBuilder.and(predicates); + } + return null; + } + + public static boolean applyPartitionFilter( + SnapshotReader snapshotReader, + @Nullable LeafPredicate partitionPredicate, + List partitionKeys, + RowType partitionType) { + if (partitionPredicate == null) { + return true; } + Predicate predicate = + buildPartitionPredicate(partitionPredicate, partitionKeys, partitionType); + if (predicate == null) { + return false; + } + snapshotReader.withPartitionFilter(predicate); return true; } @Nullable public static LinkedHashMap parsePartitionSpec( String partitionStr, List partitionKeys) { + // Handle {value1, value2} format (BucketsTable, FilesTable, FileKeyRangesTable) if (partitionStr.startsWith("{")) { partitionStr = partitionStr.substring(1); + if (partitionStr.endsWith("}")) { + partitionStr = partitionStr.substring(0, partitionStr.length() - 1); + } + String[] partFields = partitionStr.split(", "); + if (partitionKeys.size() != partFields.length) { + return null; + } + LinkedHashMap partSpec = new LinkedHashMap<>(); + for (int i = 0; i < partitionKeys.size(); i++) { + partSpec.put(partitionKeys.get(i), partFields[i]); + } + return partSpec; } - if (partitionStr.endsWith("}")) { - partitionStr = partitionStr.substring(0, partitionStr.length() - 1); - } - String[] partFields = partitionStr.split(", "); + + // Handle key=value/key=value format (PartitionsTable) + String[] partFields = partitionStr.split("/"); if (partitionKeys.size() != partFields.length) { return null; } LinkedHashMap partSpec = new LinkedHashMap<>(); - for (int i = 0; i < partitionKeys.size(); i++) { - partSpec.put(partitionKeys.get(i), partFields[i]); + for (String field : partFields) { + int eqIndex = field.indexOf('='); + if (eqIndex < 0) { + return null; + } + partSpec.put(field.substring(0, eqIndex), field.substring(eqIndex + 1)); + } + for (String key : partitionKeys) { + if (!partSpec.containsKey(key)) { + return null; + } } return partSpec; } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java index 8091288c60b2..49b01c9bc780 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java @@ -23,22 +23,30 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.Table; import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.Projection; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -167,4 +175,120 @@ void testPartitionWithLegacyPartitionName() throws Exception { List result = read(testPartitionsTable, new int[] {0, 1}); assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow); } + + @Test + public void testPartitionPredicateFilterEqual() throws Exception { + PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE); + + // Equal filter: partition = 'pt=1' + Predicate filter = builder.equal(0, BinaryString.fromString("pt=1")); + assertThat(readWithFilter(partitionsTable, filter, new int[] {0, 1})) + .containsExactlyInAnyOrder(GenericRow.of(BinaryString.fromString("pt=1"), 2L)); + + // Equal filter: partition = 'pt=2' + filter = builder.equal(0, BinaryString.fromString("pt=2")); + assertThat(readWithFilter(partitionsTable, filter, new int[] {0, 1})) + .containsExactlyInAnyOrder(GenericRow.of(BinaryString.fromString("pt=2"), 1L)); + } + + @Test + public void testPartitionPredicateFilterIn() throws Exception { + PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE); + + Predicate filter = + builder.in( + 0, + Arrays.asList( + BinaryString.fromString("pt=1"), BinaryString.fromString("pt=3"))); + assertThat(readWithFilter(partitionsTable, filter, new int[] {0, 1})) + .containsExactlyInAnyOrder( + GenericRow.of(BinaryString.fromString("pt=1"), 2L), + GenericRow.of(BinaryString.fromString("pt=3"), 1L)); + } + + @Test + public void testPartitionPredicateFilterNoMatch() throws Exception { + PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE); + + Predicate filter = builder.equal(0, BinaryString.fromString("pt=999")); + assertThat(readWithFilter(partitionsTable, filter, new int[] {0, 1})).isEmpty(); + } + + @Test + public void testPartitionPredicateFilterNonPartitionColumn() throws Exception { + PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE); + + // Filter on record_count column — should be safely ignored, return all partitions + Predicate filter = builder.greaterThan(1, 0L); + assertThat(readWithFilter(partitionsTable, filter, new int[] {0, 1})).hasSize(3); + } + + @Test + public void testPartitionPredicateFilterMultiColumnKeys() throws Exception { + String testTableName = "MultiPartTable"; + FileIO fileIO = LocalFileIO.create(); + Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, testTableName)); + Schema schema = + Schema.newBuilder() + .column("pk", DataTypes.INT()) + .column("dt", DataTypes.INT()) + .column("region", DataTypes.INT()) + .column("col1", DataTypes.INT()) + .partitionKeys("dt", "region") + .primaryKey("pk", "dt", "region") + .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input") + .option("bucket", "1") + .build(); + TableSchema tableSchema = + SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), schema); + FileStoreTable multiTable = + FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema); + + Identifier multiPartitionsTableId = + identifier(testTableName + SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS); + PartitionsTable multiPartitionsTable = + (PartitionsTable) catalog.getTable(multiPartitionsTableId); + + write(multiTable, GenericRow.of(1, 20260410, 1, 100)); + write(multiTable, GenericRow.of(2, 20260410, 2, 200)); + write(multiTable, GenericRow.of(3, 20260411, 1, 300)); + + PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE); + + // Equal filter on multi-column partition + Predicate filter = builder.equal(0, BinaryString.fromString("dt=20260410/region=1")); + assertThat(readWithFilter(multiPartitionsTable, filter, new int[] {0, 1})) + .containsExactlyInAnyOrder( + GenericRow.of(BinaryString.fromString("dt=20260410/region=1"), 1L)); + + // IN filter on multi-column partition + filter = + builder.in( + 0, + Arrays.asList( + BinaryString.fromString("dt=20260410/region=1"), + BinaryString.fromString("dt=20260411/region=1"))); + assertThat(readWithFilter(multiPartitionsTable, filter, new int[] {0, 1})) + .containsExactlyInAnyOrder( + GenericRow.of(BinaryString.fromString("dt=20260410/region=1"), 1L), + GenericRow.of(BinaryString.fromString("dt=20260411/region=1"), 1L)); + } + + private List readWithFilter(Table table, Predicate filter, int[] projection) + throws Exception { + ReadBuilder readBuilder = table.newReadBuilder().withFilter(filter); + if (projection != null) { + readBuilder.withProjection(projection); + } + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + InternalRowSerializer serializer = + new InternalRowSerializer( + projection == null + ? table.rowType() + : Projection.of(projection).project(table.rowType())); + List rows = new ArrayList<>(); + reader.forEachRemaining(row -> rows.add(serializer.copy(row))); + return rows; + } } From 6ab7f9ce9d4c97d7d9309c0e4a471311e41c8c75 Mon Sep 17 00:00:00 2001 From: Sun Dapeng Date: Sat, 11 Apr 2026 15:05:41 +0800 Subject: [PATCH 2/4] [core] Fix null partition value mismatch in PartitionsTable filterByPredicate filterByPredicate used raw p.spec().get(key) which renders null as literal "null", while toRow substitutes null with defaultPartitionName. This caused predicate pushdown to fail matching null-valued partitions. Co-Authored-By: Claude Opus 4.6 --- .../java/org/apache/paimon/table/system/PartitionsTable.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java index dd3caa116293..b945ab5a3e46 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java @@ -429,6 +429,7 @@ private List filterByPredicate( return partitions; } List partitionKeys = fileStoreTable.partitionKeys(); + String defaultPartitionName = fileStoreTable.coreOptions().partitionDefaultName(); return partitions.stream() .filter( p -> { @@ -437,9 +438,10 @@ private List filterByPredicate( if (i > 0) { sb.append("/"); } + String value = p.spec().get(partitionKeys.get(i)); sb.append(partitionKeys.get(i)) .append("=") - .append(p.spec().get(partitionKeys.get(i))); + .append(value == null ? defaultPartitionName : value); } return partitionPredicate.test( GenericRow.of(BinaryString.fromString(sb.toString()))); From e5185ab10b532df733ee2dbde71d38b637f0de16 Mon Sep 17 00:00:00 2001 From: Sun Dapeng Date: Sat, 11 Apr 2026 17:31:34 +0800 Subject: [PATCH 3/4] [core] Fix RestPartitionsTableTest by creating table through catalog API testPartitionPredicateFilterMultiColumnKeys created MultiPartTable directly via filesystem (SchemaUtils.forceCommit), which works for local catalog but fails for REST catalog since it's unaware of tables created outside its API. Co-Authored-By: Claude Opus 4.6 --- .../apache/paimon/table/system/PartitionsTableTest.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java index 49b01c9bc780..0971cf50f133 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java @@ -226,8 +226,6 @@ public void testPartitionPredicateFilterNonPartitionColumn() throws Exception { @Test public void testPartitionPredicateFilterMultiColumnKeys() throws Exception { String testTableName = "MultiPartTable"; - FileIO fileIO = LocalFileIO.create(); - Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, testTableName)); Schema schema = Schema.newBuilder() .column("pk", DataTypes.INT()) @@ -239,10 +237,9 @@ public void testPartitionPredicateFilterMultiColumnKeys() throws Exception { .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input") .option("bucket", "1") .build(); - TableSchema tableSchema = - SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), schema); - FileStoreTable multiTable = - FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema); + Identifier multiTableId = identifier(testTableName); + catalog.createTable(multiTableId, schema, true); + FileStoreTable multiTable = (FileStoreTable) catalog.getTable(multiTableId); Identifier multiPartitionsTableId = identifier(testTableName + SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS); From 62d378eb9f658d4b77be78e7351acb1f5d4136d6 Mon Sep 17 00:00:00 2001 From: Sun Dapeng Date: Sat, 11 Apr 2026 21:01:50 +0800 Subject: [PATCH 4/4] [core] Fix default partition handling and improve predicate pushdown correctness - Handle __DEFAULT_PARTITION__ in buildPartitionPredicate() by generating isNull() instead of castFromString() which throws NumberFormatException on non-string partition types (e.g. INT) - Fix scan path to skip pushdown for unsupported predicates instead of returning empty results - Pass defaultPartitionName through all system table callers Co-Authored-By: Claude Opus 4.6 --- .../paimon/table/system/BucketsTable.java | 3 +- .../table/system/FileKeyRangesTable.java | 6 +- .../paimon/table/system/FilesTable.java | 6 +- .../paimon/table/system/PartitionsTable.java | 11 +-- .../utils/PartitionPredicateHelper.java | 68 ++++++++++++------- 5 files changed, 62 insertions(+), 32 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java index e5c419f515df..5db93ac6685f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java @@ -234,7 +234,8 @@ public RecordReader createReader(Split split) { snapshotReader, bucketsSplit.partitionPredicate, partitionKeys, - partitionType); + partitionType, + fileStoreTable.coreOptions().partitionDefaultName()); if (!hasResults) { return new IteratorRecordReader<>(Collections.emptyIterator()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java index 095dcda74a82..4eed653a30f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java @@ -170,7 +170,11 @@ public Plan innerPlan() { RowType partitionType = fileStoreTable.schema().logicalPartitionType(); boolean hasResults = PartitionPredicateHelper.applyPartitionFilter( - snapshotReader, partitionPredicate, partitionKeys, partitionType); + snapshotReader, + partitionPredicate, + partitionKeys, + partitionType, + fileStoreTable.coreOptions().partitionDefaultName()); if (!hasResults) { return Collections::emptyList; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 0c7eadb7ae62..314db9f77601 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -196,7 +196,11 @@ public Plan innerPlan() { RowType partitionType = fileStoreTable.schema().logicalPartitionType(); boolean hasResults = PartitionPredicateHelper.applyPartitionFilter( - snapshotReader, partitionPredicate, partitionKeys, partitionType); + snapshotReader, + partitionPredicate, + partitionKeys, + partitionType, + fileStoreTable.coreOptions().partitionDefaultName()); if (!hasResults) { return Collections::emptyList; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java index b945ab5a3e46..49961b3bbf37 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java @@ -455,13 +455,16 @@ private List listPartitionEntries(@Nullable LeafPredicate partitionPr if (partitionPredicate != null) { List partitionKeys = fileStoreTable.partitionKeys(); RowType partitionType = fileStoreTable.schema().logicalPartitionType(); + String defaultPartitionName = fileStoreTable.coreOptions().partitionDefaultName(); Predicate partPred = PartitionPredicateHelper.buildPartitionPredicate( - partitionPredicate, partitionKeys, partitionType); - if (partPred == null) { - return Collections.emptyList(); + partitionPredicate, + partitionKeys, + partitionType, + defaultPartitionName); + if (partPred != null) { + scan.withPartitionFilter(partPred); } - scan.withPartitionFilter(partPred); } List partitionEntries = scan.listPartitionEntries(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPredicateHelper.java b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPredicateHelper.java index 1806fdfa9450..694ac9734115 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPredicateHelper.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPredicateHelper.java @@ -42,14 +42,17 @@ public class PartitionPredicateHelper { /** * Build a partition-typed predicate from a string-based leaf predicate on the "partition" - * column. + * column. Only Equal and In predicates are supported for pushdown. * - * @return the predicate on partition fields, or {@code null} if the partition spec is invalid - * (indicating no results should be returned) + * @return the predicate on partition fields, or {@code null} if the predicate cannot be pushed + * down (unsupported predicate type or invalid partition spec) */ @Nullable public static Predicate buildPartitionPredicate( - LeafPredicate partitionPredicate, List partitionKeys, RowType partitionType) { + LeafPredicate partitionPredicate, + List partitionKeys, + RowType partitionType, + String defaultPartitionName) { if (partitionPredicate.function() instanceof Equal) { LinkedHashMap partSpec = parsePartitionSpec( @@ -60,10 +63,13 @@ public static Predicate buildPartitionPredicate( PredicateBuilder partBuilder = new PredicateBuilder(partitionType); List predicates = new ArrayList<>(); for (int i = 0; i < partitionKeys.size(); i++) { - Object value = - TypeUtils.castFromString( - partSpec.get(partitionKeys.get(i)), partitionType.getTypeAt(i)); - predicates.add(partBuilder.equal(i, value)); + String strValue = partSpec.get(partitionKeys.get(i)); + if (defaultPartitionName.equals(strValue)) { + predicates.add(partBuilder.isNull(i)); + } else { + Object value = TypeUtils.castFromString(strValue, partitionType.getTypeAt(i)); + predicates.add(partBuilder.equal(i, value)); + } } return PredicateBuilder.and(predicates); } else if (partitionPredicate.function() instanceof In) { @@ -77,15 +83,22 @@ public static Predicate buildPartitionPredicate( } List andPredicates = new ArrayList<>(); for (int i = 0; i < partitionKeys.size(); i++) { - Object value = - TypeUtils.castFromString( - partSpec.get(partitionKeys.get(i)), partitionType.getTypeAt(i)); - andPredicates.add(partBuilder.equal(i, value)); + String strValue = partSpec.get(partitionKeys.get(i)); + if (defaultPartitionName.equals(strValue)) { + andPredicates.add(partBuilder.isNull(i)); + } else { + Object value = + TypeUtils.castFromString(strValue, partitionType.getTypeAt(i)); + andPredicates.add(partBuilder.equal(i, value)); + } } orPredicates.add(PredicateBuilder.and(andPredicates)); } return orPredicates.isEmpty() ? null : PredicateBuilder.or(orPredicates); - } else if (partitionPredicate.function() instanceof LeafBinaryFunction) { + } + // Range predicates (>, >=, <, <=) can be pushed down for simple value formats + // used by BucketsTable, FilesTable, FileKeyRangesTable. + if (partitionPredicate.function() instanceof LeafBinaryFunction) { LinkedHashMap partSpec = parsePartitionSpec( partitionPredicate.literals().get(0).toString(), partitionKeys); @@ -95,16 +108,19 @@ public static Predicate buildPartitionPredicate( PredicateBuilder partBuilder = new PredicateBuilder(partitionType); List predicates = new ArrayList<>(); for (int i = 0; i < partitionKeys.size(); i++) { - Object value = - TypeUtils.castFromString( - partSpec.get(partitionKeys.get(i)), partitionType.getTypeAt(i)); - predicates.add( - new LeafPredicate( - partitionPredicate.function(), - partitionType.getTypeAt(i), - i, - partitionKeys.get(i), - Collections.singletonList(value))); + String strValue = partSpec.get(partitionKeys.get(i)); + if (defaultPartitionName.equals(strValue)) { + predicates.add(partBuilder.isNull(i)); + } else { + Object value = TypeUtils.castFromString(strValue, partitionType.getTypeAt(i)); + predicates.add( + new LeafPredicate( + partitionPredicate.function(), + partitionType.getTypeAt(i), + i, + partitionKeys.get(i), + Collections.singletonList(value))); + } } return PredicateBuilder.and(predicates); } @@ -115,13 +131,15 @@ public static boolean applyPartitionFilter( SnapshotReader snapshotReader, @Nullable LeafPredicate partitionPredicate, List partitionKeys, - RowType partitionType) { + RowType partitionType, + String defaultPartitionName) { if (partitionPredicate == null) { return true; } Predicate predicate = - buildPartitionPredicate(partitionPredicate, partitionKeys, partitionType); + buildPartitionPredicate( + partitionPredicate, partitionKeys, partitionType, defaultPartitionName); if (predicate == null) { return false; }