Skip to content

Commit 79cfe6f

Browse files
committed
Add update_and_persist_node_metrics helper
Extract the repeated "acquire write lock on `node_metrics`, mutate a field or two, then write the encoded struct to the kv-store" idiom into a single helper in `io::utils`. As a side effect, `write_node_metrics` is inlined into the helper. Co-Authored-By: HAL 9000
1 parent c754e2f commit 79cfe6f

7 files changed

Lines changed: 97 additions & 100 deletions

File tree

src/builder.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ use crate::io::sqlite_store::SqliteStore;
5757
use crate::io::utils::{
5858
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
5959
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments,
60-
read_scorer, write_node_metrics,
60+
read_scorer, update_and_persist_node_metrics,
6161
};
6262
use crate::io::vss_store::VssStoreBuilder;
6363
use crate::io::{
@@ -1771,15 +1771,13 @@ fn build_with_store_internal(
17711771
));
17721772

17731773
// Reset the RGS sync timestamp in case we somehow switch gossip sources
1774-
{
1775-
let mut locked_node_metrics = node_metrics.write().expect("lock");
1776-
locked_node_metrics.latest_rgs_snapshot_timestamp = None;
1777-
write_node_metrics(&*locked_node_metrics, &*kv_store, Arc::clone(&logger))
1778-
.map_err(|e| {
1779-
log_error!(logger, "Failed writing to store: {}", e);
1780-
BuildError::WriteFailed
1781-
})?;
1782-
}
1774+
update_and_persist_node_metrics(&node_metrics, &*kv_store, Arc::clone(&logger), |m| {
1775+
m.latest_rgs_snapshot_timestamp = None
1776+
})
1777+
.map_err(|e| {
1778+
log_error!(logger, "Failed writing to store: {}", e);
1779+
BuildError::WriteFailed
1780+
})?;
17831781
p2p_source
17841782
},
17851783
GossipSourceConfig::RapidGossipSync(rgs_server) => {

src/chain/bitcoind.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::fee_estimator::{
3939
apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target,
4040
ConfirmationTarget, OnchainFeeEstimator,
4141
};
42-
use crate::io::utils::write_node_metrics;
42+
use crate::io::utils::update_and_persist_node_metrics;
4343
use crate::logger::{log_bytes, log_debug, log_error, log_info, log_trace, LdkLogger, Logger};
4444
use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
4545
use crate::{Error, NodeMetrics};
@@ -203,15 +203,18 @@ impl BitcoindChainSource {
203203
*self.latest_chain_tip.write().expect("lock") = Some(chain_tip);
204204
let unix_time_secs_opt =
205205
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
206-
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
207-
locked_node_metrics.latest_lightning_wallet_sync_timestamp =
208-
unix_time_secs_opt;
209-
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
210-
unix_time_secs_opt;
211-
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)
212-
.unwrap_or_else(|e| {
213-
log_error!(self.logger, "Failed to persist node metrics: {}", e);
214-
});
206+
update_and_persist_node_metrics(
207+
&self.node_metrics,
208+
&*self.kv_store,
209+
&*self.logger,
210+
|m| {
211+
m.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt;
212+
m.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;
213+
},
214+
)
215+
.unwrap_or_else(|e| {
216+
log_error!(self.logger, "Failed to persist node metrics: {}", e);
217+
});
215218
}
216219
break;
217220
},
@@ -454,11 +457,10 @@ impl BitcoindChainSource {
454457

455458
let unix_time_secs_opt =
456459
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
457-
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
458-
locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt;
459-
locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;
460-
461-
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
460+
update_and_persist_node_metrics(&self.node_metrics, &*self.kv_store, &*self.logger, |m| {
461+
m.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt;
462+
m.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;
463+
})?;
462464

463465
Ok(())
464466
}
@@ -568,11 +570,9 @@ impl BitcoindChainSource {
568570

569571
let unix_time_secs_opt =
570572
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
571-
{
572-
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
573-
locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt;
574-
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
575-
}
573+
update_and_persist_node_metrics(&self.node_metrics, &*self.kv_store, &*self.logger, |m| {
574+
m.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt
575+
})?;
576576

577577
Ok(())
578578
}

src/chain/electrum.rs

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::fee_estimator::{
3030
apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target,
3131
ConfirmationTarget, OnchainFeeEstimator,
3232
};
33-
use crate::io::utils::write_node_metrics;
33+
use crate::io::utils::update_and_persist_node_metrics;
3434
use crate::logger::{log_bytes, log_debug, log_error, log_trace, LdkLogger, Logger};
3535
use crate::runtime::Runtime;
3636
use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
@@ -141,16 +141,12 @@ impl ElectrumChainSource {
141141
);
142142
let unix_time_secs_opt =
143143
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
144-
{
145-
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
146-
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
147-
unix_time_secs_opt;
148-
write_node_metrics(
149-
&*locked_node_metrics,
150-
&*self.kv_store,
151-
&*self.logger,
152-
)?;
153-
}
144+
update_and_persist_node_metrics(
145+
&self.node_metrics,
146+
&*self.kv_store,
147+
&*self.logger,
148+
|m| m.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt,
149+
)?;
154150
Ok(())
155151
},
156152
Err(e) => Err(e),
@@ -238,11 +234,12 @@ impl ElectrumChainSource {
238234
if let Ok(_) = res {
239235
let unix_time_secs_opt =
240236
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
241-
{
242-
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
243-
locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt;
244-
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
245-
}
237+
update_and_persist_node_metrics(
238+
&self.node_metrics,
239+
&*self.kv_store,
240+
&*self.logger,
241+
|m| m.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt,
242+
)?;
246243
}
247244

248245
res
@@ -271,11 +268,9 @@ impl ElectrumChainSource {
271268

272269
let unix_time_secs_opt =
273270
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
274-
{
275-
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
276-
locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt;
277-
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
278-
}
271+
update_and_persist_node_metrics(&self.node_metrics, &*self.kv_store, &*self.logger, |m| {
272+
m.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt
273+
})?;
279274

280275
Ok(())
281276
}

src/chain/esplora.rs

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::fee_estimator::{
2222
apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target,
2323
OnchainFeeEstimator,
2424
};
25-
use crate::io::utils::write_node_metrics;
25+
use crate::io::utils::update_and_persist_node_metrics;
2626
use crate::logger::{log_bytes, log_debug, log_error, log_trace, LdkLogger, Logger};
2727
use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
2828
use crate::{Error, NodeMetrics};
@@ -122,16 +122,13 @@ impl EsploraChainSource {
122122
.duration_since(UNIX_EPOCH)
123123
.ok()
124124
.map(|d| d.as_secs());
125-
{
126-
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
127-
locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;
128-
write_node_metrics(
129-
&*locked_node_metrics,
130-
&*self.kv_store,
131-
&*self.logger
132-
)?;
133-
}
134-
Ok(())
125+
update_and_persist_node_metrics(
126+
&self.node_metrics,
127+
&*self.kv_store,
128+
&*self.logger,
129+
|m| m.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt,
130+
)?;
131+
Ok(())
135132
},
136133
Err(e) => Err(e),
137134
},
@@ -263,12 +260,12 @@ impl EsploraChainSource {
263260

264261
let unix_time_secs_opt =
265262
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
266-
{
267-
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
268-
locked_node_metrics.latest_lightning_wallet_sync_timestamp =
269-
unix_time_secs_opt;
270-
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
271-
}
263+
update_and_persist_node_metrics(
264+
&self.node_metrics,
265+
&*self.kv_store,
266+
&*self.logger,
267+
|m| m.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt,
268+
)?;
272269
Ok(())
273270
},
274271
Err(e) => {
@@ -348,11 +345,9 @@ impl EsploraChainSource {
348345
);
349346
let unix_time_secs_opt =
350347
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
351-
{
352-
let mut locked_node_metrics = self.node_metrics.write().expect("lock");
353-
locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt;
354-
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
355-
}
348+
update_and_persist_node_metrics(&self.node_metrics, &*self.kv_store, &*self.logger, |m| {
349+
m.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt
350+
})?;
356351

357352
Ok(())
358353
}

src/io/utils.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::ops::Deref;
1111
#[cfg(unix)]
1212
use std::os::unix::fs::OpenOptionsExt;
1313
use std::path::Path;
14-
use std::sync::Arc;
14+
use std::sync::{Arc, RwLock};
1515

1616
use bdk_chain::indexer::keychain_txout::ChangeSet as BdkIndexerChangeSet;
1717
use bdk_chain::local_chain::ChangeSet as BdkLocalChainChangeSet;
@@ -346,13 +346,20 @@ where
346346
})
347347
}
348348

349-
pub(crate) fn write_node_metrics<L: Deref>(
350-
node_metrics: &NodeMetrics, kv_store: &DynStore, logger: L,
349+
/// Take a write lock on `node_metrics`, apply `update`, and persist the result to `kv_store`.
350+
///
351+
/// The write lock is held across the KV-store write, preserving the invariant that readers only
352+
/// observe the mutation once it has been durably persisted (or the persist has failed).
353+
pub(crate) fn update_and_persist_node_metrics<L: Deref>(
354+
node_metrics: &RwLock<NodeMetrics>, kv_store: &DynStore, logger: L,
355+
update: impl FnOnce(&mut NodeMetrics),
351356
) -> Result<(), Error>
352357
where
353358
L::Target: LdkLogger,
354359
{
355-
let data = node_metrics.encode();
360+
let mut locked_node_metrics = node_metrics.write().expect("lock");
361+
update(&mut *locked_node_metrics);
362+
let data = locked_node_metrics.encode();
356363
KVStoreSync::write(
357364
&*kv_store,
358365
NODE_METRICS_PRIMARY_NAMESPACE,

src/lib.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ use fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator};
143143
use ffi::*;
144144
use gossip::GossipSource;
145145
use graph::NetworkGraph;
146-
use io::utils::write_node_metrics;
146+
use io::utils::update_and_persist_node_metrics;
147147
pub use lightning;
148148
use lightning::chain::BestBlock;
149149
use lightning::impl_writeable_tlv_based;
@@ -320,15 +320,16 @@ impl Node {
320320
gossip_sync_logger,
321321
"Background sync of RGS gossip data finished in {}ms.",
322322
now.elapsed().as_millis()
323-
);
324-
{
325-
let mut locked_node_metrics = gossip_node_metrics.write().expect("lock");
326-
locked_node_metrics.latest_rgs_snapshot_timestamp = Some(updated_timestamp);
327-
write_node_metrics(&*locked_node_metrics, &*gossip_sync_store, Arc::clone(&gossip_sync_logger))
328-
.unwrap_or_else(|e| {
329-
log_error!(gossip_sync_logger, "Persistence failed: {}", e);
330-
});
331-
}
323+
);
324+
update_and_persist_node_metrics(
325+
&gossip_node_metrics,
326+
&*gossip_sync_store,
327+
Arc::clone(&gossip_sync_logger),
328+
|m| m.latest_rgs_snapshot_timestamp = Some(updated_timestamp),
329+
)
330+
.unwrap_or_else(|e| {
331+
log_error!(gossip_sync_logger, "Persistence failed: {}", e);
332+
});
332333
}
333334
Err(e) => {
334335
log_error!(
@@ -552,14 +553,15 @@ impl Node {
552553

553554
let unix_time_secs_opt =
554555
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
555-
{
556-
let mut locked_node_metrics = bcast_node_metrics.write().expect("lock");
557-
locked_node_metrics.latest_node_announcement_broadcast_timestamp = unix_time_secs_opt;
558-
write_node_metrics(&*locked_node_metrics, &*bcast_store, Arc::clone(&bcast_logger))
559-
.unwrap_or_else(|e| {
560-
log_error!(bcast_logger, "Persistence failed: {}", e);
561-
});
562-
}
556+
update_and_persist_node_metrics(
557+
&bcast_node_metrics,
558+
&*bcast_store,
559+
Arc::clone(&bcast_logger),
560+
|m| m.latest_node_announcement_broadcast_timestamp = unix_time_secs_opt,
561+
)
562+
.unwrap_or_else(|e| {
563+
log_error!(bcast_logger, "Persistence failed: {}", e);
564+
});
563565
} else {
564566
debug_assert!(false, "We checked whether the node may announce, so node alias should always be set");
565567
continue

src/scoring.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::io::utils::write_external_pathfinding_scores_to_cache;
1313
use crate::logger::LdkLogger;
1414
use crate::runtime::Runtime;
1515
use crate::types::DynStore;
16-
use crate::{write_node_metrics, Logger, NodeMetrics, Scorer};
16+
use crate::{update_and_persist_node_metrics, Logger, NodeMetrics, Scorer};
1717

1818
/// Start a background task that periodically downloads scores via an external url and merges them into the local
1919
/// pathfinding scores.
@@ -86,10 +86,10 @@ async fn sync_external_scores(
8686
.duration_since(SystemTime::UNIX_EPOCH)
8787
.expect("system time must be after Unix epoch");
8888
scorer.lock().expect("lock").merge(liquidities, duration_since_epoch);
89-
let mut locked_node_metrics = node_metrics.write().expect("lock");
90-
locked_node_metrics.latest_pathfinding_scores_sync_timestamp =
91-
Some(duration_since_epoch.as_secs());
92-
write_node_metrics(&*locked_node_metrics, &*kv_store, logger).unwrap_or_else(|e| {
89+
update_and_persist_node_metrics(&node_metrics, &*kv_store, logger, |m| {
90+
m.latest_pathfinding_scores_sync_timestamp = Some(duration_since_epoch.as_secs());
91+
})
92+
.unwrap_or_else(|e| {
9393
log_error!(logger, "Persisting node metrics failed: {}", e);
9494
});
9595
log_trace!(logger, "External scores merged successfully");

0 commit comments

Comments
 (0)