Skip to content

Commit 67e72af

Browse files
committed
Fix: exact reverse works correctly with pushdown_filters + RowFilter
When pushdown_filters is enabled, RowFilter may reduce actual rows per row group below what rg_row_counts predicts. ReversedRowGroupStream handles this correctly: delayed RG boundary detection means multiple RGs may buffer together, but all remaining batches are flushed and reversed when the stream ends. Memory cost is O(all data) instead of O(largest RG), acceptable for LIMIT queries. Added SLT test verifying exact reverse with pushdown_filters=true produces correct DESC results with and without LIMIT.
1 parent 549772b commit 67e72af

2 files changed

Lines changed: 59 additions & 0 deletions

File tree

datafusion/datasource-parquet/src/source.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,14 @@ impl FileSource for ParquetSource {
856856
// Exact: reverse both row groups and rows within each batch,
857857
// giving globally sorted output. This allows the Sort operator
858858
// to be removed entirely and fetch to be pushed down to the scan.
859+
//
860+
// Note: when pushdown_filters is enabled, RowFilter may reduce
861+
// actual rows below what rg_row_counts predicts. This causes
862+
// ReversedRowGroupStream's RG boundary detection to delay
863+
// (multiple RGs may buffer together), but correctness is preserved
864+
// because all buffered batches are flushed and reversed when the
865+
// stream ends. Memory cost becomes O(all data) instead of
866+
// O(largest RG), which is acceptable for LIMIT queries.
859867
let mut source = self.clone().with_reverse_row_groups(true);
860868
source.reverse_rows = true;
861869
Arc::new(source)

datafusion/sqllogictest/test_files/sort_pushdown.slt

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1641,6 +1641,57 @@ physical_plan
16411641
01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false]
16421642
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_data.parquet]]}, projection=[id, value, name], file_type=parquet, predicate=DynamicFilter [ empty ], reverse_row_groups=true
16431643

1644+
# Test 12.9: Exact reverse works correctly with pushdown_filters enabled.
1645+
# RowFilter may reduce actual rows below rg_row_counts, but
1646+
# ReversedRowGroupStream handles this by flushing all remaining
1647+
# buffered batches when the stream ends.
1648+
1649+
statement ok
1650+
SET datafusion.execution.parquet.enable_exact_reverse_scan = true;
1651+
1652+
statement ok
1653+
SET datafusion.execution.parquet.pushdown_filters = true;
1654+
1655+
statement ok
1656+
CREATE EXTERNAL TABLE exact_rev_pushdown_parquet(id INT, value INT, name VARCHAR)
1657+
STORED AS PARQUET
1658+
LOCATION 'test_files/scratch/sort_pushdown/exact_rev_data.parquet'
1659+
WITH ORDER (id ASC);
1660+
1661+
# Sort removed (Exact), scan_direction=Reversed, with predicate pushed down
1662+
query TT
1663+
EXPLAIN SELECT * FROM exact_rev_pushdown_parquet WHERE value > 500 ORDER BY id DESC LIMIT 3;
1664+
----
1665+
logical_plan
1666+
01)Sort: exact_rev_pushdown_parquet.id DESC NULLS FIRST, fetch=3
1667+
02)--Filter: exact_rev_pushdown_parquet.value > Int32(500)
1668+
03)----TableScan: exact_rev_pushdown_parquet projection=[id, value, name], partial_filters=[exact_rev_pushdown_parquet.value > Int32(500)]
1669+
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_data.parquet]]}, projection=[id, value, name], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, predicate=value@1 > 500, scan_direction=Reversed, pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 500, required_guarantees=[]
1670+
1671+
# Results correct in DESC order with filter
1672+
query IIT
1673+
SELECT * FROM exact_rev_pushdown_parquet WHERE value > 500 ORDER BY id DESC LIMIT 3;
1674+
----
1675+
10 1000 j
1676+
9 900 i
1677+
8 800 h
1678+
1679+
# Without LIMIT — all filtered rows in DESC order
1680+
query IIT
1681+
SELECT * FROM exact_rev_pushdown_parquet WHERE value > 500 ORDER BY id DESC;
1682+
----
1683+
10 1000 j
1684+
9 900 i
1685+
8 800 h
1686+
7 700 g
1687+
6 600 f
1688+
1689+
statement ok
1690+
DROP TABLE exact_rev_pushdown_parquet;
1691+
1692+
statement ok
1693+
SET datafusion.execution.parquet.pushdown_filters = false;
1694+
16441695
# Cleanup exact reverse test tables
16451696
statement ok
16461697
DROP TABLE exact_rev_data;

0 commit comments

Comments
 (0)