diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java index e5c419f515df..5db93ac6685f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java @@ -234,7 +234,8 @@ public RecordReader createReader(Split split) { snapshotReader, bucketsSplit.partitionPredicate, partitionKeys, - partitionType); + partitionType, + fileStoreTable.coreOptions().partitionDefaultName()); if (!hasResults) { return new IteratorRecordReader<>(Collections.emptyIterator()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java index 095dcda74a82..4eed653a30f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java @@ -170,7 +170,11 @@ public Plan innerPlan() { RowType partitionType = fileStoreTable.schema().logicalPartitionType(); boolean hasResults = PartitionPredicateHelper.applyPartitionFilter( - snapshotReader, partitionPredicate, partitionKeys, partitionType); + snapshotReader, + partitionPredicate, + partitionKeys, + partitionType, + fileStoreTable.coreOptions().partitionDefaultName()); if (!hasResults) { return Collections::emptyList; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 0c7eadb7ae62..314db9f77601 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -196,7 +196,11 @@ public Plan innerPlan() { RowType partitionType = fileStoreTable.schema().logicalPartitionType(); boolean hasResults = PartitionPredicateHelper.applyPartitionFilter( - snapshotReader, partitionPredicate, partitionKeys, partitionType); + snapshotReader, + partitionPredicate, + partitionKeys, + partitionType, + fileStoreTable.coreOptions().partitionDefaultName()); if (!hasResults) { return Collections::emptyList; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java index 296542d70a0c..49961b3bbf37 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java @@ -35,7 +35,12 @@ import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.In; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.LeafPredicateExtractor; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; @@ -56,15 +61,19 @@ import org.apache.paimon.utils.InternalRowUtils; import org.apache.paimon.utils.IteratorRecordReader; import org.apache.paimon.utils.JsonSerdeUtil; +import org.apache.paimon.utils.PartitionPredicateHelper; import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.SerializationUtils; import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; +import javax.annotation.Nullable; + import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -140,14 +149,67 @@ public Table copy(Map dynamicOptions) { private static class PartitionsScan extends ReadOnceTableScan { + @Nullable private LeafPredicate partitionPredicate; + @Override public InnerTableScan withFilter(Predicate predicate) { + if (predicate == null) { + return this; + } + + Map leafPredicates = + predicate.visit(LeafPredicateExtractor.INSTANCE); + this.partitionPredicate = leafPredicates.get("partition"); + + // Handle Or(Equal, Equal...) pattern from PredicateBuilder.in() with <=20 literals + if (this.partitionPredicate == null) { + for (Predicate andChild : PredicateBuilder.splitAnd(predicate)) { + LeafPredicate inPred = convertOrEqualsToIn(andChild, "partition"); + if (inPred != null) { + this.partitionPredicate = inPred; + break; + } + } + } return this; } + @Nullable + private static LeafPredicate convertOrEqualsToIn(Predicate predicate, String targetField) { + List orChildren = PredicateBuilder.splitOr(predicate); + if (orChildren.size() <= 1) { + return null; + } + List literals = new ArrayList<>(); + String fieldName = null; + int fieldIndex = -1; + DataType fieldType = null; + for (Predicate child : orChildren) { + if (!(child instanceof LeafPredicate)) { + return null; + } + LeafPredicate leaf = (LeafPredicate) child; + if (!(leaf.function() instanceof Equal)) { + return null; + } + if (fieldName == null) { + fieldName = leaf.fieldName(); + fieldIndex = leaf.index(); + fieldType = leaf.type(); + } else if (!fieldName.equals(leaf.fieldName())) { + return null; + } + literals.addAll(leaf.literals()); + } + if (!targetField.equals(fieldName)) { + return null; + } + return new LeafPredicate(In.INSTANCE, fieldType, fieldIndex, fieldName, literals); + } + @Override public Plan innerPlan() { - return () -> Collections.singletonList(new PartitionsSplit()); + return () -> Collections.singletonList(new PartitionsSplit(partitionPredicate)); } } @@ -155,17 +217,27 @@ private static class PartitionsSplit extends SingletonSplit { private static final long serialVersionUID = 1L; + @Nullable private final LeafPredicate partitionPredicate; + + private PartitionsSplit(@Nullable LeafPredicate partitionPredicate) { + this.partitionPredicate = partitionPredicate; + } + @Override public boolean equals(Object o) { if (this == o) { return true; } - return o != null && getClass() == o.getClass(); + if (o == null || getClass() != o.getClass()) { + return false; + } + PartitionsSplit that = (PartitionsSplit) o; + return Objects.equals(partitionPredicate, that.partitionPredicate); } @Override public int hashCode() { - return 1; + return Objects.hash(partitionPredicate); } @Override @@ -186,7 +258,7 @@ public PartitionsRead(FileStoreTable table) { @Override public InnerTableRead withFilter(Predicate predicate) { - // TODO + // filter pushdown is handled at the Scan layer through PartitionsSplit return this; } @@ -207,7 +279,8 @@ public RecordReader createReader(Split split) throws IOException { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } - List partitions = listPartitions(); + PartitionsSplit partitionsSplit = (PartitionsSplit) split; + List partitions = listPartitions(partitionsSplit.partitionPredicate); List fieldTypes = fileStoreTable.schema().logicalPartitionType().getFieldTypes(); @@ -320,17 +393,17 @@ private Timestamp toTimestamp(Long epochMillis) { Instant.ofEpochMilli(epochMillis), ZoneId.systemDefault())); } - private List listPartitions() { + private List listPartitions(@Nullable LeafPredicate partitionPredicate) { CatalogLoader catalogLoader = fileStoreTable.catalogEnvironment().catalogLoader(); if (TimeTravelUtil.hasTimeTravelOptions(new Options(fileStoreTable.options())) || catalogLoader == null) { - return listPartitionEntries(); + return listPartitionEntries(partitionPredicate); } try (Catalog catalog = catalogLoader.load()) { Identifier baseIdentifier = fileStoreTable.catalogEnvironment().identifier(); if (baseIdentifier == null) { - return listPartitionEntries(); + return listPartitionEntries(partitionPredicate); } String branch = fileStoreTable.coreOptions().branch(); Identifier identifier; @@ -343,15 +416,58 @@ private List listPartitions() { } else { identifier = baseIdentifier; } - return catalog.listPartitions(identifier); + List partitions = catalog.listPartitions(identifier); + return filterByPredicate(partitions, partitionPredicate); } catch (Exception e) { - return listPartitionEntries(); + return listPartitionEntries(partitionPredicate); + } + } + + private List filterByPredicate( + List partitions, @Nullable LeafPredicate partitionPredicate) { + if (partitionPredicate == null) { + return partitions; } + List partitionKeys = fileStoreTable.partitionKeys(); + String defaultPartitionName = fileStoreTable.coreOptions().partitionDefaultName(); + return partitions.stream() + .filter( + p -> { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < partitionKeys.size(); i++) { + if (i > 0) { + sb.append("/"); + } + String value = p.spec().get(partitionKeys.get(i)); + sb.append(partitionKeys.get(i)) + .append("=") + .append(value == null ? defaultPartitionName : value); + } + return partitionPredicate.test( + GenericRow.of(BinaryString.fromString(sb.toString()))); + }) + .collect(Collectors.toList()); } - private List listPartitionEntries() { - List partitionEntries = - fileStoreTable.newScan().withLevelFilter(level -> true).listPartitionEntries(); + private List listPartitionEntries(@Nullable LeafPredicate partitionPredicate) { + InnerTableScan scan = fileStoreTable.newScan().withLevelFilter(level -> true); + + if (partitionPredicate != null) { + List partitionKeys = fileStoreTable.partitionKeys(); + RowType partitionType = fileStoreTable.schema().logicalPartitionType(); + String defaultPartitionName = fileStoreTable.coreOptions().partitionDefaultName(); + Predicate partPred = + PartitionPredicateHelper.buildPartitionPredicate( + partitionPredicate, + partitionKeys, + partitionType, + defaultPartitionName); + if (partPred != null) { + scan.withPartitionFilter(partPred); + } + } + + List partitionEntries = scan.listPartitionEntries(); RowType partitionType = fileStoreTable.schema().logicalPartitionType(); String defaultPartitionName = fileStoreTable.coreOptions().partitionDefaultName(); String[] partitionColumns = fileStoreTable.partitionKeys().toArray(new String[0]); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPredicateHelper.java b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPredicateHelper.java index 634d9e5d5bdc..694ac9734115 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPredicateHelper.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPredicateHelper.java @@ -36,27 +36,42 @@ /** * Helper for applying partition predicate pushdown in system tables (BucketsTable, FilesTable, - * FileKeyRangesTable). + * FileKeyRangesTable, PartitionsTable). */ public class PartitionPredicateHelper { - public static boolean applyPartitionFilter( - SnapshotReader snapshotReader, - @Nullable LeafPredicate partitionPredicate, + /** + * Build a partition-typed predicate from a string-based leaf predicate on the "partition" + * column. Only Equal and In predicates are supported for pushdown. + * + * @return the predicate on partition fields, or {@code null} if the predicate cannot be pushed + * down (unsupported predicate type or invalid partition spec) + */ + @Nullable + public static Predicate buildPartitionPredicate( + LeafPredicate partitionPredicate, List partitionKeys, - RowType partitionType) { - if (partitionPredicate == null) { - return true; - } - + RowType partitionType, + String defaultPartitionName) { if (partitionPredicate.function() instanceof Equal) { LinkedHashMap partSpec = parsePartitionSpec( partitionPredicate.literals().get(0).toString(), partitionKeys); if (partSpec == null) { - return false; + return null; + } + PredicateBuilder partBuilder = new PredicateBuilder(partitionType); + List predicates = new ArrayList<>(); + for (int i = 0; i < partitionKeys.size(); i++) { + String strValue = partSpec.get(partitionKeys.get(i)); + if (defaultPartitionName.equals(strValue)) { + predicates.add(partBuilder.isNull(i)); + } else { + Object value = TypeUtils.castFromString(strValue, partitionType.getTypeAt(i)); + predicates.add(partBuilder.equal(i, value)); + } } - snapshotReader.withPartitionFilter(partSpec); + return PredicateBuilder.and(predicates); } else if (partitionPredicate.function() instanceof In) { List orPredicates = new ArrayList<>(); PredicateBuilder partBuilder = new PredicateBuilder(partitionType); @@ -68,27 +83,36 @@ public static boolean applyPartitionFilter( } List andPredicates = new ArrayList<>(); for (int i = 0; i < partitionKeys.size(); i++) { - Object value = - TypeUtils.castFromString( - partSpec.get(partitionKeys.get(i)), partitionType.getTypeAt(i)); - andPredicates.add(partBuilder.equal(i, value)); + String strValue = partSpec.get(partitionKeys.get(i)); + if (defaultPartitionName.equals(strValue)) { + andPredicates.add(partBuilder.isNull(i)); + } else { + Object value = + TypeUtils.castFromString(strValue, partitionType.getTypeAt(i)); + andPredicates.add(partBuilder.equal(i, value)); + } } orPredicates.add(PredicateBuilder.and(andPredicates)); } - if (!orPredicates.isEmpty()) { - snapshotReader.withPartitionFilter(PredicateBuilder.or(orPredicates)); - } - } else if (partitionPredicate.function() instanceof LeafBinaryFunction) { + return orPredicates.isEmpty() ? null : PredicateBuilder.or(orPredicates); + } + // Range predicates (>, >=, <, <=) can be pushed down for simple value formats + // used by BucketsTable, FilesTable, FileKeyRangesTable. + if (partitionPredicate.function() instanceof LeafBinaryFunction) { LinkedHashMap partSpec = parsePartitionSpec( partitionPredicate.literals().get(0).toString(), partitionKeys); - if (partSpec != null) { - PredicateBuilder partBuilder = new PredicateBuilder(partitionType); - List predicates = new ArrayList<>(); - for (int i = 0; i < partitionKeys.size(); i++) { - Object value = - TypeUtils.castFromString( - partSpec.get(partitionKeys.get(i)), partitionType.getTypeAt(i)); + if (partSpec == null) { + return null; + } + PredicateBuilder partBuilder = new PredicateBuilder(partitionType); + List predicates = new ArrayList<>(); + for (int i = 0; i < partitionKeys.size(); i++) { + String strValue = partSpec.get(partitionKeys.get(i)); + if (defaultPartitionName.equals(strValue)) { + predicates.add(partBuilder.isNull(i)); + } else { + Object value = TypeUtils.castFromString(strValue, partitionType.getTypeAt(i)); predicates.add( new LeafPredicate( partitionPredicate.function(), @@ -97,29 +121,69 @@ public static boolean applyPartitionFilter( partitionKeys.get(i), Collections.singletonList(value))); } - snapshotReader.withPartitionFilter(PredicateBuilder.and(predicates)); } + return PredicateBuilder.and(predicates); } + return null; + } + public static boolean applyPartitionFilter( + SnapshotReader snapshotReader, + @Nullable LeafPredicate partitionPredicate, + List partitionKeys, + RowType partitionType, + String defaultPartitionName) { + if (partitionPredicate == null) { + return true; + } + + Predicate predicate = + buildPartitionPredicate( + partitionPredicate, partitionKeys, partitionType, defaultPartitionName); + if (predicate == null) { + return false; + } + snapshotReader.withPartitionFilter(predicate); return true; } @Nullable public static LinkedHashMap parsePartitionSpec( String partitionStr, List partitionKeys) { + // Handle {value1, value2} format (BucketsTable, FilesTable, FileKeyRangesTable) if (partitionStr.startsWith("{")) { partitionStr = partitionStr.substring(1); + if (partitionStr.endsWith("}")) { + partitionStr = partitionStr.substring(0, partitionStr.length() - 1); + } + String[] partFields = partitionStr.split(", "); + if (partitionKeys.size() != partFields.length) { + return null; + } + LinkedHashMap partSpec = new LinkedHashMap<>(); + for (int i = 0; i < partitionKeys.size(); i++) { + partSpec.put(partitionKeys.get(i), partFields[i]); + } + return partSpec; } - if (partitionStr.endsWith("}")) { - partitionStr = partitionStr.substring(0, partitionStr.length() - 1); - } - String[] partFields = partitionStr.split(", "); + + // Handle key=value/key=value format (PartitionsTable) + String[] partFields = partitionStr.split("/"); if (partitionKeys.size() != partFields.length) { return null; } LinkedHashMap partSpec = new LinkedHashMap<>(); - for (int i = 0; i < partitionKeys.size(); i++) { - partSpec.put(partitionKeys.get(i), partFields[i]); + for (String field : partFields) { + int eqIndex = field.indexOf('='); + if (eqIndex < 0) { + return null; + } + partSpec.put(field.substring(0, eqIndex), field.substring(eqIndex + 1)); + } + for (String key : partitionKeys) { + if (!partSpec.containsKey(key)) { + return null; + } } return partSpec; } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java index 8091288c60b2..0971cf50f133 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java @@ -23,22 +23,30 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.Table; import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.Projection; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -167,4 +175,117 @@ void testPartitionWithLegacyPartitionName() throws Exception { List result = read(testPartitionsTable, new int[] {0, 1}); assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow); } + + @Test + public void testPartitionPredicateFilterEqual() throws Exception { + PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE); + + // Equal filter: partition = 'pt=1' + Predicate filter = builder.equal(0, BinaryString.fromString("pt=1")); + assertThat(readWithFilter(partitionsTable, filter, new int[] {0, 1})) + .containsExactlyInAnyOrder(GenericRow.of(BinaryString.fromString("pt=1"), 2L)); + + // Equal filter: partition = 'pt=2' + filter = builder.equal(0, BinaryString.fromString("pt=2")); + assertThat(readWithFilter(partitionsTable, filter, new int[] {0, 1})) + .containsExactlyInAnyOrder(GenericRow.of(BinaryString.fromString("pt=2"), 1L)); + } + + @Test + public void testPartitionPredicateFilterIn() throws Exception { + PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE); + + Predicate filter = + builder.in( + 0, + Arrays.asList( + BinaryString.fromString("pt=1"), BinaryString.fromString("pt=3"))); + assertThat(readWithFilter(partitionsTable, filter, new int[] {0, 1})) + .containsExactlyInAnyOrder( + GenericRow.of(BinaryString.fromString("pt=1"), 2L), + GenericRow.of(BinaryString.fromString("pt=3"), 1L)); + } + + @Test + public void testPartitionPredicateFilterNoMatch() throws Exception { + PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE); + + Predicate filter = builder.equal(0, BinaryString.fromString("pt=999")); + assertThat(readWithFilter(partitionsTable, filter, new int[] {0, 1})).isEmpty(); + } + + @Test + public void testPartitionPredicateFilterNonPartitionColumn() throws Exception { + PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE); + + // Filter on record_count column — should be safely ignored, return all partitions + Predicate filter = builder.greaterThan(1, 0L); + assertThat(readWithFilter(partitionsTable, filter, new int[] {0, 1})).hasSize(3); + } + + @Test + public void testPartitionPredicateFilterMultiColumnKeys() throws Exception { + String testTableName = "MultiPartTable"; + Schema schema = + Schema.newBuilder() + .column("pk", DataTypes.INT()) + .column("dt", DataTypes.INT()) + .column("region", DataTypes.INT()) + .column("col1", DataTypes.INT()) + .partitionKeys("dt", "region") + .primaryKey("pk", "dt", "region") + .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input") + .option("bucket", "1") + .build(); + Identifier multiTableId = identifier(testTableName); + catalog.createTable(multiTableId, schema, true); + FileStoreTable multiTable = (FileStoreTable) catalog.getTable(multiTableId); + + Identifier multiPartitionsTableId = + identifier(testTableName + SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS); + PartitionsTable multiPartitionsTable = + (PartitionsTable) catalog.getTable(multiPartitionsTableId); + + write(multiTable, GenericRow.of(1, 20260410, 1, 100)); + write(multiTable, GenericRow.of(2, 20260410, 2, 200)); + write(multiTable, GenericRow.of(3, 20260411, 1, 300)); + + PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE); + + // Equal filter on multi-column partition + Predicate filter = builder.equal(0, BinaryString.fromString("dt=20260410/region=1")); + assertThat(readWithFilter(multiPartitionsTable, filter, new int[] {0, 1})) + .containsExactlyInAnyOrder( + GenericRow.of(BinaryString.fromString("dt=20260410/region=1"), 1L)); + + // IN filter on multi-column partition + filter = + builder.in( + 0, + Arrays.asList( + BinaryString.fromString("dt=20260410/region=1"), + BinaryString.fromString("dt=20260411/region=1"))); + assertThat(readWithFilter(multiPartitionsTable, filter, new int[] {0, 1})) + .containsExactlyInAnyOrder( + GenericRow.of(BinaryString.fromString("dt=20260410/region=1"), 1L), + GenericRow.of(BinaryString.fromString("dt=20260411/region=1"), 1L)); + } + + private List readWithFilter(Table table, Predicate filter, int[] projection) + throws Exception { + ReadBuilder readBuilder = table.newReadBuilder().withFilter(filter); + if (projection != null) { + readBuilder.withProjection(projection); + } + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + InternalRowSerializer serializer = + new InternalRowSerializer( + projection == null + ? table.rowType() + : Projection.of(projection).project(table.rowType())); + List rows = new ArrayList<>(); + reader.forEachRemaining(row -> rows.add(serializer.copy(row))); + return rows; + } }