Skip to content

Commit febad01

Browse files
committed
Move node metrics persistence onto async KV storage
Persist node metric updates through async KVStore writes and await them from the chain, gossip, and scoring tasks. This removes the remaining blocking metrics writer while keeping the helper name stable. Co-Authored-By: HAL 9000
1 parent 71147d2 commit febad01

6 files changed

Lines changed: 62 additions & 46 deletions

File tree

src/chain/bitcoind.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ impl BitcoindChainSource {
204204
m.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;
205205
},
206206
)
207+
.await
207208
.unwrap_or_else(|e| {
208209
log_error!(self.logger, "Failed to persist node metrics: {}", e);
209210
});
@@ -451,7 +452,8 @@ impl BitcoindChainSource {
451452
update_and_persist_node_metrics(&self.node_metrics, &*self.kv_store, &*self.logger, |m| {
452453
m.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt;
453454
m.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;
454-
})?;
455+
})
456+
.await?;
455457

456458
Ok(())
457459
}
@@ -563,7 +565,8 @@ impl BitcoindChainSource {
563565
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
564566
update_and_persist_node_metrics(&self.node_metrics, &*self.kv_store, &*self.logger, |m| {
565567
m.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt
566-
})?;
568+
})
569+
.await?;
567570

568571
Ok(())
569572
}

src/chain/electrum.rs

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -129,30 +129,36 @@ impl ElectrumChainSource {
129129
let incremental_sync =
130130
self.node_metrics.read().expect("lock").latest_onchain_wallet_sync_timestamp.is_some();
131131

132-
let apply_wallet_update =
133-
|update_res: Result<BdkUpdate, Error>, now: Instant| match update_res {
134-
Ok(update) => match onchain_wallet.apply_update(update) {
135-
Ok(()) => {
136-
log_debug!(
137-
self.logger,
138-
"{} of on-chain wallet finished in {}ms.",
139-
if incremental_sync { "Incremental sync" } else { "Sync" },
140-
now.elapsed().as_millis()
141-
);
142-
let unix_time_secs_opt =
143-
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
144-
update_and_persist_node_metrics(
145-
&self.node_metrics,
146-
&*self.kv_store,
147-
&*self.logger,
148-
|m| m.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt,
149-
)?;
150-
Ok(())
132+
macro_rules! apply_wallet_update {
133+
($update_res:expr, $now:expr) => {
134+
match $update_res {
135+
Ok(update) => match onchain_wallet.apply_update(update) {
136+
Ok(()) => {
137+
log_debug!(
138+
self.logger,
139+
"{} of on-chain wallet finished in {}ms.",
140+
if incremental_sync { "Incremental sync" } else { "Sync" },
141+
$now.elapsed().as_millis()
142+
);
143+
let unix_time_secs_opt = SystemTime::now()
144+
.duration_since(UNIX_EPOCH)
145+
.ok()
146+
.map(|d| d.as_secs());
147+
update_and_persist_node_metrics(
148+
&self.node_metrics,
149+
&*self.kv_store,
150+
&*self.logger,
151+
|m| m.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt,
152+
)
153+
.await?;
154+
Ok(())
155+
},
156+
Err(e) => Err(e),
151157
},
152158
Err(e) => Err(e),
153-
},
154-
Err(e) => Err(e),
159+
}
155160
};
161+
}
156162

157163
let cached_txs = onchain_wallet.get_cached_txs();
158164

@@ -162,15 +168,15 @@ impl ElectrumChainSource {
162168
.get_incremental_sync_wallet_update(incremental_sync_request, cached_txs);
163169

164170
let now = Instant::now();
165-
let update_res = incremental_sync_fut.await.map(|u| u.into());
166-
apply_wallet_update(update_res, now)
171+
let update_res: Result<BdkUpdate, Error> = incremental_sync_fut.await.map(|u| u.into());
172+
apply_wallet_update!(update_res, now)
167173
} else {
168174
let full_scan_request = onchain_wallet.get_full_scan_request();
169175
let full_scan_fut =
170176
electrum_client.get_full_scan_wallet_update(full_scan_request, cached_txs);
171177
let now = Instant::now();
172-
let update_res = full_scan_fut.await.map(|u| u.into());
173-
apply_wallet_update(update_res, now)
178+
let update_res: Result<BdkUpdate, Error> = full_scan_fut.await.map(|u| u.into());
179+
apply_wallet_update!(update_res, now)
174180
};
175181

176182
res
@@ -239,7 +245,8 @@ impl ElectrumChainSource {
239245
&*self.kv_store,
240246
&*self.logger,
241247
|m| m.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt,
242-
)?;
248+
)
249+
.await?;
243250
}
244251

245252
res
@@ -270,7 +277,8 @@ impl ElectrumChainSource {
270277
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
271278
update_and_persist_node_metrics(&self.node_metrics, &*self.kv_store, &*self.logger, |m| {
272279
m.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt
273-
})?;
280+
})
281+
.await?;
274282

275283
Ok(())
276284
}

src/chain/esplora.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ impl EsploraChainSource {
127127
&*self.kv_store,
128128
&*self.logger,
129129
|m| m.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt,
130-
)?;
130+
)
131+
.await?;
131132
Ok(())
132133
},
133134
Err(e) => Err(e),
@@ -265,7 +266,8 @@ impl EsploraChainSource {
265266
&*self.kv_store,
266267
&*self.logger,
267268
|m| m.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt,
268-
)?;
269+
)
270+
.await?;
269271
Ok(())
270272
},
271273
Err(e) => {
@@ -347,7 +349,8 @@ impl EsploraChainSource {
347349
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
348350
update_and_persist_node_metrics(&self.node_metrics, &*self.kv_store, &*self.logger, |m| {
349351
m.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt
350-
})?;
352+
})
353+
.await?;
351354

352355
Ok(())
353356
}

src/io/utils.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ use lightning::routing::scoring::{
2626
ChannelLiquidities, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
2727
};
2828
use lightning::util::persist::{
29-
migrate_kv_store_data, KVStore, KVStoreSync, KVSTORE_NAMESPACE_KEY_ALPHABET,
30-
KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY,
31-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
32-
OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
33-
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
34-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
29+
migrate_kv_store_data, KVStore, KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN,
30+
NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
31+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY,
32+
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
33+
SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
34+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
3535
};
3636
use lightning::util::ser::{Readable, ReadableArgs, Writeable};
3737
use lightning_persister::fs_store::v1::FilesystemStore;
@@ -336,26 +336,26 @@ where
336336
}
337337

338338
/// Take a write lock on `node_metrics`, apply `update`, and persist the result to `kv_store`.
339-
///
340-
/// The write lock is held across the KV-store write, preserving the invariant that readers only
341-
/// observe the mutation once it has been durably persisted (or the persist has failed).
342-
pub(crate) fn update_and_persist_node_metrics<L: Deref>(
339+
pub(crate) async fn update_and_persist_node_metrics<L: Deref>(
343340
node_metrics: &RwLock<NodeMetrics>, kv_store: &DynStore, logger: L,
344341
update: impl FnOnce(&mut NodeMetrics),
345342
) -> Result<(), Error>
346343
where
347344
L::Target: LdkLogger,
348345
{
349-
let mut locked_node_metrics = node_metrics.write().expect("lock");
350-
update(&mut *locked_node_metrics);
351-
let data = locked_node_metrics.encode();
352-
KVStoreSync::write(
346+
let data = {
347+
let mut locked_node_metrics = node_metrics.write().expect("lock");
348+
update(&mut *locked_node_metrics);
349+
locked_node_metrics.encode()
350+
};
351+
KVStore::write(
353352
&*kv_store,
354353
NODE_METRICS_PRIMARY_NAMESPACE,
355354
NODE_METRICS_SECONDARY_NAMESPACE,
356355
NODE_METRICS_KEY,
357356
data,
358357
)
358+
.await
359359
.map_err(|e| {
360360
log_error!(
361361
logger,

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,7 @@ impl Node {
556556
Arc::clone(&bcast_logger),
557557
|m| m.latest_node_announcement_broadcast_timestamp = unix_time_secs_opt,
558558
)
559+
.await
559560
.unwrap_or_else(|e| {
560561
log_error!(bcast_logger, "Persistence failed: {}", e);
561562
});

src/scoring.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ async fn sync_external_scores(
8989
update_and_persist_node_metrics(&node_metrics, &*kv_store, logger, |m| {
9090
m.latest_pathfinding_scores_sync_timestamp = Some(duration_since_epoch.as_secs());
9191
})
92+
.await
9293
.unwrap_or_else(|e| {
9394
log_error!(logger, "Persisting node metrics failed: {}", e);
9495
});

0 commit comments

Comments
 (0)