@@ -118,6 +118,10 @@ pub(super) struct ParquetOpener {
118118 pub max_predicate_cache_size : Option < usize > ,
119119 /// Whether to read row groups in reverse order
120120 pub reverse_row_groups : bool ,
121+ /// When `true`, row ordering must be preserved — `prune_by_limit` must not
122+ /// discard partially-matched row groups because they may contain rows that
123+ /// sort before fully-matched groups.
124+ pub preserve_order : bool ,
121125}
122126
123127/// Represents a prepared access plan with optional row selection
@@ -262,6 +266,7 @@ impl FileOpener for ParquetOpener {
262266 let enable_bloom_filter = self . enable_bloom_filter ;
263267 let enable_row_group_stats_pruning = self . enable_row_group_stats_pruning ;
264268 let limit = self . limit ;
269+ let preserve_order = self . preserve_order ;
265270
266271 let predicate_creation_errors = MetricBuilder :: new ( & self . metrics )
267272 . global_counter ( "num_predicate_creation_errors" ) ;
@@ -523,8 +528,11 @@ impl FileOpener for ParquetOpener {
523528 . add_matched ( n_remaining_row_groups) ;
524529 }
525530
526- // Prune by limit
527- if let Some ( limit) = limit {
531+ // Prune by limit: only safe when order does not matter.
532+ // With preserve_order=true, partially-matched row groups may
533+ // contain rows that sort before fully-matched groups, so
534+ // discarding them would return incorrect results.
535+ if let ( Some ( limit) , false ) = ( limit, preserve_order) {
528536 row_groups. prune_by_limit ( limit, rg_metadata, & file_metrics) ;
529537 }
530538
@@ -1076,6 +1084,7 @@ mod test {
10761084 coerce_int96 : Option < arrow:: datatypes:: TimeUnit > ,
10771085 max_predicate_cache_size : Option < usize > ,
10781086 reverse_row_groups : bool ,
1087+ preserve_order : bool ,
10791088 }
10801089
10811090 impl ParquetOpenerBuilder {
@@ -1101,6 +1110,7 @@ mod test {
11011110 coerce_int96 : None ,
11021111 max_predicate_cache_size : None ,
11031112 reverse_row_groups : false ,
1113+ preserve_order : false ,
11041114 }
11051115 }
11061116
@@ -1158,6 +1168,18 @@ mod test {
11581168 self
11591169 }
11601170
1171+ /// Set preserve_order flag. When true, prune_by_limit is disabled.
1172+ fn with_preserve_order ( mut self , enable : bool ) -> Self {
1173+ self . preserve_order = enable;
1174+ self
1175+ }
1176+
1177+ /// Set the limit.
1178+ fn with_limit ( mut self , limit : Option < usize > ) -> Self {
1179+ self . limit = limit;
1180+ self
1181+ }
1182+
11611183 /// Build the ParquetOpener instance.
11621184 ///
11631185 /// # Panics
@@ -1208,6 +1230,7 @@ mod test {
12081230 encryption_factory : None ,
12091231 max_predicate_cache_size : self . max_predicate_cache_size ,
12101232 reverse_row_groups : self . reverse_row_groups ,
1233+ preserve_order : self . preserve_order ,
12111234 }
12121235 }
12131236 }
@@ -2196,5 +2219,127 @@ mod test {
21962219 "Output field should be nullable"
21972220 ) ;
21982221 }
2222+
2223+ /// Regression test: `prune_by_limit` must be disabled when
2224+ /// `preserve_order = true` to avoid skipping partially-matched
2225+ /// row groups that contain rows sorting before fully-matched groups.
2226+ ///
2227+ /// Layout (3 rows/RG, sorted `[a ASC]`):
2228+ /// RG0: a=[1,2,3] — partially matched for `a > 2`
2229+ /// RG1: a=[4,5,6] — fully matched for `a > 2`
2230+ ///
2231+ /// `WHERE a > 2 LIMIT 1` should return `a=3` from RG0.
2232+ /// Without preserve_order, prune_by_limit discards RG0 and
2233+ /// returns `a=4` from RG1.
2234+ #[ tokio:: test]
2235+ async fn test_preserve_order_prevents_limit_pruning ( ) {
2236+ use arrow:: array:: Int32Array ;
2237+ use datafusion_common:: ScalarValue ;
2238+ use datafusion_expr:: Operator ;
2239+ use datafusion_physical_expr:: expressions:: {
2240+ BinaryExpr , Column , Literal ,
2241+ } ;
2242+ use object_store:: memory:: InMemory ;
2243+ use parquet:: arrow:: ArrowWriter ;
2244+ use parquet:: file:: properties:: WriterProperties ;
2245+
2246+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
2247+ "a" ,
2248+ DataType :: Int32 ,
2249+ false ,
2250+ ) ] ) ) ;
2251+
2252+ // Write parquet with 2 RGs of 3 rows each into memory buffer
2253+ let mut buf = Vec :: new ( ) ;
2254+ let props = WriterProperties :: builder ( )
2255+ . set_max_row_group_size ( 3 )
2256+ . build ( ) ;
2257+ {
2258+ let mut writer =
2259+ ArrowWriter :: try_new ( & mut buf, schema. clone ( ) , Some ( props) ) . unwrap ( ) ;
2260+ // RG0: partially matched for a > 2
2261+ writer
2262+ . write ( & RecordBatch :: try_new (
2263+ schema. clone ( ) ,
2264+ vec ! [ Arc :: new( Int32Array :: from( vec![ 1 , 2 , 3 ] ) ) ] ,
2265+ )
2266+ . unwrap ( ) )
2267+ . unwrap ( ) ;
2268+ // RG1: fully matched for a > 2
2269+ writer
2270+ . write ( & RecordBatch :: try_new (
2271+ schema. clone ( ) ,
2272+ vec ! [ Arc :: new( Int32Array :: from( vec![ 4 , 5 , 6 ] ) ) ] ,
2273+ )
2274+ . unwrap ( ) )
2275+ . unwrap ( ) ;
2276+ writer. close ( ) . unwrap ( ) ;
2277+ }
2278+
2279+ let file_size = buf. len ( ) as u64 ;
2280+ let store = Arc :: new ( InMemory :: new ( ) ) ;
2281+ let path = object_store:: path:: Path :: from ( "test.parquet" ) ;
2282+ store
2283+ . put ( & path, buf. into ( ) )
2284+ . await
2285+ . unwrap ( ) ;
2286+
2287+ // Predicate: a > 2
2288+ let predicate: Arc < dyn PhysicalExpr > = Arc :: new (
2289+ BinaryExpr :: new (
2290+ Arc :: new (
2291+ Column :: new ( "a" , 0 ) ,
2292+ ) ,
2293+ Operator :: Gt ,
2294+ Arc :: new (
2295+ Literal :: new (
2296+ ScalarValue :: Int32 ( Some ( 2 ) ) ,
2297+ ) ,
2298+ ) ,
2299+ ) ,
2300+ ) ;
2301+
2302+ // Run query with preserve_order=false: prune_by_limit may skip RG0
2303+ let opener_unordered = ParquetOpenerBuilder :: new ( )
2304+ . with_store ( store. clone ( ) )
2305+ . with_schema ( schema. clone ( ) )
2306+ . with_limit ( Some ( 1 ) )
2307+ . with_pushdown_filters ( true )
2308+ . with_predicate ( predicate. clone ( ) )
2309+ . with_row_group_stats_pruning ( true )
2310+ . with_preserve_order ( false )
2311+ . build ( ) ;
2312+
2313+ let file = PartitionedFile :: new ( path. clone ( ) , file_size) ;
2314+ let mut stream = opener_unordered. open ( file) . unwrap ( ) . await . unwrap ( ) ;
2315+ let batch = stream. next ( ) . await . unwrap ( ) . unwrap ( ) ;
2316+ let col = batch. column ( 0 ) . as_any ( ) . downcast_ref :: < Int32Array > ( ) . unwrap ( ) ;
2317+ assert_eq ! (
2318+ col. value( 0 ) ,
2319+ 4 ,
2320+ "Without preserve_order, prune_by_limit skips partially-matched RG0"
2321+ ) ;
2322+
2323+ // Run query with preserve_order=true: RG0 must be kept
2324+ let opener_ordered = ParquetOpenerBuilder :: new ( )
2325+ . with_store ( store)
2326+ . with_schema ( schema)
2327+ . with_limit ( Some ( 1 ) )
2328+ . with_pushdown_filters ( true )
2329+ . with_predicate ( predicate)
2330+ . with_row_group_stats_pruning ( true )
2331+ . with_preserve_order ( true )
2332+ . build ( ) ;
2333+
2334+ let file = PartitionedFile :: new ( path, file_size) ;
2335+ let mut stream = opener_ordered. open ( file) . unwrap ( ) . await . unwrap ( ) ;
2336+ let batch = stream. next ( ) . await . unwrap ( ) . unwrap ( ) ;
2337+ let col = batch. column ( 0 ) . as_any ( ) . downcast_ref :: < Int32Array > ( ) . unwrap ( ) ;
2338+ assert_eq ! (
2339+ col. value( 0 ) ,
2340+ 3 ,
2341+ "With preserve_order, partially-matched RG0 is scanned first"
2342+ ) ;
2343+ }
21992344 }
22002345}
0 commit comments