diff --git a/Cargo.lock b/Cargo.lock index c7c1a505..b9250e54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1101,6 +1101,17 @@ dependencies = [ "web-time", ] +[[package]] +name = "n0-watcher" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "813443be5d70d9ed05ad04577f0db9fa1d655336911d2ac271b63eb2da363ea1" +dependencies = [ + "derive_more", + "n0-future", + "thiserror 2.0.12", +] + [[package]] name = "nested_enum_utils" version = "0.2.2" @@ -1221,6 +1232,7 @@ dependencies = [ "js-sys", "libc", "n0-future", + "n0-watcher", "nested_enum_utils", "netdev", "netlink-packet-core", diff --git a/netwatch/Cargo.toml b/netwatch/Cargo.toml index 2925b874..5c3cd6ac 100644 --- a/netwatch/Cargo.toml +++ b/netwatch/Cargo.toml @@ -19,6 +19,7 @@ workspace = true atomic-waker = "1.1.2" bytes = "1.7" n0-future = "0.1.3" +n0-watcher = "0.1" nested_enum_utils = "0.2.0" snafu = "0.8.5" time = "0.3.20" diff --git a/netwatch/src/interfaces.rs b/netwatch/src/interfaces.rs index 5b0613ae..d4a8ed28 100644 --- a/netwatch/src/interfaces.rs +++ b/netwatch/src/interfaces.rs @@ -29,10 +29,14 @@ use self::bsd::default_route; use self::linux::default_route; #[cfg(target_os = "windows")] use self::windows::default_route; +#[cfg(not(wasm_browser))] +use crate::ip::is_link_local; use crate::ip::{is_private_v6, is_up}; +#[cfg(not(wasm_browser))] +use crate::netmon::is_interesting_interface; /// Represents a network interface. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Interface { iface: netdev::interface::Interface, } @@ -61,12 +65,12 @@ impl Eq for Interface {} impl Interface { /// Is this interface up? - pub(crate) fn is_up(&self) -> bool { + pub fn is_up(&self) -> bool { is_up(&self.iface) } /// The name of the interface. - pub(crate) fn name(&self) -> &str { + pub fn name(&self) -> &str { &self.iface.name } @@ -153,7 +157,7 @@ impl IpNet { /// Intended to store the state of the machine's network interfaces, routing table, and /// other network configuration. For now it's pretty basic. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct State { /// Maps from an interface name interface. pub interfaces: HashMap, @@ -165,22 +169,16 @@ pub struct State { /// Whether the machine has some non-localhost, non-link-local IPv4 address. pub have_v4: bool, - //// Whether the current network interface is considered "expensive", which currently means LTE/etc + /// Whether the current network interface is considered "expensive", which currently means LTE/etc /// instead of Wifi. This field is not populated by `get_state`. - pub(crate) is_expensive: bool, + pub is_expensive: bool, /// The interface name for the machine's default route. /// /// It is not yet populated on all OSes. /// /// When set, its value is the map key into `interface` and `interface_ips`. - pub(crate) default_route_interface: Option, - - /// The HTTP proxy to use, if any. - pub(crate) http_proxy: Option, - - /// The URL to the Proxy Autoconfig URL, if applicable. - pub(crate) pac: Option, + pub default_route_interface: Option, } impl fmt::Display for State { @@ -241,8 +239,6 @@ impl State { have_v6, is_expensive: false, default_route_interface, - http_proxy: None, - pac: None, } } @@ -258,10 +254,42 @@ impl State { have_v4: true, is_expensive: false, default_route_interface: Some(ifname), - http_proxy: None, - pac: None, } } + + /// Is this a major change compared to the `old` one?. + #[cfg(wasm_browser)] + pub fn is_major_change(&self, old: &State) -> bool { + // All changes are major. + // In the browser, there only are changes from online to offline + self != old + } + + /// Is this a major change compared to the `old` one?. + #[cfg(not(wasm_browser))] + pub fn is_major_change(&self, old: &State) -> bool { + if self.have_v6 != old.have_v6 + || self.have_v4 != old.have_v4 + || self.is_expensive != old.is_expensive + || self.default_route_interface != old.default_route_interface + { + return true; + } + + for (iname, i) in &old.interfaces { + if !is_interesting_interface(i.name()) { + continue; + } + let Some(i2) = self.interfaces.get(iname) else { + return true; + }; + if i != i2 || !prefixes_major_equal(i.addrs(), i2.addrs()) { + return true; + } + } + + false + } } /// Reports whether ip is a usable IPv4 address which should have Internet connectivity. @@ -373,6 +401,30 @@ impl HomeRouter { } } +/// Checks whether `a` and `b` are equal after ignoring uninteresting +/// things, like link-local, loopback and multicast addresses. +#[cfg(not(wasm_browser))] +fn prefixes_major_equal(a: impl Iterator, b: impl Iterator) -> bool { + fn is_interesting(p: &IpNet) -> bool { + let a = p.addr(); + if is_link_local(a) || a.is_loopback() || a.is_multicast() { + return false; + } + true + } + + let a = a.filter(is_interesting); + let b = b.filter(is_interesting); + + for (a, b) in a.zip(b) { + if a != b { + return false; + } + } + + true +} + #[cfg(test)] mod tests { use std::net::Ipv6Addr; diff --git a/netwatch/src/interfaces/wasm_browser.rs b/netwatch/src/interfaces/wasm_browser.rs index 190431b0..38c3440d 100644 --- a/netwatch/src/interfaces/wasm_browser.rs +++ b/netwatch/src/interfaces/wasm_browser.rs @@ -5,7 +5,7 @@ use js_sys::{JsString, Reflect}; pub const BROWSER_INTERFACE: &str = "browserif"; /// Represents a network interface. -#[derive(Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct Interface { is_up: bool, } @@ -45,7 +45,7 @@ impl Interface { /// Intended to store the state of the machine's network interfaces, routing table, and /// other network configuration. For now it's pretty basic. -#[derive(Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct State { /// Maps from an interface name interface. pub interfaces: HashMap, diff --git a/netwatch/src/netmon.rs b/netwatch/src/netmon.rs index 246fe2a6..e87975c6 100644 --- a/netwatch/src/netmon.rs +++ b/netwatch/src/netmon.rs @@ -1,9 +1,7 @@ //! Monitoring of networking interfaces and route changes. -use n0_future::{ - boxed::BoxFuture, - task::{self, AbortOnDropHandle}, -}; +use n0_future::task::{self, AbortOnDropHandle}; +use n0_watcher::Watchable; use nested_enum_utils::common_fields; use snafu::{Backtrace, ResultExt, Snafu}; use tokio::sync::{mpsc, oneshot}; @@ -26,8 +24,10 @@ mod wasm_browser; #[cfg(target_os = "windows")] mod windows; -pub use self::actor::CallbackToken; +#[cfg(not(wasm_browser))] +pub(crate) use self::actor::is_interesting_interface; use self::actor::{Actor, ActorMessage}; +pub use crate::interfaces::State; /// Monitors networking interface and route changes. #[derive(Debug)] @@ -35,6 +35,7 @@ pub struct Monitor { /// Task handle for the monitor task. _handle: AbortOnDropHandle<()>, actor_tx: mpsc::Sender, + interface_state: Watchable, } #[common_fields({ @@ -66,6 +67,7 @@ impl Monitor { pub async fn new() -> Result { let actor = Actor::new().await.context(ActorSnafu)?; let actor_tx = actor.subscribe(); + let interface_state = actor.state().clone(); let handle = task::spawn(async move { actor.run().await; @@ -74,30 +76,13 @@ impl Monitor { Ok(Monitor { _handle: AbortOnDropHandle::new(handle), actor_tx, + interface_state, }) } /// Subscribe to network changes. - pub async fn subscribe(&self, callback: F) -> Result - where - F: Fn(bool) -> BoxFuture<()> + 'static + Sync + Send, - { - let (s, r) = oneshot::channel(); - self.actor_tx - .send(ActorMessage::Subscribe(Box::new(callback), s)) - .await?; - let token = r.await?; - Ok(token) - } - - /// Unsubscribe a callback from network changes, using the provided token. - pub async fn unsubscribe(&self, token: CallbackToken) -> Result<(), Error> { - let (s, r) = oneshot::channel(); - self.actor_tx - .send(ActorMessage::Unsubscribe(token, s)) - .await?; - r.await?; - Ok(()) + pub fn interface_state(&self) -> n0_watcher::Direct { + self.interface_state.watch() } /// Potential change detected outside @@ -109,23 +94,16 @@ impl Monitor { #[cfg(test)] mod tests { - use n0_future::future::FutureExt; + use n0_watcher::Watcher as _; use super::*; #[tokio::test] async fn test_smoke_monitor() { let mon = Monitor::new().await.unwrap(); - let _token = mon - .subscribe(|is_major| { - async move { - println!("CHANGE DETECTED: {}", is_major); - } - .boxed() - }) - .await - .unwrap(); - - tokio::time::sleep(std::time::Duration::from_secs(15)).await; + let sub = mon.interface_state(); + + let current = sub.get().unwrap(); + println!("current state: {}", current); } } diff --git a/netwatch/src/netmon/actor.rs b/netwatch/src/netmon/actor.rs index bd5743ce..8654c289 100644 --- a/netwatch/src/netmon/actor.rs +++ b/netwatch/src/netmon/actor.rs @@ -1,15 +1,10 @@ -use std::{collections::HashMap, sync::Arc}; - -use n0_future::{ - boxed::BoxFuture, - task, - time::{self, Duration, Instant}, -}; +use n0_future::time::{self, Duration, Instant}; +use n0_watcher::Watchable; #[cfg(not(wasm_browser))] -use os::is_interesting_interface; +pub(crate) use os::is_interesting_interface; pub(super) use os::Error; use os::RouteMonitor; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use tracing::{debug, trace}; #[cfg(target_os = "android")] @@ -29,8 +24,6 @@ use super::wasm_browser as os; #[cfg(target_os = "windows")] use super::windows as os; use crate::interfaces::State; -#[cfg(not(wasm_browser))] -use crate::{interfaces::IpNet, ip::is_link_local}; /// The message sent by the OS specific monitors. #[derive(Debug, Copy, Clone)] @@ -52,7 +45,7 @@ const ACTOR_CHAN_CAPACITY: usize = 16; pub(super) struct Actor { /// Latest known interface state. - interface_state: State, + interface_state: Watchable, /// Latest observed wall time. wall_time: Instant, /// OS specific monitor. @@ -61,21 +54,9 @@ pub(super) struct Actor { mon_receiver: mpsc::Receiver, actor_receiver: mpsc::Receiver, actor_sender: mpsc::Sender, - /// Callback registry. - callbacks: HashMap>, - callback_token: u64, } -/// Token to remove a callback -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub struct CallbackToken(u64); - -/// Callbacks that get notified about changes. -pub(super) type Callback = Box BoxFuture<()> + Sync + Send + 'static>; - pub(super) enum ActorMessage { - Subscribe(Callback, oneshot::Sender), - Unsubscribe(CallbackToken, oneshot::Sender<()>), NetworkChange, } @@ -89,17 +70,19 @@ impl Actor { let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY); Ok(Actor { - interface_state, + interface_state: Watchable::new(interface_state), wall_time, route_monitor, mon_receiver, actor_receiver, actor_sender, - callbacks: Default::default(), - callback_token: 0, }) } + pub(super) fn state(&self) -> &Watchable { + &self.interface_state + } + pub(super) fn subscribe(&self) -> mpsc::Sender { self.actor_sender.clone() } @@ -143,15 +126,6 @@ impl Actor { } msg = self.actor_receiver.recv() => { match msg { - Some(ActorMessage::Subscribe(callback, s)) => { - let token = self.next_callback_token(); - self.callbacks.insert(token, Arc::new(callback)); - s.send(token).ok(); - } - Some(ActorMessage::Unsubscribe(token, s)) => { - self.callbacks.remove(&token); - s.send(()).ok(); - } Some(ActorMessage::NetworkChange) => { trace!("external network activity detected"); last_event.replace(false); @@ -167,17 +141,11 @@ impl Actor { } } - fn next_callback_token(&mut self) -> CallbackToken { - let token = CallbackToken(self.callback_token); - self.callback_token += 1; - token - } - async fn handle_potential_change(&mut self, time_jumped: bool) { trace!("potential change"); let new_state = State::new().await; - let old_state = &self.interface_state; + let old_state = &self.interface_state.get(); // No major changes, continue on if !time_jumped && old_state == &new_state { @@ -185,19 +153,7 @@ impl Actor { return; } - let is_major = is_major_change(old_state, &new_state) || time_jumped; - - if is_major { - self.interface_state = new_state; - } - - debug!("triggering {} callbacks", self.callbacks.len()); - for cb in self.callbacks.values() { - let cb = cb.clone(); - task::spawn(async move { - cb(is_major).await; - }); - } + self.interface_state.set(new_state).ok(); } /// Reports whether wall time jumped more than 150% @@ -214,61 +170,3 @@ impl Actor { jumped } } - -#[cfg(wasm_browser)] -fn is_major_change(s1: &State, s2: &State) -> bool { - // All changes are major. - // In the browser, there only are changes from online to offline - s1 != s2 -} - -#[cfg(not(wasm_browser))] -fn is_major_change(s1: &State, s2: &State) -> bool { - if s1.have_v6 != s2.have_v6 - || s1.have_v4 != s2.have_v4 - || s1.is_expensive != s2.is_expensive - || s1.default_route_interface != s2.default_route_interface - || s1.http_proxy != s2.http_proxy - || s1.pac != s2.pac - { - return true; - } - - for (iname, i) in &s1.interfaces { - if !is_interesting_interface(i.name()) { - continue; - } - let Some(i2) = s2.interfaces.get(iname) else { - return true; - }; - if i != i2 || !prefixes_major_equal(i.addrs(), i2.addrs()) { - return true; - } - } - - false -} - -/// Checks whether `a` and `b` are equal after ignoring uninteresting -/// things, like link-local, loopback and multicast addresses. -#[cfg(not(wasm_browser))] -fn prefixes_major_equal(a: impl Iterator, b: impl Iterator) -> bool { - fn is_interesting(p: &IpNet) -> bool { - let a = p.addr(); - if is_link_local(a) || a.is_loopback() || a.is_multicast() { - return false; - } - true - } - - let a = a.filter(is_interesting); - let b = b.filter(is_interesting); - - for (a, b) in a.zip(b) { - if a != b { - return false; - } - } - - true -} diff --git a/netwatch/src/netmon/android.rs b/netwatch/src/netmon/android.rs index 14189bfa..1d37dc08 100644 --- a/netwatch/src/netmon/android.rs +++ b/netwatch/src/netmon/android.rs @@ -21,6 +21,6 @@ impl RouteMonitor { } } -pub(super) fn is_interesting_interface(_name: &str) -> bool { +pub(crate) fn is_interesting_interface(_name: &str) -> bool { true } diff --git a/netwatch/src/netmon/bsd.rs b/netwatch/src/netmon/bsd.rs index 4e099212..32b75097 100644 --- a/netwatch/src/netmon/bsd.rs +++ b/netwatch/src/netmon/bsd.rs @@ -120,7 +120,7 @@ pub(super) fn is_interesting_message(msg: &WireMessage) -> bool { } } -pub(super) fn is_interesting_interface(name: &str) -> bool { +pub(crate) fn is_interesting_interface(name: &str) -> bool { let base_name = name.trim_end_matches("0123456789"); if base_name == "llw" || base_name == "awdl" || base_name == "ipsec" { return false; diff --git a/netwatch/src/netmon/linux.rs b/netwatch/src/netmon/linux.rs index 0eed8260..7f777e54 100644 --- a/netwatch/src/netmon/linux.rs +++ b/netwatch/src/netmon/linux.rs @@ -184,6 +184,6 @@ impl RouteMonitor { } } -pub(super) fn is_interesting_interface(_name: &str) -> bool { +pub(crate) fn is_interesting_interface(_name: &str) -> bool { true } diff --git a/netwatch/src/netmon/windows.rs b/netwatch/src/netmon/windows.rs index 57037745..02c20c11 100644 --- a/netwatch/src/netmon/windows.rs +++ b/netwatch/src/netmon/windows.rs @@ -56,7 +56,7 @@ impl RouteMonitor { } } -pub(super) fn is_interesting_interface(_name: &str) -> bool { +pub(crate) fn is_interesting_interface(_name: &str) -> bool { true } diff --git a/netwatch/src/udp.rs b/netwatch/src/udp.rs index 1e0a6d26..d0d96495 100644 --- a/netwatch/src/udp.rs +++ b/netwatch/src/udp.rs @@ -702,15 +702,12 @@ impl SocketState { let local_addr = socket.local_addr()?; if addr.port() != 0 && local_addr.port() != addr.port() { - return Err(io::Error::new( - io::ErrorKind::Other, - format!( - "wrong port bound: {:?}: wanted: {} got {}", - network, - addr.port(), - local_addr.port(), - ), - )); + return Err(io::Error::other(format!( + "wrong port bound: {:?}: wanted: {} got {}", + network, + addr.port(), + local_addr.port(), + ))); } Ok(Self::Connected { @@ -731,10 +728,7 @@ impl SocketState { (*addr, s) } Self::Closed { .. } => { - return Err(io::Error::new( - io::ErrorKind::Other, - "socket is closed and cannot be rebound", - )); + return Err(io::Error::other("socket is closed and cannot be rebound")); } }; debug!("rebinding {}", addr); diff --git a/netwatch/tests/smoke.rs b/netwatch/tests/smoke.rs index 04da94ee..4dce37b5 100644 --- a/netwatch/tests/smoke.rs +++ b/netwatch/tests/smoke.rs @@ -5,7 +5,7 @@ //! the browser online/offline. //! //! However, this gives us a minimum guarantee that the Wasm build doesn't break fully. -use n0_future::FutureExt; +use n0_watcher::Watcher; use netwatch::netmon; use testresult::TestResult; #[cfg(not(wasm_browser))] @@ -30,20 +30,12 @@ async fn smoke_test() -> TestResult { // globalThis.navigator.onLine or globalThis.addEventListener("online"/"offline", ...) APIs, // so this is more of a test to see if we gracefully handle these situations & if our // .wasm files are without "env" imports. - tracing::info!("subscribing to netmon callback"); - let token = monitor - .subscribe(|is_major| { - async move { - tracing::info!(is_major, "network change"); - } - .boxed() - }) - .await?; - tracing::info!("successfully subscribed to netmon callback"); + tracing::info!("subscribing to netmon"); + let sub = monitor.interface_state(); - tracing::info!("unsubscribing"); - monitor.unsubscribe(token).await?; - tracing::info!("unsubscribed"); + let current = sub.get()?; + tracing::info!(?current, "network change"); + tracing::info!("successfully subscribed to netmon"); tracing::info!("dropping netmon::Monitor"); drop(monitor);