@@ -1579,9 +1579,15 @@ impl<
15791579 // future for the completion of the write. This ensures monitor persistence
15801580 // ordering is preserved.
15811581 let encoded = update. encode ( ) ;
1582- res_a = Some ( async move {
1583- self . kv_store . write ( primary, & monitor_key, update_name. as_str ( ) , encoded) . await
1584- } ) ;
1582+ let write_fut: Pin <
1583+ Box < dyn MaybeSendableFuture < Output = Result < ( ) , io:: Error > > + ' static > ,
1584+ > = Box :: pin ( self . kv_store . write (
1585+ primary,
1586+ & monitor_key,
1587+ update_name. as_str ( ) ,
1588+ encoded,
1589+ ) ) ;
1590+ res_a = Some ( write_fut) ;
15851591 } else {
15861592 // We could write this update, but it meets criteria of our design that calls for a full monitor write.
15871593 // Note that this is NOT an async function, but rather calls the *sync* KVStore
@@ -2107,6 +2113,58 @@ mod tests {
21072113 do_persister_with_real_monitors ( 4 , 2 ) ;
21082114 }
21092115
2116+ #[ test]
2117+ fn async_update_persist_queues_write_synchronously ( ) {
2118+ // Build, but do not poll, an incremental monitor-update persistence future. The
2119+ // underlying KVStore::write call must queue the write synchronously so that queue order
2120+ // is set by call order rather than executor poll order.
2121+ let chanmon_cfgs = create_chanmon_cfgs ( 2 ) ;
2122+ let node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
2123+ let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
2124+ let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
2125+ let _ = create_announced_chan_between_nodes ( & nodes, 0 , 1 ) ;
2126+
2127+ send_payment ( & nodes[ 0 ] , & vec ! [ & nodes[ 1 ] ] [ ..] , 8_000_000 ) ;
2128+ send_payment ( & nodes[ 1 ] , & vec ! [ & nodes[ 0 ] ] [ ..] , 4_000_000 ) ;
2129+
2130+ let cmu_map = nodes[ 0 ] . chain_monitor . monitor_updates . lock ( ) . unwrap ( ) ;
2131+ let ( channel_id, updates) =
2132+ cmu_map. iter ( ) . next ( ) . expect ( "expected a channel with monitor updates" ) ;
2133+ let monitor = nodes[ 0 ] . chain_monitor . chain_monitor . get_monitor ( * channel_id) . unwrap ( ) ;
2134+ let monitor_name = monitor. persistence_key ( ) ;
2135+ let cmu = updates
2136+ . iter ( )
2137+ . find ( |u| u. update_id != 0 && u. update_id != u64:: MAX )
2138+ . expect ( "expected a normal monitor update" ) ;
2139+ let update_id = cmu. update_id ;
2140+
2141+ let store = TestStore :: new ( false ) ;
2142+ let persister = MonitorUpdatingPersisterAsync :: new (
2143+ & store,
2144+ PanicingSpawner ,
2145+ node_cfgs[ 0 ] . logger ,
2146+ update_id + 1 ,
2147+ node_cfgs[ 0 ] . keys_manager ,
2148+ node_cfgs[ 0 ] . keys_manager ,
2149+ node_cfgs[ 0 ] . tx_broadcaster ,
2150+ node_cfgs[ 0 ] . fee_estimator ,
2151+ ) ;
2152+
2153+ let _future =
2154+ Arc :: clone ( & persister. 0 ) . update_persisted_channel ( monitor_name, Some ( cmu) , & monitor) ;
2155+
2156+ let update_name = UpdateName :: from ( update_id) ;
2157+ let pending = store. list_pending_async_writes (
2158+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
2159+ & monitor_name. to_string ( ) ,
2160+ update_name. as_str ( ) ,
2161+ ) ;
2162+ assert ! (
2163+ !pending. is_empty( ) ,
2164+ "monitor update write was not queued synchronously by update_persisted_channel"
2165+ ) ;
2166+ }
2167+
21102168 // Test that if the `MonitorUpdatingPersister`'s can't actually write, trying to persist a
21112169 // monitor or update with it results in the persister returning an UnrecoverableError status.
21122170 #[ test]
0 commit comments