Skip to content
This repository was archived by the owner on Feb 3, 2025. It is now read-only.

Commit 46afa7e

Browse files
authored
Use alternative retry method for async storage (#797)
1 parent 9421735 commit 46afa7e

4 files changed

Lines changed: 114 additions & 77 deletions

File tree

mutiny-core/src/ldkstorage.rs

Lines changed: 34 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use crate::nodemanager::ChannelClosure;
1111
use crate::storage::{MutinyStorage, VersionedValue};
1212
use crate::utils;
1313
use crate::utils::{sleep, spawn};
14-
use crate::vss::VssKeyValueItem;
1514
use crate::{chain::MutinyChain, scorer::HubPreferentialScorer};
1615
use anyhow::anyhow;
1716
use bitcoin::hashes::hex::{FromHex, ToHex};
@@ -93,6 +92,15 @@ impl<S: MutinyStorage> MutinyNodePersister<S> {
9392
format!("{}_{}", key, self.node_id)
9493
}
9594

95+
pub(crate) fn get_monitor_key(&self, funding_txo: &OutPoint) -> String {
96+
let key = format!(
97+
"{MONITORS_PREFIX_KEY}{}_{}",
98+
funding_txo.txid.to_hex(),
99+
funding_txo.index
100+
);
101+
self.get_key(&key)
102+
}
103+
96104
fn init_persist_monitor<W: Writeable>(
97105
&self,
98106
key: String,
@@ -105,47 +113,30 @@ impl<S: MutinyStorage> MutinyNodePersister<S> {
105113
let logger = self.logger.clone();
106114
let object = object.encode();
107115

108-
// currently we only retry storage to VSS because we don't have a way to detect
109-
// for local storage if a higher version has been persisted. Without handling this
110-
// we could end up with a previous state being persisted over a newer one.
111-
// VSS does not have this problem because it verifies the version before storing
112-
// and will not overwrite a newer version, so it is safe to retry.
113116
spawn(async move {
114-
let mut is_retry = false;
115117
// Sleep before persisting to give chance for the manager to be persisted
116118
sleep(50).await;
117-
loop {
118-
match persist_monitor(&storage, &key, &object, Some(version), is_retry, &logger)
119-
.await
120-
{
121-
Ok(()) => {
122-
log_debug!(logger, "Persisted channel monitor: {update_id:?}");
123-
124-
// unwrap is safe, we set it up immediately
125-
let chain_monitor = chain_monitor.lock().await;
126-
let chain_monitor = chain_monitor.as_ref().unwrap();
127-
128-
// these errors are not fatal, so we don't return them just log
129-
if let Err(e) = chain_monitor.channel_monitor_updated(
130-
update_id.funding_txo,
131-
update_id.monitor_update_id,
132-
) {
133-
log_error!(
134-
logger,
135-
"Error notifying chain monitor of channel monitor update: {e:?}"
136-
);
137-
} else {
138-
break; // successful storage, no more attempts
139-
}
140-
}
141-
Err(e) => {
142-
log_error!(logger, "Error persisting channel monitor: {e}");
119+
match persist_monitor(&storage, &key, &object, Some(version), &logger).await {
120+
Ok(()) => {
121+
log_debug!(logger, "Persisted channel monitor: {update_id:?}");
122+
123+
// unwrap is safe, we set it up immediately
124+
let chain_monitor = chain_monitor.lock().await;
125+
let chain_monitor = chain_monitor.as_ref().unwrap();
126+
127+
// these errors are not fatal, so we don't return them just log
128+
if let Err(e) = chain_monitor
129+
.channel_monitor_updated(update_id.funding_txo, update_id.monitor_update_id)
130+
{
131+
log_error!(
132+
logger,
133+
"Error notifying chain monitor of channel monitor update: {e:?}"
134+
);
143135
}
144136
}
145-
146-
// if we get here, we failed to persist, so we retry
147-
is_retry = true;
148-
sleep(1_000).await;
137+
Err(e) => {
138+
log_error!(logger, "Error persisting channel monitor: {e}");
139+
}
149140
}
150141
});
151142

@@ -675,15 +666,10 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, S: MutinyStorage> Persist<Chann
675666
monitor: &ChannelMonitor<ChannelSigner>,
676667
monitor_update_id: MonitorUpdateId,
677668
) -> ChannelMonitorUpdateStatus {
678-
let key = format!(
679-
"{MONITORS_PREFIX_KEY}{}_{}",
680-
funding_txo.txid.to_hex(),
681-
funding_txo.index
682-
);
683-
let key = self.get_key(&key);
669+
let key = self.get_monitor_key(&funding_txo);
684670

685671
let update_id = monitor.get_latest_update_id();
686-
debug_assert!(update_id == utils::get_monitor_version(monitor.encode()));
672+
debug_assert!(update_id == utils::get_monitor_version(&monitor.encode()));
687673

688674
// safely convert u64 to u32
689675
let version = if update_id >= u32::MAX as u64 {
@@ -707,14 +693,9 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, S: MutinyStorage> Persist<Chann
707693
monitor: &ChannelMonitor<ChannelSigner>,
708694
monitor_update_id: MonitorUpdateId,
709695
) -> ChannelMonitorUpdateStatus {
710-
let key = format!(
711-
"{MONITORS_PREFIX_KEY}{}_{}",
712-
funding_txo.txid.to_hex(),
713-
funding_txo.index
714-
);
715-
let key = self.get_key(&key);
696+
let key = self.get_monitor_key(&funding_txo);
716697
let update_id = monitor.get_latest_update_id();
717-
debug_assert!(update_id == utils::get_monitor_version(monitor.encode()));
698+
debug_assert!(update_id == utils::get_monitor_version(&monitor.encode()));
718699

719700
// safely convert u64 to u32
720701
let version = if update_id >= u32::MAX as u64 {
@@ -738,33 +719,14 @@ pub struct MonitorUpdateIdentifier {
738719
pub monitor_update_id: MonitorUpdateId,
739720
}
740721

741-
async fn persist_monitor(
722+
pub(crate) async fn persist_monitor(
742723
storage: &impl MutinyStorage,
743724
key: &str,
744725
object: &Vec<u8>,
745726
version: Option<u32>,
746-
vss_only: bool,
747727
logger: &MutinyLogger,
748728
) -> Result<(), lightning::io::Error> {
749-
let res = if vss_only {
750-
// if we are only storing to VSS, we don't need to store to local storage
751-
// just need to call put_objects on VSS
752-
if let (Some(vss), Some(version)) = (storage.vss_client(), version) {
753-
let value =
754-
serde_json::to_value(object).map_err(|_| lightning::io::ErrorKind::Other)?;
755-
let item = VssKeyValueItem {
756-
key: key.to_string(),
757-
value,
758-
version,
759-
};
760-
761-
vss.put_objects(vec![item]).await
762-
} else {
763-
Ok(())
764-
}
765-
} else {
766-
storage.set_data_async(key, object, version).await
767-
};
729+
let res = storage.set_data_async(key, object, version).await;
768730

769731
res.map_err(|e| {
770732
match e {

mutiny-core/src/node.rs

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::labels::LabelStorage;
2-
use crate::ldkstorage::ChannelOpenParams;
2+
use crate::ldkstorage::{persist_monitor, ChannelOpenParams};
33
use crate::nodemanager::ChannelClosure;
44
use crate::scb::StaticChannelBackup;
55
use crate::{
@@ -39,6 +39,7 @@ use lightning::{
3939
};
4040

4141
use crate::multiesplora::MultiEsploraClient;
42+
use crate::utils::get_monitor_version;
4243
use bitcoin::util::bip32::ExtendedPrivKey;
4344
use lightning::events::bump_transaction::{BumpTransactionEventHandler, Wallet};
4445
use lightning::ln::PaymentSecret;
@@ -548,6 +549,80 @@ impl<S: MutinyStorage> Node<S> {
548549
keys_manager.get_node_id(Recipient::Node).unwrap()
549550
);
550551

552+
// Here we re-attempt to persist any monitors that failed to persist previously.
553+
let retry_logger = logger.clone();
554+
let retry_persister = persister.clone();
555+
let retry_stop = stop.clone();
556+
let retry_chain_monitor = chain_monitor.clone();
557+
utils::spawn(async move {
558+
loop {
559+
if retry_stop.load(Ordering::Relaxed) {
560+
break;
561+
}
562+
563+
let updates = retry_chain_monitor.list_pending_monitor_updates();
564+
for (funding_txo, update_ids) in updates {
565+
// if there are no updates, skip
566+
if update_ids.is_empty() {
567+
continue;
568+
}
569+
570+
log_debug!(
571+
retry_logger,
572+
"Retrying to persist monitor for outpoint: {funding_txo:?}"
573+
);
574+
575+
match retry_chain_monitor.get_monitor(funding_txo) {
576+
Ok(monitor) => {
577+
let key = retry_persister.get_monitor_key(&funding_txo);
578+
let object = monitor.encode();
579+
let update_id = monitor.get_latest_update_id();
580+
debug_assert_eq!(update_id, get_monitor_version(&object));
581+
582+
// safely convert u64 to u32
583+
let version = if update_id >= u32::MAX as u64 {
584+
u32::MAX
585+
} else {
586+
update_id as u32
587+
};
588+
589+
let res = persist_monitor(
590+
&retry_persister.storage,
591+
&key,
592+
&object,
593+
Some(version),
594+
&retry_logger,
595+
)
596+
.await;
597+
598+
match res {
599+
Ok(_) => {
600+
for id in update_ids {
601+
if let Err(e) = retry_chain_monitor
602+
.channel_monitor_updated(funding_txo, id)
603+
{
604+
log_error!(retry_logger, "Error notifying chain monitor of channel monitor update: {e:?}");
605+
}
606+
}
607+
}
608+
Err(e) => log_error!(
609+
retry_logger,
610+
"Failed to persist monitor for outpoint: {funding_txo:?}, error: {e:?}",
611+
),
612+
}
613+
}
614+
Err(_) => log_error!(
615+
retry_logger,
616+
"Failed to get monitor for outpoint: {funding_txo:?}"
617+
),
618+
}
619+
}
620+
621+
// sleep 3 seconds
622+
sleep(3_000).await;
623+
}
624+
});
625+
551626
Ok(Node {
552627
_uuid: uuid,
553628
stopped_components,

mutiny-core/src/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ where
178178

179179
/// Returns the version of a channel monitor from a serialized version
180180
/// of a channel monitor.
181-
pub fn get_monitor_version(bytes: Vec<u8>) -> u64 {
181+
pub fn get_monitor_version(bytes: &[u8]) -> u64 {
182182
// first two bytes are the version
183183
// next 8 bytes are the version number
184184
u64::from_be_bytes(bytes[2..10].try_into().unwrap())

mutiny-wasm/src/indexed_db.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,12 +258,12 @@ impl IndexedDbStorage {
258258
match current.get::<Vec<u8>>(&key)? {
259259
Some(bytes) => {
260260
// check first byte is 1, then take u64 from next 8 bytes
261-
let current_version = utils::get_monitor_version(bytes);
261+
let current_version = utils::get_monitor_version(&bytes);
262262

263263
let obj: Value = LocalStorage::get(&key).unwrap();
264264
let value = decrypt_value(&key, obj, current.password())?;
265265
if let Ok(local_bytes) = serde_json::from_value::<Vec<u8>>(value.clone()) {
266-
let local_version = utils::get_monitor_version(local_bytes);
266+
let local_version = utils::get_monitor_version(&local_bytes);
267267

268268
// if the current version is less than the version from local storage
269269
// then we want to use the local storage version
@@ -372,7 +372,7 @@ impl IndexedDbStorage {
372372
// we can get versions from monitors, so we should compare
373373
match current.get::<Vec<u8>>(&kv.key)? {
374374
Some(bytes) => {
375-
let current_version = utils::get_monitor_version(bytes);
375+
let current_version = utils::get_monitor_version(&bytes);
376376

377377
// if the current version is less than the version from vss, then we want to use the vss version
378378
if current_version < kv.version as u64 {

0 commit comments

Comments
 (0)