Skip to content

Commit a9dced6

Browse files
committed
feat(cbf): auto-restart bip157 node with exponential backoff
When node.run() exits (e.g. NoReachablePeers from kyoto lightningdevkit#558), the background task rebuilds the node, swaps the requester, and respawns channel processing tasks, up to MAX_RESTART_RETRIES (5) attempts with doubling backoff starting at 500ms. - Change cbf_runtime_status from Mutex<> to Arc<Mutex<>> so it can be shared with the async restart loop - Extract build_cbf_node_static() that takes explicit params instead of &self, enabling calls from 'static async blocks - Move all task spawning (info/warn/event + node.run) into a single restart loop inside spawn_background_task AI: claude
1 parent 0151c4a commit a9dced6

File tree

1 file changed

+142
-50
lines changed

1 file changed

+142
-50
lines changed

src/chain/cbf.rs

Lines changed: 142 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ pub(super) struct CbfChainSource {
7979
/// Fee estimation back-end.
8080
fee_source: FeeSource,
8181
/// Tracks whether the bip157 node is running and holds the command handle.
82-
cbf_runtime_status: Mutex<CbfRuntimeStatus>,
82+
cbf_runtime_status: Arc<Mutex<CbfRuntimeStatus>>,
8383
/// Latest chain tip hash, updated by the background event processing task.
8484
latest_tip: Arc<Mutex<Option<BlockHash>>>,
8585
/// Scripts to match against compact block filters during a scan.
@@ -147,7 +147,7 @@ impl CbfChainSource {
147147
None => FeeSource::Cbf,
148148
};
149149

150-
let cbf_runtime_status = Mutex::new(CbfRuntimeStatus::Stopped);
150+
let cbf_runtime_status = Arc::new(Mutex::new(CbfRuntimeStatus::Stopped));
151151
let latest_tip = Arc::new(Mutex::new(None));
152152
let watched_scripts = Arc::new(RwLock::new(Vec::new()));
153153
let matched_block_hashes = Arc::new(Mutex::new(Vec::new()));
@@ -183,46 +183,57 @@ impl CbfChainSource {
183183

184184
/// Build a new bip157 node and client from the current configuration.
185185
///
186-
/// If an on-chain wallet reference is available, a `ChainState::Checkpoint`
187-
/// is derived from the wallet's persisted checkpoint (walked back by
188-
/// `REORG_SAFETY_BLOCKS`) so the node resumes near its last known height
189-
/// instead of re-syncing from genesis.
186+
/// Delegates to [`Self::build_cbf_node_static`], passing all needed fields.
190187
fn build_cbf_node(&self) -> (CbfNode, Client) {
191-
let network = self.config.network;
188+
let wallet = self.onchain_wallet.lock().unwrap().clone();
189+
Self::build_cbf_node_static(
190+
&self.peers,
191+
&self.sync_config,
192+
&self.config,
193+
wallet.as_ref(),
194+
&self.logger,
195+
)
196+
}
197+
198+
/// Static version of the builder: takes all required parameters explicitly
199+
/// so it can be called from an `async move` block without borrowing `self`.
200+
fn build_cbf_node_static(
201+
peers: &[String], sync_config: &CbfSyncConfig, config: &Config,
202+
wallet: Option<&Arc<Wallet>>, logger: &Logger,
203+
) -> (CbfNode, Client) {
204+
let network = config.network;
192205

193206
let mut builder = Builder::new(network);
194207

195208
// Configure data directory under the node's storage path.
196-
let data_dir = std::path::PathBuf::from(&self.config.storage_dir_path).join("bip157_data");
209+
let data_dir = std::path::PathBuf::from(&config.storage_dir_path).join("bip157_data");
197210
builder = builder.data_dir(data_dir);
198211

199212
// Add configured peers.
200-
let peers: Vec<TrustedPeer> = self
201-
.peers
213+
let trusted_peers: Vec<TrustedPeer> = peers
202214
.iter()
203215
.filter_map(|peer_str| {
204216
peer_str.parse::<SocketAddr>().ok().map(TrustedPeer::from_socket_addr)
205217
})
206218
.collect();
207-
if !peers.is_empty() {
208-
builder = builder.add_peers(peers);
219+
if !trusted_peers.is_empty() {
220+
builder = builder.add_peers(trusted_peers);
209221
}
210222

211223
// Require multiple peers to agree on filter headers before accepting them,
212224
// as recommended by BIP 157 to mitigate malicious peer attacks.
213-
builder = builder.required_peers(self.sync_config.required_peers);
225+
builder = builder.required_peers(sync_config.required_peers);
214226

215227
// Request witness data so segwit transactions include full witnesses,
216228
// required for Lightning channel operations.
217229
builder = builder.fetch_witness_data();
218230

219231
// Set peer response timeout from user configuration (default: 30s).
220-
builder =
221-
builder.response_timeout(Duration::from_secs(self.sync_config.response_timeout_secs));
232+
builder = builder.response_timeout(Duration::from_secs(sync_config.response_timeout_secs));
222233

223234
// If we have a wallet reference, derive a chain_state checkpoint so the
224235
// bip157 node can skip already-synced headers on restart.
225-
if let Some(wallet) = self.onchain_wallet.lock().unwrap().as_ref() {
236+
if let Some(wallet) = wallet {
226237
let cp = wallet.latest_checkpoint();
227238
let target_height = cp.height().saturating_sub(REORG_SAFETY_BLOCKS);
228239
// Walk the checkpoint chain back to the target height.
@@ -237,7 +248,7 @@ impl CbfChainSource {
237248
let header_cp = HeaderCheckpoint::new(cursor.height(), cursor.hash());
238249
builder = builder.chain_state(ChainState::Checkpoint(header_cp));
239250
log_debug!(
240-
self.logger,
251+
logger,
241252
"CBF builder: resuming from checkpoint height={}, hash={}",
242253
cursor.height(),
243254
cursor.hash(),
@@ -249,6 +260,11 @@ impl CbfChainSource {
249260
}
250261

251262
/// Start the bip157 node and spawn background tasks for event processing.
263+
///
264+
/// The node runs inside a restart loop: if `node.run()` returns an error,
265+
/// the loop rebuilds the node, swaps the requester, and respawns channel
266+
/// processing tasks — up to [`MAX_RESTART_RETRIES`] consecutive failures
267+
/// with exponential backoff starting at [`INITIAL_BACKOFF_MS`].
252268
pub(crate) fn start(&self, runtime: Arc<Runtime>, onchain_wallet: Arc<Wallet>) {
253269
let mut status = self.cbf_runtime_status.lock().unwrap();
254270
if matches!(*status, CbfRuntimeStatus::Started { .. }) {
@@ -263,42 +279,118 @@ impl CbfChainSource {
263279

264280
let Client { requester, info_rx, warn_rx, event_rx } = client;
265281

266-
// Spawn the bip157 node in the background.
267-
let node_logger = Arc::clone(&self.logger);
268-
runtime.spawn_background_task(async move {
269-
if let Err(e) = node.run().await {
270-
log_error!(node_logger, "CBF node exited with error: {:?}", e);
271-
}
272-
});
273-
274-
// Spawn a task to log info messages.
275-
let info_logger = Arc::clone(&self.logger);
276-
runtime
277-
.spawn_cancellable_background_task(Self::process_info_messages(info_rx, info_logger));
278-
279-
// Spawn a task to log warning messages.
280-
let warn_logger = Arc::clone(&self.logger);
281-
runtime
282-
.spawn_cancellable_background_task(Self::process_warn_messages(warn_rx, warn_logger));
283-
284-
// Spawn a task to process events.
285-
let event_state = CbfEventState {
286-
latest_tip: Arc::clone(&self.latest_tip),
287-
watched_scripts: Arc::clone(&self.watched_scripts),
288-
matched_block_hashes: Arc::clone(&self.matched_block_hashes),
289-
sync_completion_tx: Arc::clone(&self.sync_completion_tx),
290-
filter_skip_height: Arc::clone(&self.filter_skip_height),
291-
};
292-
let event_logger = Arc::clone(&self.logger);
293-
runtime.spawn_cancellable_background_task(Self::process_events(
294-
event_rx,
295-
event_state,
296-
event_logger,
297-
));
282+
*status = CbfRuntimeStatus::Started { requester };
283+
drop(status);
298284

299285
log_info!(self.logger, "CBF chain source started.");
300286

301-
*status = CbfRuntimeStatus::Started { requester };
287+
// Clone all Arc references needed by the restart loop so the async
288+
// block is 'static (no borrows of `self`).
289+
let restart_status = Arc::clone(&self.cbf_runtime_status);
290+
let restart_logger = Arc::clone(&self.logger);
291+
let restart_latest_tip = Arc::clone(&self.latest_tip);
292+
let restart_watched_scripts = Arc::clone(&self.watched_scripts);
293+
let restart_matched_block_hashes = Arc::clone(&self.matched_block_hashes);
294+
let restart_sync_completion_tx = Arc::clone(&self.sync_completion_tx);
295+
let restart_filter_skip_height = Arc::clone(&self.filter_skip_height);
296+
let restart_peers = self.peers.clone();
297+
let restart_sync_config = self.sync_config.clone();
298+
let restart_config = Arc::clone(&self.config);
299+
let restart_wallet = Arc::clone(&onchain_wallet);
300+
301+
runtime.spawn_background_task(async move {
302+
let mut current_node = node;
303+
let mut current_info_rx = info_rx;
304+
let mut current_warn_rx = warn_rx;
305+
let mut current_event_rx = event_rx;
306+
let mut retries = 0u32;
307+
let mut backoff_ms = INITIAL_BACKOFF_MS;
308+
309+
loop {
310+
// Spawn channel processing tasks for this iteration.
311+
let info_handle = tokio::spawn(Self::process_info_messages(
312+
current_info_rx,
313+
Arc::clone(&restart_logger),
314+
));
315+
let warn_handle = tokio::spawn(Self::process_warn_messages(
316+
current_warn_rx,
317+
Arc::clone(&restart_logger),
318+
));
319+
let event_state = CbfEventState {
320+
latest_tip: Arc::clone(&restart_latest_tip),
321+
watched_scripts: Arc::clone(&restart_watched_scripts),
322+
matched_block_hashes: Arc::clone(&restart_matched_block_hashes),
323+
sync_completion_tx: Arc::clone(&restart_sync_completion_tx),
324+
filter_skip_height: Arc::clone(&restart_filter_skip_height),
325+
};
326+
let event_handle = tokio::spawn(Self::process_events(
327+
current_event_rx,
328+
event_state,
329+
Arc::clone(&restart_logger),
330+
));
331+
332+
// Run the node until it exits.
333+
match current_node.run().await {
334+
Ok(()) => {
335+
log_info!(restart_logger, "CBF node shut down cleanly.");
336+
break;
337+
},
338+
Err(e) => {
339+
retries += 1;
340+
if retries > MAX_RESTART_RETRIES {
341+
log_error!(
342+
restart_logger,
343+
"CBF node failed {} times, giving up: {:?}",
344+
retries,
345+
e,
346+
);
347+
break;
348+
}
349+
log_error!(
350+
restart_logger,
351+
"CBF node exited with error (attempt {}/{}): {:?}. \
352+
Restarting in {}ms.",
353+
retries,
354+
MAX_RESTART_RETRIES,
355+
e,
356+
backoff_ms,
357+
);
358+
359+
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
360+
backoff_ms = backoff_ms.saturating_mul(2);
361+
362+
// Abort old channel processing tasks.
363+
info_handle.abort();
364+
warn_handle.abort();
365+
event_handle.abort();
366+
367+
// Rebuild the node from scratch.
368+
let (new_node, new_client) = Self::build_cbf_node_static(
369+
&restart_peers,
370+
&restart_sync_config,
371+
&restart_config,
372+
Some(&restart_wallet),
373+
&restart_logger,
374+
);
375+
let Client {
376+
requester: new_requester,
377+
info_rx: new_info_rx,
378+
warn_rx: new_warn_rx,
379+
event_rx: new_event_rx,
380+
} = new_client;
381+
382+
// Swap the requester so callers pick up the new handle.
383+
*restart_status.lock().unwrap() =
384+
CbfRuntimeStatus::Started { requester: new_requester };
385+
386+
current_node = new_node;
387+
current_info_rx = new_info_rx;
388+
current_warn_rx = new_warn_rx;
389+
current_event_rx = new_event_rx;
390+
},
391+
}
392+
}
393+
});
302394
}
303395

304396
/// Shut down the bip157 node and stop all background tasks.

0 commit comments

Comments
 (0)