Skip to content

Commit f56f47f

Browse files
authored
Merge pull request #11 from moneydevkit/lsp-0.2.0_socks-support
Add SOCKS5 proxy support to lightning-net-tokio
2 parents dbc5bf8 + 8ee5832 commit f56f47f

3 files changed

Lines changed: 237 additions & 2 deletions

File tree

lightning-liquidity/tests/common/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use lightning_liquidity::{LiquidityClientConfig, LiquidityManagerSync, Liquidity
66
use lightning::chain::{BestBlock, Filter};
77
use lightning::ln::channelmanager::ChainParameters;
88
use lightning::ln::functional_test_utils::{Node, TestChannelManager};
9-
use lightning::util::test_utils::{TestBroadcaster, TestKeysInterface, TestStore};
9+
use lightning::util::test_utils::{TestBroadcaster, TestKeysInterface, TestLogger, TestStore};
1010

1111
use bitcoin::Network;
1212

@@ -47,6 +47,7 @@ fn build_service_and_client_nodes<'a, 'b, 'c>(
4747
Some(service_config),
4848
None,
4949
Arc::clone(&time_provider),
50+
service_inner.logger,
5051
)
5152
.unwrap();
5253

@@ -61,6 +62,7 @@ fn build_service_and_client_nodes<'a, 'b, 'c>(
6162
None,
6263
Some(client_config),
6364
time_provider,
65+
client_inner.logger,
6466
)
6567
.unwrap();
6668

@@ -141,6 +143,7 @@ pub(crate) struct LiquidityNode<'a, 'b, 'c> {
141143
Arc<TestStore>,
142144
Arc<dyn TimeProvider + Send + Sync>,
143145
&'c TestBroadcaster,
146+
&'c TestLogger,
144147
>,
145148
}
146149

@@ -155,6 +158,7 @@ impl<'a, 'b, 'c> LiquidityNode<'a, 'b, 'c> {
155158
Arc<TestStore>,
156159
Arc<dyn TimeProvider + Send + Sync>,
157160
&'c TestBroadcaster,
161+
&'c TestLogger,
158162
>,
159163
) -> Self {
160164
Self { inner: node, liquidity_manager }

lightning-net-tokio/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,17 @@ edition = "2021"
1515
all-features = true
1616
rustdoc-args = ["--cfg", "docsrs"]
1717

18+
[features]
19+
socks = ["tokio-socks"]
20+
1821
[dependencies]
1922
bitcoin = "0.32.2"
2023
lightning = { version = "0.2.0", path = "../lightning" }
2124
tokio = { version = "1.35", features = [ "rt", "sync", "net", "time" ] }
25+
tokio-socks = { version = "0.5", optional = true }
2226

2327
[dev-dependencies]
24-
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] }
28+
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "net", "time", "io-util" ] }
2529
lightning = { version = "0.2.0", path = "../lightning", features = ["_test_utils"] }
2630

2731
[lints]

lightning-net-tokio/src/lib.rs

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,33 @@ where
470470
}
471471
}
472472

473+
/// Like [`connect_outbound`], but routes the TCP connection through a SOCKS5 proxy.
474+
///
475+
/// `proxy_addr` is the address of the SOCKS5 proxy (e.g. `127.0.0.1:1080`).
476+
/// The proxy connects to `addr` on our behalf, then we hand the resulting stream to
477+
/// [`setup_outbound`].
478+
///
479+
/// Available only when the `socks` feature is enabled.
480+
#[cfg(feature = "socks")]
481+
#[cfg_attr(docsrs, doc(cfg(feature = "socks")))]
482+
pub async fn connect_outbound_via_socks5<PM: Deref + 'static + Send + Sync + Clone>(
483+
peer_manager: PM, their_node_id: PublicKey, addr: SocketAddr, proxy_addr: SocketAddr,
484+
) -> Option<impl std::future::Future<Output = ()>>
485+
where
486+
PM::Target: APeerManager<Descriptor = SocketDescriptor>,
487+
{
488+
let connect_fut = async {
489+
tokio_socks::tcp::Socks5Stream::connect(proxy_addr, addr)
490+
.await
491+
.map(|s| s.into_inner().into_std().unwrap())
492+
};
493+
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), connect_fut).await {
494+
Some(setup_outbound(peer_manager, their_node_id, stream))
495+
} else {
496+
None
497+
}
498+
}
499+
473500
const SOCK_WAKER_VTABLE: task::RawWakerVTable = task::RawWakerVTable::new(
474501
clone_socket_waker,
475502
wake_socket_waker,
@@ -949,4 +976,204 @@ mod tests {
949976
async fn unthreaded_race_disconnect_accept() {
950977
race_disconnect_accept().await;
951978
}
979+
980+
#[cfg(feature = "socks")]
981+
mod socks_tests {
982+
use super::*;
983+
984+
/// Minimal SOCKS5 proxy: no auth, CONNECT to IPv4 only. Good enough for tests.
985+
async fn run_socks5_proxy(listener: tokio::net::TcpListener) {
986+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
987+
use tokio::net::TcpStream;
988+
989+
loop {
990+
let (mut client, _) = match listener.accept().await {
991+
Ok(c) => c,
992+
Err(_) => return,
993+
};
994+
tokio::spawn(async move {
995+
// Greeting: client sends [version, nmethods, methods...]
996+
let mut buf = [0u8; 258];
997+
client.read_exact(&mut buf[..2]).await.ok()?;
998+
let nmethods = buf[1] as usize;
999+
client.read_exact(&mut buf[..nmethods]).await.ok()?;
1000+
// Reply: no auth
1001+
client.write_all(&[0x05, 0x00]).await.ok()?;
1002+
1003+
// Connect request: [version, cmd, rsv, atype, addr..., port]
1004+
client.read_exact(&mut buf[..4]).await.ok()?;
1005+
let atype = buf[3];
1006+
let target: std::net::SocketAddr = match atype {
1007+
0x01 => {
1008+
// IPv4
1009+
client.read_exact(&mut buf[..6]).await.ok()?;
1010+
let ip = std::net::Ipv4Addr::new(buf[0], buf[1], buf[2], buf[3]);
1011+
let port = u16::from_be_bytes([buf[4], buf[5]]);
1012+
(ip, port).into()
1013+
},
1014+
_ => return None, // unsupported
1015+
};
1016+
1017+
let mut upstream = TcpStream::connect(target).await.ok()?;
1018+
// Reply: success, bound addr 0.0.0.0:0
1019+
client.write_all(&[0x05, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0]).await.ok()?;
1020+
// Shovel bytes
1021+
tokio::io::copy_bidirectional(&mut client, &mut upstream).await.ok()?;
1022+
Some(())
1023+
});
1024+
}
1025+
}
1026+
1027+
async fn do_socks5_connection_test() {
1028+
let secp_ctx = Secp256k1::new();
1029+
let a_key = SecretKey::from_slice(&[1; 32]).unwrap();
1030+
let b_key = SecretKey::from_slice(&[1; 32]).unwrap();
1031+
let a_pub = PublicKey::from_secret_key(&secp_ctx, &a_key);
1032+
let b_pub = PublicKey::from_secret_key(&secp_ctx, &b_key);
1033+
1034+
let (a_connected_sender, mut a_connected) = mpsc::channel(1);
1035+
let (a_disconnected_sender, mut a_disconnected) = mpsc::channel(1);
1036+
let a_handler = Arc::new(MsgHandler {
1037+
expected_pubkey: b_pub,
1038+
pubkey_connected: a_connected_sender,
1039+
pubkey_disconnected: a_disconnected_sender,
1040+
disconnected_flag: AtomicBool::new(false),
1041+
msg_events: Mutex::new(Vec::new()),
1042+
});
1043+
let a_manager = Arc::new(PeerManager::new(
1044+
MessageHandler {
1045+
chan_handler: Arc::clone(&a_handler),
1046+
route_handler: Arc::clone(&a_handler),
1047+
onion_message_handler: Arc::new(IgnoringMessageHandler {}),
1048+
custom_message_handler: Arc::new(IgnoringMessageHandler {}),
1049+
send_only_message_handler: Arc::new(IgnoringMessageHandler {}),
1050+
},
1051+
0,
1052+
&[1; 32],
1053+
Arc::new(TestLogger()),
1054+
Arc::new(TestNodeSigner::new(a_key)),
1055+
));
1056+
1057+
let (b_connected_sender, mut b_connected) = mpsc::channel(1);
1058+
let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1);
1059+
let b_handler = Arc::new(MsgHandler {
1060+
expected_pubkey: a_pub,
1061+
pubkey_connected: b_connected_sender,
1062+
pubkey_disconnected: b_disconnected_sender,
1063+
disconnected_flag: AtomicBool::new(false),
1064+
msg_events: Mutex::new(Vec::new()),
1065+
});
1066+
let b_manager = Arc::new(PeerManager::new(
1067+
MessageHandler {
1068+
chan_handler: Arc::clone(&b_handler),
1069+
route_handler: Arc::clone(&b_handler),
1070+
onion_message_handler: Arc::new(IgnoringMessageHandler {}),
1071+
custom_message_handler: Arc::new(IgnoringMessageHandler {}),
1072+
send_only_message_handler: Arc::new(IgnoringMessageHandler {}),
1073+
},
1074+
0,
1075+
&[2; 32],
1076+
Arc::new(TestLogger()),
1077+
Arc::new(TestNodeSigner::new(b_key)),
1078+
));
1079+
1080+
// Start SOCKS5 proxy on a random port.
1081+
let proxy_listener =
1082+
tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1083+
let proxy_addr = proxy_listener.local_addr().unwrap();
1084+
tokio::spawn(run_socks5_proxy(proxy_listener));
1085+
1086+
// Listener for the inbound side of the LN connection.
1087+
let inbound_listener =
1088+
tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1089+
let inbound_addr = inbound_listener.local_addr().unwrap();
1090+
1091+
// Outbound: connect through the proxy. Spawned because we need to
1092+
// concurrently accept on the inbound listener.
1093+
let a_mgr = Arc::clone(&a_manager);
1094+
let outbound_handle = tokio::spawn(async move {
1095+
super::super::connect_outbound_via_socks5(
1096+
a_mgr, b_pub, inbound_addr, proxy_addr,
1097+
)
1098+
.await
1099+
});
1100+
1101+
// Accept the proxied TCP connection and hand it to setup_inbound.
1102+
let (inbound_stream, _) = inbound_listener.accept().await.unwrap();
1103+
let inbound_std = inbound_stream.into_std().unwrap();
1104+
let fut_b = super::super::setup_inbound(b_manager, inbound_std);
1105+
1106+
let outbound_result = outbound_handle.await.unwrap();
1107+
assert!(outbound_result.is_some(), "SOCKS5 outbound connection failed");
1108+
let fut_a = outbound_result.unwrap();
1109+
1110+
// Both peers should see each other.
1111+
tokio::time::timeout(Duration::from_secs(10), a_connected.recv())
1112+
.await
1113+
.unwrap();
1114+
tokio::time::timeout(Duration::from_secs(1), b_connected.recv())
1115+
.await
1116+
.unwrap();
1117+
1118+
// Disconnect.
1119+
a_handler.msg_events.lock().unwrap().push(MessageSendEvent::HandleError {
1120+
node_id: b_pub,
1121+
action: ErrorAction::DisconnectPeer { msg: None },
1122+
});
1123+
a_manager.process_events();
1124+
tokio::time::timeout(Duration::from_secs(10), a_disconnected.recv())
1125+
.await
1126+
.unwrap();
1127+
tokio::time::timeout(Duration::from_secs(1), b_disconnected.recv())
1128+
.await
1129+
.unwrap();
1130+
assert!(a_handler.disconnected_flag.load(Ordering::SeqCst));
1131+
assert!(b_handler.disconnected_flag.load(Ordering::SeqCst));
1132+
1133+
fut_a.await;
1134+
fut_b.await;
1135+
}
1136+
1137+
#[tokio::test(flavor = "multi_thread")]
1138+
async fn socks5_connection_test() {
1139+
do_socks5_connection_test().await;
1140+
}
1141+
1142+
/// Connecting through a dead proxy must return None, not silently
1143+
/// bypass the proxy. This catches regressions where the proxy
1144+
/// config is accidentally ignored.
1145+
#[tokio::test(flavor = "multi_thread")]
1146+
async fn socks5_dead_proxy_returns_none() {
1147+
let secp_ctx = Secp256k1::new();
1148+
let a_key = SecretKey::from_slice(&[1; 32]).unwrap();
1149+
let b_key = SecretKey::from_slice(&[2; 32]).unwrap();
1150+
let b_pub = PublicKey::from_secret_key(&secp_ctx, &b_key);
1151+
1152+
let a_manager = Arc::new(PeerManager::new(
1153+
MessageHandler {
1154+
chan_handler: Arc::new(
1155+
lightning::ln::peer_handler::ErroringMessageHandler::new(),
1156+
),
1157+
route_handler: Arc::new(IgnoringMessageHandler {}),
1158+
onion_message_handler: Arc::new(IgnoringMessageHandler {}),
1159+
custom_message_handler: Arc::new(IgnoringMessageHandler {}),
1160+
send_only_message_handler: Arc::new(IgnoringMessageHandler {}),
1161+
},
1162+
0,
1163+
&[1; 32],
1164+
Arc::new(TestLogger()),
1165+
Arc::new(TestNodeSigner::new(a_key)),
1166+
));
1167+
1168+
// Port 1 on loopback: nothing should be listening there.
1169+
let dead_proxy: std::net::SocketAddr = "127.0.0.1:1".parse().unwrap();
1170+
let target: std::net::SocketAddr = "127.0.0.1:9735".parse().unwrap();
1171+
1172+
let result = super::super::connect_outbound_via_socks5(
1173+
a_manager, b_pub, target, dead_proxy,
1174+
)
1175+
.await;
1176+
assert!(result.is_none(), "dead proxy should yield None");
1177+
}
1178+
}
9521179
}

0 commit comments

Comments
 (0)