Skip to content

Commit 250ae60

Browse files
committed
refactor(service)!: rework outbound proxy chain into a unified client
Replace the previous parallel outbound proxy implementations (net::outbound_proxy + local::net::tcp::outbound_proxy) with a single self-contained module at net::outbound, exposing OutboundProxyClient, OutboundProxyStream, Socks5Auth, HttpProxyAuth and a TcpDialer trait. Highlights: * OutboundProxyStream is now a closed-set enum (Bypassed / Https / Http) with statically-dispatched poll_* — no more Box<dyn ProxyStream>. The HTTPS variant boxes its inner OutboundProxyStream solely to break the recursive type instantiation required by the TLS libraries; this is not dynamic dispatch. * HTTP CONNECT client is rebuilt on top of hyper::client::conn::http1 + hyper::upgrade. The hand-rolled status-line parser (which could drop bytes that arrived in the same read() as the header terminator, did not handle 100-Continue, etc.) is gone. * Auth parameters are now strongly typed: Socks5Auth::{None, UsernamePassword{..}} HttpProxyAuth::{None, Basic{..}} Both enums are #[non_exhaustive] for forward compatibility. * AutoProxyClientStream's ProxiedViaChain variant is folded back into a single Proxied(ProxyClientStream<MonProxyStream<OutboundTransport>>) variant, with OutboundTransport = Direct(TcpStream) | Chained(...). * ServiceContext (both server and local) now caches an Arc<OutboundProxyClient> built once at set_outbound_proxies time instead of re-parsing the chain on every connection. * The standalone Socks5TcpClient / Socks5UdpClient implementations previously duplicated across net::socks5_client and local::socks::client::socks5 are unified into the latter location; net::outbound::socks5 only keeps the chain-friendly Socks5Negotiator. * When UDP is enabled but the configured chain contains a non-SOCKS5 hop, sslocal/ssserver emits a single startup warning. Phase 2 will add UDP relay support for SOCKS5-only chains. BREAKING CHANGE: the following items have moved or been removed: * crate::net::outbound_proxy — removed; use crate::net::OutboundProxyClient * crate::net::http_connect::HttpConnectClient — moved to crate::net::HttpConnectClient (only available with the local-http feature) * crate::net::Socks5TcpClient (the chain-mixing wrapper) — split: the pure SOCKS5 client is now crate::local::socks::client::socks5::Socks5TcpClient; the in-band negotiator is crate::net::Socks5Negotiator * crate::local::net::tcp::outbound_proxy — removed * ServiceContext::outbound_proxies() — replaced by outbound_client() returning Option<&Arc<OutboundProxyClient>>
1 parent e2ecfae commit 250ae60

23 files changed

Lines changed: 1375 additions & 722 deletions

File tree

crates/shadowsocks-service/src/local/context.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@ use tokio::sync::Mutex;
1818
#[cfg(feature = "local-fake-dns")]
1919
use tokio::sync::RwLock;
2020

21-
use crate::{acl::AccessControl, config::{OutboundProxy, SecurityConfig}, net::FlowStat};
21+
use crate::{
22+
acl::AccessControl,
23+
config::{OutboundProxy, SecurityConfig},
24+
net::{FlowStat, OutboundProxyClient},
25+
};
2226

2327
#[cfg(feature = "local-fake-dns")]
2428
use super::fake_dns::manager::FakeDnsManager;
@@ -36,8 +40,8 @@ pub struct ServiceContext {
3640
// Flow statistic report
3741
flow_stat: Arc<FlowStat>,
3842

39-
// Outbound proxy chain (sslocal ss-server connection goes through these proxies)
40-
outbound_proxies: Vec<OutboundProxy>,
43+
// Outbound proxy chain (sslocal -> ss-server connection routes through these proxies)
44+
outbound_client: Option<Arc<OutboundProxyClient>>,
4145

4246
// For DNS relay's ACL domain name reverse lookup -- whether the IP shall be forwarded
4347
#[cfg(feature = "local-dns")]
@@ -62,7 +66,7 @@ impl ServiceContext {
6266
accept_opts: AcceptOpts::default(),
6367
acl: None,
6468
flow_stat: Arc::new(FlowStat::new()),
65-
outbound_proxies: Vec::new(),
69+
outbound_client: None,
6670
#[cfg(feature = "local-dns")]
6771
reverse_lookup_cache: Arc::new(Mutex::new(LruCache::with_expiry_duration_and_capacity(
6872
Duration::from_secs(3 * 24 * 60 * 60),
@@ -115,12 +119,16 @@ impl ServiceContext {
115119

116120
/// Set outbound proxy chain (connection to SS server routes through these proxies)
117121
pub fn set_outbound_proxies(&mut self, proxies: Vec<OutboundProxy>) {
118-
self.outbound_proxies = proxies;
122+
self.outbound_client = if proxies.is_empty() {
123+
None
124+
} else {
125+
Some(Arc::new(OutboundProxyClient::from_config(&proxies)))
126+
};
119127
}
120128

121-
/// Get outbound proxy chain
122-
pub fn outbound_proxies(&self) -> &[OutboundProxy] {
123-
&self.outbound_proxies
129+
/// Get the outbound proxy client (if a chain is configured).
130+
pub fn outbound_client(&self) -> Option<&Arc<OutboundProxyClient>> {
131+
self.outbound_client.as_ref()
124132
}
125133

126134
/// Get cloned flow statistic

crates/shadowsocks-service/src/local/http/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ pub mod config;
1111
mod http_client;
1212
mod http_service;
1313
pub mod server;
14-
mod tokio_rt;
14+
pub(crate) mod tokio_rt;
1515
mod utils;

crates/shadowsocks-service/src/local/mod.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use std::{io, net::SocketAddr, sync::Arc, time::Duration};
44

55
use futures::future;
6-
use log::{info, trace};
6+
use log::trace;
77
use shadowsocks::{
88
config::Mode,
99
net::{AcceptOpts, ConnectOpts},
@@ -183,9 +183,13 @@ impl Server {
183183
if !config.outbound_proxy.is_empty() {
184184
let has_udp = config.local.iter().any(|local| local.config.mode.enable_udp());
185185
if has_udp {
186-
info!(
187-
"outbound proxy chain only supports TCP; UDP traffic may not be proxied and may behave unexpectedly"
188-
);
186+
let preview = crate::net::OutboundProxyClient::from_config(&config.outbound_proxy);
187+
if !preview.supports_udp() {
188+
log::warn!(
189+
"outbound proxy chain contains non-SOCKS5 hop(s); UDP traffic will bypass the chain. \
190+
Configure a SOCKS5-only chain to enable UDP relay."
191+
);
192+
}
189193
}
190194
context.set_outbound_proxies(config.outbound_proxy);
191195
}

crates/shadowsocks-service/src/local/net/tcp/auto_proxy_stream.rs

Lines changed: 126 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,112 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
1818

1919
use crate::{
2020
local::{context::ServiceContext, loadbalancing::ServerIdent},
21-
net::MonProxyStream,
21+
net::{MonProxyStream, OutboundProxyStream, TcpDialer},
2222
};
2323

24-
use super::{auto_proxy_io::AutoProxyIo, outbound_proxy::OutboundProxyStream};
24+
use super::auto_proxy_io::AutoProxyIo;
25+
26+
/// Outbound transport used by [`AutoProxyClientStream`]: either a direct
27+
/// TCP connection or a tunnel through the configured outbound proxy chain.
28+
#[allow(clippy::large_enum_variant)]
29+
#[pin_project(project = OutboundTransportProj)]
30+
pub enum OutboundTransport {
31+
/// Direct TCP, no outbound chain configured.
32+
Direct(#[pin] TcpStream),
33+
/// Tunnel produced by `OutboundProxyClient::connect_tcp`.
34+
Chained(#[pin] OutboundProxyStream),
35+
}
36+
37+
impl OutboundTransport {
38+
fn local_addr(&self) -> io::Result<SocketAddr> {
39+
match self {
40+
Self::Direct(s) => s.local_addr(),
41+
Self::Chained(s) => s.local_addr(),
42+
}
43+
}
44+
45+
fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
46+
match self {
47+
Self::Direct(s) => s.set_nodelay(nodelay),
48+
// For tunnels we can only forward the request to the underlying
49+
// TCP socket if it is exposed; the unified enum has no such
50+
// accessor today, so the call is a no-op.
51+
Self::Chained(_) => Ok(()),
52+
}
53+
}
54+
}
55+
56+
impl AsyncRead for OutboundTransport {
57+
fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
58+
match self.project() {
59+
OutboundTransportProj::Direct(s) => s.poll_read(cx, buf),
60+
OutboundTransportProj::Chained(s) => s.poll_read(cx, buf),
61+
}
62+
}
63+
}
64+
65+
impl AsyncWrite for OutboundTransport {
66+
fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
67+
match self.project() {
68+
OutboundTransportProj::Direct(s) => s.poll_write(cx, buf),
69+
OutboundTransportProj::Chained(s) => s.poll_write(cx, buf),
70+
}
71+
}
72+
73+
fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
74+
match self.project() {
75+
OutboundTransportProj::Direct(s) => s.poll_flush(cx),
76+
OutboundTransportProj::Chained(s) => s.poll_flush(cx),
77+
}
78+
}
79+
80+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
81+
match self.project() {
82+
OutboundTransportProj::Direct(s) => s.poll_shutdown(cx),
83+
OutboundTransportProj::Chained(s) => s.poll_shutdown(cx),
84+
}
85+
}
86+
87+
fn poll_write_vectored(
88+
self: Pin<&mut Self>,
89+
cx: &mut task::Context<'_>,
90+
bufs: &[IoSlice<'_>],
91+
) -> Poll<io::Result<usize>> {
92+
match self.project() {
93+
OutboundTransportProj::Direct(s) => s.poll_write_vectored(cx, bufs),
94+
OutboundTransportProj::Chained(s) => s.poll_write_vectored(cx, bufs),
95+
}
96+
}
97+
98+
fn is_write_vectored(&self) -> bool {
99+
match self {
100+
Self::Direct(s) => s.is_write_vectored(),
101+
Self::Chained(s) => s.is_write_vectored(),
102+
}
103+
}
104+
}
105+
106+
/// `TcpDialer` adapter that dials directly via the shadowsocks
107+
/// infrastructure (DNS resolver, connect options).
108+
struct DirectTcpDialer<'a> {
109+
context: &'a ServiceContext,
110+
opts: &'a ConnectOpts,
111+
}
112+
113+
impl<'a> TcpDialer for DirectTcpDialer<'a> {
114+
async fn dial(&self, addr: &Address) -> io::Result<TcpStream> {
115+
TcpStream::connect_remote_with_opts(self.context.context_ref(), addr, self.opts).await
116+
}
117+
}
25118

26119
/// Unified stream for bypassed and proxied connections
27120
#[allow(clippy::large_enum_variant)]
28121
#[pin_project(project = AutoProxyClientStreamProj)]
29122
pub enum AutoProxyClientStream {
30-
Proxied(#[pin] ProxyClientStream<MonProxyStream<TcpStream>>),
31-
ProxiedViaChain(#[pin] ProxyClientStream<MonProxyStream<OutboundProxyStream>>),
123+
/// Tunnel through the shadowsocks server (optionally over the outbound
124+
/// proxy chain).
125+
Proxied(#[pin] ProxyClientStream<MonProxyStream<OutboundTransport>>),
126+
/// Direct TCP, bypassing the shadowsocks server.
32127
Bypassed(#[pin] TcpStream),
33128
}
34129

@@ -83,7 +178,6 @@ impl AutoProxyClientStream {
83178
where
84179
A: Into<Address>,
85180
{
86-
// Connect directly.
87181
#[cfg_attr(not(feature = "local-fake-dns"), allow(unused_mut))]
88182
let mut addr = addr.into();
89183
#[cfg(feature = "local-fake-dns")]
@@ -143,74 +237,63 @@ impl AutoProxyClientStream {
143237
A: Into<Address>,
144238
{
145239
let flow_stat = context.flow_stat();
146-
match context.outbound_proxies() {
147-
[] => {
148-
let stream = match ProxyClientStream::connect_with_opts_map(
149-
context.context(),
150-
server.server_config(),
151-
addr,
152-
connect_opts,
153-
|stream| MonProxyStream::from_stream(stream, flow_stat),
154-
)
240+
let target_addr: Address = addr.into();
241+
let ss_addr: Address = server.server_config().tcp_external_addr().into();
242+
243+
let dial_result = match context.outbound_client() {
244+
None => TcpStream::connect_remote_with_opts(context.context_ref(), &ss_addr, connect_opts)
155245
.await
156-
{
157-
Ok(s) => s,
158-
Err(err) => {
159-
server.tcp_score().report_failure().await;
160-
return Err(err);
161-
}
246+
.map(OutboundTransport::Direct),
247+
Some(client) => {
248+
let dialer = DirectTcpDialer {
249+
context: context.as_ref(),
250+
opts: connect_opts,
162251
};
163-
Ok(Self::Proxied(stream))
252+
client
253+
.connect_tcp(&dialer, &ss_addr)
254+
.await
255+
.map(OutboundTransport::Chained)
164256
}
165-
proxies => {
166-
use super::outbound_proxy::connect_outbound_proxy_chain;
167-
168-
let addr = addr.into();
169-
let ss_addr = server.server_config().tcp_external_addr().into();
170-
let proxy_stream =
171-
match connect_outbound_proxy_chain(context.clone(), ss_addr, proxies, connect_opts).await {
172-
Ok(s) => s,
173-
Err(err) => {
174-
server.tcp_score().report_failure().await;
175-
return Err(err);
176-
}
177-
};
178-
let mon_stream = MonProxyStream::from_stream(proxy_stream, flow_stat);
179-
let stream =
180-
ProxyClientStream::from_stream(context.context(), mon_stream, server.server_config(), addr);
181-
Ok(Self::ProxiedViaChain(stream))
257+
};
258+
259+
let transport = match dial_result {
260+
Ok(t) => t,
261+
Err(err) => {
262+
server.tcp_score().report_failure().await;
263+
return Err(err);
182264
}
183-
}
265+
};
266+
267+
let mon = MonProxyStream::from_stream(transport, flow_stat);
268+
let stream = ProxyClientStream::from_stream(context.context(), mon, server.server_config(), target_addr);
269+
Ok(Self::Proxied(stream))
184270
}
185271

186272
pub fn local_addr(&self) -> io::Result<SocketAddr> {
187273
match *self {
188274
Self::Proxied(ref s) => s.get_ref().get_ref().local_addr(),
189-
Self::ProxiedViaChain(ref s) => s.get_ref().get_ref().local_addr(),
190275
Self::Bypassed(ref s) => s.local_addr(),
191276
}
192277
}
193278

194279
pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
195280
match *self {
196281
Self::Proxied(ref s) => s.get_ref().get_ref().set_nodelay(nodelay),
197-
Self::ProxiedViaChain(..) => Ok(()),
198282
Self::Bypassed(ref s) => s.set_nodelay(nodelay),
199283
}
200284
}
201285
}
202286

203287
impl AutoProxyIo for AutoProxyClientStream {
204288
fn is_proxied(&self) -> bool {
205-
matches!(*self, Self::Proxied(..) | Self::ProxiedViaChain(..))
289+
matches!(*self, Self::Proxied(..))
206290
}
207291
}
208292

209293
impl AsyncRead for AutoProxyClientStream {
210294
fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
211295
match self.project() {
212296
AutoProxyClientStreamProj::Proxied(s) => s.poll_read(cx, buf),
213-
AutoProxyClientStreamProj::ProxiedViaChain(s) => s.poll_read(cx, buf),
214297
AutoProxyClientStreamProj::Bypassed(s) => s.poll_read(cx, buf),
215298
}
216299
}
@@ -220,23 +303,20 @@ impl AsyncWrite for AutoProxyClientStream {
220303
fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
221304
match self.project() {
222305
AutoProxyClientStreamProj::Proxied(s) => s.poll_write(cx, buf),
223-
AutoProxyClientStreamProj::ProxiedViaChain(s) => s.poll_write(cx, buf),
224306
AutoProxyClientStreamProj::Bypassed(s) => s.poll_write(cx, buf),
225307
}
226308
}
227309

228310
fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
229311
match self.project() {
230312
AutoProxyClientStreamProj::Proxied(s) => s.poll_flush(cx),
231-
AutoProxyClientStreamProj::ProxiedViaChain(s) => s.poll_flush(cx),
232313
AutoProxyClientStreamProj::Bypassed(s) => s.poll_flush(cx),
233314
}
234315
}
235316

236317
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
237318
match self.project() {
238319
AutoProxyClientStreamProj::Proxied(s) => s.poll_shutdown(cx),
239-
AutoProxyClientStreamProj::ProxiedViaChain(s) => s.poll_shutdown(cx),
240320
AutoProxyClientStreamProj::Bypassed(s) => s.poll_shutdown(cx),
241321
}
242322
}
@@ -248,14 +328,7 @@ impl AsyncWrite for AutoProxyClientStream {
248328
) -> Poll<io::Result<usize>> {
249329
match self.project() {
250330
AutoProxyClientStreamProj::Proxied(s) => s.poll_write_vectored(cx, bufs),
251-
AutoProxyClientStreamProj::ProxiedViaChain(s) => s.poll_write_vectored(cx, bufs),
252331
AutoProxyClientStreamProj::Bypassed(s) => s.poll_write_vectored(cx, bufs),
253332
}
254333
}
255334
}
256-
257-
impl From<ProxyClientStream<MonProxyStream<TcpStream>>> for AutoProxyClientStream {
258-
fn from(s: ProxyClientStream<MonProxyStream<TcpStream>>) -> Self {
259-
Self::Proxied(s)
260-
}
261-
}
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
pub mod auto_proxy_io;
22
pub mod auto_proxy_stream;
33
pub mod listener;
4-
pub(crate) mod outbound_proxy;

0 commit comments

Comments
 (0)