Skip to content

Commit 051dc75

Browse files
authored
[core] Support partition and bucket predicate pushdown for BucketsTable (#7592)
1 parent bb00b63 commit 051dc75

5 files changed

Lines changed: 292 additions & 169 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.paimon.disk.IOManager;
2828
import org.apache.paimon.fs.FileIO;
2929
import org.apache.paimon.manifest.BucketEntry;
30+
import org.apache.paimon.predicate.LeafPredicate;
31+
import org.apache.paimon.predicate.LeafPredicateExtractor;
3032
import org.apache.paimon.predicate.Predicate;
3133
import org.apache.paimon.reader.RecordReader;
3234
import org.apache.paimon.table.FileStoreTable;
@@ -38,6 +40,7 @@
3840
import org.apache.paimon.table.source.SingletonSplit;
3941
import org.apache.paimon.table.source.Split;
4042
import org.apache.paimon.table.source.TableRead;
43+
import org.apache.paimon.table.source.snapshot.SnapshotReader;
4144
import org.apache.paimon.types.BigIntType;
4245
import org.apache.paimon.types.DataField;
4346
import org.apache.paimon.types.DataTypes;
@@ -48,6 +51,8 @@
4851

4952
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
5053

54+
import javax.annotation.Nullable;
55+
5156
import java.time.Instant;
5257
import java.time.LocalDateTime;
5358
import java.time.ZoneId;
@@ -57,9 +62,11 @@
5762
import java.util.Iterator;
5863
import java.util.List;
5964
import java.util.Map;
65+
import java.util.Objects;
6066
import java.util.OptionalLong;
6167

6268
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
69+
import static org.apache.paimon.utils.PartitionPredicateHelper.applyPartitionFilter;
6370

6471
/** A {@link Table} for showing buckets info. */
6572
public class BucketsTable implements ReadonlyTable {
@@ -121,32 +128,60 @@ public Table copy(Map<String, String> dynamicOptions) {
121128

122129
private static class BucketsScan extends ReadOnceTableScan {
123130

131+
@Nullable private LeafPredicate partitionPredicate;
132+
@Nullable private LeafPredicate bucketPredicate;
133+
124134
@Override
125135
public InnerTableScan withFilter(Predicate predicate) {
136+
if (predicate == null) {
137+
return this;
138+
}
139+
140+
Map<String, LeafPredicate> leafPredicates =
141+
predicate.visit(LeafPredicateExtractor.INSTANCE);
142+
this.partitionPredicate = leafPredicates.get("partition");
143+
this.bucketPredicate = leafPredicates.get("bucket");
126144
return this;
127145
}
128146

129147
@Override
130148
public Plan innerPlan() {
131-
return () -> Collections.singletonList(new BucketsSplit());
149+
return () ->
150+
Collections.singletonList(
151+
new BucketsSplit(partitionPredicate, bucketPredicate));
132152
}
133153
}
134154

135155
private static class BucketsSplit extends SingletonSplit {
136156

137157
private static final long serialVersionUID = 1L;
138158

159+
@Nullable private final LeafPredicate partitionPredicate;
160+
@Nullable private final LeafPredicate bucketPredicate;
161+
162+
private BucketsSplit(
163+
@Nullable LeafPredicate partitionPredicate,
164+
@Nullable LeafPredicate bucketPredicate) {
165+
this.partitionPredicate = partitionPredicate;
166+
this.bucketPredicate = bucketPredicate;
167+
}
168+
139169
@Override
140170
public boolean equals(Object o) {
141171
if (this == o) {
142172
return true;
143173
}
144-
return o != null && getClass() == o.getClass();
174+
if (o == null || getClass() != o.getClass()) {
175+
return false;
176+
}
177+
BucketsSplit that = (BucketsSplit) o;
178+
return Objects.equals(partitionPredicate, that.partitionPredicate)
179+
&& Objects.equals(bucketPredicate, that.bucketPredicate);
145180
}
146181

147182
@Override
148183
public int hashCode() {
149-
return 1;
184+
return Objects.hash(partitionPredicate, bucketPredicate);
150185
}
151186

152187
@Override
@@ -167,7 +202,7 @@ public BucketsRead(FileStoreTable table) {
167202

168203
@Override
169204
public InnerTableRead withFilter(Predicate predicate) {
170-
// TODO
205+
// filter pushdown is handled at the Scan layer through BucketsSplit
171206
return this;
172207
}
173208

@@ -188,7 +223,29 @@ public RecordReader<InternalRow> createReader(Split split) {
188223
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
189224
}
190225

191-
List<BucketEntry> buckets = fileStoreTable.newSnapshotReader().bucketEntries();
226+
BucketsSplit bucketsSplit = (BucketsSplit) split;
227+
SnapshotReader snapshotReader = fileStoreTable.newSnapshotReader();
228+
229+
// Apply partition filter to SnapshotReader
230+
List<String> partitionKeys = fileStoreTable.partitionKeys();
231+
RowType partitionType = fileStoreTable.schema().logicalPartitionType();
232+
boolean hasResults =
233+
applyPartitionFilter(
234+
snapshotReader,
235+
bucketsSplit.partitionPredicate,
236+
partitionKeys,
237+
partitionType);
238+
if (!hasResults) {
239+
return new IteratorRecordReader<>(Collections.emptyIterator());
240+
}
241+
242+
// Apply bucket filter to SnapshotReader
243+
if (bucketsSplit.bucketPredicate != null) {
244+
LeafPredicate bp = bucketsSplit.bucketPredicate;
245+
snapshotReader.withBucketFilter(bucket -> bp.test(GenericRow.of(null, bucket)));
246+
}
247+
248+
List<BucketEntry> buckets = snapshotReader.bucketEntries();
192249

193250
@SuppressWarnings("unchecked")
194251
CastExecutor<InternalRow, BinaryString> partitionCastExecutor =

paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java

Lines changed: 8 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,9 @@
2727
import org.apache.paimon.fs.FileIO;
2828
import org.apache.paimon.io.DataFileMeta;
2929
import org.apache.paimon.io.DataFilePathFactory;
30-
import org.apache.paimon.predicate.Equal;
31-
import org.apache.paimon.predicate.In;
32-
import org.apache.paimon.predicate.LeafBinaryFunction;
3330
import org.apache.paimon.predicate.LeafPredicate;
3431
import org.apache.paimon.predicate.LeafPredicateExtractor;
3532
import org.apache.paimon.predicate.Predicate;
36-
import org.apache.paimon.predicate.PredicateBuilder;
3733
import org.apache.paimon.reader.RecordReader;
3834
import org.apache.paimon.schema.SchemaManager;
3935
import org.apache.paimon.schema.TableSchema;
@@ -52,10 +48,10 @@
5248
import org.apache.paimon.types.IntType;
5349
import org.apache.paimon.types.RowType;
5450
import org.apache.paimon.utils.IteratorRecordReader;
51+
import org.apache.paimon.utils.PartitionPredicateHelper;
5552
import org.apache.paimon.utils.ProjectedRow;
5653
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
5754
import org.apache.paimon.utils.SerializationUtils;
58-
import org.apache.paimon.utils.TypeUtils;
5955

6056
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
6157

@@ -66,7 +62,6 @@
6662
import java.util.Collections;
6763
import java.util.HashMap;
6864
import java.util.Iterator;
69-
import java.util.LinkedHashMap;
7065
import java.util.List;
7166
import java.util.Map;
7267
import java.util.function.Function;
@@ -171,89 +166,20 @@ public InnerTableScan withFilter(Predicate pushdown) {
171166
@Override
172167
public Plan innerPlan() {
173168
SnapshotReader snapshotReader = fileStoreTable.newSnapshotReader();
174-
if (partitionPredicate != null) {
175-
List<String> partitionKeys = fileStoreTable.partitionKeys();
176-
RowType partitionType = fileStoreTable.schema().logicalPartitionType();
177-
if (partitionPredicate.function() instanceof Equal) {
178-
LinkedHashMap<String, String> partSpec =
179-
parsePartitionSpec(
180-
partitionPredicate.literals().get(0).toString(), partitionKeys);
181-
if (partSpec == null) {
182-
return Collections::emptyList;
183-
}
184-
snapshotReader.withPartitionFilter(partSpec);
185-
} else if (partitionPredicate.function() instanceof In) {
186-
List<Predicate> orPredicates = new ArrayList<>();
187-
PredicateBuilder partBuilder = new PredicateBuilder(partitionType);
188-
for (Object literal : partitionPredicate.literals()) {
189-
LinkedHashMap<String, String> partSpec =
190-
parsePartitionSpec(literal.toString(), partitionKeys);
191-
if (partSpec == null) {
192-
continue;
193-
}
194-
List<Predicate> andPredicates = new ArrayList<>();
195-
for (int i = 0; i < partitionKeys.size(); i++) {
196-
Object value =
197-
TypeUtils.castFromString(
198-
partSpec.get(partitionKeys.get(i)),
199-
partitionType.getTypeAt(i));
200-
andPredicates.add(partBuilder.equal(i, value));
201-
}
202-
orPredicates.add(PredicateBuilder.and(andPredicates));
203-
}
204-
if (!orPredicates.isEmpty()) {
205-
snapshotReader.withPartitionFilter(PredicateBuilder.or(orPredicates));
206-
}
207-
} else if (partitionPredicate.function() instanceof LeafBinaryFunction) {
208-
LinkedHashMap<String, String> partSpec =
209-
parsePartitionSpec(
210-
partitionPredicate.literals().get(0).toString(), partitionKeys);
211-
if (partSpec != null) {
212-
PredicateBuilder partBuilder = new PredicateBuilder(partitionType);
213-
List<Predicate> predicates = new ArrayList<>();
214-
for (int i = 0; i < partitionKeys.size(); i++) {
215-
Object value =
216-
TypeUtils.castFromString(
217-
partSpec.get(partitionKeys.get(i)),
218-
partitionType.getTypeAt(i));
219-
predicates.add(
220-
new LeafPredicate(
221-
partitionPredicate.function(),
222-
partitionType.getTypeAt(i),
223-
i,
224-
partitionKeys.get(i),
225-
Collections.singletonList(value)));
226-
}
227-
snapshotReader.withPartitionFilter(PredicateBuilder.and(predicates));
228-
}
229-
}
169+
List<String> partitionKeys = fileStoreTable.partitionKeys();
170+
RowType partitionType = fileStoreTable.schema().logicalPartitionType();
171+
boolean hasResults =
172+
PartitionPredicateHelper.applyPartitionFilter(
173+
snapshotReader, partitionPredicate, partitionKeys, partitionType);
174+
if (!hasResults) {
175+
return Collections::emptyList;
230176
}
231177

232178
return () ->
233179
snapshotReader.partitions().stream()
234180
.map(p -> new FilesTable.FilesSplit(p, bucketPredicate, levelPredicate))
235181
.collect(Collectors.toList());
236182
}
237-
238-
@Nullable
239-
private LinkedHashMap<String, String> parsePartitionSpec(
240-
String partitionStr, List<String> partitionKeys) {
241-
if (partitionStr.startsWith("{")) {
242-
partitionStr = partitionStr.substring(1);
243-
}
244-
if (partitionStr.endsWith("}")) {
245-
partitionStr = partitionStr.substring(0, partitionStr.length() - 1);
246-
}
247-
String[] partFields = partitionStr.split(", ");
248-
if (partitionKeys.size() != partFields.length) {
249-
return null;
250-
}
251-
LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
252-
for (int i = 0; i < partitionKeys.size(); i++) {
253-
partSpec.put(partitionKeys.get(i), partFields[i]);
254-
}
255-
return partSpec;
256-
}
257183
}
258184

259185
private static class FileKeyRangesRead implements InnerTableRead {

0 commit comments

Comments
 (0)