Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 102 additions & 35 deletions iroh/src/address_lookup/mdns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
//! [`AddrFilter`]: crate::address_lookup::AddrFilter
//! [`RelayUrl`]: iroh_base::RelayUrl
use std::{
collections::{BTreeSet, HashMap},
net::{IpAddr, SocketAddr},
collections::{BTreeSet, HashMap, HashSet},
net::{IpAddr, Ipv4Addr, SocketAddr},
str::FromStr,
sync::Arc,
};
Expand Down Expand Up @@ -103,6 +103,9 @@ const USER_DATA_ATTRIBUTE: &str = "user-data";
/// How long we will wait before we stop attempting to resolve an endpoint ID to an address.
const LOOKUP_DURATION: Duration = Duration::from_secs(10);

/// Duration of time between interface updates
const INTERFACE_UPDATE_DURATION: Duration = Duration::from_secs(5);

/// The key of the attribute under which the `RelayUrl` is stored in
/// the TXT record supported by swarm-discovery.
const RELAY_URL_ATTRIBUTE: &str = "relay";
Expand All @@ -112,6 +115,8 @@ const RELAY_URL_ATTRIBUTE: &str = "relay";
pub struct MdnsAddressLookup {
#[allow(dead_code)]
handle: Arc<AbortOnDropHandle<()>>,
#[allow(dead_code)]
interface_watcher_handle: Arc<AbortOnDropHandle<()>>,
sender: mpsc::Sender<Message>,
advertise: bool,
/// When `local_addrs` changes, we re-publish our info.
Expand Down Expand Up @@ -293,6 +298,13 @@ impl MdnsAddressLookup {
&rt,
)?;

let address_lookup_clone = Arc::clone(&address_lookup);
let interface_watcher =
async move { MdnsAddressLookup::spawn_interface_watcher(address_lookup_clone).await };
let interface_watcher_handle = task::spawn(
interface_watcher.instrument(info_span!("swarm-discovery.multi-interface.actor")),
);

let local_addrs: Watchable<Option<EndpointData>> = Watchable::default();
let mut addrs_change = local_addrs.watch();
let address_lookup_fut = async move {
Expand Down Expand Up @@ -461,6 +473,7 @@ impl MdnsAddressLookup {
task::spawn(address_lookup_fut.instrument(info_span!("swarm-discovery.actor")));
Ok(Self {
handle: Arc::new(AbortOnDropHandle::new(handle)),
interface_watcher_handle: Arc::new(AbortOnDropHandle::new(interface_watcher_handle)),
sender: send,
advertise,
local_addrs,
Expand All @@ -485,7 +498,7 @@ impl MdnsAddressLookup {
socketaddrs: BTreeSet<SocketAddr>,
service_name: String,
rt: &tokio::runtime::Handle,
) -> Result<DropGuard, AddressLookupBuilderError> {
) -> Result<Arc<DropGuard>, AddressLookupBuilderError> {
let spawn_rt = rt.clone();
let callback = move |endpoint_id: &str, peer: &Peer| {
trace!(endpoint_id, ?peer, "Received peer information from Mdns");
Expand All @@ -500,6 +513,7 @@ impl MdnsAddressLookup {
.ok();
});
};

let endpoint_id_str = data_encoding::BASE32_NOPAD
.encode(endpoint_id.as_bytes())
.to_ascii_lowercase();
Expand All @@ -512,9 +526,35 @@ impl MdnsAddressLookup {
discoverer = discoverer.with_addrs(addr.0, addr.1);
}
}
discoverer
.spawn(rt)
.map_err(|e| AddressLookupBuilderError::from_err("mdns", e))

// Spawn discoverer
Ok(Arc::new(discoverer.spawn(rt).map_err(|e| {
AddressLookupBuilderError::from_err("mdns", e)
})?))
}

async fn spawn_interface_watcher(guard: Arc<DropGuard>) {
let mut known_interfaces: HashSet<Ipv4Addr> = HashSet::new();

loop {
let current: HashSet<Ipv4Addr> = interfaces_v4().await.into_iter().collect();

// Add new interfaces
for &addr in current.difference(&known_interfaces) {
trace!(%addr, "adding multicast interface");
guard.add_interface_v4(addr);
}

// Remove gone interfaces
for &addr in known_interfaces.difference(&current) {
trace!(%addr, "removing multicast interface");
guard.remove_interface_v4(addr);
}

known_interfaces = current;

time::sleep(INTERFACE_UPDATE_DURATION).await;
}
}

fn socketaddrs_to_addrs<'a>(
Expand All @@ -531,6 +571,26 @@ impl MdnsAddressLookup {
}
}

async fn interfaces_v4() -> Vec<Ipv4Addr> {
// Load current network interfaces state
let interfaces = netwatch::interfaces::State::new().await;

// Get the interface ip v4 addresses
interfaces
.interfaces
.into_iter()
.filter_map(|(_, iface)| {
iface
.addrs()
.find(|i| i.addr().is_ipv4())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this basically filter_map?

.and_then(|ipnet| match ipnet.addr() {
std::net::IpAddr::V4(v4) => Some(v4),
_ => None,
})
})
.collect()
}

fn peer_to_discovery_item(peer: &Peer, endpoint_id: &EndpointId) -> AddressLookupItem {
let ip_addrs: BTreeSet<SocketAddr> = peer
.addrs()
Expand Down Expand Up @@ -619,9 +679,9 @@ mod tests {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);

// Create Address LookupA with advertise=false (only listens)
let (_, address_lookup_a) = make_address_lookup(&mut rng, false)?;
let (_, address_lookup_a) = make_address_lookup(&mut rng, false, "test-resolve")?;
// Create Address LookupB with advertise=true (will broadcast)
let (endpoint_id_b, address_lookup_b) = make_address_lookup(&mut rng, true)?;
let (endpoint_id_b, address_lookup_b) = make_address_lookup(&mut rng, true, "test-resolve")?;

// make addr info for discoverer b
let user_data: UserData = "foobar".parse()?;
Expand Down Expand Up @@ -682,8 +742,8 @@ mod tests {
#[traced_test]
async fn mdns_publish_expire() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let (_, address_lookup_a) = make_address_lookup(&mut rng, false)?;
let (endpoint_id_b, address_lookup_b) = make_address_lookup(&mut rng, true)?;
let (_, address_lookup_a) = make_address_lookup(&mut rng, false, "test-expire")?;
let (endpoint_id_b, address_lookup_b) = make_address_lookup(&mut rng, true, "test-expire")?;

// publish address_lookup_b's address
let endpoint_data =
Expand Down Expand Up @@ -744,12 +804,12 @@ mod tests {
let mut endpoint_ids = BTreeSet::new();
let mut address_lookup_list = vec![];

let (_, address_lookup) = make_address_lookup(&mut rng, false)?;
let (_, address_lookup) = make_address_lookup(&mut rng, false, "test-subscribe")?;
let endpoint_data =
EndpointData::new([TransportAddr::Ip("0.0.0.0:11111".parse().unwrap())]);

for i in 0..num_endpoints {
let (endpoint_id, address_lookup) = make_address_lookup(&mut rng, true)?;
let (endpoint_id, address_lookup) = make_address_lookup(&mut rng, true, "test-subscribe")?;
let user_data: UserData = format!("endpoint{i}").parse()?;
let endpoint_data = endpoint_data
.clone()
Expand All @@ -764,24 +824,29 @@ mod tests {
let test = async move {
let mut got_ids = BTreeSet::new();
while got_ids.len() != num_endpoints {
if let Some(DiscoveryEvent::Discovered { endpoint_info, .. }) =
events.next().await
{
let data = endpoint_info.data.user_data().cloned();
if endpoint_ids.contains(&(endpoint_info.endpoint_id, data.clone())) {
got_ids.insert((endpoint_info.endpoint_id, data));
match events.next().await {
Some(DiscoveryEvent::Discovered { endpoint_info, .. }) => {
let data = endpoint_info.data.user_data().cloned();
if endpoint_ids.contains(&(endpoint_info.endpoint_id, data.clone())) {
got_ids.insert((endpoint_info.endpoint_id, data));
}
}
Some(DiscoveryEvent::Expired { .. }) => {
// Ignore expiry events
continue;
}
None => {
bail_any!(
"no more events, only got {} ids, expected {num_endpoints}\n",
got_ids.len()
);
}
} else {
bail_any!(
"no more events, only got {} ids, expected {num_endpoints}\n",
got_ids.len()
);
}
}
assert_eq!(got_ids, endpoint_ids);
Ok::<_, Error>(())
};
tokio::time::timeout(Duration::from_secs(5), test)
tokio::time::timeout(Duration::from_secs(30), test)
.await
.std_context("timeout")?
}
Expand All @@ -791,10 +856,10 @@ mod tests {
async fn non_advertising_endpoint_not_discovered() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);

let (_, address_lookup_a) = make_address_lookup(&mut rng, false)?;
let (endpoint_id_b, address_lookup_b) = make_address_lookup(&mut rng, false)?;
let (_, address_lookup_a) = make_address_lookup(&mut rng, false, "test-noadvert")?;
let (endpoint_id_b, address_lookup_b) = make_address_lookup(&mut rng, false, "test-noadvert")?;

let (endpoint_id_c, address_lookup_c) = make_address_lookup(&mut rng, true)?;
let (endpoint_id_c, address_lookup_c) = make_address_lookup(&mut rng, true, "test-noadvert")?;
let endpoint_data_c =
EndpointData::new([TransportAddr::Ip("0.0.0.0:22222".parse().unwrap())]);
address_lookup_c.publish(&endpoint_data_c);
Expand All @@ -804,14 +869,14 @@ mod tests {
address_lookup_b.publish(&endpoint_data_b);

let mut stream_c = address_lookup_a.resolve(endpoint_id_c).unwrap();
let result_c = tokio::time::timeout(Duration::from_secs(2), stream_c.next()).await;
let result_c = tokio::time::timeout(Duration::from_secs(5), stream_c.next()).await;
assert!(
result_c.is_ok(),
"Advertising endpoint should be discoverable"
);

let mut stream_b = address_lookup_a.resolve(endpoint_id_b).unwrap();
let result_b = tokio::time::timeout(Duration::from_secs(2), stream_b.next()).await;
let result_b = tokio::time::timeout(Duration::from_secs(5), stream_b.next()).await;
assert!(
result_b.is_err(),
"Expected timeout since endpoint b isn't advertising"
Expand Down Expand Up @@ -857,21 +922,21 @@ mod tests {
address_lookup_c.publish(&endpoint_data_c);

let mut stream_a = address_lookup_a.resolve(id_b).unwrap();
let result_a = tokio::time::timeout(Duration::from_secs(2), stream_a.next()).await;
let result_a = tokio::time::timeout(Duration::from_secs(5), stream_a.next()).await;
assert!(
result_a.is_err(),
"Endpoint on a different service should NOT be discoverable"
);

let mut stream_b = address_lookup_b.resolve(id_c).unwrap();
let result_b = tokio::time::timeout(Duration::from_secs(2), stream_b.next()).await;
let result_b = tokio::time::timeout(Duration::from_secs(5), stream_b.next()).await;
assert!(
result_b.is_ok(),
"Endpoint on the same service should be discoverable"
);

let mut stream_b = address_lookup_b.resolve(id_a).unwrap();
let result_b = tokio::time::timeout(Duration::from_secs(2), stream_b.next()).await;
let result_b = tokio::time::timeout(Duration::from_secs(5), stream_b.next()).await;
assert!(
result_b.is_err(),
"Endpoint on a different service should NOT be discoverable"
Expand All @@ -886,10 +951,10 @@ mod tests {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);

// Create an mdns address lookup A that only listens
let (_, mdns_a) = make_address_lookup(&mut rng, false)?;
let (_, mdns_a) = make_address_lookup(&mut rng, false, "test-relay")?;

// Create an mdns address lookup B that includes a relay url for publishing
let (endpoint_id_b, mdns_b) = make_address_lookup(&mut rng, true)?;
let (endpoint_id_b, mdns_b) = make_address_lookup(&mut rng, true, "test-relay")?;
let relay_url: iroh_base::RelayUrl = "https://relay.example.com".parse().unwrap();
let endpoint_data =
EndpointData::new([TransportAddr::Ip("0.0.0.0:11111".parse().unwrap())])
Expand All @@ -908,7 +973,7 @@ mod tests {

// Wait for discovery
let DiscoveryEvent::Discovered { endpoint_info, .. } =
tokio::time::timeout(Duration::from_secs(2), events.next())
tokio::time::timeout(Duration::from_secs(15), events.next())
.await
.std_context("timeout")?
.unwrap()
Expand All @@ -927,12 +992,14 @@ mod tests {
fn make_address_lookup<R: CryptoRng + ?Sized>(
rng: &mut R,
advertise: bool,
service_name: &str,
) -> Result<(PublicKey, MdnsAddressLookup)> {
let endpoint_id = SecretKey::generate(rng).public();
Ok((
endpoint_id,
MdnsAddressLookup::builder()
.advertise(advertise)
.service_name(service_name)
.build(endpoint_id)?,
))
}
Expand Down
Loading