Skip to content

Commit c5d2e3a

Browse files
UPnP: bind to specified interface
1 parent 795d568 commit c5d2e3a

6 files changed

Lines changed: 122 additions & 80 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ This repo contains reusable library crates and a CLI executable. For GUI version
2727
- [x] [DHT](https://www.bittorrent.org/beps/bep_0005.html) ([Kademlia](https://homes.cs.aau.dk/~bnielsen/DSE04/papers/kademlia.pdf))
2828
- [x] [Protocol Encryption](https://css.csail.mit.edu/6.858/2018/projects/bgu-kelvinlu.pdf)
2929
- [x] [uTP](https://github.com/bittorrent/libutp)
30+
- [x] [UPnP](https://upnp.org/specs/gw/UPnP-gw-WANIPConnection-v2-Service.pdf)
3031
- [ ] [IPv6 DHT](https://www.bittorrent.org/beps/bep_0032.html)
3132
- [ ] [STUN (for DHT)?](https://datatracker.ietf.org/doc/html/rfc8489)
3233
- [ ] [Fast Extension](https://www.bittorrent.org/beps/bep_0006.html)

mtorrent-cli/src/bin/dht_node.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ struct Args {
5050

5151
/// Example usage:
5252
/// ```bash
53-
/// ./target/release/dht_node --duration=30
5453
/// ./target/release/dht_node --nodes '"router.bittorrent.com:6881" "dht.transmissionbt.com:6881"' --duration=72
5554
/// ./target/release/dht_node --duration=10 -t "magnet:?xt=urn:btih:1EBD3DBFBB25C1333F51C99C7EE670FC2A1727C9" -o peers.txt
55+
/// ./target/release/dht_node --duration=30 -t "magnet:?xt=urn:btih:1EBD3DBFBB25C1333F51C99C7EE670FC2A1727C9" -i "wlp0s20f3"
5656
/// ```
5757
fn main() -> io::Result<()> {
5858
#[cfg(debug_assertions)]
@@ -67,6 +67,7 @@ fn main() -> io::Result<()> {
6767
.with_threads(false)
6868
.with_level(log::LevelFilter::Info)
6969
.with_module_level("mtorrent_dht", log::LevelFilter::Debug)
70+
.with_module_level("mtorrent_utils", log::LevelFilter::Debug)
7071
.with_module_level("mtorrent::app::dht", log::LevelFilter::Debug)
7172
.with_module_level("dht_node", log::LevelFilter::Debug)
7273
.init()

mtorrent-utils/src/net.rs

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,34 +4,29 @@ use std::hash::{DefaultHasher, Hash, Hasher};
44
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6};
55
use std::{io, ops};
66

7-
/// Get local (non-loopback) IPv4.
8-
#[cfg(target_os = "linux")]
9-
pub fn get_local_addr() -> io::Result<Ipv4Addr> {
10-
let hostname_out = std::process::Command::new("hostname").arg("-I").output()?;
11-
let ipv4_string = String::from_utf8_lossy(&hostname_out.stdout)
12-
.split_once(' ')
13-
.ok_or_else(|| io::Error::other("Unexpected output from 'hostname -I'"))?
14-
.0
15-
.to_string();
16-
ipv4_string.parse::<Ipv4Addr>().map_err(io::Error::other)
7+
#[cfg(not(windows))]
8+
pub(crate) fn get_local_addr(mut predicate: impl FnMut(&IpAddr) -> bool) -> Option<IpAddr> {
9+
sysinfo::Networks::new_with_refreshed_list()
10+
.values()
11+
.flat_map(sysinfo::NetworkData::ip_networks)
12+
.filter_map(|network| predicate(&network.addr).then_some(network.addr))
13+
.next()
1714
}
1815

19-
/// Get local (non-loopback) IPv4.
2016
#[cfg(windows)]
21-
pub fn get_local_addr() -> io::Result<Ipv4Addr> {
22-
// naively uses first connected adapter
23-
ipconfig::get_adapters()
24-
.map_err(io::Error::other)?
17+
pub(crate) fn get_local_addr(predicate: impl FnMut(&IpAddr) -> bool) -> Option<IpAddr> {
18+
// can't use sysinfo on Windows because it doesn't return SW adapters, e.g. loopback
19+
let adapters = ipconfig::get_adapters().ok()?;
20+
adapters
2521
.iter()
2622
.filter(|adapter| matches!(adapter.oper_status(), ipconfig::OperStatus::IfOperStatusUp))
2723
.flat_map(ipconfig::Adapter::ip_addresses)
28-
.find_map(|addr| match addr {
29-
IpAddr::V4(ipv4) => Some(*ipv4),
30-
_ => None,
31-
})
32-
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "IPv4 not found"))
24+
.cloned()
25+
.find(predicate)
3326
}
3427

28+
// ------------------------------------------------------------------------------------------------
29+
3530
#[cfg(windows)]
3631
fn get_adapter_addrs<'a>(
3732
adapters: impl IntoIterator<Item = &'a ipconfig::Adapter>,
@@ -104,10 +99,7 @@ pub fn get_bind_addr_v6(interface: Option<&str>) -> Ipv6Addr {
10499
Ipv6Addr::UNSPECIFIED
105100
}
106101

107-
#[cfg(not(any(target_os = "linux", windows)))]
108-
pub fn get_local_addr() -> io::Result<Ipv4Addr> {
109-
Ok(Ipv4Addr::UNSPECIFIED)
110-
}
102+
// ------------------------------------------------------------------------------------------------
111103

112104
#[doc(hidden)]
113105
pub fn set_so_sndbuf_internal<'s>(socket: impl Into<SockRef<'s>>, value: usize, module: &str) {
@@ -123,6 +115,24 @@ pub fn set_so_rcvbuf_internal<'s>(socket: impl Into<SockRef<'s>>, value: usize,
123115
}
124116
}
125117

118+
/// Set SO_SNDBUF on a socket.
119+
#[macro_export]
120+
macro_rules! set_so_sndbuf {
121+
($sock:expr, $size:expr) => {{
122+
$crate::net::set_so_sndbuf_internal($sock, $size, std::module_path!());
123+
}};
124+
}
125+
126+
/// Set SO_RCVBUF on a socket.
127+
#[macro_export]
128+
macro_rules! set_so_rcvbuf {
129+
($sock:expr, $size:expr) => {{
130+
$crate::net::set_so_rcvbuf_internal($sock, $size, std::module_path!());
131+
}};
132+
}
133+
134+
// ------------------------------------------------------------------------------------------------
135+
126136
/// Bind a socket to a specific network interface. Does nothing on Windows.
127137
#[cfg(target_os = "windows")]
128138
pub fn bind_to_interface<'s>(_socket: impl Into<SockRef<'s>>, _interface: &str) -> io::Result<()> {
@@ -176,21 +186,7 @@ pub fn bind_to_interface<'s>(socket: impl Into<SockRef<'s>>, interface: &str) ->
176186
Ok(())
177187
}
178188

179-
/// Set SO_SNDBUF on a socket.
180-
#[macro_export]
181-
macro_rules! set_so_sndbuf {
182-
($sock:expr, $size:expr) => {{
183-
$crate::net::set_so_sndbuf_internal($sock, $size, std::module_path!());
184-
}};
185-
}
186-
187-
/// Set SO_RCVBUF on a socket.
188-
#[macro_export]
189-
macro_rules! set_so_rcvbuf {
190-
($sock:expr, $size:expr) => {{
191-
$crate::net::set_so_rcvbuf_internal($sock, $size, std::module_path!());
192-
}};
193-
}
189+
// ------------------------------------------------------------------------------------------------
194190

195191
const DYNAMIC_PORT_RANGE: ops::Range<u32> = 49152..65536;
196192

@@ -332,4 +328,15 @@ mod tests {
332328
let addr = get_bind_addr_v6(Some(iface));
333329
assert_eq!(addr, Ipv6Addr::LOCALHOST);
334330
}
331+
332+
#[test]
333+
fn test_local_addr_predicate() {
334+
let addr = get_local_addr(|addr| addr.is_loopback() && addr.is_ipv4());
335+
assert_eq!(addr, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
336+
337+
let addr = get_local_addr(|addr| !addr.is_loopback() && !addr.is_unspecified());
338+
let addr = addr.expect("failed to find non-loopback address");
339+
assert!(!addr.is_loopback());
340+
assert!(!addr.is_unspecified());
341+
}
335342
}

mtorrent-utils/src/upnp.rs

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use crate::net;
12
use igd_next::{SearchOptions, aio};
23
use local_async_utils::prelude::*;
34
use std::mem;
4-
use std::net::SocketAddr;
5+
use std::net::{Ipv4Addr, SocketAddr};
6+
use tokio::time::sleep;
57

68
type AsyncGateway = aio::Gateway<aio::tokio::Tokio>;
79
type BlockingGateway = igd_next::Gateway;
@@ -16,36 +18,50 @@ pub struct PortOpener {
1618
}
1719

1820
impl PortOpener {
19-
const LEASE_DURATION: std::time::Duration = sec!(300);
21+
/// Recommended lease duration from <https://upnp.org/specs/gw/UPnP-gw-WANIPConnection-v2-Service.pdf>.
22+
const LEASE_DURATION_SEC: u32 = 3600;
2023

24+
/// Create a port mapping on the local gateway and return a `PortOpener` that maintains it.
25+
///
26+
/// The mapping will be automatically removed when the `PortOpener` is dropped, but it will not
27+
/// be renewed unless `run_continuous_renewal()` is called.
2128
pub async fn new(
22-
internal_addr: SocketAddr,
2329
proto: PortMappingProtocol,
30+
internal_port: u16,
2431
desired_external_port: Option<u16>,
32+
interface: Option<&str>,
2533
) -> igd_next::Result<Self> {
34+
// get local IP for the specified interface (or any interface)
35+
let internal_ip = if let Some(iface) = interface {
36+
net::get_bind_addr_v4(Some(iface)).into()
37+
} else {
38+
net::get_local_addr(|addr| {
39+
addr.is_ipv4() && !addr.is_loopback() && !addr.is_multicast()
40+
})
41+
.unwrap_or(Ipv4Addr::UNSPECIFIED.into())
42+
};
43+
let internal_addr = SocketAddr::new(internal_ip, internal_port);
44+
45+
// see if the gateway supports UPnP
2646
let gateway = aio::tokio::search_gateway(SearchOptions {
2747
timeout: Some(sec!(5)),
48+
bind_addr: (internal_ip, 0).into(),
2849
..Default::default()
2950
})
3051
.await?;
52+
53+
// create port mapping and get our external IP and port
3154
let public_ip = gateway.get_external_ip().await?;
3255
let public_port = if let Some(desired_port) = desired_external_port {
3356
gateway
34-
.add_port(
35-
proto,
36-
desired_port,
37-
internal_addr,
38-
Self::LEASE_DURATION.as_secs() as u32,
39-
"",
40-
)
57+
.add_port(proto, desired_port, internal_addr, Self::LEASE_DURATION_SEC, "")
4158
.await?;
4259
desired_port
4360
} else {
44-
gateway
45-
.add_any_port(proto, internal_addr, Self::LEASE_DURATION.as_secs() as u32, "")
46-
.await?
61+
gateway.add_any_port(proto, internal_addr, Self::LEASE_DURATION_SEC, "").await?
4762
};
4863
let external_addr = SocketAddr::new(public_ip, public_port);
64+
4965
log::debug!("UPnP: port mapping created ({}:{})", proto, external_addr.port());
5066
Ok(Self {
5167
gateway,
@@ -55,22 +71,30 @@ impl PortOpener {
5571
})
5672
}
5773

74+
/// Get the external socket address that was mapped to the internal port.
5875
pub fn external_ip(&self) -> SocketAddr {
5976
self.external_addr
6077
}
6178

79+
/// Start continuous renewal of the port mapping. The mapping will be automatically removed when
80+
/// the returned future is dropped.
6281
pub async fn run_continuous_renewal(self) -> igd_next::Result<()> {
82+
// renewal interval must be slightly higher than the lease duration because renewing a
83+
// mapping that hasn't expired yet has no effect
84+
let renewal_interval = sec!(Self::LEASE_DURATION_SEC as u64) + millisec!(500);
6385
loop {
64-
tokio::time::sleep(Self::LEASE_DURATION).await;
86+
sleep(renewal_interval).await;
87+
6588
self.gateway
6689
.add_port(
6790
self.proto,
6891
self.external_addr.port(),
6992
self.internal_addr,
70-
Self::LEASE_DURATION.as_secs() as u32,
93+
Self::LEASE_DURATION_SEC,
7194
"",
7295
)
7396
.await?;
97+
7498
log::debug!(
7599
"UPnP: port mapping renewed ({}:{})",
76100
self.proto,
@@ -81,6 +105,8 @@ impl PortOpener {
81105
}
82106

83107
impl Drop for PortOpener {
108+
/// Remove the port mapping when the `PortOpener` is dropped. Note that this is a blocking
109+
/// operation because [`AsyncDrop`](https://doc.rust-lang.org/std/future/trait.AsyncDrop.html) is experimental.
84110
fn drop(&mut self) {
85111
let gateway = BlockingGateway {
86112
addr: self.gateway.addr,
@@ -108,19 +134,16 @@ impl Drop for PortOpener {
108134
#[cfg(test)]
109135
mod tests {
110136
use super::*;
111-
use crate::net;
112137
use log::Level;
113-
use std::net::SocketAddrV4;
114138
use tokio::time;
115139

116140
#[ignore]
117141
#[tokio::test]
118142
async fn test_async_port_opener() {
119143
simple_logger::init_with_level(Level::Debug).unwrap();
120144

121-
let local_ip = net::get_local_addr().unwrap();
122-
let local_internal_ip = SocketAddrV4::new(local_ip, 23015);
123-
let port_opener = PortOpener::new(local_internal_ip.into(), PortMappingProtocol::TCP, None)
145+
let internal_port = 12345;
146+
let port_opener = PortOpener::new(PortMappingProtocol::TCP, internal_port, None, None)
124147
.await
125148
.unwrap_or_else(|e| panic!("Failed to create PortOpener: {e}"));
126149
log::info!("port opener created, external ip: {}", port_opener.external_ip());

mtorrent/src/app/dht.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use mtorrent_dht as dht;
22
use mtorrent_utils::{info_stopwatch, net, upnp, worker};
33
use std::io;
4-
use std::net::{SocketAddr, SocketAddrV4};
4+
use std::net::SocketAddrV4;
55
use std::path::PathBuf;
66
use std::time::Duration;
77
use tokio::net::UdpSocket;
@@ -54,14 +54,13 @@ pub fn launch_dht_node_runtime(cfg: Config) -> io::Result<(worker::rt::Handle, d
5454
Ok((worker_handle, cmd_sender))
5555
}
5656

57-
async fn start_upnp(local_port: u16) -> io::Result<()> {
58-
let local_addr = net::get_local_addr()?;
59-
57+
async fn start_upnp(local_port: u16, interface: Option<&str>) -> io::Result<()> {
6058
// try create a port mapping with the same port number
6159
let port_opener = upnp::PortOpener::new(
62-
SocketAddr::new(local_addr.into(), local_port),
6360
upnp::PortMappingProtocol::UDP,
61+
local_port,
6462
Some(local_port),
63+
interface,
6564
)
6665
.await
6766
.map_err(io::Error::other)?;
@@ -101,14 +100,14 @@ async fn dht_main(
101100
Ok(socket) => socket,
102101
};
103102

104-
if let Some(interface) = bind_interface
105-
&& let Err(e) = net::bind_to_interface(&socket, &interface)
103+
if let Some(interface) = &bind_interface
104+
&& let Err(e) = net::bind_to_interface(&socket, interface)
106105
{
107106
log::error!("Failed to bind DHT UDP socket to interface {interface}: {e}");
108107
return;
109108
}
110109

111-
if use_upnp && let Err(e) = start_upnp(local_port).await {
110+
if use_upnp && let Err(e) = start_upnp(local_port, bind_interface.as_deref()).await {
112111
log::error!("UPnP for DHT failed: {e}");
113112
}
114113

mtorrent/src/app/main.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,17 @@ struct Params {
6161
}
6262

6363
async fn start_upnp(
64-
internal_addr: SocketAddr,
64+
internal_port: u16,
6565
desired_external_port: Option<u16>,
6666
proto: upnp::PortMappingProtocol,
67-
) -> SocketAddr {
68-
let Ok(port_opener) = upnp::PortOpener::new(internal_addr, proto, desired_external_port)
69-
.await
70-
.inspect_err(|e| log::error!("UPnP: {proto:?} port mapping failed: {e}"))
67+
interface: Option<&str>,
68+
) -> u16 {
69+
let Ok(port_opener) =
70+
upnp::PortOpener::new(proto, internal_port, desired_external_port, interface)
71+
.await
72+
.inspect_err(|e| log::error!("UPnP: {proto:?} port mapping failed: {e}"))
7173
else {
72-
return internal_addr;
74+
return internal_port;
7375
};
7476

7577
let external_addr = port_opener.external_ip();
@@ -80,7 +82,7 @@ async fn start_upnp(
8082
log::error!("UPnP: {proto:?} port renewal for PWP failed: {e}");
8183
}
8284
});
83-
external_addr
85+
external_addr.port()
8486
}
8587

8688
/// Download a single torrent given a magnet link or a path to its metainfo file.
@@ -109,12 +111,21 @@ pub async fn single_torrent(
109111
// peers later
110112
let external_pwp_port = if cfg.use_upnp {
111113
let _g = ctx.pwp_runtime.enter();
112-
let internal_addr = (net::get_local_addr()?, listener_port).into();
113-
let (_public_pwp_addr, public_utp_addr) = join!(
114-
start_upnp(internal_addr, cfg.pwp_port, upnp::PortMappingProtocol::TCP),
115-
start_upnp(internal_addr, cfg.pwp_port, upnp::PortMappingProtocol::UDP),
114+
let (_public_pwp_port, public_utp_port) = join!(
115+
start_upnp(
116+
listener_port,
117+
cfg.pwp_port,
118+
upnp::PortMappingProtocol::TCP,
119+
cfg.bind_interface.as_deref()
120+
),
121+
start_upnp(
122+
listener_port,
123+
cfg.pwp_port,
124+
upnp::PortMappingProtocol::UDP,
125+
cfg.bind_interface.as_deref()
126+
),
116127
);
117-
public_utp_addr.port()
128+
public_utp_port
118129
} else {
119130
listener_port
120131
};

0 commit comments

Comments
 (0)