@@ -1426,6 +1426,8 @@ struct InflightMeta {
14261426struct InflightEntry {
14271427 meta : InflightMeta ,
14281428 reply_rx : oneshot:: Receiver < Result < ( TunnelResponse , String ) , String > > ,
1429+ /// Held until reply is processed — releases upload flow control permit.
1430+ _upload_permit : Option < tokio:: sync:: OwnedSemaphorePermit > ,
14291431}
14301432
14311433/// Upload task: reads from the client socket, accumulates data (50ms
@@ -1444,6 +1446,7 @@ async fn upload_task(
14441446 eof_seen : Arc < AtomicBool > ,
14451447 inflight_tx : mpsc:: UnboundedSender < InflightEntry > ,
14461448 initial_data : Option < Bytes > ,
1449+ upload_sem : Arc < Semaphore > ,
14471450) {
14481451 const READ_CHUNK : usize = 512 * 1024 ;
14491452 let mut buf = vec ! [ 0u8 ; READ_CHUNK ] ;
@@ -1473,6 +1476,7 @@ async fn upload_task(
14731476 let entry = InflightEntry {
14741477 meta : InflightMeta { seq, was_empty_poll : false , send_at } ,
14751478 reply_rx,
1479+ _upload_permit : None , // initial data — no flow control
14761480 } ;
14771481 if inflight_tx. send ( entry) . is_err ( ) {
14781482 return ; // download task gone
@@ -1532,6 +1536,9 @@ async fn upload_task(
15321536
15331537 if data. is_empty ( ) { continue ; }
15341538
1539+ // Flow control: wait for a permit before sending.
1540+ let permit = upload_sem. clone ( ) . acquire_owned ( ) . await . unwrap ( ) ;
1541+
15351542 let seq = next_send_seq. fetch_add ( 1 , Ordering :: Relaxed ) ;
15361543 let wseq = next_data_write_seq;
15371544 next_data_write_seq += 1 ;
@@ -1551,6 +1558,7 @@ async fn upload_task(
15511558 let entry = InflightEntry {
15521559 meta : InflightMeta { seq, was_empty_poll : false , send_at } ,
15531560 reply_rx,
1561+ _upload_permit : Some ( permit) ,
15541562 } ;
15551563 if inflight_tx. send ( entry) . is_err ( ) {
15561564 break ; // download task gone
@@ -1581,6 +1589,7 @@ async fn tunnel_loop(
15811589 // the mux, and forwards InflightEntry to the download task.
15821590 // Pending client data is seeded as the first send inside the upload
15831591 // task via an initial_data parameter.
1592+ let upload_sem = Arc :: new ( Semaphore :: new ( 3 ) ) ; // max 3 unacked upload ops
15841593 let _upload_handle = tokio:: spawn ( upload_task (
15851594 reader,
15861595 sid. to_string ( ) ,
@@ -1590,6 +1599,7 @@ async fn tunnel_loop(
15901599 eof_seen. clone ( ) ,
15911600 inflight_tx, // move the only sender to the upload task
15921601 pending_client_data. clone ( ) ,
1602+ upload_sem,
15931603 ) ) ;
15941604 // The download task does NOT hold an inflight_tx clone — when the
15951605 // upload task exits and drops the sender, inflight_rx.recv() returns
@@ -1616,14 +1626,20 @@ async fn tunnel_loop(
16161626 let mut inflight: FuturesUnordered < ReplyFut > = FuturesUnordered :: new ( ) ;
16171627
16181628 // Helper: wrap a reply_rx into a ReplyFut with timeout.
1619- fn wrap_reply ( meta : InflightMeta , reply_rx : oneshot:: Receiver < Result < ( TunnelResponse , String ) , String > > ) -> std:: pin:: Pin < Box < dyn std:: future:: Future < Output = ( InflightMeta , ReplyOutcome ) > + Send > > {
1629+ fn wrap_reply (
1630+ meta : InflightMeta ,
1631+ reply_rx : oneshot:: Receiver < Result < ( TunnelResponse , String ) , String > > ,
1632+ permit : Option < tokio:: sync:: OwnedSemaphorePermit > ,
1633+ ) -> std:: pin:: Pin < Box < dyn std:: future:: Future < Output = ( InflightMeta , ReplyOutcome ) > + Send > > {
16201634 Box :: pin ( async move {
1621- match tokio:: time:: timeout ( REPLY_TIMEOUT , reply_rx) . await {
1635+ let result = match tokio:: time:: timeout ( REPLY_TIMEOUT , reply_rx) . await {
16221636 Ok ( Ok ( Ok ( ( r, sid) ) ) ) => ( meta, ReplyOutcome :: Ok ( r, sid) ) ,
16231637 Ok ( Ok ( Err ( e) ) ) => ( meta, ReplyOutcome :: BatchErr ( e) ) ,
16241638 Ok ( Err ( _) ) => ( meta, ReplyOutcome :: Dropped ) ,
16251639 Err ( _) => ( meta, ReplyOutcome :: Timeout ) ,
1626- }
1640+ } ;
1641+ drop ( permit) ; // release upload flow control permit AFTER reply
1642+ result
16271643 } )
16281644 }
16291645
@@ -1672,7 +1688,7 @@ async fn tunnel_loop(
16721688 & sid[ ..sid. len( ) . min( 8 ) ] ,
16731689 entry. meta. seq,
16741690 ) ;
1675- inflight. push ( wrap_reply ( entry. meta , entry. reply_rx ) ) ;
1691+ inflight. push ( wrap_reply ( entry. meta , entry. reply_rx , entry . _upload_permit ) ) ;
16761692 }
16771693 None => {
16781694 // Upload task exited before sending — nothing to do.
@@ -1699,13 +1715,14 @@ async fn tunnel_loop(
16991715 meta. seq,
17001716 inflight. len( ) + 1 ,
17011717 ) ;
1702- inflight. push ( wrap_reply ( meta, reply_rx) ) ;
1718+ inflight. push ( wrap_reply ( meta, reply_rx, None ) ) ;
17031719 }
17041720 }
17051721
17061722 // Timer for staggered refill polls — fires in the select, never blocks.
17071723 let mut refill_at: Option < std:: pin:: Pin < Box < tokio:: time:: Sleep > > > = None ;
1708- let mut refill_steps: u32 = 0 ; // counts 100ms steps; poll after 10 (1s)
1724+ let mut refill_steps: u32 = 0 ;
1725+ let mut data_ops_in_flight: u32 = 0 ;
17091726
17101727 // Schedule initial refill if pre-fill didn't fill all slots.
17111728 if inflight. len ( ) < max_inflight {
@@ -1767,7 +1784,7 @@ async fn tunnel_loop(
17671784 & sid[ ..sid. len( ) . min( 8 ) ] ,
17681785 entry. meta. seq,
17691786 ) ;
1770- inflight. push ( wrap_reply ( entry. meta , entry. reply_rx ) ) ;
1787+ inflight. push ( wrap_reply ( entry. meta , entry. reply_rx , entry . _upload_permit ) ) ;
17711788 continue ;
17721789 }
17731790 None => {
@@ -1782,16 +1799,17 @@ async fn tunnel_loop(
17821799 tracing:: debug!(
17831800 "sess {}: keepalive poll seq={}" , & sid[ ..sid. len( ) . min( 8 ) ] , meta. seq
17841801 ) ;
1785- inflight. push ( wrap_reply ( meta, reply_rx) ) ;
1802+ inflight. push ( wrap_reply ( meta, reply_rx, None ) ) ;
17861803 }
17871804
17881805 // Drain any InflightEntry items from the upload task before
17891806 // entering the select — non-blocking.
17901807 while let Ok ( entry) = inflight_rx. try_recv ( ) {
17911808 if !entry. meta . was_empty_poll {
17921809 consecutive_empty = 0 ;
1810+ data_ops_in_flight += 1 ;
17931811 }
1794- inflight. push ( wrap_reply ( entry. meta , entry. reply_rx ) ) ;
1812+ inflight. push ( wrap_reply ( entry. meta , entry. reply_rx , entry . _upload_permit ) ) ;
17951813 }
17961814
17971815 tokio:: select! {
@@ -1801,13 +1819,14 @@ async fn tunnel_loop(
18011819 Some ( entry) = inflight_rx. recv( ) => {
18021820 if !entry. meta. was_empty_poll {
18031821 consecutive_empty = 0 ;
1822+ data_ops_in_flight += 1 ;
18041823 }
1805- inflight. push( wrap_reply( entry. meta, entry. reply_rx) ) ;
1824+ inflight. push( wrap_reply( entry. meta, entry. reply_rx, entry . _upload_permit ) ) ;
18061825 }
18071826
18081827 // Refill timer: 100ms steps, send empty poll after 10 steps
18091828 // (1s) for batch separation.
1810- _ = async { refill_at. as_mut( ) . unwrap( ) . await } , if refill_at. is_some( ) => {
1829+ _ = async { refill_at. as_mut( ) . unwrap( ) . await } , if refill_at. is_some( ) && data_ops_in_flight == 0 => {
18111830 refill_at = None ;
18121831 if !eof_seen. load( Ordering :: Relaxed )
18131832 && inflight. len( ) < max_inflight
@@ -1817,7 +1836,7 @@ async fn tunnel_loop(
18171836 if refill_steps >= 10 {
18181837 let ( meta, reply_rx, send_fut) = send_empty_poll( sid, & next_send_seq, mux) ;
18191838 send_fut. await ;
1820- inflight. push( wrap_reply( meta, reply_rx) ) ;
1839+ inflight. push( wrap_reply( meta, reply_rx, None ) ) ;
18211840 refill_steps = 0 ;
18221841
18231842 if inflight. len( ) < max_inflight && max_inflight > INFLIGHT_IDLE {
@@ -1833,6 +1852,9 @@ async fn tunnel_loop(
18331852 Some ( ( meta, outcome) ) = inflight. next( ) => {
18341853 match outcome {
18351854 ReplyOutcome :: Ok ( resp, script_id) => {
1855+ if !meta. was_empty_poll {
1856+ data_ops_in_flight = data_ops_in_flight. saturating_sub( 1 ) ;
1857+ }
18361858 let has_data = resp. d. as_ref( ) . map( |d| !d. is_empty( ) ) . unwrap_or( false ) ;
18371859 tracing:: debug!(
18381860 "sess {}: recv seq={}, rtt={:?}, data={}, inflight={}" ,
@@ -1956,7 +1978,7 @@ async fn tunnel_loop(
19561978 // Schedule refill if pipeline needs more polls.
19571979 if !eof_seen. load( Ordering :: Relaxed )
19581980 && inflight. len( ) < max_inflight
1959- // consecutive_empty gate removed — keep polling
1981+ && data_ops_in_flight == 0
19601982 && refill_at. is_none( )
19611983 {
19621984 refill_at = Some ( Box :: pin( tokio:: time:: sleep(
0 commit comments