@@ -43,6 +43,11 @@ pub(crate) struct InlineAggregateStream {
4343
4444 batch_size : usize ,
4545
46+ /// Per-partition limit on the number of emitted groups, see [`InlineAggregateExec::limit`]
47+ limit : Option < usize > ,
48+
49+ groups_emitted : usize ,
50+
4651 exec_state : ExecutionState ,
4752
4853 input_done : bool ,
@@ -100,6 +105,8 @@ impl InlineAggregateStream {
100105 group_by : agg_group_by,
101106 exec_state,
102107 batch_size,
108+ limit : agg. limit ( ) ,
109+ groups_emitted : 0 ,
103110 current_group_indices,
104111 group_values,
105112 input_done : false ,
@@ -175,38 +182,45 @@ impl Stream for InlineAggregateStream {
175182 loop {
176183 match & self . exec_state {
177184 ExecutionState :: ReadingInput => {
185+ // All needed groups are emitted, skip the rest of the input
186+ if self . limit_reached ( ) {
187+ self . exec_state = ExecutionState :: Done ;
188+ continue ;
189+ }
190+
191+ // Drain groups accumulated beyond the emit threshold (a single input batch
192+ // can bring many) before reading further input
193+ match self . emit_early_if_ready ( ) {
194+ Ok ( Some ( batch) ) => {
195+ self . exec_state = ExecutionState :: ProducingOutput ( batch) ;
196+ continue ;
197+ }
198+ Ok ( None ) => {
199+ // Not enough groups, read further
200+ }
201+ Err ( e) => {
202+ return Poll :: Ready ( Some ( Err ( e) ) ) ;
203+ }
204+ }
205+
178206 match ready ! ( self . input. poll_next_unpin( cx) ) {
179- // New input batch to aggregate
207+ // New input batch to aggregate; emitting happens at the top of the loop
180208 Some ( Ok ( batch) ) => {
181- // Aggregate the batch
182209 if let Err ( e) = self . group_aggregate_batch ( batch) {
183210 return Poll :: Ready ( Some ( Err ( e) ) ) ;
184211 }
185-
186- // Try to emit a batch if we have enough groups
187- match self . emit_early_if_ready ( ) {
188- Ok ( Some ( batch) ) => {
189- self . exec_state = ExecutionState :: ProducingOutput ( batch) ;
190- }
191- Ok ( None ) => {
192- // Not enough groups yet, continue reading
193- }
194- Err ( e) => {
195- return Poll :: Ready ( Some ( Err ( e) ) ) ;
196- }
197- }
198212 }
199213
200214 // Error from input stream
201215 Some ( Err ( e) ) => {
202216 return Poll :: Ready ( Some ( Err ( e) ) ) ;
203217 }
204218
205- // Input stream exhausted - emit all remaining groups
219+ // Input stream exhausted - emit the remaining groups, up to the limit
206220 None => {
207221 self . input_done = true ;
208222
209- match self . emit ( EmitTo :: All ) {
223+ match self . emit_remaining ( ) {
210224 Ok ( Some ( batch) ) => {
211225 self . exec_state = ExecutionState :: ProducingOutput ( batch) ;
212226 }
@@ -244,9 +258,6 @@ impl Stream for InlineAggregateStream {
244258}
245259
246260impl InlineAggregateStream {
247- /// Emit groups based on EmitTo strategy.
248- ///
249- /// Returns None if there are no groups to emit.
250261 /// Emit groups based on EmitTo strategy.
251262 ///
252263 /// Returns None if there are no groups to emit.
@@ -283,25 +294,61 @@ impl InlineAggregateStream {
283294 Ok ( Some ( batch) )
284295 }
285296
286- /// Check if we have enough groups to emit a batch, keeping the last (potentially incomplete) group.
287- ///
288- /// For sorted aggregation, we emit batches of size batch_size when we have accumulated
289- /// more than batch_size groups. We always keep the last group as it may continue in the next input batch.
290- fn should_emit_early ( & self ) -> bool {
291- // Need at least (batch_size + 1) groups to emit batch_size and keep 1
292- self . group_values . len ( ) > self . batch_size
297+ fn limit_reached ( & self ) -> bool {
298+ self . limit . is_some_and ( |limit| self . groups_emitted >= limit)
299+ }
300+
301+ /// How many groups to emit in the next early batch: full batches until the limit (if any)
302+ /// leaves fewer groups to emit.
303+ fn emit_early_threshold ( & self ) -> usize {
304+ match self . limit {
305+ Some ( limit) => self
306+ . batch_size
307+ . min ( limit. saturating_sub ( self . groups_emitted ) ) ,
308+ None => self . batch_size ,
309+ }
293310 }
294311
295312 /// Emit a batch of groups if we have enough accumulated, keeping the last group.
296313 ///
314+ /// For sorted aggregation, we emit when we have accumulated more than threshold groups: the
315+ /// last group is always kept as it may continue in the next input batch, so only closed
316+ /// groups are emitted.
317+ ///
297318 /// Returns Some(batch) if emitted, None otherwise.
298319 fn emit_early_if_ready ( & mut self ) -> DFResult < Option < RecordBatch > > {
299- if !self . should_emit_early ( ) {
320+ let threshold = self . emit_early_threshold ( ) ;
321+ // Need at least (threshold + 1) groups to emit threshold closed groups and keep 1.
322+ // The threshold == 0 check is defensive: the poll loop checks limit_reached() before
323+ // calling this, so the threshold is at least 1 there.
324+ if threshold == 0 || self . group_values . len ( ) <= threshold {
300325 return Ok ( None ) ;
301326 }
302327
303- // Emit exactly batch_size groups, keeping the rest (including last incomplete group)
304- self . emit ( EmitTo :: First ( self . batch_size ) )
328+ let batch = self . emit ( EmitTo :: First ( threshold) ) ?;
329+ self . groups_emitted += threshold;
330+ Ok ( batch)
331+ }
332+
333+ /// Emit the groups left at the end of the input: all of them are closed at this point, but
334+ /// no more than the limit allows.
335+ fn emit_remaining ( & mut self ) -> DFResult < Option < RecordBatch > > {
336+ let len = self . group_values . len ( ) ;
337+ let emit_count = match self . limit {
338+ Some ( limit) => len. min ( limit. saturating_sub ( self . groups_emitted ) ) ,
339+ None => len,
340+ } ;
341+ if emit_count == 0 {
342+ return Ok ( None ) ;
343+ }
344+ let emit_to = if emit_count < len {
345+ EmitTo :: First ( emit_count)
346+ } else {
347+ EmitTo :: All
348+ } ;
349+ let batch = self . emit ( emit_to) ?;
350+ self . groups_emitted += emit_count;
351+ Ok ( batch)
305352 }
306353
307354 fn group_aggregate_batch ( & mut self , batch : RecordBatch ) -> DFResult < ( ) > {
0 commit comments