@@ -6,7 +6,7 @@ use std::path::PathBuf;
66use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
77use std:: sync:: Arc ;
88use std:: time:: Duration ;
9- use tokio:: sync:: { broadcast, Mutex } ;
9+ use tokio:: sync:: { broadcast, Mutex , RwLock } ;
1010use tokio:: task:: JoinSet ;
1111use tokio:: time;
1212
@@ -1068,54 +1068,7 @@ impl PeerNetworkManager {
10681068 . find ( |( a, _) | * a == selected_peer)
10691069 . ok_or_else ( || NetworkError :: ConnectionFailed ( "Selected peer not found" . to_string ( ) ) ) ?;
10701070
1071- // Upgrade GetHeaders to GetHeaders2 if this specific peer supports it and not disabled
1072- let peer_supports_headers2 = {
1073- let peer_guard = peer. read ( ) . await ;
1074- peer_guard. can_request_headers2 ( )
1075- } ;
1076- let message = match message {
1077- NetworkMessage :: GetHeaders ( get_headers)
1078- if !self . headers2_disabled . lock ( ) . await . contains ( addr)
1079- && peer_supports_headers2 =>
1080- {
1081- log:: debug!(
1082- "Upgrading GetHeaders to GetHeaders2 for peer {}: {:?}" ,
1083- addr,
1084- get_headers
1085- ) ;
1086- NetworkMessage :: GetHeaders2 ( get_headers)
1087- }
1088- other => other,
1089- } ;
1090- // Reduce verbosity for common sync messages
1091- match & message {
1092- NetworkMessage :: GetHeaders ( _)
1093- | NetworkMessage :: GetCFilters ( _)
1094- | NetworkMessage :: GetCFHeaders ( _) => {
1095- log:: debug!( "Sending {} to {}" , message. cmd( ) , addr) ;
1096- }
1097- NetworkMessage :: GetHeaders2 ( gh2) => {
1098- log:: info!( "📤 Sending GetHeaders2 to {} - version: {}, locator_count: {}, locator: {:?}, stop: {}" ,
1099- addr,
1100- gh2. version,
1101- gh2. locator_hashes. len( ) ,
1102- gh2. locator_hashes. iter( ) . take( 2 ) . collect:: <Vec <_>>( ) ,
1103- gh2. stop_hash
1104- ) ;
1105- }
1106- NetworkMessage :: SendHeaders2 => {
1107- log:: info!( "🤝 Sending SendHeaders2 to {} - requesting compressed headers" , addr) ;
1108- }
1109- _ => {
1110- log:: trace!( "Sending {:?} to {}" , message. cmd( ) , addr) ;
1111- }
1112- }
1113-
1114- let mut peer_guard = peer. write ( ) . await ;
1115- peer_guard
1116- . send_message ( message)
1117- . await
1118- . map_err ( |e| NetworkError :: ProtocolError ( format ! ( "Failed to send to {}: {}" , addr, e) ) )
1071+ self . send_message_to_peer ( addr, peer, message) . await
11191072 }
11201073
11211074 /// Send a message distributed across connected peers using round-robin selection.
@@ -1185,14 +1138,28 @@ impl PeerNetworkManager {
11851138 let idx = self . round_robin_counter . fetch_add ( 1 , Ordering :: Relaxed ) % selected_peers. len ( ) ;
11861139 let ( addr, peer) = & selected_peers[ idx] ;
11871140
1188- // Upgrade GetHeaders to GetHeaders2 if peer supports it
1141+ log:: debug!(
1142+ "Distributing {} request to peer {} (round-robin idx {})" ,
1143+ message. cmd( ) ,
1144+ addr,
1145+ idx
1146+ ) ;
1147+
1148+ self . send_message_to_peer ( addr, peer, message) . await
1149+ }
1150+
1151+ /// Send a message to the given peer.
1152+ /// For GetHeaders messages upgrade to GetHeaders2 if the peer supports it.
1153+ async fn send_message_to_peer (
1154+ & self ,
1155+ addr : & SocketAddr ,
1156+ peer : & Arc < RwLock < Peer > > ,
1157+ message : NetworkMessage ,
1158+ ) -> NetworkResult < ( ) > {
11891159 let message = match message {
11901160 NetworkMessage :: GetHeaders ( get_headers) => {
1191- let peer_supports_headers2 = {
1192- let peer_guard = peer. read ( ) . await ;
1193- peer_guard. can_request_headers2 ( )
1194- } ;
1195- if peer_supports_headers2 && !self . headers2_disabled . lock ( ) . await . contains ( addr) {
1161+ let supports_headers2 = peer. read ( ) . await . can_request_headers2 ( ) ;
1162+ if supports_headers2 && !self . headers2_disabled . lock ( ) . await . contains ( addr) {
11961163 log:: debug!( "Upgrading GetHeaders to GetHeaders2 for peer {}" , addr) ;
11971164 NetworkMessage :: GetHeaders2 ( get_headers)
11981165 } else {
@@ -1202,13 +1169,6 @@ impl PeerNetworkManager {
12021169 other => other,
12031170 } ;
12041171
1205- log:: debug!(
1206- "Distributing {} request to peer {} (round-robin idx {})" ,
1207- message. cmd( ) ,
1208- addr,
1209- idx
1210- ) ;
1211-
12121172 let mut peer_guard = peer. write ( ) . await ;
12131173 peer_guard
12141174 . send_message ( message)
0 commit comments