@@ -10,6 +10,7 @@ use std::task::Poll;
1010
1111use futures:: Stream ;
1212use pin_project_lite:: pin_project;
13+ use vortex_buffer:: Alignment ;
1314use vortex_error:: VortexExpect ;
1415use vortex_io:: CoalesceConfig ;
1516use vortex_metrics:: Counter ;
@@ -47,6 +48,7 @@ impl<S> IoRequestStream<S> {
4748 pub ( crate ) fn new (
4849 events : S ,
4950 coalesce_window : Option < CoalesceConfig > ,
51+ coalesced_buffer_alignment : Alignment ,
5052 metrics : VortexMetrics ,
5153 ) -> Self
5254 where
@@ -56,7 +58,7 @@ impl<S> IoRequestStream<S> {
5658 events,
5759 inner_done : false ,
5860 coalesce_window,
59- state : State :: new ( metrics) ,
61+ state : State :: new ( metrics, coalesced_buffer_alignment ) ,
6062 }
6163 }
6264}
@@ -117,6 +119,7 @@ struct State {
117119
118120 // Metrics for tracking I/O request patterns
119121 metrics : StateMetrics ,
122+ coalesced_buffer_alignment : Alignment ,
120123}
121124
122125struct StateMetrics {
@@ -136,12 +139,13 @@ impl StateMetrics {
136139}
137140
138141impl State {
139- fn new ( metrics : VortexMetrics ) -> Self {
142+ fn new ( metrics : VortexMetrics , coalesced_buffer_alignment : Alignment ) -> Self {
140143 Self {
141144 requests : BTreeMap :: new ( ) ,
142145 polled_requests : BTreeMap :: new ( ) ,
143146 requests_by_offset : BTreeSet :: new ( ) ,
144147 metrics : StateMetrics :: new ( metrics) ,
148+ coalesced_buffer_alignment,
145149 }
146150 }
147151
@@ -206,14 +210,23 @@ impl State {
206210 None
207211 }
208212
213+ /// Coalesce nearby requests into a single range while aligning the range start down to the
214+ /// global maximum segment alignment.
215+ ///
216+ /// Example (file offsets):
217+ /// [x, x, x, x, x, x, A, A, A, A, A, x, B]
218+ /// A aligned to 2, B aligned to 4
219+ /// Coalesced range starts at 4, so the buffer is:
220+ /// [x, x, A, A, A, A, A, x, B]
221+ /// A stays 2-aligned, B stays 4-aligned
209222 fn next_coalesced ( & mut self , window : & CoalesceConfig ) -> Option < CoalescedRequest > {
210223 // Find the next valid request in priority order
211224 let first_req = self . next_uncoalesced ( ) ?;
212225
213226 let mut requests = vec ! [ first_req] ;
214227 let mut current_start = requests[ 0 ] . offset ;
215228 let mut current_end = requests[ 0 ] . offset + requests[ 0 ] . length as u64 ;
216- let alignment = requests [ 0 ] . alignment ;
229+ let align = * self . coalesced_buffer_alignment as u64 ;
217230
218231 let mut keys_to_remove = Vec :: new ( ) ;
219232 let mut found_new_requests = true ;
@@ -256,25 +269,25 @@ impl State {
256269 // Calculate what the new range would be if we include this request
257270 let new_start = current_start. min ( req_offset) ;
258271 let new_end = current_end. max ( req_end) ;
259- let new_total_size = new_end - new_start;
260-
261- // Check if the coalesced request would exceed max_size
262- if new_total_size <= window. max_size {
263- current_start = new_start;
264- current_end = new_end;
265-
266- let req = self
267- . polled_requests
268- . remove ( & req_id)
269- . or_else ( || self . requests . remove ( & req_id) )
270- . vortex_expect ( "Missing request in requests_by_offset" ) ;
271-
272- requests. push ( req) ;
273- keys_to_remove. push ( ( req_offset, req_id) ) ;
274- found_new_requests = true ;
272+ let aligned_start = new_start - ( new_start % align) ;
273+ let new_total_size = new_end - aligned_start;
274+
275+ if new_total_size > window. max_size {
276+ // Skip it but keep it available for future coalescing operations.
277+ continue ;
275278 }
276- // If adding this request would exceed max_size, we skip it but don't remove it
277- // so it can be considered for future coalescing operations
279+
280+ current_start = new_start;
281+ current_end = new_end;
282+ let req = self
283+ . polled_requests
284+ . remove ( & req_id)
285+ . or_else ( || self . requests . remove ( & req_id) )
286+ . vortex_expect ( "Missing request in requests_by_offset" ) ;
287+
288+ requests. push ( req) ;
289+ keys_to_remove. push ( ( req_offset, req_id) ) ;
290+ found_new_requests = true ;
278291 }
279292 }
280293 }
@@ -290,17 +303,19 @@ impl State {
290303 // Sort requests by offset for correct slicing in resolve
291304 requests. sort_unstable_by_key ( |r| r. offset ) ;
292305
306+ let aligned_start = current_start - ( current_start % align) ;
307+
293308 tracing:: debug!(
294309 "Coalesced {} requests into range {}..{} (len={})" ,
295310 requests. len( ) ,
296- current_start ,
311+ aligned_start ,
297312 current_end,
298- current_end - current_start ,
313+ current_end - aligned_start ,
299314 ) ;
300315
301316 Some ( CoalescedRequest {
302- range : current_start ..current_end,
303- alignment,
317+ range : aligned_start ..current_end,
318+ alignment : self . coalesced_buffer_alignment ,
304319 requests,
305320 } )
306321 }
@@ -338,10 +353,23 @@ mod tests {
338353 async fn collect_outputs (
339354 events : Vec < ReadEvent > ,
340355 coalesce_window : Option < CoalesceConfig > ,
356+ ) -> Vec < IoRequest > {
357+ collect_outputs_with_alignment ( events, coalesce_window, Alignment :: none ( ) ) . await
358+ }
359+
360+ async fn collect_outputs_with_alignment (
361+ events : Vec < ReadEvent > ,
362+ coalesce_window : Option < CoalesceConfig > ,
363+ coalesced_buffer_alignment : Alignment ,
341364 ) -> Vec < IoRequest > {
342365 let event_stream = stream:: iter ( events) ;
343366 let metrics = VortexMetrics :: default ( ) ;
344- let io_stream = IoRequestStream :: new ( event_stream, coalesce_window, metrics) ;
367+ let io_stream = IoRequestStream :: new (
368+ event_stream,
369+ coalesce_window,
370+ coalesced_buffer_alignment,
371+ metrics,
372+ ) ;
345373 io_stream. collect ( ) . await
346374 }
347375
@@ -442,6 +470,57 @@ mod tests {
442470 }
443471 }
444472
473+ #[ tokio:: test]
474+ async fn test_coalesce_alignment_adjustment ( ) {
475+ let ( tx1, _rx1) = oneshot:: channel ( ) ;
476+ let ( tx2, _rx2) = oneshot:: channel ( ) ;
477+
478+ let req1 = ReadRequest {
479+ id : 1 ,
480+ offset : 6 ,
481+ length : 5 ,
482+ alignment : Alignment :: new ( 2 ) ,
483+ callback : tx1,
484+ } ;
485+ let req2 = ReadRequest {
486+ id : 2 ,
487+ offset : 12 ,
488+ length : 1 ,
489+ alignment : Alignment :: new ( 4 ) ,
490+ callback : tx2,
491+ } ;
492+
493+ let events = vec ! [
494+ ReadEvent :: Request ( req1) ,
495+ ReadEvent :: Request ( req2) ,
496+ ReadEvent :: Polled ( 1 ) ,
497+ ReadEvent :: Polled ( 2 ) ,
498+ ] ;
499+
500+ let outputs = collect_outputs_with_alignment (
501+ events,
502+ Some ( CoalesceConfig {
503+ distance : 1 ,
504+ max_size : 1024 ,
505+ } ) ,
506+ Alignment :: new ( 4 ) ,
507+ )
508+ . await ;
509+
510+ assert_eq ! ( outputs. len( ) , 1 ) ;
511+ match outputs[ 0 ] . inner ( ) {
512+ IoRequestInner :: Coalesced ( coalesced) => {
513+ assert_eq ! ( coalesced. range. start, 4 ) ;
514+ assert_eq ! ( coalesced. alignment, Alignment :: new( 4 ) ) ;
515+ for req in & coalesced. requests {
516+ let rel = req. offset - coalesced. range . start ;
517+ assert_eq ! ( rel % * req. alignment as u64 , 0 ) ;
518+ }
519+ }
520+ _ => panic ! ( "Expected coalesced request" ) ,
521+ }
522+ }
523+
445524 #[ tokio:: test]
446525 async fn test_dropped_requests ( ) {
447526 let ( req1, _rx1) = create_request ( 1 , 0 , 10 ) ;
@@ -609,6 +688,7 @@ mod tests {
609688 distance : 5 ,
610689 max_size : 1024 ,
611690 } ) ,
691+ Alignment :: none ( ) ,
612692 metrics. clone ( ) ,
613693 ) ;
614694
@@ -662,7 +742,8 @@ mod tests {
662742 let event_stream = stream:: iter ( events) ;
663743 let metrics = VortexMetrics :: default ( ) ;
664744 // No coalescing window - should be individual requests
665- let io_stream = IoRequestStream :: new ( event_stream, None , metrics. clone ( ) ) ;
745+ let io_stream =
746+ IoRequestStream :: new ( event_stream, None , Alignment :: none ( ) , metrics. clone ( ) ) ;
666747
667748 let outputs: Vec < IoRequest > = io_stream. collect ( ) . await ;
668749 assert_eq ! ( outputs. len( ) , 2 ) ;
0 commit comments