Skip to content

Commit 796e2ea

Browse files
committed
Wire semisync backup retry into builder and node startup
Thread BackupMode through TierStoreConfig and update backup-store configuration to distinguish best-effort backup writes from semisync behavior. Build a local retry queue store for SemiSync during tier-store construction, retain the concrete TierStore on Node, and spawn the background backup retry task during Node::start() with shutdown integration via stop_sender. Also update TierStore backup result handling to enqueue concrete PendingBackupOp values for durable retry, and refresh the related backup and retry-task documentation.
1 parent d4fffcb commit 796e2ea

3 files changed

Lines changed: 157 additions & 71 deletions

File tree

src/builder.rs

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ use crate::event::EventQueue;
5858
use crate::fee_estimator::OnchainFeeEstimator;
5959
use crate::gossip::GossipSource;
6060
use crate::io::sqlite_store::SqliteStore;
61-
use crate::io::tier_store::TierStore;
61+
use crate::io::tier_store::{BackupMode, BackupRetryQueue, TierStore};
6262
use crate::io::utils::{
6363
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
6464
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments,
@@ -157,16 +157,18 @@ impl std::fmt::Debug for LogWriterConfig {
157157
}
158158

159159
#[derive(Default)]
160-
struct TierStoreConfig {
161-
ephemeral: Option<Arc<DynStore>>,
162-
backup: Option<Arc<DynStore>>,
160+
pub(crate) struct TierStoreConfig {
161+
pub(crate) ephemeral: Option<Arc<DynStore>>,
162+
pub(crate) backup: Option<Arc<DynStore>>,
163+
pub(crate) backup_mode: BackupMode,
163164
}
164165

165166
impl std::fmt::Debug for TierStoreConfig {
166167
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167168
f.debug_struct("TierStoreConfig")
168169
.field("ephemeral", &self.ephemeral.as_ref().map(|_| "Arc<DynStore>"))
169170
.field("backup", &self.backup.as_ref().map(|_| "Arc<DynStore>"))
171+
.field("backup_mode", &self.backup_mode)
170172
.finish()
171173
}
172174
}
@@ -650,13 +652,26 @@ impl NodeBuilder {
650652
/// When building with tiered storage, this store receives a second durable
651653
/// copy of data written to the primary store.
652654
///
653-
/// Writes and removals for primary-backed data only succeed once both the
654-
/// primary and backup stores complete successfully.
655-
///
656-
/// If not set, durable data will be stored only in the primary store.
657-
pub fn set_backup_store(&mut self, backup_store: Arc<DynStore>) -> &mut Self {
658-
let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default());
659-
tier_store_config.backup = Some(backup_store);
655+
/// Backup failure handling depends on `backup_mode`:
656+
/// - [`BackupMode::BestEffortBackup`] logs backup failures and still returns
657+
/// success as long as the primary store succeeds.
658+
/// - [`BackupMode::SemiSync`] returns success only if a failed backup
659+
/// operation can be durably persisted locally and enqueued for
660+
/// asynchronous retry.
661+
///
662+
/// Note: in [`BackupMode::SemiSync`], writes and removals are still not atomic
663+
/// across the primary and backup stores. A call may return an error after the
664+
/// primary store has already been updated if immediate backup replication
665+
/// fails and the retry intent cannot be durably persisted locally.
666+
///
667+
/// If no backup store is configured, durable data will be stored only in the
668+
/// primary store.
669+
pub fn set_backup_store(
670+
&mut self, backup_store: Arc<DynStore>, backup_mode: BackupMode,
671+
) -> &mut Self {
672+
let cfg = self.tier_store_config.get_or_insert(TierStoreConfig::default());
673+
cfg.backup = Some(backup_store);
674+
cfg.backup_mode = backup_mode;
660675
self
661676
}
662677

@@ -852,14 +867,41 @@ impl NodeBuilder {
852867

853868
let ts_config = self.tier_store_config.as_ref();
854869
let mut tier_store = TierStore::new(primary_store, Arc::clone(&logger));
870+
855871
if let Some(config) = ts_config {
856872
config.ephemeral.as_ref().map(|s| tier_store.set_ephemeral_store(Arc::clone(s)));
857-
config.backup.as_ref().map(|s| tier_store.set_backup_store(Arc::clone(s), None));
873+
874+
if let Some(backup_store) = config.backup.as_ref() {
875+
match config.backup_mode {
876+
BackupMode::BestEffortBackup => {
877+
tier_store.set_backup_store(Arc::clone(backup_store), None);
878+
},
879+
BackupMode::SemiSync => {
880+
let retry_store: Arc<DynStore> =
881+
Arc::new(DynStoreWrapper(FilesystemStore::new(
882+
PathBuf::from(&self.config.storage_dir_path)
883+
.join("backup_retry_queue"),
884+
)));
885+
886+
let retry_queue = Arc::new(BackupRetryQueue::new(
887+
Arc::clone(&retry_store),
888+
Arc::clone(&logger),
889+
));
890+
tier_store.set_backup_store(
891+
Arc::clone(backup_store),
892+
Some(Arc::clone(&retry_queue)),
893+
);
894+
},
895+
}
896+
}
858897
}
859898

860899
let seed_bytes = node_entropy.to_seed_bytes();
861900
let config = Arc::new(self.config.clone());
862901

902+
let kv_store: Arc<DynStore> = Arc::new(DynStoreWrapper(tier_store.clone()));
903+
let tier_store = Arc::new(tier_store);
904+
863905
build_with_store_internal(
864906
config,
865907
self.chain_data_source_config.as_ref(),
@@ -871,7 +913,8 @@ impl NodeBuilder {
871913
seed_bytes,
872914
runtime,
873915
logger,
874-
Arc::new(DynStoreWrapper(tier_store)),
916+
kv_store,
917+
tier_store,
875918
)
876919
}
877920
}
@@ -1311,6 +1354,7 @@ fn build_with_store_internal(
13111354
pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>,
13121355
async_payments_role: Option<AsyncPaymentsRole>, recovery_mode: bool, seed_bytes: [u8; 64],
13131356
runtime: Arc<Runtime>, logger: Arc<Logger>, kv_store: Arc<DynStore>,
1357+
tier_store: Arc<TierStore>,
13141358
) -> Result<Node, BuildError> {
13151359
optionally_install_rustls_cryptoprovider();
13161360

@@ -2131,6 +2175,7 @@ fn build_with_store_internal(
21312175
om_mailbox,
21322176
async_payments_role,
21332177
hrn_resolver: Arc::new(hrn_resolver),
2178+
tier_store,
21342179
#[cfg(cycle_tests)]
21352180
_leak_checker,
21362181
})

src/io/tier_store.rs

Lines changed: 77 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,16 @@ use std::time::Duration;
4444
/// Note that dual-store writes and removals are not atomic across the primary
4545
/// and backup stores. One store may already reflect the change even if the
4646
/// overall operation returns an error.
47+
#[derive(Clone)]
4748
pub(crate) struct TierStore {
4849
inner: Arc<TierStoreInner>,
49-
logger: Arc<Logger>,
5050
}
5151

5252
impl TierStore {
5353
pub fn new(primary_store: Arc<DynStore>, logger: Arc<Logger>) -> Self {
5454
let inner = Arc::new(TierStoreInner::new(primary_store, Arc::clone(&logger)));
5555

56-
Self { inner, logger }
56+
Self { inner }
5757
}
5858

5959
/// Configures a backup store for primary-backed data.
@@ -98,6 +98,10 @@ impl TierStore {
9898

9999
inner.ephemeral_store = Some(ephemeral);
100100
}
101+
102+
pub(crate) fn backup_store(&self) -> Option<&BackupStore> {
103+
self.inner.backup_store.as_ref()
104+
}
101105
}
102106

103107
impl KVStore for TierStore {
@@ -188,11 +192,11 @@ impl KVStoreSync for TierStore {
188192
}
189193
}
190194

191-
struct BackupStore {
195+
pub(crate) struct BackupStore {
192196
/// Store may be remote.
193-
store: Arc<DynStore>,
197+
pub(crate) store: Arc<DynStore>,
194198
/// Present only when backup failures should be durably queued for retry.
195-
retry_queue: Option<Arc<BackupRetryQueue<Arc<Logger>>>>,
199+
pub(crate) retry_queue: Option<Arc<BackupRetryQueue<Arc<Logger>>>>,
196200
}
197201

198202
struct TierStoreInner {
@@ -763,6 +767,7 @@ const RETRY_MAX_BACKOFF_MS: u64 = 60_000;
763767
const RETRY_BACKOFF_MULTIPLIER: u64 = 2;
764768

765769
/// Controls how TierStore treats backup-store failures for primary-backed writes and removals.
770+
#[derive(Debug)]
766771
pub enum BackupMode {
767772
/// Writes must succeed on the primary store.
768773
///
@@ -780,6 +785,12 @@ pub enum BackupMode {
780785
SemiSync,
781786
}
782787

788+
impl Default for BackupMode {
789+
fn default() -> Self {
790+
Self::BestEffortBackup
791+
}
792+
}
793+
783794
/// A pending operation against the backup store that failed on the write path
784795
/// and needs to be retried asynchronously.
785796
///
@@ -1070,49 +1081,61 @@ impl Future for RetryQueueFuture {
10701081
/// Runs the background retry loop for the given `BackupRetryQueue` against
10711082
/// `backup_store`.
10721083
///
1073-
/// Wakes when the queue has entries, iterates all pending ops, applies each
1074-
/// against the backup store, and removes successfully retried entries.
1075-
/// Uses exponential backoff with jitter on persistent failures.
1076-
/// Throttles error logging to avoid log spam.
1084+
/// The task waits for pending queue entries, a periodic fallback wake, or a
1085+
/// shutdown signal. When woken, it snapshots the current queue and retries
1086+
/// each pending operation against the backup store.
10771087
///
1078-
/// This function runs for node lifetime and should be spawned via
1079-
/// `tokio::spawn` during `build()` when `BackupMode::SemiSync` is configured.
1080-
pub(crate) async fn run_backup_retry_task<L: Deref + Send + Sync + 'static>(
1081-
queue: Arc<BackupRetryQueue<L>>, backup_store: Arc<DynStore>, logger: L,
1082-
) where
1083-
L::Target: LdkLogger,
1084-
{
1088+
/// `Write` retries write the originally queued buffer directly to the backup
1089+
/// store and never re-read from primary. `Remove` retries issue a remove
1090+
/// against the backup store and treat `io::ErrorKind::NotFound` as success.
1091+
///
1092+
/// Successfully retried entries are removed from the queue and the updated
1093+
/// queue state is persisted locally. Failed entries remain queued for later
1094+
/// retry. If any retries fail in a round, the task waits using exponential
1095+
/// backoff with jitter before retrying again.
1096+
///
1097+
/// This function runs for node lifetime and should be spawned as a background
1098+
/// task during [`Node::start`] when semisynchronous backup retry is configured.
1099+
///
1100+
/// [`Node::start`]: crate::Node::start
1101+
pub(crate) async fn run_backup_retry_task(
1102+
queue: Arc<BackupRetryQueue<Arc<Logger>>>, backup_store: Arc<DynStore>, logger: Arc<Logger>,
1103+
mut stop_receiver: tokio::sync::watch::Receiver<()>,
1104+
) {
10851105
let mut backoff_ms = RETRY_INITIAL_BACKOFF_MS;
1086-
let mut consecutive_failures: usize = 0;
10871106

10881107
loop {
1089-
// Wait until there is something to do.
1090-
queue.wait_for_entries().await;
1108+
let fallback_sleep = tokio::time::sleep(Duration::from_secs(60));
1109+
tokio::pin!(fallback_sleep);
1110+
1111+
tokio::select! {
1112+
_ = stop_receiver.changed() => break,
1113+
_ = queue.wait_for_entries() => {},
1114+
_ = &mut fallback_sleep => {},
1115+
}
1116+
1117+
if queue.is_empty() {
1118+
continue;
1119+
}
10911120

10921121
let entries = queue.snapshot();
1093-
let mut all_succeeded = true;
1122+
let mut any_failed = false;
10941123

10951124
for (key, op) in &entries {
1125+
if stop_receiver.has_changed().unwrap_or(false) {
1126+
return;
1127+
}
1128+
10961129
let (pn, sn, k) = key;
10971130

1098-
// Before applying, check whether this key is still in the queue
1099-
// with the same op. A newer op may have replaced it since we
1100-
// took the snapshot.
1131+
// Skip stale snapshot entries that were removed or replaced
1132+
// after we took the snapshot.
11011133
{
11021134
let locked = queue.pending.lock().expect("lock");
11031135
match locked.get(key) {
1104-
None => {
1105-
// Entry was removed (completed by a concurrent retry).
1106-
continue;
1107-
},
1108-
Some(current_op) => {
1109-
// If the op in the queue differs from our snapshot,
1110-
// a newer op replaced it. Skip — next iteration will
1111-
// pick up the newer op.
1112-
if !ops_match(current_op, op) {
1113-
continue;
1114-
}
1115-
},
1136+
None => continue,
1137+
Some(current_op) if !ops_match(current_op, op) => continue,
1138+
Some(_) => {},
11161139
}
11171140
}
11181141

@@ -1121,43 +1144,39 @@ pub(crate) async fn run_backup_retry_task<L: Deref + Send + Sync + 'static>(
11211144
KVStoreSync::write(backup_store.as_ref(), pn, sn, k, buf.clone())
11221145
},
11231146
PendingBackupOp::Remove { lazy } => {
1124-
KVStoreSync::remove(backup_store.as_ref(), pn, sn, k, *lazy)
1147+
match KVStoreSync::remove(backup_store.as_ref(), pn, sn, k, *lazy) {
1148+
Ok(()) => Ok(()),
1149+
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
1150+
Err(e) => Err(e),
1151+
}
11251152
},
11261153
};
11271154

11281155
match result {
11291156
Ok(()) => {
11301157
queue.remove_completed(key);
1131-
consecutive_failures = 0;
1132-
backoff_ms = RETRY_INITIAL_BACKOFF_MS;
11331158
},
11341159
Err(e) => {
1135-
all_succeeded = false;
1136-
consecutive_failures += 1;
1137-
// Throttle logging — only log every 8 failures to avoid
1138-
// spam on persistent remote store outages.
1139-
if consecutive_failures == 1 || consecutive_failures.is_power_of_two() {
1140-
log_error!(
1141-
logger,
1142-
"Backup retry failed for key {}/{}/{} \
1143-
(attempt {}): {}",
1144-
pn,
1145-
sn,
1146-
k,
1147-
consecutive_failures,
1148-
e
1149-
);
1150-
}
1160+
any_failed = true;
1161+
log_error!(logger, "Backup retry failed for key {}/{}/{}: {}", pn, sn, k, e);
11511162
},
11521163
}
11531164
}
11541165

1155-
if !all_succeeded {
1156-
// Exponential backoff with a small jitter.
1166+
if any_failed {
11571167
let jitter_ms = (backoff_ms / 10).max(1);
1158-
let sleep_ms = backoff_ms + (jitter_ms % 17); // deterministic but offset
1159-
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
1168+
let sleep_ms = backoff_ms + (jitter_ms % 17);
1169+
let sleep = tokio::time::sleep(Duration::from_millis(sleep_ms));
1170+
tokio::pin!(sleep);
1171+
1172+
tokio::select! {
1173+
_ = stop_receiver.changed() => break,
1174+
_ = &mut sleep => {},
1175+
}
1176+
11601177
backoff_ms = (backoff_ms * RETRY_BACKOFF_MULTIPLIER).min(RETRY_MAX_BACKOFF_MS);
1178+
} else {
1179+
backoff_ms = RETRY_INITIAL_BACKOFF_MS;
11611180
}
11621181
}
11631182
}

src/lib.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ use types::{
180180
pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, SyncAndAsyncKVStore, UserChannelId};
181181
pub use vss_client;
182182

183+
use crate::io::tier_store::{run_backup_retry_task, TierStore};
183184
use crate::scoring::setup_background_pathfinding_scores_sync;
184185
use crate::wallet::FundingAmount;
185186

@@ -239,6 +240,7 @@ pub struct Node {
239240
om_mailbox: Option<Arc<OnionMessageMailbox>>,
240241
async_payments_role: Option<AsyncPaymentsRole>,
241242
hrn_resolver: Arc<HRNResolver>,
243+
tier_store: Arc<TierStore>,
242244
#[cfg(cycle_tests)]
243245
_leak_checker: LeakChecker,
244246
}
@@ -346,6 +348,26 @@ impl Node {
346348
);
347349
}
348350

351+
// Spawn background task to asynchronously retry failed backup operations.
352+
if let Some(backup_store) = self.tier_store.backup_store() {
353+
if let Some(retry_queue) = backup_store.retry_queue.as_ref() {
354+
let retry_queue = Arc::clone(retry_queue);
355+
let retry_backup_store = Arc::clone(&backup_store.store);
356+
let retry_logger = Arc::clone(&self.logger);
357+
let stop_retry = self.stop_sender.subscribe();
358+
359+
self.runtime.spawn_background_task(async move {
360+
run_backup_retry_task(
361+
retry_queue,
362+
retry_backup_store,
363+
retry_logger,
364+
stop_retry,
365+
)
366+
.await;
367+
});
368+
}
369+
}
370+
349371
if let Some(listening_addresses) = &self.config.listening_addresses {
350372
// Setup networking
351373
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);

0 commit comments

Comments
 (0)