@@ -67,7 +67,7 @@ const INFLIGHT_OPTIMIST: usize = 2;
6767
6868/// Maximum pipeline depth when data is actively flowing. Ramps up on
6969/// data-bearing replies, drops back to IDLE after consecutive empties.
70- const INFLIGHT_ACTIVE : usize = 10 ;
70+ const INFLIGHT_ACTIVE : usize = 4 ;
7171
7272/// How many consecutive empty replies before dropping from active to idle depth.
7373const INFLIGHT_COOLDOWN : u32 = 3 ;
@@ -1426,7 +1426,7 @@ async fn tunnel_loop(
14261426 mut pending_client_data : Option < Bytes > ,
14271427) -> std:: io:: Result < ( ) > {
14281428 let ( mut reader, mut writer) = sock. split ( ) ;
1429- const READ_CHUNK : usize = 65536 ;
1429+ const READ_CHUNK : usize = 512 * 1024 ;
14301430 let mut buf = BytesMut :: with_capacity ( READ_CHUNK ) ;
14311431 let mut consecutive_empty = 0u32 ;
14321432 let mut buffered_upload: Option < Bytes > = None ;
@@ -1479,7 +1479,26 @@ async fn tunnel_loop(
14791479 Ok ( Ok ( 0 ) ) => break ,
14801480 Ok ( Ok ( n) ) => {
14811481 consecutive_empty = 0 ;
1482- Some ( extract_bytes ( & mut buf, n) )
1482+ let mut data = extract_bytes ( & mut buf, n) ;
1483+ // Loop-read: accumulate more upload data (up to 1s)
1484+ let deadline = Instant :: now ( ) + Duration :: from_secs ( 1 ) ;
1485+ loop {
1486+ if Instant :: now ( ) >= deadline { break ; }
1487+ buf. reserve ( READ_CHUNK ) ;
1488+ match tokio:: time:: timeout ( Duration :: from_millis ( 20 ) , reader. read_buf ( & mut buf) ) . await {
1489+ Ok ( Ok ( 0 ) ) => { upload_closed = true ; break ; }
1490+ Ok ( Ok ( n) ) => {
1491+ let extra = extract_bytes ( & mut buf, n) ;
1492+ let mut combined = bytes:: BytesMut :: with_capacity ( data. len ( ) + extra. len ( ) ) ;
1493+ combined. extend_from_slice ( & data) ;
1494+ combined. extend_from_slice ( & extra) ;
1495+ data = combined. freeze ( ) ;
1496+ }
1497+ Ok ( Err ( _) ) => { upload_closed = true ; break ; }
1498+ Err ( _) => break , // no more data
1499+ }
1500+ }
1501+ Some ( data)
14831502 }
14841503 Ok ( Err ( _) ) => break ,
14851504 Err ( _) => None ,
@@ -1804,23 +1823,27 @@ async fn tunnel_loop(
18041823 // outside the depth cap so uploads aren't blocked
18051824 // behind polls waiting at the tunnel-node.
18061825 if buffered_upload. is_some( ) && inflight. len( ) >= max_inflight && inflight. len( ) < max_inflight + 4 {
1807- // Brief coalesce: wait 20ms for more client data
1808- // to arrive so we batch multiple small writes into
1809- // one op instead of one per read.
1810- buf. reserve( READ_CHUNK ) ;
1811- match tokio:: time:: timeout( Duration :: from_millis( 20 ) , reader. read_buf( & mut buf) ) . await {
1812- Ok ( Ok ( 0 ) ) => { break ; }
1813- Ok ( Ok ( n) ) => {
1814- let extra = extract_bytes( & mut buf, n) ;
1815- let merged = buffered_upload. take( ) . unwrap( ) ;
1816- let mut combined = bytes:: BytesMut :: with_capacity( merged. len( ) + extra. len( ) ) ;
1817- combined. extend_from_slice( & merged) ;
1818- combined. extend_from_slice( & extra) ;
1819- buffered_upload = Some ( combined. freeze( ) ) ;
1826+ // Loop-coalesce: keep reading client data up to
1827+ // 200ms so we pack a fatter upload per op.
1828+ let coalesce_deadline = Instant :: now( ) + Duration :: from_millis( 200 ) ;
1829+ loop {
1830+ if Instant :: now( ) >= coalesce_deadline { break ; }
1831+ buf. reserve( READ_CHUNK ) ;
1832+ match tokio:: time:: timeout( Duration :: from_millis( 20 ) , reader. read_buf( & mut buf) ) . await {
1833+ Ok ( Ok ( 0 ) ) => { upload_closed = true ; break ; }
1834+ Ok ( Ok ( n) ) => {
1835+ let extra = extract_bytes( & mut buf, n) ;
1836+ let merged = buffered_upload. take( ) . unwrap( ) ;
1837+ let mut combined = bytes:: BytesMut :: with_capacity( merged. len( ) + extra. len( ) ) ;
1838+ combined. extend_from_slice( & merged) ;
1839+ combined. extend_from_slice( & extra) ;
1840+ buffered_upload = Some ( combined. freeze( ) ) ;
1841+ }
1842+ Ok ( Err ( _) ) => { upload_closed = true ; break ; }
1843+ Err ( _) => break , // no more data
18201844 }
1821- Ok ( Err ( _) ) => { break ; }
1822- Err ( _) => { } // timeout — no more data, send what we have
18231845 }
1846+ if upload_closed { break ; }
18241847 let data = buffered_upload. take( ) . unwrap( ) ;
18251848 let seq = next_send_seq;
18261849 next_send_seq += 1 ;
0 commit comments