@@ -22,7 +22,7 @@ use std::net::{SocketAddr, SocketAddrV6};
2222use std:: time:: Duration ;
2323use thiserror:: Error ;
2424use tokio:: net:: { TcpListener , TcpStream } ;
25- use tokio:: sync:: { mpsc, oneshot} ;
25+ use tokio:: sync:: { mpsc, oneshot, watch } ;
2626use tokio:: time:: { Instant , MissedTickBehavior , interval} ;
2727
2828#[ derive( Debug , Clone ) ]
@@ -106,15 +106,13 @@ pub enum NodeApiRequest {
106106 config : NetworkConfig ,
107107 responder : oneshot:: Sender < Result < ( ) , NodeRequestError > > ,
108108 } ,
109-
110- /// Retrieve the current network config
111- GetNetworkConfig { responder : oneshot:: Sender < Option < NetworkConfig > > } ,
112109}
113110
114111/// A handle for interacting with a `Node` task
115112#[ derive( Debug , Clone ) ]
116113pub struct NodeHandle {
117114 tx : mpsc:: Sender < NodeApiRequest > ,
115+ network_config_rx : watch:: Receiver < Option < NetworkConfig > > ,
118116}
119117
120118impl NodeHandle {
@@ -223,17 +221,11 @@ impl NodeHandle {
223221 rx. await ?
224222 }
225223
226- /// Retrieve the current network config
227- pub async fn get_network_config (
224+ /// Subscribe to the watch channel containing the network config
225+ pub fn network_config_subscribe (
228226 & self ,
229- ) -> Result < Option < NetworkConfig > , NodeRequestError > {
230- let ( tx, rx) = oneshot:: channel ( ) ;
231- self . tx
232- . send ( NodeApiRequest :: GetNetworkConfig { responder : tx } )
233- . await
234- . map_err ( |_| NodeRequestError :: Send ) ?;
235- let res = rx. await ?;
236- Ok ( res)
227+ ) -> watch:: Receiver < Option < NetworkConfig > > {
228+ self . network_config_rx . clone ( )
237229 }
238230}
239231
@@ -256,7 +248,7 @@ pub struct Status {
256248/// via control of an underlying `Fsm`.
257249pub struct Node {
258250 fsm_ledger_generation : u64 ,
259- network_config : Option < NetworkConfig > ,
251+ network_config : watch :: Sender < Option < NetworkConfig > > ,
260252 config : Config ,
261253 fsm : Fsm ,
262254 peers : BTreeSet < SocketAddrV6 > ,
@@ -339,11 +331,13 @@ impl Node {
339331 config. clone ( ) . into ( ) ,
340332 )
341333 . await ;
342- let network_config = NetworkConfig :: load (
343- & log,
344- config. network_config_ledger_paths . clone ( ) ,
345- )
346- . await ;
334+ let ( network_config, network_config_rx) = watch:: channel (
335+ NetworkConfig :: load (
336+ & log,
337+ config. network_config_ledger_paths . clone ( ) ,
338+ )
339+ . await ,
340+ ) ;
347341
348342 (
349343 Node {
@@ -364,14 +358,14 @@ impl Node {
364358 conn_rx,
365359 conn_tx,
366360 } ,
367- NodeHandle { tx } ,
361+ NodeHandle { tx, network_config_rx } ,
368362 )
369363 }
370364
371365 /// Run the main loop of the peer
372366 ///
373367 /// This should be spawned into its own tokio task
374- pub async fn run ( & mut self ) {
368+ pub async fn run ( mut self ) {
375369 // select among timer tick/received messages
376370 let mut interval = interval ( self . config . time_per_tick ) ;
377371 interval. set_missed_tick_behavior ( MissedTickBehavior :: Delay ) ;
@@ -515,6 +509,7 @@ impl Node {
515509 fsm_ledger_generation : self . fsm_ledger_generation ,
516510 network_config_ledger_generation : self
517511 . network_config
512+ . borrow ( )
518513 . as_ref ( )
519514 . map ( |c| c. generation ) ,
520515 fsm_state : self . fsm . state ( ) . name ( ) ,
@@ -552,8 +547,11 @@ impl Node {
552547 }
553548 }
554549 NodeApiRequest :: UpdateNetworkConfig { config, responder } => {
555- let current_gen =
556- self . network_config . as_ref ( ) . map_or ( 0 , |c| c. generation ) ;
550+ let current_gen = self
551+ . network_config
552+ . borrow ( )
553+ . as_ref ( )
554+ . map_or ( 0 , |c| c. generation ) ;
557555 info ! (
558556 self . log,
559557 concat!(
@@ -608,7 +606,9 @@ impl Node {
608606 } ,
609607 ) ) ;
610608 } else {
611- self . network_config = Some ( config. clone ( ) ) ;
609+ self . network_config . send_modify ( |c| {
610+ * c = Some ( config. clone ( ) ) ;
611+ } ) ;
612612 NetworkConfig :: save (
613613 & self . log ,
614614 self . config . network_config_ledger_paths . clone ( ) ,
@@ -622,9 +622,6 @@ impl Node {
622622 let _ = responder. send ( Ok ( ( ) ) ) ;
623623 }
624624 }
625- NodeApiRequest :: GetNetworkConfig { responder } => {
626- let _ = responder. send ( self . network_config . clone ( ) ) ;
627- }
628625 }
629626 }
630627
@@ -637,7 +634,7 @@ impl Node {
637634 ) {
638635 // We only call this method when there has been an update. Otherwise we
639636 // have an invariant violation due to programmer error and should panic.
640- let network_config = self . network_config . as_ref ( ) . unwrap ( ) ;
637+ let network_config = self . network_config . borrow ( ) . clone ( ) . unwrap ( ) ;
641638 info ! (
642639 self . log,
643640 "Broadcasting network config with generation {}" ,
@@ -835,13 +832,10 @@ impl Node {
835832 addr,
836833 unique_id : accepted_handle. unique_id ,
837834 } ;
838- if let Some ( network_config) = self . network_config . as_ref ( ) {
839- self . send_network_config (
840- network_config. clone ( ) ,
841- & peer_id,
842- & handle,
843- )
844- . await ;
835+ let maybe_network_config = self . network_config . borrow ( ) . clone ( ) ;
836+ if let Some ( network_config) = maybe_network_config {
837+ self . send_network_config ( network_config, & peer_id, & handle)
838+ . await ;
845839 }
846840
847841 self . established_connections . insert ( peer_id. clone ( ) , handle) ;
@@ -864,9 +858,11 @@ impl Node {
864858 return ;
865859 }
866860
867- if let Some ( network_config) = self . network_config . as_ref ( ) {
861+ let maybe_network_config =
862+ self . network_config . borrow ( ) . clone ( ) ;
863+ if let Some ( network_config) = maybe_network_config {
868864 self . send_network_config (
869- network_config. clone ( ) ,
865+ network_config,
870866 & peer_id,
871867 & handle,
872868 )
@@ -941,8 +937,11 @@ impl Node {
941937 self . accepted_connections . remove ( & addr) ;
942938 }
943939 ConnToMainMsgInner :: ReceivedNetworkConfig { from, config } => {
944- let current_gen =
945- self . network_config . as_ref ( ) . map_or ( 0 , |c| c. generation ) ;
940+ let current_gen = self
941+ . network_config
942+ . borrow ( )
943+ . as_ref ( )
944+ . map_or ( 0 , |c| c. generation ) ;
946945 let generation = config. generation ;
947946 info ! (
948947 self . log,
@@ -955,7 +954,9 @@ impl Node {
955954 current_gen
956955 ) ;
957956 if generation > current_gen {
958- self . network_config = Some ( config. clone ( ) ) ;
957+ self . network_config . send_modify ( |c| {
958+ * c = Some ( config. clone ( ) ) ;
959+ } ) ;
959960 NetworkConfig :: save (
960961 & self . log ,
961962 self . config . network_config_ledger_paths . clone ( ) ,
@@ -1082,7 +1083,9 @@ mod tests {
10821083 TestNode { config, log, node_handles : None }
10831084 }
10841085
1085- async fn start_node ( & mut self ) {
1086+ async fn start_node (
1087+ & mut self ,
1088+ ) -> watch:: Receiver < Option < NetworkConfig > > {
10861089 // Node must have previously been shutdown (or never started)
10871090 assert ! (
10881091 self . node_handles. is_none( ) ,
@@ -1094,7 +1097,7 @@ mod tests {
10941097 self . config . addr . set_port ( 0 ) ;
10951098
10961099 // (Re-)create node with existing config and its persistent state (if any)
1097- let ( mut node, handle) =
1100+ let ( node, handle) =
10981101 Node :: new ( self . config . clone ( ) , & self . log ) . await ;
10991102 let jh = tokio:: spawn ( async move {
11001103 node. run ( ) . await ;
@@ -1113,7 +1116,9 @@ mod tests {
11131116 . port ( ) ;
11141117 self . config . addr . set_port ( port) ;
11151118
1119+ let network_config_rx = handle. network_config_subscribe ( ) ;
11161120 self . node_handles = Some ( ( handle, jh) ) ;
1121+ network_config_rx
11171122 }
11181123
11191124 async fn shutdown_node ( & mut self ) {
@@ -1184,11 +1189,15 @@ mod tests {
11841189 }
11851190
11861191 /// (Re-)start the given node and update peer addresses for everyone
1187- async fn start_node ( & mut self , i : usize ) {
1192+ async fn start_node (
1193+ & mut self ,
1194+ i : usize ,
1195+ ) -> watch:: Receiver < Option < NetworkConfig > > {
11881196 let node = & mut self . nodes [ i] ;
1189- node. start_node ( ) . await ;
1197+ let network_config_rx = node. start_node ( ) . await ;
11901198 self . addrs . insert ( node. config . addr ) ;
11911199 self . load_all_peer_addresses ( ) . await ;
1200+ network_config_rx
11921201 }
11931202
11941203 // Stop the given node and update peer addresses for everyone
@@ -1472,13 +1481,13 @@ mod tests {
14721481 async fn network_config ( ) {
14731482 // Create and start test nodes
14741483 let mut nodes = TestNodes :: setup ( initial_members ( ) ) ;
1475- nodes. start_node ( 0 ) . await ;
1476- nodes. start_node ( 1 ) . await ;
1477- nodes. start_node ( 2 ) . await ;
1484+ let node0_rx = nodes. start_node ( 0 ) . await ;
1485+ let mut node1_rx = nodes. start_node ( 1 ) . await ;
1486+ let mut node2_rx = nodes. start_node ( 2 ) . await ;
14781487
14791488 // Ensure there is no network config at any of the nodes
1480- for node in nodes . iter ( ) {
1481- assert_eq ! ( None , node . get_network_config ( ) . await . unwrap ( ) ) ;
1489+ for rx in [ & node0_rx , & node1_rx , & node2_rx ] {
1490+ assert_eq ! ( None , rx . borrow ( ) . as_ref ( ) ) ;
14821491 }
14831492
14841493 // Update the network config at node0 and ensure it has taken effect
@@ -1487,10 +1496,7 @@ mod tests {
14871496 blob : b"Some network data" . to_vec ( ) ,
14881497 } ;
14891498 nodes[ 0 ] . update_network_config ( network_config. clone ( ) ) . await . unwrap ( ) ;
1490- assert_eq ! (
1491- Some ( & network_config) ,
1492- nodes[ 0 ] . get_network_config( ) . await . unwrap( ) . as_ref( )
1493- ) ;
1499+ assert_eq ! ( Some ( & network_config) , node0_rx. borrow( ) . as_ref( ) , ) ;
14941500
14951501 // Poll node1 and node2 until the network config update shows up
14961502 // Timeout after 5 seconds
@@ -1499,19 +1505,25 @@ mod tests {
14991505 let mut node1_done = false ;
15001506 let mut node2_done = false ;
15011507 while !( node1_done && node2_done) {
1502- let timeout = POLL_TIMEOUT . saturating_sub ( Instant :: now ( ) - start ) ;
1508+ let timeout = POLL_TIMEOUT . saturating_sub ( start . elapsed ( ) ) ;
15031509 tokio:: select! {
15041510 _ = sleep( timeout) => {
15051511 panic!( "Network config not replicated" ) ;
15061512 }
1507- res = nodes[ 1 ] . get_network_config( ) , if !node1_done => {
1508- if res. unwrap( ) . as_ref( ) == Some ( & network_config) {
1513+ _ = node1_rx. changed( ) , if !node1_done => {
1514+ if node1_rx
1515+ . borrow_and_update( )
1516+ . as_ref( ) == Some ( & network_config)
1517+ {
15091518 node1_done = true ;
15101519 continue ;
15111520 }
15121521 }
1513- res = nodes[ 2 ] . get_network_config( ) , if !node2_done => {
1514- if res. unwrap( ) . as_ref( ) == Some ( & network_config) {
1522+ _ = node2_rx. changed( ) , if !node2_done => {
1523+ if node2_rx
1524+ . borrow_and_update( )
1525+ . as_ref( ) == Some ( & network_config)
1526+ {
15151527 node2_done = true ;
15161528 continue ;
15171529 }
@@ -1526,30 +1538,29 @@ mod tests {
15261538 // Poll the learner to ensure it gets the network config
15271539 // Note that the learner doesn't even need to learn its share
15281540 // for network config replication to work.
1541+ let mut learner_rx = nodes[ LEARNER ] . network_config_subscribe ( ) ;
15291542 let start = Instant :: now ( ) ;
1530- let mut done = false ;
1531- while !done {
1532- let timeout = POLL_TIMEOUT . saturating_sub ( Instant :: now ( ) - start) ;
1543+ loop {
1544+ if learner_rx. borrow_and_update ( ) . as_ref ( ) == Some ( & network_config)
1545+ {
1546+ break ;
1547+ }
1548+ let timeout = POLL_TIMEOUT . saturating_sub ( start. elapsed ( ) ) ;
15331549 tokio:: select! {
15341550 _ = sleep( timeout) => {
15351551 panic!( "Network config not replicated" ) ;
15361552 }
1537- res = nodes[ LEARNER ] . get_network_config( ) => {
1538- if res. unwrap( ) . as_ref( ) == Some ( & network_config) {
1539- done = true ;
1540- }
1553+ _ = learner_rx. changed( ) => {
1554+ continue ;
15411555 }
15421556 }
15431557 }
15441558
15451559 // Stop node0, bring it back online and ensure it still sees the config
15461560 // at generation 1
15471561 nodes. shutdown_node ( 0 ) . await ;
1548- nodes. start_node ( 0 ) . await ;
1549- assert_eq ! (
1550- Some ( & network_config) ,
1551- nodes[ 0 ] . get_network_config( ) . await . unwrap( ) . as_ref( )
1552- ) ;
1562+ let node0_rx = nodes. start_node ( 0 ) . await ;
1563+ assert_eq ! ( Some ( & network_config) , node0_rx. borrow( ) . as_ref( ) , ) ;
15531564
15541565 // Stop node0 again, update network config via node1, bring node0 back online,
15551566 // and ensure all nodes see the latest configuration.
@@ -1559,25 +1570,22 @@ mod tests {
15591570 blob : b"Some more network data" . to_vec ( ) ,
15601571 } ;
15611572 nodes[ 1 ] . update_network_config ( new_config. clone ( ) ) . await . unwrap ( ) ;
1562- assert_eq ! (
1563- Some ( & new_config) ,
1564- nodes[ 1 ] . get_network_config( ) . await . unwrap( ) . as_ref( )
1565- ) ;
1566- nodes. start_node ( 0 ) . await ;
1573+ assert_eq ! ( Some ( & new_config) , node1_rx. borrow( ) . as_ref( ) , ) ;
1574+ let node0_rx = nodes. start_node ( 0 ) . await ;
15671575 let start = Instant :: now ( ) ;
15681576 // These should all resolve instantly, so no real need for a select,
15691577 // which is getting tedious.
15701578 // We also want to repeatedly loop until all consistently have the same version
15711579 // to give some assurance that the old version from node0 doesn't replicate
15721580 ' outer: loop {
1573- if Instant :: now ( ) - start > POLL_TIMEOUT {
1581+ if start . elapsed ( ) > POLL_TIMEOUT {
15741582 panic ! ( "network config not replicated" ) ;
15751583 }
1576- for node in nodes . iter ( ) {
1577- if node . get_network_config ( ) . await . unwrap ( ) . as_ref ( )
1578- != Some ( & new_config )
1579- {
1580- // We need to try again
1584+ for rx in [ & node0_rx , & node1_rx , & node2_rx ] {
1585+ if rx . borrow ( ) . as_ref ( ) != Some ( & new_config ) {
1586+ // We need to try again; sleep to yield back to the runtime
1587+ // and give it a chance to propagate changes to us.
1588+ tokio :: time :: sleep ( Duration :: from_millis ( 10 ) ) . await ;
15811589 continue ' outer;
15821590 }
15831591 }
0 commit comments