Skip to content

Commit e913557

Browse files
Fix: MemTable LIMIT ignored with reordered projections (#21177)
## Which issue does this PR close? - Closes #21176 ## Rationale for this change When `SELECT` projects columns in a different order than the table schema (e.g., `SELECT col_c, col_b` vs schema order `col_a, col_b, col_c`), the `LIMIT` clause is silently ignored — all rows are returned instead of the requested limit. The physical optimizer runs `LimitPushdown` (which correctly sets `fetch=N` on `MemorySourceConfig`) **before** `ProjectionPushdown`. When the projection is pushed down, `try_swapping_with_projection()` creates a new `MemorySourceConfig` via `try_new()`, which resets `fetch` to `None`, silently dropping the limit. ## What changes are included in this PR? One-line fix in `MemorySourceConfig::try_swapping_with_projection` to preserve `self.fetch` on the newly created config: ```rust // Before: .map(|s| Arc::new(s) as Arc<dyn DataSource>) // After: .map(|s| Arc::new(s.with_limit(self.fetch)) as Arc<dyn DataSource>) ``` ## Are these changes tested? Yes. - **Unit test** (`try_swapping_with_projection_preserves_fetch`): directly verifies `fetch` is preserved after projection pushdown. Confirmed it fails without the fix (`None` vs `Some(5)`). - **SQL logic test** (`limit.slt`, table `t21176`): end-to-end regression test for `SELECT col_c, col_b FROM t LIMIT 5` with reverse column order. ## Are there any user-facing changes? No API changes. This is a correctness fix — `LIMIT` now works correctly regardless of `SELECT` column order against in-memory tables.
1 parent 37c1b75 commit e913557

File tree

2 files changed

+95
-7
lines changed

2 files changed

+95
-7
lines changed

datafusion/datasource/src/memory.rs

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,22 @@ impl DataSource for MemorySourceConfig {
243243
&exprs,
244244
self.projection().as_ref().unwrap_or(&all_projections),
245245
);
246-
247-
MemorySourceConfig::try_new(
248-
self.partitions(),
249-
self.original_schema(),
250-
Some(new_projections),
251-
)
252-
.map(|s| Arc::new(s) as Arc<dyn DataSource>)
246+
let projected_schema =
247+
project_schema(&self.schema, Some(&new_projections));
248+
249+
projected_schema.map(|projected_schema| {
250+
// Clone self to preserve all metadata (fetch, sort_information,
251+
// show_sizes, etc.) then update only the projection-related fields.
252+
let mut new_source = self.clone();
253+
new_source.projection = Some(new_projections);
254+
new_source.projected_schema = projected_schema;
255+
// Project sort information to match the new projection
256+
new_source.sort_information = project_orderings(
257+
&new_source.sort_information,
258+
&new_source.projected_schema,
259+
);
260+
Arc::new(new_source) as Arc<dyn DataSource>
261+
})
253262
})
254263
.transpose()
255264
}
@@ -897,6 +906,42 @@ mod tests {
897906
Ok(())
898907
}
899908

909+
/// Test that `try_swapping_with_projection` preserves the `fetch` limit.
910+
/// Regression test for <https://github.com/apache/datafusion/issues/21176>
911+
#[test]
912+
fn try_swapping_with_projection_preserves_fetch() {
913+
use datafusion_physical_expr::projection::ProjectionExprs;
914+
915+
let schema = Arc::new(Schema::new(vec![
916+
Field::new("a", DataType::Int32, false),
917+
Field::new("b", DataType::Utf8, false),
918+
Field::new("c", DataType::Int64, false),
919+
]));
920+
let partitions: Vec<Vec<RecordBatch>> = vec![vec![batch(10)]];
921+
let source = MemorySourceConfig::try_new(&partitions, schema.clone(), None)
922+
.unwrap()
923+
.with_limit(Some(5));
924+
925+
assert_eq!(source.fetch, Some(5));
926+
927+
// Create a projection that reorders columns: [c, a] (indices 2, 0)
928+
let projection = ProjectionExprs::from_indices(&[2, 0], &schema);
929+
let swapped = source
930+
.try_swapping_with_projection(&projection)
931+
.unwrap()
932+
.unwrap();
933+
let new_source = swapped
934+
.as_any()
935+
.downcast_ref::<MemorySourceConfig>()
936+
.unwrap();
937+
938+
assert_eq!(
939+
new_source.fetch,
940+
Some(5),
941+
"fetch limit must be preserved after projection pushdown"
942+
);
943+
}
944+
900945
#[tokio::test]
901946
async fn values_empty_case() -> Result<()> {
902947
let schema = aggr_test_schema();

datafusion/sqllogictest/test_files/limit.slt

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -927,3 +927,46 @@ DROP TABLE t;
927927
# Tear down src_table table:
928928
statement ok
929929
DROP TABLE src_table;
930+
931+
# LIMIT must work when SELECT projects columns in different order than table schema
932+
933+
statement ok
934+
CREATE TABLE t21176 (col_a TEXT, col_b DOUBLE, col_c TEXT) AS VALUES
935+
('a-0', 0, 'c-0'), ('a-1', 1, 'c-1'), ('a-2', 2, 'c-2'), ('a-3', 3, 'c-3'),
936+
('a-4', 4, 'c-4'), ('a-5', 5, 'c-5'), ('a-6', 6, 'c-6'), ('a-7', 7, 'c-7'),
937+
('a-8', 8, 'c-8'), ('a-9', 9, 'c-9'), ('a-10', 10, 'c-10'), ('a-11', 11, 'c-11'),
938+
('a-12', 12, 'c-12'), ('a-13', 13, 'c-13'), ('a-14', 14, 'c-14'), ('a-15', 15, 'c-15'),
939+
('a-16', 16, 'c-16'), ('a-17', 17, 'c-17'), ('a-18', 18, 'c-18'), ('a-19', 19, 'c-19');
940+
941+
# Schema-order SELECT with LIMIT should return 5 rows
942+
query RT rowsort
943+
SELECT col_b, col_c FROM t21176 LIMIT 5;
944+
----
945+
0 c-0
946+
1 c-1
947+
2 c-2
948+
3 c-3
949+
4 c-4
950+
951+
# Reverse-order SELECT with LIMIT should also return 5 rows (not 20)
952+
query TR rowsort
953+
SELECT col_c, col_b FROM t21176 LIMIT 5;
954+
----
955+
c-0 0
956+
c-1 1
957+
c-2 2
958+
c-3 3
959+
c-4 4
960+
961+
# Single column reverse SELECT with LIMIT
962+
query T rowsort
963+
SELECT col_c FROM t21176 LIMIT 5;
964+
----
965+
c-0
966+
c-1
967+
c-2
968+
c-3
969+
c-4
970+
971+
statement ok
972+
DROP TABLE t21176;

0 commit comments

Comments
 (0)