@@ -3,6 +3,7 @@ use std::net::SocketAddr;
33use std:: sync:: Arc ;
44use std:: time:: Duration ;
55
6+ use bytes:: Bytes ;
67use tokio:: io:: { AsyncReadExt , AsyncWriteExt } ;
78use tokio:: net:: { TcpListener , TcpStream , UdpSocket } ;
89use tokio:: sync:: { mpsc, Mutex } ;
@@ -965,7 +966,7 @@ struct SocksUdpTarget {
965966/// to abort mid-await.
966967struct UdpRelaySession {
967968 sid : String ,
968- uplink : mpsc:: Sender < Vec < u8 > > ,
969+ uplink : mpsc:: Sender < Bytes > ,
969970}
970971
971972/// All per-ASSOCIATE UDP relay state behind a single mutex so insertion
@@ -991,7 +992,7 @@ impl UdpRelayState {
991992 }
992993 }
993994
994- fn get_uplink ( & self , target : & SocksUdpTarget ) -> Option < mpsc:: Sender < Vec < u8 > > > {
995+ fn get_uplink ( & self , target : & SocksUdpTarget ) -> Option < mpsc:: Sender < Bytes > > {
995996 self . sessions . get ( target) . map ( |s| s. uplink . clone ( ) )
996997 }
997998
@@ -1118,7 +1119,15 @@ async fn handle_socks5_udp_associate(
11181119 client_peer_ip
11191120 ) ;
11201121
1121- let mut buf = vec ! [ 0u8 ; SOCKS5_UDP_RECV_BUF_BYTES ] ;
1122+ // Fixed reusable recv buffer. We deliberately don't go the
1123+ // `BytesMut::split().freeze()` route here even though `tunnel_loop`
1124+ // does: in TCP the read region IS the payload, but UDP always
1125+ // slices the SOCKS5 header off, so we'd be copying out anyway —
1126+ // and a frozen `Bytes` from the recv buf would refcount-pin the
1127+ // full ~65 KB allocation behind a tiny DNS reply, ballooning
1128+ // memory under bursts. Right-sized `Bytes::copy_from_slice` on
1129+ // accepted payloads keeps retention proportional to actual data.
1130+ let mut recv_buf = vec ! [ 0u8 ; SOCKS5_UDP_RECV_BUF_BYTES ] ;
11221131 let mut control_buf = [ 0u8 ; 1 ] ;
11231132 let mut client_addr: Option < SocketAddr > = None ;
11241133 let state: Arc < Mutex < UdpRelayState > > = Arc :: new ( Mutex :: new ( UdpRelayState :: new ( ) ) ) ;
@@ -1134,14 +1143,15 @@ async fn handle_socks5_udp_associate(
11341143
11351144 loop {
11361145 tokio:: select! {
1137- recv = udp. recv_from( & mut buf ) => {
1146+ recv = udp. recv_from( & mut recv_buf ) => {
11381147 let ( n, peer) = match recv {
11391148 Ok ( v) => v,
11401149 Err ( e) => {
11411150 tracing:: debug!( "udp associate recv failed: {}" , e) ;
11421151 break ;
11431152 }
11441153 } ;
1154+
11451155 // Source-IP check: anything not from the SOCKS5 client's
11461156 // host is dropped silently.
11471157 if peer. ip( ) != client_peer_ip {
@@ -1162,9 +1172,10 @@ async fn handle_socks5_udp_associate(
11621172 // can race one bad packet to DoS the legitimate client
11631173 // (whose real datagram, sent from a different ephemeral
11641174 // port, would then be silently rejected).
1165- let Some ( ( target, payload ) ) = parse_socks5_udp_packet ( & buf [ ..n] ) else {
1175+ let Some ( ( target, payload_off ) ) = parse_socks5_udp_packet_offsets ( & recv_buf [ ..n] ) else {
11661176 continue ;
11671177 } ;
1178+ let payload_slice = & recv_buf[ payload_off..n] ;
11681179
11691180 // Issue #213: client-side QUIC block. UDP/443 is
11701181 // HTTP/3 — drop the datagram silently so the client
@@ -1206,19 +1217,26 @@ async fn handle_socks5_udp_associate(
12061217 // the mux. Each datagram costs ~payload * 1.33 in the
12071218 // batched JSON envelope plus tunnel-node CPU; uncapped,
12081219 // a runaway client can exhaust Apps Script quota.
1209- if payload . len( ) > MAX_UDP_PAYLOAD_BYTES {
1220+ if payload_slice . len( ) > MAX_UDP_PAYLOAD_BYTES {
12101221 oversized_dropped += 1 ;
12111222 if oversized_dropped == 1 || oversized_dropped. is_multiple_of( 100 ) {
12121223 tracing:: debug!(
12131224 "udp datagram dropped: {} B > {} B (count={})" ,
1214- payload . len( ) ,
1225+ payload_slice . len( ) ,
12151226 MAX_UDP_PAYLOAD_BYTES ,
12161227 oversized_dropped,
12171228 ) ;
12181229 }
12191230 continue ;
12201231 }
1221- let payload = payload. to_vec( ) ;
1232+
1233+ // Right-sized copy: the queued/in-flight payload owns its
1234+ // own allocation, so the recv buffer can be reused on the
1235+ // next iteration without keeping every queued datagram
1236+ // alive. Sized to the actual payload (≤ MAX_UDP_PAYLOAD_BYTES
1237+ // = 9 KB after the guard above), not the full ~65 KB recv
1238+ // buffer.
1239+ let payload = Bytes :: copy_from_slice( payload_slice) ;
12221240
12231241 // Fast path: existing session — push payload onto its
12241242 // bounded uplink queue, drop on overflow (UDP semantics).
@@ -1292,7 +1310,7 @@ async fn handle_socks5_udp_associate(
12921310 continue ;
12931311 }
12941312
1295- let ( uplink_tx, uplink_rx) = mpsc:: channel:: <Vec < u8 > >( UDP_UPLINK_QUEUE ) ;
1313+ let ( uplink_tx, uplink_rx) = mpsc:: channel:: <Bytes >( UDP_UPLINK_QUEUE ) ;
12961314 let task_mux = mux. clone( ) ;
12971315 let task_udp = udp. clone( ) ;
12981316 let task_target = target. clone( ) ;
@@ -1365,7 +1383,7 @@ async fn udp_session_task(
13651383 sid : String ,
13661384 target : SocksUdpTarget ,
13671385 client_addr : SocketAddr ,
1368- mut uplink_rx : mpsc:: Receiver < Vec < u8 > > ,
1386+ mut uplink_rx : mpsc:: Receiver < Bytes > ,
13691387) {
13701388 let mut backoff = UDP_INITIAL_POLL_DELAY ;
13711389 loop {
@@ -1473,7 +1491,20 @@ async fn write_socks5_reply(
14731491 sock. flush ( ) . await
14741492}
14751493
1476- fn parse_socks5_udp_packet ( buf : & [ u8 ] ) -> Option < ( SocksUdpTarget , & [ u8 ] ) > {
1494+ /// Parse the SOCKS5 UDP frame header and return the target plus the byte
1495+ /// offset at which the payload starts. Splitting "structure parsing"
1496+ /// from "give me a payload slice" lets the recv hot path stay on a
1497+ /// fixed reusable `Vec<u8>` buffer and only allocate a right-sized
1498+ /// `Bytes::copy_from_slice(&recv_buf[off..n])` for accepted payloads
1499+ /// (after the size guard). DO NOT change this back to a zero-copy
1500+ /// `Bytes::slice` path: that was tried and reverted because slicing
1501+ /// the recv buffer with `bytes` 1.x refcounts the whole ~65 KB
1502+ /// allocation, so a queued tiny DNS reply pinned the full datagram-
1503+ /// sized buffer until it drained — burst retention regressed by
1504+ /// orders of magnitude on UDP-heavy workloads. The thin
1505+ /// `parse_socks5_udp_packet` wrapper below keeps existing `&[u8]`
1506+ /// callers (tests) working.
1507+ fn parse_socks5_udp_packet_offsets ( buf : & [ u8 ] ) -> Option < ( SocksUdpTarget , usize ) > {
14771508 if buf. len ( ) < 4 || buf[ 0 ] != 0 || buf[ 1 ] != 0 || buf[ 2 ] != 0 {
14781509 return None ;
14791510 }
@@ -1528,10 +1559,15 @@ fn parse_socks5_udp_packet(buf: &[u8]) -> Option<(SocksUdpTarget, &[u8])> {
15281559 atyp,
15291560 addr,
15301561 } ,
1531- & buf [ pos.. ] ,
1562+ pos,
15321563 ) )
15331564}
15341565
1566+ fn parse_socks5_udp_packet ( buf : & [ u8 ] ) -> Option < ( SocksUdpTarget , & [ u8 ] ) > {
1567+ let ( target, off) = parse_socks5_udp_packet_offsets ( buf) ?;
1568+ Some ( ( target, & buf[ off..] ) )
1569+ }
1570+
15351571fn build_socks5_udp_packet ( target : & SocksUdpTarget , payload : & [ u8 ] ) -> Vec < u8 > {
15361572 let mut out = Vec :: with_capacity ( 4 + target. addr . len ( ) + 2 + payload. len ( ) + 1 ) ;
15371573 out. extend_from_slice ( & [ 0 , 0 , 0 , target. atyp ] ) ;
0 commit comments