@@ -34,13 +34,14 @@ use tracing::{Instrument as _, Span};
3434// but we need to wait until 2025 before making this change.
3535//
3636// iperf result for 4 * 1024:
37- // > 0.0000-19.4523 sec 26.6 GBytes 11 .7 Gbits/sec
37+ // > 0.0000-10.0490 sec 23.0 GBytes 19 .7 Gbits/sec
3838//
3939// iperf result for 16 * 1024:
40- // > 0.0000-13.8540 sec 33 .6 GBytes 20.8 Gbits/sec
40+ // > 0.0000-10.0393 sec 30 .6 GBytes 26.2 Gbits/sec
4141//
42- // This is an improvement of 77.7%.
43- const MAXIMUM_PACKET_SIZE_IN_BYTES : u16 = 4 * 1024 ;
42+ // This is an improvement of ~32.9%.
43+ const MAXIMUM_PACKET_SIZE_IN_BYTES : u16 = 4 * 1024 ; // 4 kiB
44+ const WINDOW_ADJUSTMENT_THRESHOLD : u32 = 4 * 1024 ; // 4 kiB
4445
4546pub type ApiResponseSender = oneshot:: Sender < JmuxApiResponse > ;
4647pub type ApiResponseReceiver = oneshot:: Receiver < JmuxApiResponse > ;
@@ -173,6 +174,7 @@ struct JmuxChannelCtx {
173174 initial_window_size : u32 ,
174175 window_size_updated : Arc < Notify > ,
175176 window_size : Arc < AtomicUsize > ,
177+ remote_window_size : u32 ,
176178
177179 maximum_packet_size : u16 ,
178180
@@ -280,9 +282,6 @@ impl<T: AsyncWrite + Unpin + Send + 'static> JmuxSenderTask<T> {
280282 }
281283 }
282284
283- // TODO: send a signal to the main scheduler when we are done processing channel data messages
284- // and adjust windows for all the channels only then.
285-
286285 info ! ( "Closing JMUX sender task..." ) ;
287286
288287 jmux_writer. flush ( ) . await ?;
@@ -330,6 +329,8 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
330329 const MAX_CONSECUTIVE_PIPE_FAILURES : u8 = 5 ;
331330 let mut nb_consecutive_pipe_failures = 0 ;
332331
332+ let mut needs_window_adjustment = false ;
333+
333334 loop {
334335 // NOTE: Current task is the "jmux scheduler" or "jmux orchestrator".
335336 // It handles the JMUX context and communicates with other tasks.
@@ -557,6 +558,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
557558 initial_window_size: msg. initial_window_size,
558559 window_size_updated: Arc :: clone( & window_size_updated) ,
559560 window_size: Arc :: clone( & window_size) ,
561+ remote_window_size: msg. initial_window_size,
560562
561563 maximum_packet_size: msg. maximum_packet_size,
562564
@@ -576,12 +578,9 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
576578 let local_id = LocalChannelId :: from( msg. recipient_channel_id) ;
577579 let peer_id = DistantChannelId :: from( msg. sender_channel_id) ;
578580
579- let ( destination_url, api_response_tx) = match pending_channels. remove( & local_id) {
580- Some ( pending) => pending,
581- None => {
582- warn!( "Couldn’t find pending channel for {}" , local_id) ;
583- continue ;
584- } ,
581+ let Some ( ( destination_url, api_response_tx) ) = pending_channels. remove( & local_id) else {
582+ warn!( channel. id = %local_id, "Couldn’t find pending channel" ) ;
583+ continue ;
585584 } ;
586585
587586 let channel_span = info_span!( parent: parent_span. clone( ) , "channel" , %local_id, %peer_id, url = %destination_url) . entered( ) ;
@@ -603,48 +602,51 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
603602 initial_window_size: msg. initial_window_size,
604603 window_size_updated: Arc :: new( Notify :: new( ) ) ,
605604 window_size: Arc :: new( AtomicUsize :: new( usize :: try_from( msg. initial_window_size) . expect( "u32-to-usize" ) ) ) ,
605+ remote_window_size: msg. initial_window_size,
606606
607607 maximum_packet_size: msg. maximum_packet_size,
608608
609609 span: channel_span. exit( ) ,
610610 } ) ?;
611611 }
612612 Message :: WindowAdjust ( msg) => {
613- if let Some ( ctx) = jmux_ctx. get_channel_mut( LocalChannelId :: from( msg. recipient_channel_id) ) {
614- ctx. window_size. fetch_add( usize :: try_from( msg. window_adjustment) . expect( "u32-to-usize" ) , Ordering :: SeqCst ) ;
615- ctx. window_size_updated. notify_one( ) ;
616- }
613+ let id = LocalChannelId :: from( msg. recipient_channel_id) ;
614+ let Some ( channel) = jmux_ctx. get_channel_mut( id) else {
615+ warn!( channel. id = %id, "Couldn’t find channel" ) ;
616+ continue ;
617+ } ;
618+
619+ channel. window_size. fetch_add( usize :: try_from( msg. window_adjustment) . expect( "u32-to-usize" ) , Ordering :: SeqCst ) ;
620+ channel. window_size_updated. notify_one( ) ;
617621 }
618622 Message :: Data ( msg) => {
619623 let id = LocalChannelId :: from( msg. recipient_channel_id) ;
620- let data_length = u16 :: try_from( msg. transfer_data. len( ) ) . expect( "header.size (u16) <= u16::MAX" ) ;
621- let channel = match jmux_ctx. get_channel( id) {
622- Some ( channel) => channel,
623- None => {
624- warn!( channel. id = %id, "Couldn’t find channel" ) ;
625- continue ;
626- } ,
624+ let Some ( channel) = jmux_ctx. get_channel_mut( id) else {
625+ warn!( channel. id = %id, "Couldn’t find channel" ) ;
626+ continue ;
627627 } ;
628628
629- let data_tx = match data_senders. get_mut( & id) {
630- Some ( sender) => sender,
631- None => {
632- warn!( channel. id = %id, "Received data but associated data sender is missing" ) ;
633- continue ;
634- }
635- } ;
629+ let payload_size = u32 :: try_from( msg. transfer_data. len( ) ) . expect( "packet length is found by decoding a u16 in decoder" ) ;
630+ channel. remote_window_size = channel. remote_window_size. saturating_sub( payload_size) ;
636631
637- if channel. maximum_packet_size < data_length {
638- warn!( channel. id = %id, "Packet's size is exceeding the maximum size for this channel and was dropped" ) ;
632+ let packet_size = Header :: SIZE + msg. size( ) ;
633+ if usize :: from( channel. maximum_packet_size) < packet_size {
634+ channel. span. in_scope( || {
635+ warn!( packet_size, "Packet's size is exceeding the maximum size for this channel and was dropped" ) ;
636+ } ) ;
639637 continue ;
640638 }
641639
640+ let Some ( data_tx) = data_senders. get_mut( & id) else {
641+ channel. span. in_scope( || {
642+ warn!( "Received data but associated data sender is missing" ) ;
643+ } ) ;
644+ continue ;
645+ } ;
646+
642647 let _ = data_tx. send( msg. transfer_data) ;
643648
644- // Simplest flow control logic for now: just send back a WINDOW ADJUST message to
645- // increase back peer’s window size.
646- msg_to_send_tx. send( Message :: window_adjust( channel. distant_id, u32 :: from( data_length) ) )
647- . context( "couldn’t send WINDOW ADJUST message" ) ?;
649+ needs_window_adjustment = true ;
648650 }
649651 Message :: Eof ( msg) => {
650652 // Per the spec:
@@ -654,12 +656,9 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
654656 // > This message does not consume window space and can be sent even if no window space is available.
655657
656658 let id = LocalChannelId :: from( msg. recipient_channel_id) ;
657- let channel = match jmux_ctx. get_channel_mut( id) {
658- Some ( channel) => channel,
659- None => {
660- warn!( "Couldn’t find channel with id {}" , id) ;
661- continue ;
662- } ,
659+ let Some ( channel) = jmux_ctx. get_channel_mut( id) else {
660+ warn!( channel. id = %id, "Couldn’t find channel" ) ;
661+ continue ;
663662 } ;
664663
665664 channel. distant_state = JmuxChannelState :: Eof ;
@@ -684,12 +683,9 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
684683 Message :: OpenFailure ( msg) => {
685684 let id = LocalChannelId :: from( msg. recipient_channel_id) ;
686685
687- let ( destination_url, api_response_tx) = match pending_channels. remove( & id) {
688- Some ( pending) => pending,
689- None => {
690- warn!( "Couldn’t find pending channel {}" , id) ;
691- continue ;
692- } ,
686+ let Some ( ( destination_url, api_response_tx) ) = pending_channels. remove( & id) else {
687+ warn!( channel. id = %id, "Couldn’t find pending channel" ) ;
688+ continue ;
693689 } ;
694690
695691 warn!( local_id = %id, %destination_url, %msg. reason_code, "Channel opening failed: {}" , msg. description) ;
@@ -698,12 +694,9 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
698694 }
699695 Message :: Close ( msg) => {
700696 let local_id = LocalChannelId :: from( msg. recipient_channel_id) ;
701- let channel = match jmux_ctx. get_channel_mut( local_id) {
702- Some ( channel) => channel,
703- None => {
704- warn!( "Couldn’t find channel with id {}" , local_id) ;
705- continue ;
706- } ,
697+ let Some ( channel) = jmux_ctx. get_channel_mut( local_id) else {
698+ warn!( channel. id = %local_id, "Couldn’t find channel" ) ;
699+ continue ;
707700 } ;
708701 let distant_id = channel. distant_id;
709702 let channel_span = channel. span. clone( ) ;
@@ -729,6 +722,25 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
729722 }
730723 }
731724 }
725+ _ = core:: future:: ready( ( ) ) , if needs_window_adjustment => {
726+ for channel in jmux_ctx. channels. values_mut( ) {
727+ let window_adjustment = channel. initial_window_size - channel. remote_window_size;
728+
729+ if window_adjustment > WINDOW_ADJUSTMENT_THRESHOLD {
730+ channel. span. in_scope( || {
731+ trace!( %channel. distant_id, "Send WindowAdjust message" ) ;
732+ } ) ;
733+
734+ msg_to_send_tx
735+ . send( Message :: window_adjust( channel. distant_id, window_adjustment) )
736+ . context( "couldn’t send WINDOW ADJUST message" ) ?;
737+
738+ channel. remote_window_size = channel. initial_window_size;
739+ }
740+ }
741+
742+ needs_window_adjustment = false ;
743+ }
732744 }
733745 }
734746
@@ -803,24 +815,25 @@ impl DataReaderTask {
803815
804816 loop {
805817 let window_size_now = window_size. load ( Ordering :: SeqCst ) ;
818+
806819 if window_size_now < chunk. len ( ) {
807820 trace ! (
808821 window_size_now,
809- full_packet_size = bytes . len( ) ,
810- "Window size insufficient to send full packet . Truncate and wait."
822+ chunk_length = chunk . len( ) ,
823+ "Window size insufficient to send full chunk . Truncate and wait."
811824 ) ;
812825
813826 if window_size_now > 0 {
814- let bytes_to_send_now = chunk. split_to ( window_size_now) ;
815- window_size. fetch_sub ( bytes_to_send_now . len ( ) , Ordering :: SeqCst ) ;
827+ let to_send_now = chunk. split_to ( window_size_now) ;
828+ window_size. fetch_sub ( to_send_now . len ( ) , Ordering :: SeqCst ) ;
816829 msg_to_send_tx
817- . send ( Message :: data ( distant_id, bytes_to_send_now . freeze ( ) ) )
830+ . send ( Message :: data ( distant_id, to_send_now . freeze ( ) ) )
818831 . context ( "couldn’t send DATA message" ) ?;
819832 }
820833
821834 window_size_updated. notified ( ) . await ;
822835 } else {
823- window_size. fetch_sub ( bytes . len ( ) , Ordering :: SeqCst ) ;
836+ window_size. fetch_sub ( chunk . len ( ) , Ordering :: SeqCst ) ;
824837 msg_to_send_tx
825838 . send ( Message :: data ( distant_id, chunk. freeze ( ) ) )
826839 . context ( "couldn’t send DATA message" ) ?;
0 commit comments