@@ -37,6 +37,7 @@ use lightning::ln::msgs::SocketAddress;
3737use lightning:: ln:: peer_handler;
3838use lightning:: ln:: peer_handler:: APeerManager ;
3939use lightning:: ln:: peer_handler:: SocketDescriptor as LnSocketTrait ;
40+ use lightning:: sign:: EntropySource ;
4041
4142use std:: future:: Future ;
4243use std:: hash:: Hash ;
@@ -51,6 +52,9 @@ use std::time::Duration;
5152
5253static ID_COUNTER : AtomicU64 = AtomicU64 :: new ( 0 ) ;
5354
55+ const CONNECT_OUTBOUND_TIMEOUT : u64 = 10 ;
56+ const TOR_CONNECT_OUTBOUND_TIMEOUT : u64 = 30 ;
57+
5458// We only need to select over multiple futures in one place, and taking on the full `tokio/macros`
5559// dependency tree in order to do so (which has broken our MSRV before) is excessive. Instead, we
5660// define a trivial two- and three- select macro with the specific types we need and just use that.
@@ -462,13 +466,169 @@ where
462466 PM :: Target : APeerManager < Descriptor = SocketDescriptor > ,
463467{
464468 let connect_fut = async { TcpStream :: connect ( & addr) . await . map ( |s| s. into_std ( ) . unwrap ( ) ) } ;
465- if let Ok ( Ok ( stream) ) = time:: timeout ( Duration :: from_secs ( 10 ) , connect_fut) . await {
469+ if let Ok ( Ok ( stream) ) =
470+ time:: timeout ( Duration :: from_secs ( CONNECT_OUTBOUND_TIMEOUT ) , connect_fut) . await
471+ {
472+ Some ( setup_outbound ( peer_manager, their_node_id, stream) )
473+ } else {
474+ None
475+ }
476+ }
477+
478+ /// Routes [`connect_outbound`] through Tor. Implements stream isolation for each connection
479+ /// using a stream isolation parameter sourced from [`EntropySource::get_secure_random_bytes`].
480+ ///
481+ /// Returns a future (as the fn is async) that yields another future, see [`connect_outbound`] for
482+ /// details on this return value.
483+ pub async fn tor_connect_outbound < PM : Deref + ' static + Send + Sync + Clone , ES : Deref > (
484+ peer_manager : PM , their_node_id : PublicKey , addr : SocketAddress , tor_proxy_addr : SocketAddr ,
485+ entropy_source : ES ,
486+ ) -> Option < impl std:: future:: Future < Output = ( ) > >
487+ where
488+ PM :: Target : APeerManager < Descriptor = SocketDescriptor > ,
489+ ES :: Target : EntropySource ,
490+ {
491+ let connect_fut = async {
492+ tor_connect ( addr, tor_proxy_addr, entropy_source) . await . map ( |s| s. into_std ( ) . unwrap ( ) )
493+ } ;
494+ if let Ok ( Ok ( stream) ) =
495+ time:: timeout ( Duration :: from_secs ( TOR_CONNECT_OUTBOUND_TIMEOUT ) , connect_fut) . await
496+ {
466497 Some ( setup_outbound ( peer_manager, their_node_id, stream) )
467498 } else {
468499 None
469500 }
470501}
471502
503+ async fn tor_connect < ES : Deref > (
504+ addr : SocketAddress , tor_proxy_addr : SocketAddr , entropy_source : ES ,
505+ ) -> Result < TcpStream , ( ) >
506+ where
507+ ES :: Target : EntropySource ,
508+ {
509+ use std:: io:: Write ;
510+ use tokio:: io:: AsyncReadExt ;
511+
512+ const IPV4_ADDR_LEN : usize = 4 ;
513+ const IPV6_ADDR_LEN : usize = 16 ;
514+ const HOSTNAME_MAX_LEN : usize = u8:: MAX as usize ;
515+
516+ // Constants defined in RFC 1928 and RFC 1929
517+ const VERSION : u8 = 5 ;
518+ const NMETHODS : u8 = 1 ;
519+ const USERNAME_PASSWORD_AUTH : u8 = 2 ;
520+ const METHOD_SELECT_REPLY_LEN : usize = 2 ;
521+ const USERNAME_PASSWORD_VERSION : u8 = 1 ;
522+ const USERNAME_PASSWORD_REPLY_LEN : usize = 2 ;
523+ const CMD_CONNECT : u8 = 1 ;
524+ const RSV : u8 = 0 ;
525+ const ATYP_IPV4 : u8 = 1 ;
526+ const ATYP_DOMAINNAME : u8 = 3 ;
527+ const ATYP_IPV6 : u8 = 4 ;
528+ const SUCCESS : u8 = 0 ;
529+
530+ // Tor extensions, see https://spec.torproject.org/socks-extensions.html for further details
531+ const USERNAME : & [ u8 ] = b"<torS0X>0" ;
532+ const USERNAME_LEN : usize = USERNAME . len ( ) ;
533+ const PASSWORD_ENTROPY_LEN : usize = 32 ;
534+ // We encode the password as a hex string on the wire. RFC 1929 allows arbitrary byte sequences but we choose to be conservative.
535+ const PASSWORD_LEN : usize = PASSWORD_ENTROPY_LEN * 2 ;
536+
537+ const USERNAME_PASSWORD_REQUEST_LEN : usize =
538+ 1 /* VER */ + 1 /* ULEN */ + USERNAME_LEN + 1 /* PLEN */ + PASSWORD_LEN ;
539+ const SOCKS5_REQUEST_MAX_LEN : usize = 1 /* VER */ + 1 /* CMD */ + 1 /* RSV */ + 1 /* ATYP */
540+ + 1 /* HOSTNAME len */ + HOSTNAME_MAX_LEN /* HOSTNAME */ + 2 /* PORT */ ;
541+ const SOCKS5_REPLY_HEADER_LEN : usize = 1 /* VER */ + 1 /* REP */ + 1 /* RSV */ + 1 /* ATYP */ ;
542+
543+ let method_selection_request = [ VERSION , NMETHODS , USERNAME_PASSWORD_AUTH ] ;
544+ let mut tcp_stream = TcpStream :: connect ( & tor_proxy_addr) . await . map_err ( |_| ( ) ) ?;
545+ tokio:: io:: AsyncWriteExt :: write_all ( & mut tcp_stream, & method_selection_request)
546+ . await
547+ . map_err ( |_| ( ) ) ?;
548+
549+ let mut method_selection_reply = [ 0u8 ; METHOD_SELECT_REPLY_LEN ] ;
550+ tcp_stream. read_exact ( & mut method_selection_reply) . await . map_err ( |_| ( ) ) ?;
551+ if method_selection_reply != [ VERSION , USERNAME_PASSWORD_AUTH ] {
552+ return Err ( ( ) ) ;
553+ }
554+
555+ let password: [ u8 ; PASSWORD_ENTROPY_LEN ] = entropy_source. get_secure_random_bytes ( ) ;
556+ let mut username_password_request = [ 0u8 ; USERNAME_PASSWORD_REQUEST_LEN ] ;
557+ let mut stream = & mut username_password_request[ ..] ;
558+ stream. write_all ( & [ USERNAME_PASSWORD_VERSION , USERNAME_LEN as u8 ] ) . unwrap ( ) ;
559+ stream. write_all ( USERNAME ) . unwrap ( ) ;
560+ stream. write_all ( & [ PASSWORD_LEN as u8 ] ) . unwrap ( ) ;
561+ // Encode the password as a hex string even if RFC 1929 allows arbitrary sequences
562+ for byte in password {
563+ write ! ( stream, "{:02x}" , byte) . unwrap ( ) ;
564+ }
565+ debug_assert ! ( stream. is_empty( ) ) ;
566+ tokio:: io:: AsyncWriteExt :: write_all ( & mut tcp_stream, & username_password_request)
567+ . await
568+ . map_err ( |_| ( ) ) ?;
569+
570+ let mut username_password_reply = [ 0u8 ; USERNAME_PASSWORD_REPLY_LEN ] ;
571+ tcp_stream. read_exact ( & mut username_password_reply) . await . map_err ( |_| ( ) ) ?;
572+ if username_password_reply != [ USERNAME_PASSWORD_VERSION , SUCCESS ] {
573+ return Err ( ( ) ) ;
574+ }
575+
576+ let mut socks5_request = [ 0u8 ; SOCKS5_REQUEST_MAX_LEN ] ;
577+ let mut stream = & mut socks5_request[ ..] ;
578+ stream. write_all ( & [ VERSION , CMD_CONNECT , RSV ] ) . unwrap ( ) ;
579+ match addr {
580+ SocketAddress :: TcpIpV4 { addr, port } => {
581+ stream. write_all ( & [ ATYP_IPV4 ] ) . unwrap ( ) ;
582+ stream. write_all ( & addr) . unwrap ( ) ;
583+ stream. write_all ( & port. to_be_bytes ( ) ) . unwrap ( ) ;
584+ } ,
585+ SocketAddress :: TcpIpV6 { addr, port } => {
586+ stream. write_all ( & [ ATYP_IPV6 ] ) . unwrap ( ) ;
587+ stream. write_all ( & addr) . unwrap ( ) ;
588+ stream. write_all ( & port. to_be_bytes ( ) ) . unwrap ( ) ;
589+ } ,
590+ ref onion_v3 @ SocketAddress :: OnionV3 { port, .. } => {
591+ let onion_v3_url = onion_v3. to_string ( ) ;
592+ let hostname = onion_v3_url. split_once ( ':' ) . ok_or ( ( ) ) ?. 0 . as_bytes ( ) ;
593+ stream. write_all ( & [ ATYP_DOMAINNAME , hostname. len ( ) as u8 ] ) . unwrap ( ) ;
594+ stream. write_all ( hostname) . unwrap ( ) ;
595+ stream. write_all ( & port. to_be_bytes ( ) ) . unwrap ( ) ;
596+ } ,
597+ SocketAddress :: Hostname { hostname, port } => {
598+ stream. write_all ( & [ ATYP_DOMAINNAME , hostname. len ( ) ] ) . unwrap ( ) ;
599+ stream. write_all ( hostname. as_bytes ( ) ) . unwrap ( ) ;
600+ stream. write_all ( & port. to_be_bytes ( ) ) . unwrap ( ) ;
601+ } ,
602+ SocketAddress :: OnionV2 { .. } => return Err ( ( ) ) ,
603+ } ;
604+ let bytes_remaining = stream. len ( ) ;
605+ tokio:: io:: AsyncWriteExt :: write_all (
606+ & mut tcp_stream,
607+ & socks5_request[ ..socks5_request. len ( ) - bytes_remaining] ,
608+ )
609+ . await
610+ . map_err ( |_| ( ) ) ?;
611+
612+ let mut socks5_reply_header = [ 0u8 ; SOCKS5_REPLY_HEADER_LEN ] ;
613+ tcp_stream. read_exact ( & mut socks5_reply_header) . await . map_err ( |_| ( ) ) ?;
614+ if socks5_reply_header[ ..3 ] != [ VERSION , SUCCESS , RSV ] {
615+ return Err ( ( ) ) ;
616+ }
617+ match socks5_reply_header[ 3 ] {
618+ ATYP_IPV4 => tcp_stream. read_exact ( & mut [ 0u8 ; IPV4_ADDR_LEN ] ) . await . map_err ( |_| ( ) ) ?,
619+ ATYP_DOMAINNAME => {
620+ let hostname_len = tcp_stream. read_u8 ( ) . await . map_err ( |_| ( ) ) ? as usize ;
621+ let mut hostname_buffer = [ 0u8 ; HOSTNAME_MAX_LEN ] ;
622+ tcp_stream. read_exact ( & mut hostname_buffer[ ..hostname_len] ) . await . map_err ( |_| ( ) ) ?
623+ } ,
624+ ATYP_IPV6 => tcp_stream. read_exact ( & mut [ 0u8 ; IPV6_ADDR_LEN ] ) . await . map_err ( |_| ( ) ) ?,
625+ _ => return Err ( ( ) ) ,
626+ } ;
627+ tcp_stream. read_u16 ( ) . await . map_err ( |_| ( ) ) ?;
628+
629+ Ok ( tcp_stream)
630+ }
631+
472632const SOCK_WAKER_VTABLE : task:: RawWakerVTable = task:: RawWakerVTable :: new (
473633 clone_socket_waker,
474634 wake_socket_waker,
@@ -941,4 +1101,61 @@ mod tests {
9411101 async fn unthreaded_race_disconnect_accept ( ) {
9421102 race_disconnect_accept ( ) . await ;
9431103 }
1104+
1105+ #[ cfg( tor) ]
1106+ #[ tokio:: test]
1107+ async fn test_tor_connect ( ) {
1108+ use super :: tor_connect;
1109+ use lightning:: sign:: EntropySource ;
1110+ use std:: net:: SocketAddr ;
1111+
1112+ // Set TOR_PROXY=127.0.0.1:9050
1113+ let tor_proxy_addr: SocketAddr = std:: env!( "TOR_PROXY" ) . parse ( ) . unwrap ( ) ;
1114+
1115+ struct TestEntropySource ;
1116+
1117+ impl EntropySource for TestEntropySource {
1118+ fn get_secure_random_bytes ( & self ) -> [ u8 ; 32 ] {
1119+ [ 0xffu8 ; 32 ]
1120+ }
1121+ }
1122+
1123+ let entropy_source = TestEntropySource ;
1124+
1125+ // Success cases
1126+
1127+ for addr_str in [
1128+ // google.com
1129+ "142.250.189.196:80" ,
1130+ // google.com
1131+ "[2607:f8b0:4005:813::2004]:80" ,
1132+ // torproject.org
1133+ "torproject.org:80" ,
1134+ // torproject.org
1135+ "2gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion:80" ,
1136+ ] {
1137+ let addr: SocketAddress = addr_str. parse ( ) . unwrap ( ) ;
1138+ let tcp_stream = tor_connect ( addr, tor_proxy_addr, & entropy_source) . await . unwrap ( ) ;
1139+ assert_eq ! (
1140+ tcp_stream. try_read( & mut [ 0u8 ; 1 ] ) . unwrap_err( ) . kind( ) ,
1141+ std:: io:: ErrorKind :: WouldBlock
1142+ ) ;
1143+ }
1144+
1145+ // Failure cases
1146+
1147+ for addr_str in [
1148+ // google.com, with some invalid port
1149+ "142.250.189.196:1234" ,
1150+ // google.com, with some invalid port
1151+ "[2607:f8b0:4005:813::2004]:1234" ,
1152+ // torproject.org, with some invalid port
1153+ "torproject.org:1234" ,
1154+ // torproject.org, with a typo
1155+ "3gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion:80" ,
1156+ ] {
1157+ let addr: SocketAddress = addr_str. parse ( ) . unwrap ( ) ;
1158+ assert ! ( tor_connect( addr, tor_proxy_addr, & entropy_source) . await . is_err( ) ) ;
1159+ }
1160+ }
9441161}
0 commit comments