Skip to content

Commit a798d74

Browse files
committed
Add deferred monitor write support to chanmon_consistency fuzz target
Add a deferred flag to TestChainMonitor that controls whether the underlying ChainMonitor queues operations instead of executing them immediately. The flag is derived from the first fuzz input byte so each of the three nodes can independently run in deferred or immediate mode, while the same byte also selects the channel type. In deferred mode, watch_channel and update_channel always return InProgress. A new flush_and_update_latest_monitors method drains the queued operations and, when the persister reports Completed, promotes the pending shadow monitor snapshots to persisted state. This method is called before release_pending_monitor_events, at each point where the fuzzer completes pending monitor updates, and after watch_channel during node reload so the node starts from a consistent state. The first-byte config layout now uses bits 0 to 2 for initial monitor status, bits 3 to 4 for channel type, and bits 5 to 7 for deferred monitor writes. This fixes the rebase conflict where deferred mode still referenced the pre-channel-type bit layout. AI tools were used in preparing this commit.
1 parent 03ab73d commit a798d74

1 file changed

Lines changed: 64 additions & 4 deletions

File tree

fuzz/src/chanmon_consistency.rs

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -330,12 +330,13 @@ struct TestChainMonitor {
330330
Arc<KeyProvider>,
331331
>,
332332
>,
333+
pub deferred: bool,
333334
pub latest_monitors: Mutex<HashMap<ChannelId, LatestMonitorState>>,
334335
}
335336
impl TestChainMonitor {
336337
pub fn new(
337338
broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>,
338-
persister: Arc<TestPersister>, keys: Arc<KeyProvider>,
339+
persister: Arc<TestPersister>, keys: Arc<KeyProvider>, deferred: bool,
339340
) -> Self {
340341
Self {
341342
chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(
@@ -346,14 +347,44 @@ impl TestChainMonitor {
346347
Arc::clone(&persister),
347348
Arc::clone(&keys),
348349
keys.get_peer_storage_key(),
349-
false,
350+
deferred,
350351
)),
351352
logger,
352353
keys,
353354
persister,
355+
deferred,
354356
latest_monitors: Mutex::new(new_hash_map()),
355357
}
356358
}
359+
360+
/// Flushes all deferred monitor operations and, if the persister reports success, promotes
361+
/// pending monitor states to persisted in our shadow records. `TestChainMonitor` maintains
362+
/// its own `latest_monitors` map that tracks serialized monitor snapshots independently of
363+
/// `ChainMonitor`, so that the fuzzer can simulate node restarts by deserializing from these
364+
/// snapshots rather than relying on the persister's storage.
365+
///
366+
/// This simulates the pattern of snapshotting the pending count, persisting the
367+
/// `ChannelManager`, then flushing the queued monitor writes.
368+
fn flush_and_update_latest_monitors(&self) {
369+
let count = self.chain_monitor.pending_operation_count();
370+
if count == 0 {
371+
return;
372+
}
373+
// Execute all queued watch_channel/update_channel operations inside the ChainMonitor.
374+
self.chain_monitor.flush(count, &self.logger);
375+
let persister_res = *self.persister.update_ret.lock().unwrap();
376+
// Only update our local tracking state when the persister signals completion. When
377+
// persistence is still in-progress, the monitors stay in the pending set so that a
378+
// simulated restart can still reload from the last fully-persisted snapshot.
379+
if persister_res == chain::ChannelMonitorUpdateStatus::Completed {
380+
for (_channel_id, state) in self.latest_monitors.lock().unwrap().iter_mut() {
381+
if let Some((id, data)) = state.pending_monitors.drain(..).last() {
382+
state.persisted_monitor_id = id;
383+
state.persisted_monitor = data;
384+
}
385+
}
386+
}
387+
}
357388
}
358389
impl chain::Watch<TestChannelSigner> for TestChainMonitor {
359390
fn watch_channel(
@@ -363,6 +394,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
363394
monitor.write(&mut ser).unwrap();
364395
let monitor_id = monitor.get_latest_update_id();
365396
let res = self.chain_monitor.watch_channel(channel_id, monitor);
397+
if self.deferred {
398+
assert_eq!(res, Ok(chain::ChannelMonitorUpdateStatus::InProgress));
399+
}
366400
let state = match res {
367401
Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState {
368402
persisted_monitor_id: monitor_id,
@@ -412,6 +446,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
412446
let mut ser = VecWriter(Vec::new());
413447
deserialized_monitor.write(&mut ser).unwrap();
414448
let res = self.chain_monitor.update_channel(channel_id, update);
449+
if self.deferred {
450+
assert_eq!(res, chain::ChannelMonitorUpdateStatus::InProgress);
451+
}
415452
match res {
416453
chain::ChannelMonitorUpdateStatus::Completed => {
417454
map_entry.persisted_monitor_id = update.update_id;
@@ -428,6 +465,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
428465
fn release_pending_monitor_events(
429466
&self,
430467
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
468+
if self.deferred {
469+
self.flush_and_update_latest_monitors();
470+
}
431471
return self.chain_monitor.release_pending_monitor_events();
432472
}
433473
}
@@ -949,9 +989,10 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
949989
let broadcast_c = Arc::new(TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) });
950990
let router = FuzzRouter {};
951991

952-
// Read initial monitor styles and channel type from fuzz input byte 0:
992+
// Read initial monitor styles, channel type, and deferred write mode from fuzz input byte 0:
953993
// bits 0-2: monitor styles (1 bit per node)
954994
// bits 3-4: channel type (0=Legacy, 1=KeyedAnchors, 2=ZeroFeeCommitments)
995+
// bits 5-7: deferred monitor write mode (1 bit per node)
955996
let config_byte = if !data.is_empty() { data[0] } else { 0 };
956997
let chan_type = match (config_byte >> 3) & 0b11 {
957998
0 => ChanType::Legacy,
@@ -975,6 +1016,11 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
9751016
ChannelMonitorUpdateStatus::Completed
9761017
}),
9771018
];
1019+
let deferred = [
1020+
config_byte & 0b0010_0000 != 0,
1021+
config_byte & 0b0100_0000 != 0,
1022+
config_byte & 0b1000_0000 != 0,
1023+
];
9781024

9791025
let mut chain_state = ChainState::new();
9801026
let mut node_height_a: u32 = 0;
@@ -1003,6 +1049,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
10031049
update_ret: Mutex::new(mon_style[$node_id as usize].borrow().clone()),
10041050
}),
10051051
Arc::clone(&keys_manager),
1052+
deferred[$node_id as usize],
10061053
));
10071054

10081055
let mut config = UserConfig::default();
@@ -1067,6 +1114,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
10671114
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
10681115
}),
10691116
Arc::clone(keys),
1117+
deferred[node_id as usize],
10701118
));
10711119

10721120
let mut config = UserConfig::default();
@@ -1144,18 +1192,28 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
11441192
let manager = <(BlockLocator, ChanMan)>::read(&mut &ser[..], read_args)
11451193
.expect("Failed to read manager");
11461194
let res = (manager.1, chain_monitor.clone());
1195+
let expected_status = if deferred[node_id as usize] {
1196+
ChannelMonitorUpdateStatus::InProgress
1197+
} else {
1198+
ChannelMonitorUpdateStatus::Completed
1199+
};
11471200
for (channel_id, mon) in monitors.drain() {
11481201
assert_eq!(
11491202
chain_monitor.chain_monitor.watch_channel(channel_id, mon),
1150-
Ok(ChannelMonitorUpdateStatus::Completed)
1203+
Ok(expected_status)
11511204
);
11521205
}
1206+
if deferred[node_id as usize] {
1207+
let count = chain_monitor.chain_monitor.pending_operation_count();
1208+
chain_monitor.chain_monitor.flush(count, &chain_monitor.logger);
1209+
}
11531210
*chain_monitor.persister.update_ret.lock().unwrap() = *mon_style[node_id as usize].borrow();
11541211
res
11551212
};
11561213

11571214
macro_rules! complete_all_pending_monitor_updates {
11581215
($monitor: expr) => {{
1216+
$monitor.flush_and_update_latest_monitors();
11591217
for (channel_id, state) in $monitor.latest_monitors.lock().unwrap().iter_mut() {
11601218
for (id, data) in state.pending_monitors.drain(..) {
11611219
$monitor.chain_monitor.channel_monitor_updated(*channel_id, id).unwrap();
@@ -2133,6 +2191,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
21332191
|monitor: &Arc<TestChainMonitor>,
21342192
chan_funding,
21352193
compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>| {
2194+
monitor.flush_and_update_latest_monitors();
21362195
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
21372196
assert!(
21382197
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
@@ -2149,6 +2208,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
21492208
};
21502209

21512210
let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_id| {
2211+
monitor.flush_and_update_latest_monitors();
21522212
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_id) {
21532213
assert!(
21542214
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),

0 commit comments

Comments
 (0)