Skip to content

Commit ef0163d

Browse files
tnullamackillop
authored andcommitted
Use async KVStore for read_X util methods
Rather than using `KVStoreSync` we now use the async `KVStore` implementation for most `read_X` util methods used during node building. This is a first step towards making node building/startup entirely async eventually.
1 parent c315e8c commit ef0163d

2 files changed

Lines changed: 129 additions & 89 deletions

File tree

src/builder.rs

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ use crate::fee_estimator::OnchainFeeEstimator;
5555
use crate::gossip::GossipSource;
5656
use crate::io::sqlite_store::SqliteStore;
5757
use crate::io::utils::{
58-
read_external_pathfinding_scores_from_cache, read_node_metrics, write_node_metrics,
58+
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
59+
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_scorer,
60+
write_node_metrics,
5961
};
6062
use crate::io::vss_store::VssStore;
6163
use crate::io::{
@@ -1233,7 +1235,9 @@ fn build_with_store_internal(
12331235
}
12341236

12351237
// Initialize the status fields.
1236-
let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) {
1238+
let node_metrics = match runtime
1239+
.block_on(async { read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1240+
{
12371241
Ok(metrics) => Arc::new(RwLock::new(metrics)),
12381242
Err(e) => {
12391243
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1247,7 +1251,9 @@ fn build_with_store_internal(
12471251
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
12481252
let fee_estimator = Arc::new(OnchainFeeEstimator::new());
12491253

1250-
let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) {
1254+
let payment_store = match runtime
1255+
.block_on(async { read_payments(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1256+
{
12511257
Ok(payments) => Arc::new(PaymentStore::new(
12521258
payments,
12531259
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
@@ -1474,24 +1480,23 @@ fn build_with_store_internal(
14741480
));
14751481

14761482
// Initialize the network graph, scorer, and router
1477-
let network_graph =
1478-
match io::utils::read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)) {
1479-
Ok(graph) => Arc::new(graph),
1480-
Err(e) => {
1481-
if e.kind() == std::io::ErrorKind::NotFound {
1482-
Arc::new(Graph::new(config.network.into(), Arc::clone(&logger)))
1483-
} else {
1484-
log_error!(logger, "Failed to read network graph from store: {}", e);
1485-
return Err(BuildError::ReadFailed);
1486-
}
1487-
},
1488-
};
1483+
let network_graph = match runtime
1484+
.block_on(async { read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1485+
{
1486+
Ok(graph) => Arc::new(graph),
1487+
Err(e) => {
1488+
if e.kind() == std::io::ErrorKind::NotFound {
1489+
Arc::new(Graph::new(config.network.into(), Arc::clone(&logger)))
1490+
} else {
1491+
log_error!(logger, "Failed to read network graph from store: {}", e);
1492+
return Err(BuildError::ReadFailed);
1493+
}
1494+
},
1495+
};
14891496

1490-
let local_scorer = match io::utils::read_scorer(
1491-
Arc::clone(&kv_store),
1492-
Arc::clone(&network_graph),
1493-
Arc::clone(&logger),
1494-
) {
1497+
let local_scorer = match runtime.block_on(async {
1498+
read_scorer(Arc::clone(&kv_store), Arc::clone(&network_graph), Arc::clone(&logger)).await
1499+
}) {
14951500
Ok(scorer) => scorer,
14961501
Err(e) => {
14971502
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1507,7 +1512,10 @@ fn build_with_store_internal(
15071512
let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));
15081513

15091514
// Restore external pathfinding scores from cache if possible.
1510-
match read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger)) {
1515+
match runtime.block_on(async {
1516+
read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger))
1517+
.await
1518+
}) {
15111519
Ok(external_scores) => {
15121520
scorer.lock().unwrap().merge(external_scores, cur_time);
15131521
log_trace!(logger, "External scores from cache merged successfully");
@@ -1709,7 +1717,8 @@ fn build_with_store_internal(
17091717
},
17101718
};
17111719

1712-
let event_queue = match io::utils::read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger))
1720+
let event_queue = match runtime
1721+
.block_on(async { read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)).await })
17131722
{
17141723
Ok(event_queue) => Arc::new(event_queue),
17151724
Err(e) => {
@@ -1831,14 +1840,17 @@ fn build_with_store_internal(
18311840
let connection_manager =
18321841
Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger)));
18331842

1834-
let output_sweeper = match io::utils::read_output_sweeper(
1835-
Arc::clone(&tx_broadcaster),
1836-
Arc::clone(&fee_estimator),
1837-
Arc::clone(&chain_source),
1838-
Arc::clone(&keys_manager),
1839-
Arc::clone(&kv_store),
1840-
Arc::clone(&logger),
1841-
) {
1843+
let output_sweeper = match runtime.block_on(async {
1844+
read_output_sweeper(
1845+
Arc::clone(&tx_broadcaster),
1846+
Arc::clone(&fee_estimator),
1847+
Arc::clone(&chain_source),
1848+
Arc::clone(&keys_manager),
1849+
Arc::clone(&kv_store),
1850+
Arc::clone(&logger),
1851+
)
1852+
.await
1853+
}) {
18421854
Ok(output_sweeper) => Arc::new(output_sweeper),
18431855
Err(e) => {
18441856
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1859,7 +1871,9 @@ fn build_with_store_internal(
18591871
},
18601872
};
18611873

1862-
let peer_store = match io::utils::read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)) {
1874+
let peer_store = match runtime
1875+
.block_on(async { read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1876+
{
18631877
Ok(peer_store) => Arc::new(peer_store),
18641878
Err(e) => {
18651879
if e.kind() == std::io::ErrorKind::NotFound {

src/io/utils.rs

Lines changed: 84 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -133,38 +133,44 @@ where
133133
}
134134

135135
/// Read a previously persisted [`NetworkGraph`] from the store.
136-
pub(crate) fn read_network_graph<L: Deref + Clone>(
136+
pub(crate) async fn read_network_graph<L: Deref + Clone>(
137137
kv_store: Arc<DynStore>, logger: L,
138138
) -> Result<NetworkGraph<L>, std::io::Error>
139139
where
140140
L::Target: LdkLogger,
141141
{
142-
let mut reader = Cursor::new(KVStoreSync::read(
143-
&*kv_store,
144-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
145-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
146-
NETWORK_GRAPH_PERSISTENCE_KEY,
147-
)?);
142+
let mut reader = Cursor::new(
143+
KVStore::read(
144+
&*kv_store,
145+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
146+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
147+
NETWORK_GRAPH_PERSISTENCE_KEY,
148+
)
149+
.await?,
150+
);
148151
NetworkGraph::read(&mut reader, logger.clone()).map_err(|e| {
149152
log_error!(logger, "Failed to deserialize NetworkGraph: {}", e);
150153
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize NetworkGraph")
151154
})
152155
}
153156

154157
/// Read a previously persisted [`ProbabilisticScorer`] from the store.
155-
pub(crate) fn read_scorer<G: Deref<Target = NetworkGraph<L>>, L: Deref + Clone>(
158+
pub(crate) async fn read_scorer<G: Deref<Target = NetworkGraph<L>>, L: Deref + Clone>(
156159
kv_store: Arc<DynStore>, network_graph: G, logger: L,
157160
) -> Result<ProbabilisticScorer<G, L>, std::io::Error>
158161
where
159162
L::Target: LdkLogger,
160163
{
161164
let params = ProbabilisticScoringDecayParameters::default();
162-
let mut reader = Cursor::new(KVStoreSync::read(
163-
&*kv_store,
164-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
165-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
166-
SCORER_PERSISTENCE_KEY,
167-
)?);
165+
let mut reader = Cursor::new(
166+
KVStore::read(
167+
&*kv_store,
168+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
169+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
170+
SCORER_PERSISTENCE_KEY,
171+
)
172+
.await?,
173+
);
168174
let args = (params, network_graph, logger.clone());
169175
ProbabilisticScorer::read(&mut reader, args).map_err(|e| {
170176
log_error!(logger, "Failed to deserialize scorer: {}", e);
@@ -173,18 +179,21 @@ where
173179
}
174180

175181
/// Read previously persisted external pathfinding scores from the cache.
176-
pub(crate) fn read_external_pathfinding_scores_from_cache<L: Deref>(
182+
pub(crate) async fn read_external_pathfinding_scores_from_cache<L: Deref>(
177183
kv_store: Arc<DynStore>, logger: L,
178184
) -> Result<ChannelLiquidities, std::io::Error>
179185
where
180186
L::Target: LdkLogger,
181187
{
182-
let mut reader = Cursor::new(KVStoreSync::read(
183-
&*kv_store,
184-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
185-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
186-
EXTERNAL_PATHFINDING_SCORES_CACHE_KEY,
187-
)?);
188+
let mut reader = Cursor::new(
189+
KVStore::read(
190+
&*kv_store,
191+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
192+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
193+
EXTERNAL_PATHFINDING_SCORES_CACHE_KEY,
194+
)
195+
.await?,
196+
);
188197
ChannelLiquidities::read(&mut reader).map_err(|e| {
189198
log_error!(logger, "Failed to deserialize scorer: {}", e);
190199
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize Scorer")
@@ -220,63 +229,74 @@ where
220229
}
221230

222231
/// Read previously persisted events from the store.
223-
pub(crate) fn read_event_queue<L: Deref + Clone>(
232+
pub(crate) async fn read_event_queue<L: Deref + Clone>(
224233
kv_store: Arc<DynStore>, logger: L,
225234
) -> Result<EventQueue<L>, std::io::Error>
226235
where
227236
L::Target: LdkLogger,
228237
{
229-
let mut reader = Cursor::new(KVStoreSync::read(
230-
&*kv_store,
231-
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
232-
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
233-
EVENT_QUEUE_PERSISTENCE_KEY,
234-
)?);
238+
let mut reader = Cursor::new(
239+
KVStore::read(
240+
&*kv_store,
241+
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
242+
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
243+
EVENT_QUEUE_PERSISTENCE_KEY,
244+
)
245+
.await?,
246+
);
235247
EventQueue::read(&mut reader, (kv_store, logger.clone())).map_err(|e| {
236248
log_error!(logger, "Failed to deserialize event queue: {}", e);
237249
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize EventQueue")
238250
})
239251
}
240252

241253
/// Read previously persisted peer info from the store.
242-
pub(crate) fn read_peer_info<L: Deref + Clone>(
254+
pub(crate) async fn read_peer_info<L: Deref + Clone>(
243255
kv_store: Arc<DynStore>, logger: L,
244256
) -> Result<PeerStore<L>, std::io::Error>
245257
where
246258
L::Target: LdkLogger,
247259
{
248-
let mut reader = Cursor::new(KVStoreSync::read(
249-
&*kv_store,
250-
PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
251-
PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
252-
PEER_INFO_PERSISTENCE_KEY,
253-
)?);
260+
let mut reader = Cursor::new(
261+
KVStore::read(
262+
&*kv_store,
263+
PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
264+
PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
265+
PEER_INFO_PERSISTENCE_KEY,
266+
)
267+
.await?,
268+
);
254269
PeerStore::read(&mut reader, (kv_store, logger.clone())).map_err(|e| {
255270
log_error!(logger, "Failed to deserialize peer store: {}", e);
256271
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize PeerStore")
257272
})
258273
}
259274

260275
/// Read previously persisted payments information from the store.
261-
pub(crate) fn read_payments<L: Deref>(
276+
pub(crate) async fn read_payments<L: Deref>(
262277
kv_store: Arc<DynStore>, logger: L,
263278
) -> Result<Vec<PaymentDetails>, std::io::Error>
264279
where
265280
L::Target: LdkLogger,
266281
{
267282
let mut res = Vec::new();
268283

269-
for stored_key in KVStoreSync::list(
284+
for stored_key in KVStore::list(
270285
&*kv_store,
271286
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
272287
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
273-
)? {
274-
let mut reader = Cursor::new(KVStoreSync::read(
275-
&*kv_store,
276-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
277-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
278-
&stored_key,
279-
)?);
288+
)
289+
.await?
290+
{
291+
let mut reader = Cursor::new(
292+
KVStore::read(
293+
&*kv_store,
294+
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
295+
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
296+
&stored_key,
297+
)
298+
.await?,
299+
);
280300
let payment = PaymentDetails::read(&mut reader).map_err(|e| {
281301
log_error!(logger, "Failed to deserialize PaymentDetails: {}", e);
282302
std::io::Error::new(
@@ -290,17 +310,20 @@ where
290310
}
291311

292312
/// Read `OutputSweeper` state from the store.
293-
pub(crate) fn read_output_sweeper(
313+
pub(crate) async fn read_output_sweeper(
294314
broadcaster: Arc<Broadcaster>, fee_estimator: Arc<OnchainFeeEstimator>,
295315
chain_data_source: Arc<ChainSource>, keys_manager: Arc<KeysManager>, kv_store: Arc<DynStore>,
296316
logger: Arc<Logger>,
297317
) -> Result<Sweeper, std::io::Error> {
298-
let mut reader = Cursor::new(KVStoreSync::read(
299-
&*kv_store,
300-
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
301-
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
302-
OUTPUT_SWEEPER_PERSISTENCE_KEY,
303-
)?);
318+
let mut reader = Cursor::new(
319+
KVStore::read(
320+
&*kv_store,
321+
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
322+
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
323+
OUTPUT_SWEEPER_PERSISTENCE_KEY,
324+
)
325+
.await?,
326+
);
304327
let args = (
305328
broadcaster,
306329
fee_estimator,
@@ -317,18 +340,21 @@ pub(crate) fn read_output_sweeper(
317340
Ok(sweeper)
318341
}
319342

320-
pub(crate) fn read_node_metrics<L: Deref>(
343+
pub(crate) async fn read_node_metrics<L: Deref>(
321344
kv_store: Arc<DynStore>, logger: L,
322345
) -> Result<NodeMetrics, std::io::Error>
323346
where
324347
L::Target: LdkLogger,
325348
{
326-
let mut reader = Cursor::new(KVStoreSync::read(
327-
&*kv_store,
328-
NODE_METRICS_PRIMARY_NAMESPACE,
329-
NODE_METRICS_SECONDARY_NAMESPACE,
330-
NODE_METRICS_KEY,
331-
)?);
349+
let mut reader = Cursor::new(
350+
KVStore::read(
351+
&*kv_store,
352+
NODE_METRICS_PRIMARY_NAMESPACE,
353+
NODE_METRICS_SECONDARY_NAMESPACE,
354+
NODE_METRICS_KEY,
355+
)
356+
.await?,
357+
);
332358
NodeMetrics::read(&mut reader).map_err(|e| {
333359
log_error!(logger, "Failed to deserialize NodeMetrics: {}", e);
334360
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize NodeMetrics")

0 commit comments

Comments
 (0)