Skip to content

Commit 8e3d472

Browse files
[core] Do not pushdown limit when non partition filter is present (#7665)
1 parent 167a362 commit 8e3d472

8 files changed

Lines changed: 169 additions & 15 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,11 @@ public FileStoreScan withCompleteFilter(Predicate predicate) {
9999
public Iterator<ManifestEntry> readManifestEntries(
100100
List<ManifestFileMeta> manifestFiles, boolean useSequential) {
101101
Iterator<ManifestEntry> result = super.readManifestEntries(manifestFiles, useSequential);
102-
if (limit == null || limit <= 0 || deletionVectorsEnabled || dataEvolutionEnabled) {
102+
if (limit == null
103+
|| limit <= 0
104+
|| deletionVectorsEnabled
105+
|| dataEvolutionEnabled
106+
|| inputFilter != null) {
103107
return result;
104108
}
105109

paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,11 @@ public boolean limitPushdownEnabled() {
225225
return false;
226226
}
227227

228-
return mergeEngine != PARTIAL_UPDATE && mergeEngine != AGGREGATE && !deletionVectorsEnabled;
228+
return mergeEngine != PARTIAL_UPDATE
229+
&& mergeEngine != AGGREGATE
230+
&& !deletionVectorsEnabled
231+
&& valueFilter == null
232+
&& keyFilter == null;
229233
}
230234

231235
@Override

paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public List<PartitionEntry> listPartitionEntries() {
126126
}
127127

128128
private Optional<StartingScanner.Result> applyPushDownLimit() {
129-
if (pushDownLimit == null) {
129+
if (pushDownLimit == null || snapshotReader.hasNonPartitionFilter()) {
130130
return Optional.empty();
131131
}
132132

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ public interface SnapshotReader {
121121

122122
SnapshotReader withLimit(int limit);
123123

124+
/** Whether the pushed filter still contains non-partition predicates. */
125+
boolean hasNonPartitionFilter();
126+
124127
/** Get splits plan from snapshot. */
125128
Plan read();
126129

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
102102
@Nullable private final DVMetaCache dvMetaCache;
103103

104104
private ScanMode scanMode = ScanMode.ALL;
105+
private boolean hasNonPartitionFilter;
105106
private RecordComparator lazyPartitionComparator;
106107
private CacheMetrics dvMetaCacheMetrics;
107108

@@ -239,6 +240,7 @@ public SnapshotReader withFilter(Predicate predicate) {
239240
scan.withPartitionFilter(pair.getLeft().get());
240241
}
241242
if (!pair.getRight().isEmpty()) {
243+
this.hasNonPartitionFilter = true;
242244
nonPartitionFilterConsumer.accept(scan, PredicateBuilder.and(pair.getRight()));
243245
}
244246
scan.withCompleteFilter(predicate);
@@ -338,6 +340,11 @@ public SnapshotReader withLimit(int limit) {
338340
return this;
339341
}
340342

343+
@Override
344+
public boolean hasNonPartitionFilter() {
345+
return hasNonPartitionFilter;
346+
}
347+
341348
@Override
342349
public SnapshotReader dropStats() {
343350
scan.dropStats();

paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,11 @@ public SnapshotReader withLimit(int limit) {
477477
return this;
478478
}
479479

480+
@Override
481+
public boolean hasNonPartitionFilter() {
482+
return wrapped.hasNonPartitionFilter();
483+
}
484+
480485
@Override
481486
public Plan read() {
482487
return wrapped.read();

paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.paimon.fs.local.LocalFileIO;
2929
import org.apache.paimon.manifest.ManifestEntry;
3030
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
31+
import org.apache.paimon.predicate.Predicate;
3132
import org.apache.paimon.predicate.PredicateBuilder;
3233
import org.apache.paimon.schema.Schema;
3334
import org.apache.paimon.schema.SchemaManager;
@@ -331,20 +332,28 @@ public void testLimitPushdownWithValueFilter() throws Exception {
331332

332333
@Test
333334
public void testLimitPushdownWithKeyFilter() throws Exception {
334-
// Write data with different shop IDs
335335
List<KeyValue> data = generateData(200);
336336
Snapshot snapshot = writeData(data);
337337

338-
// With keyFilter, limit pushdown should still work (keyFilter doesn't affect limit
339-
// pushdown)
338+
Predicate keyPredicate =
339+
new PredicateBuilder(RowType.of(new IntType(false)))
340+
.equal(0, data.get(0).key().getInt(0));
341+
342+
// baseline: keyFilter without limit
343+
KeyValueFileStoreScan scanFilterOnly = store.newScan();
344+
scanFilterOnly.withSnapshot(snapshot.id());
345+
scanFilterOnly.withKeyFilter(keyPredicate);
346+
int filteredFiles = scanFilterOnly.plan().files().size();
347+
assertThat(filteredFiles).isGreaterThan(0);
348+
349+
// keyFilter + limit: early-stop by rowCount() is unsafe, should be disabled
340350
KeyValueFileStoreScan scan = store.newScan();
341351
scan.withSnapshot(snapshot.id());
342-
scan.withKeyFilter(
343-
new PredicateBuilder(RowType.of(new IntType(false)))
344-
.equal(0, data.get(0).key().getInt(0)));
352+
scan.withKeyFilter(keyPredicate);
345353
scan.withLimit(5);
346-
List<ManifestEntry> files = scan.plan().files();
347-
assertThat(files.size()).isGreaterThan(0);
354+
355+
assertThat(scan.limitPushdownEnabled()).isFalse();
356+
assertThat(scan.plan().files().size()).isEqualTo(filteredFiles);
348357
}
349358

350359
@Test
@@ -569,6 +578,48 @@ private Map<BinaryRow, BinaryRow> getActualKvMap(FileStoreScan scan, Long expect
569578
return store.toKvMap(actualKvs);
570579
}
571580

581+
@Test
582+
void testLimitPushdownWithFilter() throws Exception {
583+
int numFiles = 10;
584+
int rowsPerFile = 100;
585+
586+
Snapshot snapshot = null;
587+
for (int bucket = 0; bucket < numFiles; bucket++) {
588+
List<KeyValue> data = new ArrayList<>();
589+
for (int i = 0; i < rowsPerFile; i++) {
590+
data.add(gen.nextInsert("", 0, (long) i, null, null));
591+
}
592+
snapshot = writeData(data, bucket);
593+
}
594+
595+
KeyValueFileStoreScan scanAll = store.newScan();
596+
scanAll.withSnapshot(snapshot.id());
597+
List<ManifestEntry> allFiles = scanAll.plan().files();
598+
assertThat(allFiles.size()).isEqualTo(numFiles);
599+
600+
KeyValueFileStoreScan scanFilterOnly = store.newScan();
601+
scanFilterOnly.withSnapshot(snapshot.id());
602+
scanFilterOnly.withValueFilter(
603+
new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE).equal(4, 50L));
604+
List<ManifestEntry> filteredFiles = scanFilterOnly.plan().files();
605+
assertThat(filteredFiles.size()).isEqualTo(numFiles); // no file eliminated by stats
606+
607+
KeyValueFileStoreScan scanWithLimit = store.newScan();
608+
scanWithLimit.withSnapshot(snapshot.id());
609+
scanWithLimit.withValueFilter(
610+
new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE).equal(4, 50L));
611+
scanWithLimit.withLimit(5);
612+
613+
assertThat(scanWithLimit.limitPushdownEnabled()).isFalse();
614+
615+
List<ManifestEntry> limitedFiles = scanWithLimit.plan().files();
616+
617+
assertThat(limitedFiles.size())
618+
.as(
619+
"When filter is present, limit pushdown should be disabled, returning all files")
620+
.isEqualTo(numFiles);
621+
}
622+
572623
private List<KeyValue> generateData(int numRecords) {
573624
List<KeyValue> data = new ArrayList<>();
574625
for (int i = 0; i < numRecords; i++) {

paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,90 @@ public void testLimitPushdownWithFilter() throws Exception {
146146
table.newScan().withFilter(filter).withLimit(10).plan();
147147
int splitsWithFilterAndLimit = planWithFilterAndLimit.splits().size();
148148

149-
// Should read exactly 10 files (from index 25 to 34) to get 10 rows
150-
assertThat(splitsWithFilterAndLimit).isLessThanOrEqualTo(10);
151-
assertThat(splitsWithFilterAndLimit).isGreaterThan(0);
152-
assertThat(splitsWithFilterAndLimit).isLessThan(totalSplits);
149+
// With non-partition filter, limit pushdown should be disabled to avoid
150+
// returning insufficient rows. All 25 matching files should be returned.
151+
assertThat(splitsWithFilterAndLimit).isEqualTo(25);
152+
153+
write.close();
154+
commit.close();
155+
}
156+
157+
@Test
158+
void testLimitPushdownWithNonPartitionFilter() throws Exception {
159+
createAppendOnlyTable();
160+
161+
StreamTableWrite write = table.newWrite(commitUser);
162+
StreamTableCommit commit = table.newCommit(commitUser);
163+
164+
int filesCount = 10;
165+
int rowsPerFile = 100;
166+
int filterValue = 50;
167+
168+
for (int fileIdx = 0; fileIdx < filesCount; fileIdx++) {
169+
for (int i = 0; i < rowsPerFile; i++) {
170+
write.write(rowData(fileIdx, i, (long) (fileIdx * rowsPerFile + i)));
171+
}
172+
commit.commit(fileIdx, write.prepareCommit(true, fileIdx));
173+
}
174+
175+
TableScan.Plan planAll = table.newScan().plan();
176+
assertThat(planAll.splits().size()).isEqualTo(filesCount);
177+
178+
Predicate filter =
179+
new PredicateBuilder(table.schema().logicalRowType()).equal(1, filterValue);
180+
TableScan.Plan planFilterOnly = table.newScan().withFilter(filter).plan();
181+
assertThat(planFilterOnly.splits().size()).isEqualTo(filesCount);
182+
183+
List<String> allRows = getResult(table.newRead(), planFilterOnly.splits());
184+
long totalMatchingRows =
185+
allRows.stream().filter(r -> r.contains("|" + filterValue + "|")).count();
186+
assertThat(totalMatchingRows).isEqualTo(filesCount);
187+
188+
int limit = 5;
189+
TableScan.Plan planWithFilterAndLimit =
190+
table.newScan().withFilter(filter).withLimit(limit).plan();
191+
192+
List<String> limitedAllRows = getResult(table.newRead(), planWithFilterAndLimit.splits());
193+
long limitedMatchingRows =
194+
limitedAllRows.stream().filter(r -> r.contains("|" + filterValue + "|")).count();
195+
196+
assertThat(limitedMatchingRows)
197+
.as(
198+
"Filter+limit bug: scan returned %d splits, but only %d rows match "
199+
+ "filter (expected >= %d). Total matching = %d",
200+
planWithFilterAndLimit.splits().size(),
201+
limitedMatchingRows,
202+
limit,
203+
totalMatchingRows)
204+
.isGreaterThanOrEqualTo(limit);
205+
206+
write.close();
207+
commit.close();
208+
}
209+
210+
@Test
211+
void testLimitPushdownWithPartitionFilter() throws Exception {
212+
createAppendOnlyTable();
213+
214+
StreamTableWrite write = table.newWrite(commitUser);
215+
StreamTableCommit commit = table.newCommit(commitUser);
216+
217+
for (int i = 0; i < 10; i++) {
218+
write.write(rowData(i, i, (long) i * 100));
219+
commit.commit(i, write.prepareCommit(true, i));
220+
}
221+
222+
Predicate partitionFilter =
223+
new PredicateBuilder(table.schema().logicalRowType()).lessOrEqual(0, 4);
224+
225+
TableScan.Plan planNoLimit = table.newScan().withFilter(partitionFilter).plan();
226+
assertThat(planNoLimit.splits().size()).isEqualTo(5);
227+
228+
TableScan.Plan plan = table.newScan().withFilter(partitionFilter).withLimit(2).plan();
229+
230+
assertThat(plan.splits().size())
231+
.as("Partition filter + limit: limit pushdown should not be disabled")
232+
.isEqualTo(2);
153233

154234
write.close();
155235
commit.close();

0 commit comments

Comments
 (0)