@@ -33,7 +33,7 @@ use lightning::routing::scoring::{
3333} ;
3434use lightning:: sign:: { EntropySource , NodeSigner } ;
3535use lightning:: util:: persist:: {
36- KVStoreSync , CHANNEL_MANAGER_PERSISTENCE_KEY , CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
36+ KVStore , CHANNEL_MANAGER_PERSISTENCE_KEY , CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
3737 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
3838} ;
3939use lightning:: util:: ser:: ReadableArgs ;
@@ -1052,10 +1052,20 @@ fn build_with_store_internal(
10521052 }
10531053 }
10541054
1055+ let tx_broadcaster = Arc :: new ( TransactionBroadcaster :: new ( Arc :: clone ( & logger) ) ) ;
1056+ let fee_estimator = Arc :: new ( OnchainFeeEstimator :: new ( ) ) ;
1057+
1058+ let kv_store_ref = Arc :: clone ( & kv_store) ;
1059+ let logger_ref = Arc :: clone ( & logger) ;
1060+ let ( payment_store_res, node_metris_res) = runtime. block_on ( async move {
1061+ tokio:: join!(
1062+ read_payments( & * kv_store_ref, Arc :: clone( & logger_ref) ) ,
1063+ read_node_metrics( & * kv_store_ref, Arc :: clone( & logger_ref) ) ,
1064+ )
1065+ } ) ;
1066+
10551067 // Initialize the status fields.
1056- let node_metrics = match runtime
1057- . block_on ( async { read_node_metrics ( & * kv_store, Arc :: clone ( & logger) ) . await } )
1058- {
1068+ let node_metrics = match node_metris_res {
10591069 Ok ( metrics) => Arc :: new ( RwLock :: new ( metrics) ) ,
10601070 Err ( e) => {
10611071 if e. kind ( ) == std:: io:: ErrorKind :: NotFound {
@@ -1066,23 +1076,20 @@ fn build_with_store_internal(
10661076 }
10671077 } ,
10681078 } ;
1069- let tx_broadcaster = Arc :: new ( TransactionBroadcaster :: new ( Arc :: clone ( & logger) ) ) ;
1070- let fee_estimator = Arc :: new ( OnchainFeeEstimator :: new ( ) ) ;
10711079
1072- let payment_store =
1073- match runtime. block_on ( async { read_payments ( & * kv_store, Arc :: clone ( & logger) ) . await } ) {
1074- Ok ( payments) => Arc :: new ( PaymentStore :: new (
1075- payments,
1076- PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE . to_string ( ) ,
1077- PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE . to_string ( ) ,
1078- Arc :: clone ( & kv_store) ,
1079- Arc :: clone ( & logger) ,
1080- ) ) ,
1081- Err ( e) => {
1082- log_error ! ( logger, "Failed to read payment data from store: {}" , e) ;
1083- return Err ( BuildError :: ReadFailed ) ;
1084- } ,
1085- } ;
1080+ let payment_store = match payment_store_res {
1081+ Ok ( payments) => Arc :: new ( PaymentStore :: new (
1082+ payments,
1083+ PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE . to_string ( ) ,
1084+ PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE . to_string ( ) ,
1085+ Arc :: clone ( & kv_store) ,
1086+ Arc :: clone ( & logger) ,
1087+ ) ) ,
1088+ Err ( e) => {
1089+ log_error ! ( logger, "Failed to read payment data from store: {}" , e) ;
1090+ return Err ( BuildError :: ReadFailed ) ;
1091+ } ,
1092+ } ;
10861093
10871094 let ( chain_source, chain_tip_opt) = match chain_data_source_config {
10881095 Some ( ChainDataSourceConfig :: Esplora { server_url, headers, sync_config } ) => {
@@ -1273,10 +1280,18 @@ fn build_with_store_internal(
12731280 Arc :: clone ( & fee_estimator) ,
12741281 ) ) ;
12751282
1283+ // Read ChannelMonitors and the NetworkGraph
1284+ let kv_store_ref = Arc :: clone ( & kv_store) ;
1285+ let logger_ref = Arc :: clone ( & logger) ;
1286+ let ( monitor_read_res, network_graph_res) = runtime. block_on ( async move {
1287+ tokio:: join!(
1288+ monitor_reader. read_all_channel_monitors_with_updates_parallel( ) ,
1289+ read_network_graph( & * kv_store_ref, logger_ref) ,
1290+ )
1291+ } ) ;
1292+
12761293 // Read ChannelMonitor state from store
1277- let monitor_read_result =
1278- runtime. block_on ( monitor_reader. read_all_channel_monitors_with_updates_parallel ( ) ) ;
1279- let channel_monitors = match monitor_read_result {
1294+ let channel_monitors = match monitor_read_res {
12801295 Ok ( monitors) => monitors,
12811296 Err ( e) => {
12821297 if e. kind ( ) == lightning:: io:: ErrorKind :: NotFound {
@@ -1310,9 +1325,7 @@ fn build_with_store_internal(
13101325 ) ) ;
13111326
13121327 // Initialize the network graph, scorer, and router
1313- let network_graph = match runtime
1314- . block_on ( async { read_network_graph ( & * kv_store, Arc :: clone ( & logger) ) . await } )
1315- {
1328+ let network_graph = match network_graph_res {
13161329 Ok ( graph) => Arc :: new ( graph) ,
13171330 Err ( e) => {
13181331 if e. kind ( ) == std:: io:: ErrorKind :: NotFound {
@@ -1324,9 +1337,42 @@ fn build_with_store_internal(
13241337 } ,
13251338 } ;
13261339
1327- let local_scorer = match runtime. block_on ( async {
1328- read_scorer ( & * kv_store, Arc :: clone ( & network_graph) , Arc :: clone ( & logger) ) . await
1329- } ) {
1340+ // Read various smaller LDK and ldk-node objects from the store
1341+ let kv_store_ref = Arc :: clone ( & kv_store) ;
1342+ let logger_ref = Arc :: clone ( & logger) ;
1343+ let network_graph_ref = Arc :: clone ( & network_graph) ;
1344+ let output_sweeper_future = read_output_sweeper (
1345+ Arc :: clone ( & tx_broadcaster) ,
1346+ Arc :: clone ( & fee_estimator) ,
1347+ Arc :: clone ( & chain_source) ,
1348+ Arc :: clone ( & keys_manager) ,
1349+ Arc :: clone ( & kv_store_ref) ,
1350+ Arc :: clone ( & logger_ref) ,
1351+ ) ;
1352+ let (
1353+ scorer_res,
1354+ external_scores_res,
1355+ channel_manager_bytes_res,
1356+ sweeper_bytes_res,
1357+ event_queue_res,
1358+ peer_info_res,
1359+ ) = runtime. block_on ( async move {
1360+ tokio:: join!(
1361+ read_scorer( & * kv_store_ref, network_graph_ref, Arc :: clone( & logger_ref) ) ,
1362+ read_external_pathfinding_scores_from_cache( & * kv_store_ref, Arc :: clone( & logger_ref) ) ,
1363+ KVStore :: read(
1364+ & * kv_store_ref,
1365+ CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
1366+ CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
1367+ CHANNEL_MANAGER_PERSISTENCE_KEY ,
1368+ ) ,
1369+ output_sweeper_future,
1370+ read_event_queue( Arc :: clone( & kv_store_ref) , Arc :: clone( & logger_ref) ) ,
1371+ read_peer_info( Arc :: clone( & kv_store_ref) , Arc :: clone( & logger_ref) ) ,
1372+ )
1373+ } ) ;
1374+
1375+ let local_scorer = match scorer_res {
13301376 Ok ( scorer) => scorer,
13311377 Err ( e) => {
13321378 if e. kind ( ) == std:: io:: ErrorKind :: NotFound {
@@ -1342,9 +1388,7 @@ fn build_with_store_internal(
13421388 let scorer = Arc :: new ( Mutex :: new ( CombinedScorer :: new ( local_scorer) ) ) ;
13431389
13441390 // Restore external pathfinding scores from cache if possible.
1345- match runtime. block_on ( async {
1346- read_external_pathfinding_scores_from_cache ( & * kv_store, Arc :: clone ( & logger) ) . await
1347- } ) {
1391+ match external_scores_res {
13481392 Ok ( external_scores) => {
13491393 scorer. lock ( ) . unwrap ( ) . merge ( external_scores, cur_time) ;
13501394 log_trace ! ( logger, "External scores from cache merged successfully" ) ;
@@ -1397,12 +1441,7 @@ fn build_with_store_internal(
13971441
13981442 // Initialize the ChannelManager
13991443 let channel_manager = {
1400- if let Ok ( reader) = KVStoreSync :: read (
1401- & * kv_store,
1402- CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
1403- CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
1404- CHANNEL_MANAGER_PERSISTENCE_KEY ,
1405- ) {
1444+ if let Ok ( reader) = channel_manager_bytes_res {
14061445 let channel_monitor_references =
14071446 channel_monitors. iter ( ) . map ( |( _, chanmon) | chanmon) . collect ( ) ;
14081447 let read_args = ChannelManagerReadArgs :: new (
@@ -1627,17 +1666,7 @@ fn build_with_store_internal(
16271666 let connection_manager =
16281667 Arc :: new ( ConnectionManager :: new ( Arc :: clone ( & peer_manager) , Arc :: clone ( & logger) ) ) ;
16291668
1630- let output_sweeper = match runtime. block_on ( async {
1631- read_output_sweeper (
1632- Arc :: clone ( & tx_broadcaster) ,
1633- Arc :: clone ( & fee_estimator) ,
1634- Arc :: clone ( & chain_source) ,
1635- Arc :: clone ( & keys_manager) ,
1636- Arc :: clone ( & kv_store) ,
1637- Arc :: clone ( & logger) ,
1638- )
1639- . await
1640- } ) {
1669+ let output_sweeper = match sweeper_bytes_res {
16411670 Ok ( output_sweeper) => Arc :: new ( output_sweeper) ,
16421671 Err ( e) => {
16431672 if e. kind ( ) == std:: io:: ErrorKind :: NotFound {
@@ -1658,9 +1687,7 @@ fn build_with_store_internal(
16581687 } ,
16591688 } ;
16601689
1661- let event_queue = match runtime
1662- . block_on ( async { read_event_queue ( Arc :: clone ( & kv_store) , Arc :: clone ( & logger) ) . await } )
1663- {
1690+ let event_queue = match event_queue_res {
16641691 Ok ( event_queue) => Arc :: new ( event_queue) ,
16651692 Err ( e) => {
16661693 if e. kind ( ) == std:: io:: ErrorKind :: NotFound {
@@ -1672,9 +1699,7 @@ fn build_with_store_internal(
16721699 } ,
16731700 } ;
16741701
1675- let peer_store = match runtime
1676- . block_on ( async { read_peer_info ( Arc :: clone ( & kv_store) , Arc :: clone ( & logger) ) . await } )
1677- {
1702+ let peer_store = match peer_info_res {
16781703 Ok ( peer_store) => Arc :: new ( peer_store) ,
16791704 Err ( e) => {
16801705 if e. kind ( ) == std:: io:: ErrorKind :: NotFound {
0 commit comments