Skip to content

Commit fc8cd59

Browse files
committed
feat(service): outbound UDP relay through SOCKS5-only chains
Phase 2 of the outbound proxy refactor. The new OutboundProxyDatagram type implements DatagramSocket / DatagramSend / DatagramReceive on top of a freshly-bound local UDP socket plus one SOCKS5 UDP ASSOCIATE per chain hop. Each datagram on the wire carries N nested UdpAssociateHeader frames (one per hop), the inner-most one addressing the ss-server's UDP external endpoint. Per-hop control connections are built by reusing the existing TCP chain builder (connect_chain_for_udp_associate) so all of the shadowsocks DNS resolver / ConnectOpts behaviour still applies. Integration: * OutboundProxyClient::associate_udp + supports_udp(). * OutboundProxyClient::supports_udp() returns true iff every hop is SOCKS5; HTTP/HTTPS hops cannot transport UDP and downgrade the whole chain to 'unsupported'. * local::net::udp::association::create_proxied_socket dispatches based on supports_udp(): when true the per-association socket is ProxySocket<OutboundProxyDatagram> wrapping the chained datagram; otherwise we keep the previous direct ProxySocket<ShadowUdpSocket>. * When the chain is configured but unable to relay UDP, sslocal / ssserver emit a single startup WARN ('UDP traffic will bypass the chain') instead of failing — runtime traffic transparently falls back to a direct connection to the ss-server. OutboundProxyStream gains try_into_tcp() so SOCKS5-only chains can recover the bare TcpStream as keep-alive control connections — Sync-friendly storage required because the HTTP CONNECT variant internally holds a hyper::upgrade::Upgraded which is !Sync.
1 parent 250ae60 commit fc8cd59

6 files changed

Lines changed: 522 additions & 13 deletions

File tree

crates/shadowsocks-service/src/local/net/udp/association.rs

Lines changed: 99 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,109 @@ use rand::{RngExt, rngs::SmallRng};
1717
use tokio::{sync::mpsc, task::JoinHandle, time};
1818

1919
use shadowsocks::{
20+
config::ServerConfig,
2021
lookup_then,
21-
net::{AddrFamily, UdpSocket as ShadowUdpSocket},
22+
net::{AddrFamily, ConnectOpts, TcpStream as ShadowTcpStream, UdpSocket as ShadowUdpSocket},
2223
relay::{
2324
Address,
24-
udprelay::{MAXIMUM_UDP_PAYLOAD_SIZE, ProxySocket, options::UdpSocketControlData},
25+
udprelay::{
26+
MAXIMUM_UDP_PAYLOAD_SIZE, ProxySocket,
27+
options::UdpSocketControlData,
28+
proxy_socket::UdpSocketType,
29+
},
2530
},
2631
};
2732

2833
use crate::{
2934
local::{context::ServiceContext, loadbalancing::PingBalancer},
3035
net::{
31-
MonProxySocket, UDP_ASSOCIATION_KEEP_ALIVE_CHANNEL_SIZE, UDP_ASSOCIATION_SEND_CHANNEL_SIZE,
36+
MonProxySocket, OutboundProxyDatagram, TcpDialer,
37+
UDP_ASSOCIATION_KEEP_ALIVE_CHANNEL_SIZE, UDP_ASSOCIATION_SEND_CHANNEL_SIZE,
3238
packet_window::PacketWindowFilter,
3339
},
3440
};
3541

42+
/// Build the proxied socket appropriate for `svr_cfg`, transparently
43+
/// going through the configured outbound chain when one is present and
44+
/// fully SOCKS5 (otherwise the chain is bypassed for UDP).
45+
async fn create_proxied_socket(
46+
context: &ServiceContext,
47+
svr_cfg: &ServerConfig,
48+
connect_opts: &ConnectOpts,
49+
) -> io::Result<ProxiedSocket> {
50+
let use_chain = context
51+
.outbound_client()
52+
.map(|c| c.supports_udp())
53+
.unwrap_or(false);
54+
55+
if use_chain {
56+
let client = context
57+
.outbound_client()
58+
.expect("outbound_client checked above")
59+
.clone();
60+
let dialer = LocalTcpDialer {
61+
context: Arc::new(context.clone()),
62+
opts: connect_opts.clone(),
63+
};
64+
let target: Address = svr_cfg.udp_external_addr().into();
65+
let datagram = client
66+
.associate_udp(&context.context(), &dialer, connect_opts, target)
67+
.await?;
68+
let proxy_socket =
69+
ProxySocket::from_socket(UdpSocketType::Client, context.context(), svr_cfg, datagram);
70+
let mon = MonProxySocket::from_socket(proxy_socket, context.flow_stat());
71+
Ok(ProxiedSocket::Chained(mon))
72+
} else {
73+
let socket = ProxySocket::connect_with_opts(context.context(), svr_cfg, connect_opts).await?;
74+
let mon = MonProxySocket::from_socket(socket, context.flow_stat());
75+
Ok(ProxiedSocket::Direct(mon))
76+
}
77+
}
78+
79+
/// `TcpDialer` adapter that uses the local service context's connect options.
80+
struct LocalTcpDialer {
81+
context: Arc<ServiceContext>,
82+
opts: ConnectOpts,
83+
}
84+
85+
impl TcpDialer for LocalTcpDialer {
86+
async fn dial(&self, addr: &Address) -> io::Result<ShadowTcpStream> {
87+
ShadowTcpStream::connect_remote_with_opts(self.context.context_ref(), addr, &self.opts).await
88+
}
89+
}
90+
91+
/// Proxied UDP socket, either direct (single hop to ss-server) or routed
92+
/// through a SOCKS5-only outbound proxy chain.
93+
#[allow(clippy::large_enum_variant)]
94+
enum ProxiedSocket {
95+
Direct(MonProxySocket<ShadowUdpSocket>),
96+
Chained(MonProxySocket<OutboundProxyDatagram>),
97+
}
98+
99+
impl ProxiedSocket {
100+
async fn send_with_ctrl(
101+
&self,
102+
addr: &Address,
103+
control: &UdpSocketControlData,
104+
data: &[u8],
105+
) -> io::Result<()> {
106+
match self {
107+
Self::Direct(s) => s.send_with_ctrl(addr, control, data).await,
108+
Self::Chained(s) => s.send_with_ctrl(addr, control, data).await,
109+
}
110+
}
111+
112+
async fn recv_with_ctrl(
113+
&self,
114+
buf: &mut [u8],
115+
) -> io::Result<(usize, Address, Option<UdpSocketControlData>)> {
116+
match self {
117+
Self::Direct(s) => s.recv_with_ctrl(buf).await,
118+
Self::Chained(s) => s.recv_with_ctrl(buf).await,
119+
}
120+
}
121+
}
122+
36123
/// Writer for sending packets back to client
37124
#[trait_variant::make(Send)]
38125
pub trait UdpInboundWrite {
@@ -213,7 +300,7 @@ where
213300
peer_addr: SocketAddr,
214301
bypassed_ipv4_socket: Option<ShadowUdpSocket>,
215302
bypassed_ipv6_socket: Option<ShadowUdpSocket>,
216-
proxied_socket: Option<MonProxySocket<ShadowUdpSocket>>,
303+
proxied_socket: Option<ProxiedSocket>,
217304
keepalive_tx: mpsc::Sender<SocketAddr>,
218305
keepalive_flag: bool,
219306
balancer: PingBalancer,
@@ -410,7 +497,7 @@ where
410497

411498
#[inline]
412499
async fn receive_from_proxied_opt(
413-
socket: &Option<MonProxySocket<ShadowUdpSocket>>,
500+
socket: &Option<ProxiedSocket>,
414501
buf: &mut Vec<u8>,
415502
) -> io::Result<(usize, Address, Option<UdpSocketControlData>)> {
416503
match *socket {
@@ -565,15 +652,17 @@ where
565652
Some(ref mut socket) => socket,
566653
None => {
567654
// Create a new connection to proxy server
568-
569655
let server = self.balancer.best_udp_server();
570656
let svr_cfg = server.server_config();
571657

572-
let socket =
573-
ProxySocket::connect_with_opts(self.context.context(), svr_cfg, server.connect_opts_ref()).await?;
574-
let socket = MonProxySocket::from_socket(socket, self.context.flow_stat());
658+
let proxied = create_proxied_socket(
659+
self.context.as_ref(),
660+
svr_cfg,
661+
server.connect_opts_ref(),
662+
)
663+
.await?;
575664

576-
self.proxied_socket.insert(socket)
665+
self.proxied_socket.insert(proxied)
577666
}
578667
};
579668

crates/shadowsocks-service/src/net/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ pub use self::{
55
mon_socket::MonProxySocket,
66
mon_stream::MonProxyStream,
77
outbound::{
8-
HttpProxyAuth, OutboundProxyClient, OutboundProxyHop, OutboundProxyKind, OutboundProxyStream, Socks5Auth,
9-
Socks5Negotiator, TcpDialer,
8+
HttpProxyAuth, OutboundProxyClient, OutboundProxyDatagram, OutboundProxyHop, OutboundProxyKind,
9+
OutboundProxyStream, Socks5Auth, Socks5Negotiator, TcpDialer,
1010
},
1111
};
1212

crates/shadowsocks-service/src/net/outbound/chain.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,39 @@ where
7070
Ok(stream)
7171
}
7272

73+
/// Build a TCP control connection that *terminates at* `hops[hop_index]`
74+
/// (i.e. previous hops are traversed via SOCKS5 TcpConnect, but the last
75+
/// hop is left in its post-handshake "ready for an arbitrary command"
76+
/// state — typically `UDP ASSOCIATE`).
77+
///
78+
/// All hops in `hops[..=hop_index]` must be SOCKS5; this helper is used
79+
/// exclusively by the UDP outbound path which only supports
80+
/// SOCKS5-only chains.
81+
pub(crate) async fn connect_chain_for_udp_associate<D>(
82+
hops: &[OutboundProxyHop],
83+
hop_index: usize,
84+
dialer: &D,
85+
) -> io::Result<OutboundProxyStream>
86+
where
87+
D: TcpDialer + Sync,
88+
{
89+
debug_assert!(hop_index < hops.len());
90+
let prefix = &hops[..hop_index];
91+
let target_hop = &hops[hop_index];
92+
93+
if hop_index == 0 {
94+
// Direct dial.
95+
let tcp = dialer.dial(&target_hop.addr).await?;
96+
return OutboundProxyStream::from_tcp(tcp);
97+
}
98+
99+
// Reuse `connect_chain`: target = hops[hop_index].addr; the chain
100+
// builder will SOCKS5-TcpConnect through prefix and leave the byte
101+
// stream pointed at `hops[hop_index]` ready for the caller to issue
102+
// its own SOCKS5 handshake / UdpAssociate command on top.
103+
connect_chain(prefix, dialer, &target_hop.addr).await
104+
}
105+
73106
async fn negotiate_hop(
74107
mut stream: OutboundProxyStream,
75108
hop: &OutboundProxyHop,

crates/shadowsocks-service/src/net/outbound/mod.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@
2323
2424
use std::{io, sync::Arc};
2525

26-
use shadowsocks::relay::socks5::Address;
26+
use shadowsocks::{
27+
context::SharedContext,
28+
net::ConnectOpts,
29+
relay::socks5::Address,
30+
};
2731

2832
use crate::config::{OutboundProxy, OutboundProxyProtocol};
2933

@@ -34,10 +38,12 @@ pub mod http_connect;
3438
pub mod socks5;
3539
pub mod stream;
3640
pub mod tls;
41+
pub mod udp;
3742

3843
pub use auth::{HttpProxyAuth, Socks5Auth};
3944
pub use socks5::Socks5Negotiator;
4045
pub use stream::OutboundProxyStream;
46+
pub use udp::OutboundProxyDatagram;
4147

4248
#[cfg(feature = "local-http")]
4349
pub use http_connect::{HttpConnectClient, HttpConnectTunnel};
@@ -141,6 +147,25 @@ impl OutboundProxyClient {
141147
pub fn supports_udp(&self) -> bool {
142148
!self.hops.is_empty() && self.hops.iter().all(OutboundProxyHop::supports_udp)
143149
}
150+
151+
/// Establish a multi-hop UDP relay through the chain.
152+
///
153+
/// Requires every hop to be SOCKS5 (see [`Self::supports_udp`]).
154+
/// `target` is the inner-most destination address baked into every
155+
/// outgoing datagram's SOCKS5 UDP header (typically the ss-server's
156+
/// UDP external address).
157+
pub async fn associate_udp<D>(
158+
&self,
159+
context: &SharedContext,
160+
dialer: &D,
161+
connect_opts: &ConnectOpts,
162+
target: Address,
163+
) -> io::Result<OutboundProxyDatagram>
164+
where
165+
D: TcpDialer + Sync,
166+
{
167+
OutboundProxyDatagram::associate(self, context, dialer, connect_opts, target).await
168+
}
144169
}
145170

146171
fn hop_from_config(proxy: &OutboundProxy) -> OutboundProxyHop {

crates/shadowsocks-service/src/net/outbound/stream.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,25 @@ impl OutboundProxyStream {
7979
Ok(self.local_addr)
8080
}
8181

82+
/// Try to recover the underlying [`TcpStream`].
83+
///
84+
/// Succeeds only if the stream is still in the unwrapped TCP state
85+
/// (i.e. no TLS or HTTP CONNECT layer has been applied). The UDP
86+
/// outbound path uses this to obtain a keep-alive connection that is
87+
/// `Sync`-friendly (`OutboundProxyStream` itself is intentionally not
88+
/// `Sync` because the HTTP CONNECT variant wraps a
89+
/// `hyper::upgrade::Upgraded` which is not).
90+
#[allow(clippy::result_large_err)]
91+
pub fn try_into_tcp(self) -> Result<TcpStream, Self> {
92+
match self.inner {
93+
OutboundProxyStreamInner::Bypassed(s) => Ok(s),
94+
other => Err(Self {
95+
local_addr: self.local_addr,
96+
inner: other,
97+
}),
98+
}
99+
}
100+
82101
/// Wrap as a TLS-protected stream (used by the chain builder when the
83102
/// next hop is HTTPS). `local_addr` is the address recorded for the
84103
/// very first TCP hop and is preserved unchanged.

0 commit comments

Comments
 (0)