diff --git a/src/io/mod.rs b/src/io/mod.rs index 87a8544a2..4f5008440 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -27,8 +27,12 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers"; pub(crate) const PAYMENT_INFO_PERSISTENCE_NAMESPACE: &str = "payments"; /// RapidGossipSync's `latest_sync_timestamp` will be persisted under this key. -pub(crate) const RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE: &str = ""; -pub(crate) const RGS_LATEST_SYNC_TIMESTAMP_KEY: &str = "rgs_latest_sync_timestamp"; +pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE: &str = ""; +pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_KEY: &str = "latest_rgs_sync_timestamp"; + +/// The last time we broadcast a node announcement will be persisted under this key. +pub(crate) const LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE: &str = ""; +pub(crate) const LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY: &str = "latest_node_ann_bcast_timestamp"; /// Provides an interface that allows to store and retrieve persisted values that are associated /// with given keys. diff --git a/src/io/utils.rs b/src/io/utils.rs index 95ad56b06..f31c7587f 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -174,12 +174,12 @@ where Ok(res) } -pub(crate) fn read_rgs_latest_sync_timestamp(kv_store: K) -> Result +pub(crate) fn read_latest_rgs_sync_timestamp(kv_store: K) -> Result where K::Target: KVStore, { let mut reader = - kv_store.read(RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, RGS_LATEST_SYNC_TIMESTAMP_KEY)?; + kv_store.read(LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, LATEST_RGS_SYNC_TIMESTAMP_KEY)?; u32::read(&mut reader).map_err(|_| { std::io::Error::new( std::io::ErrorKind::InvalidData, @@ -188,7 +188,7 @@ where }) } -pub(crate) fn write_rgs_latest_sync_timestamp( +pub(crate) fn write_latest_rgs_sync_timestamp( updated_timestamp: u32, kv_store: K, logger: L, ) -> Result<(), Error> where @@ -196,13 +196,13 @@ where L::Target: Logger, { let mut writer = kv_store - .write(RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, RGS_LATEST_SYNC_TIMESTAMP_KEY) + .write(LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, LATEST_RGS_SYNC_TIMESTAMP_KEY) .map_err(|e| { log_error!( logger, "Getting writer for key {}/{} failed due to: {}", - RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, - RGS_LATEST_SYNC_TIMESTAMP_KEY, + LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, + LATEST_RGS_SYNC_TIMESTAMP_KEY, e ); Error::PersistenceFailed @@ -211,8 +211,8 @@ where log_error!( logger, "Writing data to key {}/{} failed due to: {}", - RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, - RGS_LATEST_SYNC_TIMESTAMP_KEY, + LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, + LATEST_RGS_SYNC_TIMESTAMP_KEY, e ); Error::PersistenceFailed @@ -221,8 +221,65 @@ where log_error!( logger, "Committing data to key {}/{} failed due to: {}", - RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, - RGS_LATEST_SYNC_TIMESTAMP_KEY, + LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, + LATEST_RGS_SYNC_TIMESTAMP_KEY, + e + ); + Error::PersistenceFailed + }) +} + +pub(crate) fn read_latest_node_ann_bcast_timestamp( + kv_store: K, +) -> Result +where + K::Target: KVStore, +{ + let mut reader = kv_store + .read(LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY)?; + u64::read(&mut reader).map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Failed to deserialize latest node announcment broadcast timestamp", + ) + }) +} + +pub(crate) fn write_latest_node_ann_bcast_timestamp( + updated_timestamp: u64, kv_store: K, logger: L, +) -> Result<(), Error> +where + K::Target: KVStore, + L::Target: Logger, +{ + let mut writer = kv_store + .write(LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY) + .map_err(|e| { + log_error!( + logger, + "Getting writer for key {}/{} failed due to: {}", + LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, + LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY, + e + ); + Error::PersistenceFailed + })?; + updated_timestamp.write(&mut writer).map_err(|e| { + log_error!( + logger, + "Writing data to key {}/{} failed due to: {}", + LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, + LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY, + e + ); + Error::PersistenceFailed + })?; + writer.commit().map_err(|e| { + log_error!( + logger, + "Committing data to key {}/{} failed due to: {}", + LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, + LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY, e ); Error::PersistenceFailed diff --git a/src/lib.rs b/src/lib.rs index b8b13ab1b..2161aafc9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -122,7 +122,9 @@ use lightning::ln::channelmanager::{ }; use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler}; use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret}; +use lightning::onion_message::{CustomOnionMessageContents, Destination, OnionMessageContents}; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringParameters}; +use lightning::util::ser::{Writeable, Writer}; use lightning::util::config::{ChannelHandshakeConfig, ChannelHandshakeLimits, UserConfig}; use lightning::util::ser::ReadableArgs; @@ -169,9 +171,15 @@ const BDK_CLIENT_CONCURRENCY: u8 = 8; // The timeout after which we abandon retrying failed payments. const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10); -// The time in between peer reconnection attempts. +// The time in-between peer reconnection attempts. const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10); +// The time in-between RGS sync attempts. +const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60); + +// The time in-between node announcement broadcast attempts. +const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60); + // The length in bytes of our wallets' keys seed. const WALLET_KEYS_SEED_LEN: usize = 64; @@ -215,6 +223,23 @@ enum GossipSourceConfig { RapidGossipSync(String), } +struct UserOnionMessageContents { + tlv_type: u64, + data: Vec, +} + +impl CustomOnionMessageContents for UserOnionMessageContents { + fn tlv_type(&self) -> u64 { + self.tlv_type + } +} + +impl Writeable for UserOnionMessageContents { + fn write(&self, w: &mut W) -> Result<(), std::io::Error> { + w.write_all(&self.data) + } +} + /// A builder for an [`Node`] instance, allowing to set some configuration and module choices from /// the getgo. #[derive(Debug, Clone)] @@ -547,7 +572,7 @@ impl Builder { )); // Reset the RGS sync timestamp in case we somehow switch gossip sources - io::utils::write_rgs_latest_sync_timestamp( + io::utils::write_latest_rgs_sync_timestamp( 0, Arc::clone(&kv_store), Arc::clone(&logger), @@ -557,7 +582,7 @@ impl Builder { } GossipSourceConfig::RapidGossipSync(rgs_server) => { let latest_sync_timestamp = - io::utils::read_rgs_latest_sync_timestamp(Arc::clone(&kv_store)).unwrap_or(0); + io::utils::read_latest_rgs_sync_timestamp(Arc::clone(&kv_store)).unwrap_or(0); Arc::new(GossipSource::new_rgs( rgs_server.clone(), latest_sync_timestamp, @@ -572,13 +597,13 @@ impl Builder { chan_handler: Arc::clone(&channel_manager), route_handler: Arc::clone(&p2p_gossip_sync) as Arc, - onion_message_handler: onion_messenger, + onion_message_handler: onion_messenger.clone(), }, GossipSync::Rapid(_) => MessageHandler { chan_handler: Arc::clone(&channel_manager), route_handler: Arc::new(IgnoringMessageHandler {}) as Arc, - onion_message_handler: onion_messenger, + onion_message_handler: onion_messenger.clone(), }, GossipSync::None => { unreachable!("We must always have a gossip sync!"); @@ -631,11 +656,12 @@ impl Builder { } }; - let stop_running = Arc::new(AtomicBool::new(false)); + let (stop_sender, stop_receiver) = tokio::sync::watch::channel(()); Arc::new(Node { runtime, - stop_running, + stop_sender, + stop_receiver, config, wallet, tx_sync, @@ -651,6 +677,7 @@ impl Builder { scorer, peer_store, payment_store, + onion_messenger, }) } } @@ -660,7 +687,8 @@ impl Builder { /// Needs to be initialized and instantiated through [`Builder::build`]. pub struct Node { runtime: Arc>>, - stop_running: Arc, + stop_sender: tokio::sync::watch::Sender<()>, + stop_receiver: tokio::sync::watch::Receiver<()>, config: Arc, wallet: Arc>, tx_sync: Arc>>, @@ -676,6 +704,7 @@ pub struct Node { scorer: Arc>, peer_store: Arc, Arc>>, payment_store: Arc, Arc>>, + onion_messenger: Arc, } impl Node { @@ -694,8 +723,6 @@ impl Node { let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); - let stop_running = Arc::new(AtomicBool::new(false)); - let event_handler = Arc::new(EventHandler::new( Arc::clone(&self.wallet), Arc::clone(&self.event_queue), @@ -714,31 +741,36 @@ impl Node { let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); let sync_logger = Arc::clone(&self.logger); - let stop_sync = Arc::clone(&stop_running); + let mut stop_sync = self.stop_receiver.clone(); std::thread::spawn(move || { tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on( async move { + let mut interval = tokio::time::interval(Duration::from_secs(30)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { - if stop_sync.load(Ordering::Acquire) { - return; - } let now = Instant::now(); - match wallet.sync().await { - Ok(()) => log_info!( - sync_logger, - "Background sync of on-chain wallet finished in {}ms.", - now.elapsed().as_millis() - ), - Err(err) => { - log_error!( - sync_logger, - "Background sync of on-chain wallet failed: {}", - err - ) + tokio::select! { + _ = stop_sync.changed() => { + return; + } + _ = interval.tick() => { + match wallet.sync().await { + Ok(()) => log_info!( + sync_logger, + "Background sync of on-chain wallet finished in {}ms.", + now.elapsed().as_millis() + ), + Err(err) => { + log_error!( + sync_logger, + "Background sync of on-chain wallet failed: {}", + err + ) + } + } } } - tokio::time::sleep(Duration::from_secs(20)).await; } }, ); @@ -748,72 +780,78 @@ impl Node { let gossip_source = Arc::clone(&self.gossip_source); let gossip_sync_store = Arc::clone(&self.kv_store); let gossip_sync_logger = Arc::clone(&self.logger); - let stop_gossip_sync = Arc::clone(&stop_running); + let mut stop_gossip_sync = self.stop_receiver.clone(); runtime.spawn(async move { + let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL); loop { - let gossip_sync_logger = Arc::clone(&gossip_sync_logger); - let stop_gossip_sync = Arc::clone(&stop_gossip_sync); - if stop_gossip_sync.load(Ordering::Acquire) { - return; - } - - let now = Instant::now(); - match gossip_source.update_rgs_snapshot().await { - Ok(updated_timestamp) => { - log_info!( - gossip_sync_logger, - "Background sync of RGS gossip data finished in {}ms.", - now.elapsed().as_millis() - ); - io::utils::write_rgs_latest_sync_timestamp( - updated_timestamp, - Arc::clone(&gossip_sync_store), - Arc::clone(&gossip_sync_logger), - ) - .expect("Persistence failed"); + tokio::select! { + _ = stop_gossip_sync.changed() => { + return; + } + _ = interval.tick() => { + let gossip_sync_logger = Arc::clone(&gossip_sync_logger); + let now = Instant::now(); + match gossip_source.update_rgs_snapshot().await { + Ok(updated_timestamp) => { + log_info!( + gossip_sync_logger, + "Background sync of RGS gossip data finished in {}ms.", + now.elapsed().as_millis() + ); + io::utils::write_latest_rgs_sync_timestamp( + updated_timestamp, + Arc::clone(&gossip_sync_store), + Arc::clone(&gossip_sync_logger), + ) + .expect("Persistence failed"); + } + Err(e) => log_error!( + gossip_sync_logger, + "Background sync of RGS gossip data failed: {}", + e + ), + } } - Err(e) => log_error!( - gossip_sync_logger, - "Background sync of RGS gossip data failed: {}", - e - ), } - - tokio::time::sleep(Duration::from_secs(60 * 60)).await; } }); } let sync_logger = Arc::clone(&self.logger); - let stop_sync = Arc::clone(&stop_running); + let mut stop_sync = self.stop_receiver.clone(); runtime.spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(10)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { - if stop_sync.load(Ordering::Acquire) { - return; - } let now = Instant::now(); - let confirmables = vec![ - &*sync_cman as &(dyn Confirm + Sync + Send), - &*sync_cmon as &(dyn Confirm + Sync + Send), - ]; - match tx_sync.sync(confirmables).await { - Ok(()) => log_info!( - sync_logger, - "Background sync of Lightning wallet finished in {}ms.", - now.elapsed().as_millis() - ), - Err(e) => { - log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e) + tokio::select! { + _ = stop_sync.changed() => { + return; + } + _ = interval.tick() => { + let confirmables = vec![ + &*sync_cman as &(dyn Confirm + Sync + Send), + &*sync_cmon as &(dyn Confirm + Sync + Send), + ]; + match tx_sync.sync(confirmables).await { + Ok(()) => log_info!( + sync_logger, + "Background sync of Lightning wallet finished in {}ms.", + now.elapsed().as_millis() + ), + Err(e) => { + log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e) + } + } } } - tokio::time::sleep(Duration::from_secs(5)).await; } }); if let Some(listening_address) = &self.config.listening_address { // Setup networking let peer_manager_connection_handler = Arc::clone(&self.peer_manager); - let stop_listen = Arc::clone(&stop_running); + let mut stop_listen = self.stop_receiver.clone(); let listening_address = listening_address.clone(); let bind_addr = listening_address @@ -828,18 +866,22 @@ impl Node { "Failed to bind to listen address/port - is something else already listening on it?", ); loop { - if stop_listen.load(Ordering::Acquire) { - return; - } let peer_mgr = Arc::clone(&peer_manager_connection_handler); - let tcp_stream = listener.accept().await.unwrap().0; - tokio::spawn(async move { - lightning_net_tokio::setup_inbound( - Arc::clone(&peer_mgr), - tcp_stream.into_std().unwrap(), - ) - .await; - }); + tokio::select! { + _ = stop_listen.changed() => { + return; + } + res = listener.accept() => { + let tcp_stream = res.unwrap().0; + tokio::spawn(async move { + lightning_net_tokio::setup_inbound( + Arc::clone(&peer_mgr), + tcp_stream.into_std().unwrap(), + ) + .await; + }); + } + } } }); } @@ -849,35 +891,93 @@ impl Node { let connect_pm = Arc::clone(&self.peer_manager); let connect_logger = Arc::clone(&self.logger); let connect_peer_store = Arc::clone(&self.peer_store); - let stop_connect = Arc::clone(&stop_running); + let mut stop_connect = self.stop_receiver.clone(); runtime.spawn(async move { let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { - if stop_connect.load(Ordering::Acquire) { - return; + tokio::select! { + _ = stop_connect.changed() => { + return; + } + _ = interval.tick() => { + let pm_peers = connect_pm + .get_peer_node_ids() + .iter() + .map(|(peer, _addr)| *peer) + .collect::>(); + for node_id in connect_cm + .list_channels() + .iter() + .map(|chan| chan.counterparty.node_id) + .filter(|id| !pm_peers.contains(id)) + { + if let Some(peer_info) = connect_peer_store.get_peer(&node_id) { + let _ = do_connect_peer( + peer_info.node_id, + peer_info.address, + Arc::clone(&connect_pm), + Arc::clone(&connect_logger), + ) + .await; + } + } + } } - let pm_peers = connect_pm - .get_peer_node_ids() - .iter() - .map(|(peer, _addr)| *peer) - .collect::>(); - for node_id in connect_cm - .list_channels() - .iter() - .map(|chan| chan.counterparty.node_id) - .filter(|id| !pm_peers.contains(id)) - { - if let Some(peer_info) = connect_peer_store.get_peer(&node_id) { - let _ = do_connect_peer( - peer_info.node_id, - peer_info.address, - Arc::clone(&connect_pm), - Arc::clone(&connect_logger), - ) - .await; - } + } + }); + + // Regularly broadcast node announcements. + let bcast_cm = Arc::clone(&self.channel_manager); + let bcast_pm = Arc::clone(&self.peer_manager); + let bcast_config = Arc::clone(&self.config); + let bcast_store = Arc::clone(&self.kv_store); + let bcast_logger = Arc::clone(&self.logger); + let mut stop_bcast = self.stop_receiver.clone(); + runtime.spawn(async move { + // We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away. + let mut interval = tokio::time::interval(Duration::from_secs(30)); + loop { + tokio::select! { + _ = stop_bcast.changed() => { + return; + } + _ = interval.tick() => { + let skip_broadcast = match io::utils::read_latest_node_ann_bcast_timestamp(Arc::clone(&bcast_store)) { + Ok(latest_bcast_time_secs) => { + // Skip if the time hasn't elapsed yet. + let next_bcast_unix_time = SystemTime::UNIX_EPOCH + Duration::from_secs(latest_bcast_time_secs) + NODE_ANN_BCAST_INTERVAL; + next_bcast_unix_time.elapsed().is_err() + } + Err(_) => { + // Don't skip if we haven't broadcasted before. + false + } + }; + + if skip_broadcast { + continue; + } + + if bcast_cm.list_channels().iter().any(|chan| chan.is_public) { + // Skip if we don't have any public channels. + continue; + } + + if bcast_pm.get_peer_node_ids().is_empty() { + // Skip if we don't have any connected peers to gossip to. + continue; + } + + let addresses = + bcast_config.listening_address.iter().cloned().map(|a| a.0).collect(); + bcast_pm.broadcast_node_announcement([0; 3], [0; 32], addresses); + + let unix_time_secs = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger)) + .expect("Persistence failed"); + } } - interval.tick().await; } }); @@ -890,15 +990,17 @@ impl Node { let background_peer_man = Arc::clone(&self.peer_manager); let background_logger = Arc::clone(&self.logger); let background_scorer = Arc::clone(&self.scorer); - let stop_background_processing = Arc::clone(&stop_running); + let stop_bp = self.stop_receiver.clone(); let sleeper = move |d| { - let stop = Arc::clone(&stop_background_processing); + let mut stop = stop_bp.clone(); Box::pin(async move { - if stop.load(Ordering::Acquire) { - true - } else { - tokio::time::sleep(d).await; - false + tokio::select! { + _ = stop.changed() => { + true + } + _ = tokio::time::sleep(d) => { + false + } } }) }; @@ -930,7 +1032,17 @@ impl Node { pub fn stop(&self) -> Result<(), Error> { let runtime = self.runtime.write().unwrap().take().ok_or(Error::NotRunning)?; // Stop the runtime. - self.stop_running.store(true, Ordering::Release); + match self.stop_sender.send(()) { + Ok(_) => (), + Err(e) => { + log_error!( + self.logger, + "Failed to send shutdown signal. This should never happen: {}", + e + ); + debug_assert!(false); + } + } // Stop disconnect peers. self.peer_manager.disconnect_all_peers(); @@ -1013,6 +1125,21 @@ impl Node { self.wallet.send_to_address(address, None) } + /// Send an onion message to the following address. + pub fn send_onion_message( + &self, node_pks: Vec, destination_pk: PublicKey, tlv_type: u64, data: Vec, + ) { + match self.onion_messenger.send_onion_message( + &node_pks, + Destination::Node(destination_pk), + OnionMessageContents::Custom(UserOnionMessageContents { tlv_type, data }), + None, + ) { + Ok(()) => println!("SUCCESS: forwarded onion message to first hop"), + Err(e) => println!("ERROR: failed to send onion message: {:?}", e), + } + } + /// Retrieve the currently spendable on-chain balance in satoshis. pub fn spendable_onchain_balance_sats(&self) -> Result { Ok(self.wallet.get_balance().map(|bal| bal.get_spendable())?)