Skip to content

Commit 2621234

Browse files
authored
feat: introduce MessageDispatcher in network manager (#383)
* feat: subscription-based message routing This PR: - introduces `MessageRouter` to `PeerNetworkManager` which routes messages based on previously subscribed topics to the related mpsc channels. This allows for different tasks to listen to the messages they are interested in without locking/polling the single channel that we currently have. - intoduces `network::Message` which encapsulates actual `NetworkMessages` with a `SocketAddress` of the sending peer to allow more streamlined reputation updates in message handlers. - drops all the "last message peer id" stuff and uses the peer address from the new message format instead. * some refactoring: - refactor `subscribers: Vec<MessageSubscriber>` to `senders: HashMap<MessageType, Vec<UnboundedSender<Message>>>` - introduce `define_message_types` macro to generate `MessageType` enum with `Message` -> `MessageType` conversion for the hashmap lookups - rename `MessageRouter` -> `MessageDispatcher` and accordingly the `route()` to `dispatch()` - rename `subscribe` -> `message_receiver` and `new_subscriber` -> `message_receiver` * fix: deduplicate message types in MessageDispatcher registration Prevents duplicate message deliveries when message_receiver is called with duplicate MessageType entries in the input slice. * fix and test macro * prune dead senders in `dispatch()` * lock `message_dispatcher` with a mutex since all access is mutable
1 parent 12a121b commit 2621234

17 files changed

Lines changed: 657 additions & 420 deletions

dash-spv/src/client/chainlock.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
//! - ChainLock validation updates
77
//! - Pending ChainLock validation
88
9+
use std::net::SocketAddr;
910
use std::sync::Arc;
1011

1112
use crate::error::{Result, SpvError};
@@ -21,6 +22,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
2122
/// Process and validate a ChainLock.
2223
pub async fn process_chainlock(
2324
&mut self,
25+
peer_address: SocketAddr,
2426
chainlock: dashcore::ephemerealdata::chain_lock::ChainLock,
2527
) -> Result<()> {
2628
tracing::info!(
@@ -40,7 +42,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
4042
{
4143
// Penalize the peer that relayed the invalid ChainLock
4244
let reason = format!("Invalid ChainLock: {}", e);
43-
let _ = self.network.penalize_last_message_peer_invalid_chainlock(&reason).await;
45+
self.network.penalize_peer_invalid_chainlock(peer_address, &reason).await;
4446
return Err(SpvError::Validation(e));
4547
}
4648
}
@@ -87,6 +89,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
8789
/// Process and validate an InstantSendLock.
8890
pub(super) async fn process_instantsendlock(
8991
&mut self,
92+
peer_address: SocketAddr,
9093
islock: dashcore::ephemerealdata::instant_lock::InstantLock,
9194
) -> Result<()> {
9295
tracing::info!("Processing InstantSendLock for tx {}", islock.txid);
@@ -107,7 +110,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
107110
tracing::warn!("{}", reason);
108111

109112
// Ban the peer using the reputation system
110-
let _ = self.network.penalize_last_message_peer_invalid_instantlock(&reason).await;
113+
self.network.penalize_peer_invalid_instantlock(peer_address, &reason).await;
111114

112115
return Err(SpvError::Validation(e));
113116
}

dash-spv/src/client/message_handler.rs

Lines changed: 23 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use crate::client::ClientConfig;
44
use crate::error::{Result, SpvError};
55
use crate::mempool_filter::MempoolFilter;
6-
use crate::network::NetworkManager;
6+
use crate::network::{Message, NetworkManager};
77
use crate::storage::StorageManager;
88
use crate::sync::legacy::SyncManager;
99
use crate::types::{MempoolState, SpvEvent};
@@ -47,17 +47,17 @@ impl<'a, S: StorageManager, N: NetworkManager, W: WalletInterface> MessageHandle
4747
}
4848

4949
/// Handle incoming network messages during monitoring.
50-
pub async fn handle_network_message(
51-
&mut self,
52-
message: &dashcore::network::message::NetworkMessage,
53-
) -> Result<()> {
50+
pub async fn handle_network_message(&mut self, message: &Message) -> Result<()> {
5451
use dashcore::network::message::NetworkMessage;
5552

56-
tracing::debug!("Client handling network message: {:?}", std::mem::discriminant(message));
53+
tracing::debug!(
54+
"Client handling network message: {:?}",
55+
std::mem::discriminant(message.inner())
56+
);
5757

5858
// First check if this is a message that ONLY the sync manager handles
5959
// These messages can be moved to the sync manager without cloning
60-
match message {
60+
match message.inner() {
6161
NetworkMessage::Headers2(ref headers2) => {
6262
tracing::info!(
6363
"📋 Received Headers2 message with {} compressed headers",
@@ -89,22 +89,11 @@ impl<'a, S: StorageManager, N: NetworkManager, W: WalletInterface> MessageHandle
8989
}
9090
NetworkMessage::CFHeaders(ref cf_headers) => {
9191
// Try to include the peer address for better diagnostics
92-
let peer_addr = self.network.get_last_message_peer_addr().await;
93-
match peer_addr {
94-
Some(addr) => {
95-
tracing::info!(
96-
"📨 Client received CFHeaders message with {} filter headers from {}",
97-
cf_headers.filter_hashes.len(),
98-
addr
99-
);
100-
}
101-
None => {
102-
tracing::info!(
103-
"📨 Client received CFHeaders message with {} filter headers (peer unknown)",
104-
cf_headers.filter_hashes.len()
105-
);
106-
}
107-
}
92+
tracing::info!(
93+
"📨 Client received CFHeaders message with {} filter headers from {}",
94+
cf_headers.filter_hashes.len(),
95+
message.peer_address()
96+
);
10897
// Move to sync manager without cloning
10998
return self
11099
.sync_manager
@@ -164,23 +153,15 @@ impl<'a, S: StorageManager, N: NetworkManager, W: WalletInterface> MessageHandle
164153
}
165154

166155
// Then handle client-specific message processing
167-
match message {
156+
match message.inner() {
168157
NetworkMessage::Headers(headers) => {
169158
// For post-sync headers, we need special handling
170159
if self.sync_manager.is_synced() && !headers.is_empty() {
171-
let peer_addr = self.network.get_last_message_peer_addr().await;
172-
if let Some(addr) = peer_addr {
173-
tracing::info!(
160+
tracing::info!(
174161
"📋 Post-sync headers received from {} ({} headers), additional processing may be needed",
175-
addr,
176-
headers.len()
177-
);
178-
} else {
179-
tracing::info!(
180-
"📋 Post-sync headers received ({} headers), additional processing may be needed",
162+
message.peer_address(),
181163
headers.len()
182164
);
183-
}
184165
}
185166
}
186167
NetworkMessage::Block(block) => {
@@ -194,7 +175,10 @@ impl<'a, S: StorageManager, N: NetworkManager, W: WalletInterface> MessageHandle
194175

195176
// 1) Ensure header processing and chain tip update for this block
196177
// Route the header through the sequential sync manager as a Headers message
197-
let headers_msg = NetworkMessage::Headers(vec![block.header]);
178+
let headers_msg = Message::new(
179+
message.peer_address(),
180+
NetworkMessage::Headers(vec![block.header]),
181+
);
198182
if let Err(e) = self
199183
.sync_manager
200184
.handle_message(&headers_msg, &mut *self.network, &mut *self.storage)
@@ -297,7 +281,10 @@ impl<'a, S: StorageManager, N: NetworkManager, W: WalletInterface> MessageHandle
297281
}
298282
_ => {
299283
// Ignore other message types for now
300-
tracing::debug!("Received network message: {:?}", std::mem::discriminant(message));
284+
tracing::debug!(
285+
"Received network message: {:?}",
286+
std::mem::discriminant(message.inner())
287+
);
301288
}
302289
}
303290

dash-spv/src/client/message_handler_test.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
#[cfg(test)]
44
mod tests {
55
use crate::client::{ClientConfig, MessageHandler};
6+
use crate::network::Message;
67
use crate::storage::DiskStorageManager;
78
use crate::sync::legacy::SyncManager;
8-
use crate::test_utils::MockNetworkManager;
9+
use crate::test_utils::{test_socket_address, MockNetworkManager};
910
use crate::types::{MempoolState, SpvEvent, SpvStats};
1011
use crate::ChainState;
1112
use dashcore::block::Header as BlockHeader;
@@ -69,7 +70,7 @@ mod tests {
6970
let headers2 = dashcore::network::message_headers2::Headers2Message {
7071
headers: vec![],
7172
};
72-
let message = NetworkMessage::Headers2(headers2);
73+
let message = Message::new(test_socket_address(1), NetworkMessage::Headers2(headers2));
7374

7475
// Handle the message
7576
let result = handler.handle_network_message(&message).await;
@@ -115,7 +116,7 @@ mod tests {
115116
new_quorums: vec![],
116117
quorums_chainlock_signatures: vec![],
117118
};
118-
let message = NetworkMessage::MnListDiff(mnlistdiff);
119+
let message = Message::new(test_socket_address(1), NetworkMessage::MnListDiff(mnlistdiff));
119120

120121
// Handle the message
121122
let result = handler.handle_network_message(&message).await;
@@ -144,7 +145,7 @@ mod tests {
144145
previous_filter_header: dashcore::hash_types::FilterHeader::from([0u8; 32]),
145146
filter_hashes: vec![],
146147
};
147-
let message = NetworkMessage::CFHeaders(cfheaders);
148+
let message = Message::new(test_socket_address(1), NetworkMessage::CFHeaders(cfheaders));
148149

149150
// Handle the message
150151
let result = handler.handle_network_message(&message).await;
@@ -172,7 +173,7 @@ mod tests {
172173
block_hash: BlockHash::from([0u8; 32]),
173174
filter: vec![],
174175
};
175-
let message = NetworkMessage::CFilter(cfilter);
176+
let message = Message::new(test_socket_address(1), NetworkMessage::CFilter(cfilter));
176177

177178
// Handle the message - should be passed to sync manager
178179
let result = handler.handle_network_message(&message).await;
@@ -206,7 +207,7 @@ mod tests {
206207
},
207208
txdata: vec![],
208209
};
209-
let message = NetworkMessage::Block(block.clone());
210+
let message = Message::new(test_socket_address(1), NetworkMessage::Block(block.clone()));
210211

211212
// Handle the message
212213
let result = handler.handle_network_message(&message).await;
@@ -234,7 +235,7 @@ mod tests {
234235

235236
// Create an Inv message with transaction
236237
let inv = vec![Inventory::Transaction(dashcore::Txid::all_zeros())];
237-
let message = NetworkMessage::Inv(inv);
238+
let message = Message::new(test_socket_address(1), NetworkMessage::Inv(inv));
238239

239240
// Handle the message
240241
let result = handler.handle_network_message(&message).await;
@@ -267,7 +268,7 @@ mod tests {
267268
output: vec![],
268269
special_transaction_payload: None,
269270
};
270-
let message = NetworkMessage::Tx(tx.clone());
271+
let message = Message::new(test_socket_address(1), NetworkMessage::Tx(tx.clone()));
271272

272273
// Handle the message
273274
let result = handler.handle_network_message(&message).await;
@@ -300,7 +301,7 @@ mod tests {
300301
block_hash: BlockHash::from([0u8; 32]),
301302
signature: dashcore::bls_sig_utils::BLSSignature::from([0u8; 96]),
302303
};
303-
let message = NetworkMessage::CLSig(chainlock);
304+
let message = Message::new(test_socket_address(1), NetworkMessage::CLSig(chainlock));
304305

305306
// Handle the message
306307
let result = handler.handle_network_message(&message).await;
@@ -323,7 +324,7 @@ mod tests {
323324
);
324325

325326
// Create a Ping message
326-
let message = NetworkMessage::Ping(12345);
327+
let message = Message::new(test_socket_address(1), NetworkMessage::Ping(12345));
327328

328329
// Handle the message
329330
let result = handler.handle_network_message(&message).await;
@@ -352,7 +353,7 @@ mod tests {
352353
let headers2 = dashcore::network::message_headers2::Headers2Message {
353354
headers: vec![], // Empty headers might cause validation error
354355
};
355-
let message = NetworkMessage::Headers2(headers2);
356+
let message = Message::new(test_socket_address(1), NetworkMessage::Headers2(headers2));
356357

357358
// Handle the message - error should be propagated
358359
let result = handler.handle_network_message(&message).await;

dash-spv/src/client/sync_coordinator.rs

Lines changed: 31 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use super::{DashSpvClient, MessageHandler};
1414
use crate::client::interface::DashSpvClientCommand;
1515
use crate::error::{Result, SpvError};
1616
use crate::network::constants::MESSAGE_RECEIVE_TIMEOUT;
17-
use crate::network::NetworkManager;
17+
use crate::network::{Message, MessageType, NetworkManager};
1818
use crate::storage::StorageManager;
1919
use crate::types::{DetailedSyncProgress, SyncProgress};
2020
use key_wallet_manager::wallet_interface::WalletInterface;
@@ -78,6 +78,22 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
7878
let mut last_emitted_filters_downloaded: u64 = 0;
7979
let mut last_emitted_phase_name: Option<String> = None;
8080

81+
let mut message_receiver = self
82+
.network
83+
.message_receiver(&[
84+
MessageType::Headers,
85+
MessageType::Headers2,
86+
MessageType::CFHeaders,
87+
MessageType::CFilter,
88+
MessageType::Block,
89+
MessageType::MnListDiff,
90+
MessageType::QRInfo,
91+
MessageType::CLSig,
92+
MessageType::ISLock,
93+
MessageType::Inv,
94+
])
95+
.await;
96+
8197
loop {
8298
// Check if we should stop
8399
let running = self.running.read().await;
@@ -415,12 +431,13 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
415431
}
416432
}
417433
}
418-
received = self.network.receive_message() => {
434+
received = message_receiver.recv() => {
419435
match received {
420-
Ok(None) => {
421-
continue;
436+
None => {
437+
tracing::info!("Network message subscription channel closed.");
438+
break;
422439
}
423-
Ok(Some(message)) => {
440+
Some(message) => {
424441
// Wrap message handling in comprehensive error handling
425442
match self.handle_network_message(message).await {
426443
Ok(_) => {
@@ -452,36 +469,6 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
452469
}
453470
}
454471
},
455-
Err(err) => {
456-
// Handle specific network error types
457-
if let crate::error::NetworkError::ConnectionFailed(msg) = &err {
458-
if msg.contains("No connected peers") || self.network.peer_count() == 0 {
459-
tracing::warn!("All peers disconnected during monitoring, checking connection health");
460-
461-
// Wait for potential reconnection
462-
let mut wait_count = 0;
463-
while wait_count < 10 && self.network.peer_count() == 0 {
464-
tokio::time::sleep(Duration::from_millis(500)).await;
465-
wait_count += 1;
466-
}
467-
468-
if self.network.peer_count() > 0 {
469-
tracing::info!(
470-
"✅ Reconnected to {} peer(s), resuming monitoring",
471-
self.network.peer_count()
472-
);
473-
continue
474-
} else {
475-
tracing::warn!(
476-
"No peers available after waiting, will retry monitoring"
477-
);
478-
}
479-
}
480-
}
481-
482-
tracing::error!("Network error during monitoring: {}", err);
483-
tokio::time::sleep(Duration::from_secs(5)).await;
484-
}
485472
}
486473
}
487474
_ = tokio::time::sleep(MESSAGE_RECEIVE_TIMEOUT) => {}
@@ -546,13 +533,10 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
546533
}
547534

548535
/// Handle incoming network messages during monitoring.
549-
pub(super) async fn handle_network_message(
550-
&mut self,
551-
message: dashcore::network::message::NetworkMessage,
552-
) -> Result<()> {
536+
pub(super) async fn handle_network_message(&mut self, message: Message) -> Result<()> {
553537
// Check if this is a special message that needs client-level processing
554538
let needs_special_processing = matches!(
555-
&message,
539+
&message.inner(),
556540
dashcore::network::message::NetworkMessage::CLSig(_)
557541
| dashcore::network::message::NetworkMessage::ISLock(_)
558542
);
@@ -582,17 +566,21 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
582566
if needs_special_processing {
583567
// Special handling for messages that need client-level processing
584568
use dashcore::network::message::NetworkMessage;
585-
match &message {
569+
match message.inner() {
586570
NetworkMessage::CLSig(clsig) => {
587571
// Additional client-level ChainLock processing
588-
self.process_chainlock(clsig.clone()).await?;
572+
self.process_chainlock(message.peer_address(), clsig.clone()).await?;
589573
}
590574
NetworkMessage::ISLock(islock_msg) => {
591575
// Only process InstantLocks when fully synced and masternode engine is available
592576
if self.sync_manager.is_synced()
593577
&& self.sync_manager.get_masternode_engine().is_some()
594578
{
595-
self.process_instantsendlock(islock_msg.clone()).await?;
579+
self.process_instantsendlock(
580+
message.peer_address(),
581+
islock_msg.clone(),
582+
)
583+
.await?;
596584
} else {
597585
tracing::debug!(
598586
"Skipping InstantLock processing - not fully synced or masternode engine unavailable"

0 commit comments

Comments
 (0)