diff --git a/iroh/src/address_lookup/mdns.rs b/iroh/src/address_lookup/mdns.rs index 6d29c06c0b6..fe08c7df7bd 100644 --- a/iroh/src/address_lookup/mdns.rs +++ b/iroh/src/address_lookup/mdns.rs @@ -59,8 +59,8 @@ //! [`AddrFilter`]: crate::address_lookup::AddrFilter //! [`RelayUrl`]: iroh_base::RelayUrl use std::{ - collections::{BTreeSet, HashMap}, - net::{IpAddr, SocketAddr}, + collections::{BTreeSet, HashMap, HashSet}, + net::{IpAddr, Ipv4Addr, SocketAddr}, str::FromStr, sync::Arc, }; @@ -103,6 +103,9 @@ const USER_DATA_ATTRIBUTE: &str = "user-data"; /// How long we will wait before we stop attempting to resolve an endpoint ID to an address. const LOOKUP_DURATION: Duration = Duration::from_secs(10); +/// Duration of time between interface updates +const INTERFACE_UPDATE_DURATION: Duration = Duration::from_secs(5); + /// The key of the attribute under which the `RelayUrl` is stored in /// the TXT record supported by swarm-discovery. const RELAY_URL_ATTRIBUTE: &str = "relay"; @@ -112,6 +115,8 @@ const RELAY_URL_ATTRIBUTE: &str = "relay"; pub struct MdnsAddressLookup { #[allow(dead_code)] handle: Arc>, + #[allow(dead_code)] + interface_watcher_handle: Arc>, sender: mpsc::Sender, advertise: bool, /// When `local_addrs` changes, we re-publish our info. @@ -293,6 +298,13 @@ impl MdnsAddressLookup { &rt, )?; + let address_lookup_clone = Arc::clone(&address_lookup); + let interface_watcher = + async move { MdnsAddressLookup::spawn_interface_watcher(address_lookup_clone).await }; + let interface_watcher_handle = task::spawn( + interface_watcher.instrument(info_span!("swarm-discovery.multi-interface.actor")), + ); + let local_addrs: Watchable> = Watchable::default(); let mut addrs_change = local_addrs.watch(); let address_lookup_fut = async move { @@ -461,6 +473,7 @@ impl MdnsAddressLookup { task::spawn(address_lookup_fut.instrument(info_span!("swarm-discovery.actor"))); Ok(Self { handle: Arc::new(AbortOnDropHandle::new(handle)), + interface_watcher_handle: Arc::new(AbortOnDropHandle::new(interface_watcher_handle)), sender: send, advertise, local_addrs, @@ -485,7 +498,7 @@ impl MdnsAddressLookup { socketaddrs: BTreeSet, service_name: String, rt: &tokio::runtime::Handle, - ) -> Result { + ) -> Result, AddressLookupBuilderError> { let spawn_rt = rt.clone(); let callback = move |endpoint_id: &str, peer: &Peer| { trace!(endpoint_id, ?peer, "Received peer information from Mdns"); @@ -500,6 +513,7 @@ impl MdnsAddressLookup { .ok(); }); }; + let endpoint_id_str = data_encoding::BASE32_NOPAD .encode(endpoint_id.as_bytes()) .to_ascii_lowercase(); @@ -512,9 +526,35 @@ impl MdnsAddressLookup { discoverer = discoverer.with_addrs(addr.0, addr.1); } } - discoverer - .spawn(rt) - .map_err(|e| AddressLookupBuilderError::from_err("mdns", e)) + + // Spawn discoverer + Ok(Arc::new(discoverer.spawn(rt).map_err(|e| { + AddressLookupBuilderError::from_err("mdns", e) + })?)) + } + + async fn spawn_interface_watcher(guard: Arc) { + let mut known_interfaces: HashSet = HashSet::new(); + + loop { + let current: HashSet = interfaces_v4().await.into_iter().collect(); + + // Add new interfaces + for &addr in current.difference(&known_interfaces) { + trace!(%addr, "adding multicast interface"); + guard.add_interface_v4(addr); + } + + // Remove gone interfaces + for &addr in known_interfaces.difference(¤t) { + trace!(%addr, "removing multicast interface"); + guard.remove_interface_v4(addr); + } + + known_interfaces = current; + + time::sleep(INTERFACE_UPDATE_DURATION).await; + } } fn socketaddrs_to_addrs<'a>( @@ -531,6 +571,26 @@ impl MdnsAddressLookup { } } +async fn interfaces_v4() -> Vec { + // Load current network interfaces state + let interfaces = netwatch::interfaces::State::new().await; + + // Get the interface ip v4 addresses + interfaces + .interfaces + .into_iter() + .filter_map(|(_, iface)| { + iface + .addrs() + .find(|i| i.addr().is_ipv4()) + .and_then(|ipnet| match ipnet.addr() { + std::net::IpAddr::V4(v4) => Some(v4), + _ => None, + }) + }) + .collect() +} + fn peer_to_discovery_item(peer: &Peer, endpoint_id: &EndpointId) -> AddressLookupItem { let ip_addrs: BTreeSet = peer .addrs() @@ -619,9 +679,9 @@ mod tests { let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64); // Create Address LookupA with advertise=false (only listens) - let (_, address_lookup_a) = make_address_lookup(&mut rng, false)?; + let (_, address_lookup_a) = make_address_lookup(&mut rng, false, "test-resolve")?; // Create Address LookupB with advertise=true (will broadcast) - let (endpoint_id_b, address_lookup_b) = make_address_lookup(&mut rng, true)?; + let (endpoint_id_b, address_lookup_b) = make_address_lookup(&mut rng, true, "test-resolve")?; // make addr info for discoverer b let user_data: UserData = "foobar".parse()?; @@ -682,8 +742,8 @@ mod tests { #[traced_test] async fn mdns_publish_expire() -> Result { let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64); - let (_, address_lookup_a) = make_address_lookup(&mut rng, false)?; - let (endpoint_id_b, address_lookup_b) = make_address_lookup(&mut rng, true)?; + let (_, address_lookup_a) = make_address_lookup(&mut rng, false, "test-expire")?; + let (endpoint_id_b, address_lookup_b) = make_address_lookup(&mut rng, true, "test-expire")?; // publish address_lookup_b's address let endpoint_data = @@ -744,12 +804,12 @@ mod tests { let mut endpoint_ids = BTreeSet::new(); let mut address_lookup_list = vec![]; - let (_, address_lookup) = make_address_lookup(&mut rng, false)?; + let (_, address_lookup) = make_address_lookup(&mut rng, false, "test-subscribe")?; let endpoint_data = EndpointData::new([TransportAddr::Ip("0.0.0.0:11111".parse().unwrap())]); for i in 0..num_endpoints { - let (endpoint_id, address_lookup) = make_address_lookup(&mut rng, true)?; + let (endpoint_id, address_lookup) = make_address_lookup(&mut rng, true, "test-subscribe")?; let user_data: UserData = format!("endpoint{i}").parse()?; let endpoint_data = endpoint_data .clone() @@ -764,24 +824,29 @@ mod tests { let test = async move { let mut got_ids = BTreeSet::new(); while got_ids.len() != num_endpoints { - if let Some(DiscoveryEvent::Discovered { endpoint_info, .. }) = - events.next().await - { - let data = endpoint_info.data.user_data().cloned(); - if endpoint_ids.contains(&(endpoint_info.endpoint_id, data.clone())) { - got_ids.insert((endpoint_info.endpoint_id, data)); + match events.next().await { + Some(DiscoveryEvent::Discovered { endpoint_info, .. }) => { + let data = endpoint_info.data.user_data().cloned(); + if endpoint_ids.contains(&(endpoint_info.endpoint_id, data.clone())) { + got_ids.insert((endpoint_info.endpoint_id, data)); + } + } + Some(DiscoveryEvent::Expired { .. }) => { + // Ignore expiry events + continue; + } + None => { + bail_any!( + "no more events, only got {} ids, expected {num_endpoints}\n", + got_ids.len() + ); } - } else { - bail_any!( - "no more events, only got {} ids, expected {num_endpoints}\n", - got_ids.len() - ); } } assert_eq!(got_ids, endpoint_ids); Ok::<_, Error>(()) }; - tokio::time::timeout(Duration::from_secs(5), test) + tokio::time::timeout(Duration::from_secs(30), test) .await .std_context("timeout")? } @@ -791,10 +856,10 @@ mod tests { async fn non_advertising_endpoint_not_discovered() -> Result { let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64); - let (_, address_lookup_a) = make_address_lookup(&mut rng, false)?; - let (endpoint_id_b, address_lookup_b) = make_address_lookup(&mut rng, false)?; + let (_, address_lookup_a) = make_address_lookup(&mut rng, false, "test-noadvert")?; + let (endpoint_id_b, address_lookup_b) = make_address_lookup(&mut rng, false, "test-noadvert")?; - let (endpoint_id_c, address_lookup_c) = make_address_lookup(&mut rng, true)?; + let (endpoint_id_c, address_lookup_c) = make_address_lookup(&mut rng, true, "test-noadvert")?; let endpoint_data_c = EndpointData::new([TransportAddr::Ip("0.0.0.0:22222".parse().unwrap())]); address_lookup_c.publish(&endpoint_data_c); @@ -804,14 +869,14 @@ mod tests { address_lookup_b.publish(&endpoint_data_b); let mut stream_c = address_lookup_a.resolve(endpoint_id_c).unwrap(); - let result_c = tokio::time::timeout(Duration::from_secs(2), stream_c.next()).await; + let result_c = tokio::time::timeout(Duration::from_secs(5), stream_c.next()).await; assert!( result_c.is_ok(), "Advertising endpoint should be discoverable" ); let mut stream_b = address_lookup_a.resolve(endpoint_id_b).unwrap(); - let result_b = tokio::time::timeout(Duration::from_secs(2), stream_b.next()).await; + let result_b = tokio::time::timeout(Duration::from_secs(5), stream_b.next()).await; assert!( result_b.is_err(), "Expected timeout since endpoint b isn't advertising" @@ -857,21 +922,21 @@ mod tests { address_lookup_c.publish(&endpoint_data_c); let mut stream_a = address_lookup_a.resolve(id_b).unwrap(); - let result_a = tokio::time::timeout(Duration::from_secs(2), stream_a.next()).await; + let result_a = tokio::time::timeout(Duration::from_secs(5), stream_a.next()).await; assert!( result_a.is_err(), "Endpoint on a different service should NOT be discoverable" ); let mut stream_b = address_lookup_b.resolve(id_c).unwrap(); - let result_b = tokio::time::timeout(Duration::from_secs(2), stream_b.next()).await; + let result_b = tokio::time::timeout(Duration::from_secs(5), stream_b.next()).await; assert!( result_b.is_ok(), "Endpoint on the same service should be discoverable" ); let mut stream_b = address_lookup_b.resolve(id_a).unwrap(); - let result_b = tokio::time::timeout(Duration::from_secs(2), stream_b.next()).await; + let result_b = tokio::time::timeout(Duration::from_secs(5), stream_b.next()).await; assert!( result_b.is_err(), "Endpoint on a different service should NOT be discoverable" @@ -886,10 +951,10 @@ mod tests { let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64); // Create an mdns address lookup A that only listens - let (_, mdns_a) = make_address_lookup(&mut rng, false)?; + let (_, mdns_a) = make_address_lookup(&mut rng, false, "test-relay")?; // Create an mdns address lookup B that includes a relay url for publishing - let (endpoint_id_b, mdns_b) = make_address_lookup(&mut rng, true)?; + let (endpoint_id_b, mdns_b) = make_address_lookup(&mut rng, true, "test-relay")?; let relay_url: iroh_base::RelayUrl = "https://relay.example.com".parse().unwrap(); let endpoint_data = EndpointData::new([TransportAddr::Ip("0.0.0.0:11111".parse().unwrap())]) @@ -908,7 +973,7 @@ mod tests { // Wait for discovery let DiscoveryEvent::Discovered { endpoint_info, .. } = - tokio::time::timeout(Duration::from_secs(2), events.next()) + tokio::time::timeout(Duration::from_secs(15), events.next()) .await .std_context("timeout")? .unwrap() @@ -927,12 +992,14 @@ mod tests { fn make_address_lookup( rng: &mut R, advertise: bool, + service_name: &str, ) -> Result<(PublicKey, MdnsAddressLookup)> { let endpoint_id = SecretKey::generate(rng).public(); Ok(( endpoint_id, MdnsAddressLookup::builder() .advertise(advertise) + .service_name(service_name) .build(endpoint_id)?, )) }