@@ -1591,15 +1591,37 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
15911591 }
15921592
15931593 /// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
1594+ ///
15941595 /// Returns the message back if it needs to be broadcasted to all other peers.
15951596 fn handle_message(
15961597 &self,
15971598 peer_mutex: &Mutex<Peer>,
1598- mut peer_lock: MutexGuard<Peer>,
1599- message: wire::Message<<<CMH as core::ops:: Deref>::Target as wire::CustomMessageReader>::CustomMessage>
1600- ) -> Result<Option<wire::Message<<<CMH as core::ops:: Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> {
1599+ peer_lock: MutexGuard<Peer>,
1600+ message: wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>
1601+ ) -> Result<Option<wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> {
16011602 let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0;
16021603 let logger = WithContext::from(&self.logger, Some(their_node_id), None);
1604+
1605+ let message = match self.do_handle_message_holding_peer_lock(peer_lock, message, &their_node_id, &logger)? {
1606+ Some(processed_message) => processed_message,
1607+ None => return Ok(None),
1608+ };
1609+
1610+ self.do_handle_message_without_peer_lock(peer_mutex, message, &their_node_id, &logger)
1611+ }
1612+
1613+ // Conducts all message processing that requires us to hold the `peer_lock`.
1614+ //
1615+ // Returns `None` if the message was fully processed and otherwise returns the message back to
1616+ // allow it to be subsequently processed by `do_handle_message_without_peer_lock`.
1617+ fn do_handle_message_holding_peer_lock<'a>(
1618+ &self,
1619+ mut peer_lock: MutexGuard<Peer>,
1620+ message: wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>,
1621+ their_node_id: &PublicKey,
1622+ logger: &WithContext<'a, L>
1623+ ) -> Result<Option<wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError>
1624+ {
16031625 peer_lock.received_message_since_timer_tick = true;
16041626
16051627 // Need an Init as first message
@@ -1680,8 +1702,20 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
16801702 peer_lock.received_channel_announce_since_backlogged = true;
16811703 }
16821704
1683- mem::drop(peer_lock);
1705+ Ok(Some(message))
1706+ }
16841707
1708+ // Conducts all message processing that doesn't require us to hold the `peer_lock`.
1709+ //
1710+ // Returns the message back if it needs to be broadcasted to all other peers.
1711+ fn do_handle_message_without_peer_lock<'a>(
1712+ &self,
1713+ peer_mutex: &Mutex<Peer>,
1714+ message: wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>,
1715+ their_node_id: &PublicKey,
1716+ logger: &WithContext<'a, L>
1717+ ) -> Result<Option<wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError>
1718+ {
16851719 if is_gossip_msg(message.type_id()) {
16861720 log_gossip!(logger, "Received message {:?} from {}", message, log_pubkey!(their_node_id));
16871721 } else {
@@ -1883,7 +1917,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
18831917 Ok(should_forward)
18841918 }
18851919
1886- fn forward_broadcast_msg(&self, peers: &HashMap<Descriptor, Mutex<Peer>>, msg: &wire::Message<<<CMH as core::ops:: Deref>::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) {
1920+ fn forward_broadcast_msg(&self, peers: &HashMap<Descriptor, Mutex<Peer>>, msg: &wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) {
18871921 match msg {
18881922 wire::Message::ChannelAnnouncement(ref msg) => {
18891923 log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg);
@@ -2275,7 +2309,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
22752309 // We do not have the peers write lock, so we just store that we're
22762310 // about to disconnect the peer and do it after we finish
22772311 // processing most messages.
2278- let msg = msg.map(|msg| wire::Message::<<<CMH as core::ops:: Deref>::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg));
2312+ let msg = msg.map(|msg| wire::Message::<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg));
22792313 peers_to_disconnect.insert(node_id, msg);
22802314 },
22812315 msgs::ErrorAction::DisconnectPeerWithWarning { msg } => {
0 commit comments