Skip to content

Commit d4bf3e0

Browse files
committed
Add deferred monitor write support to chanmon_consistency fuzz target
Add a deferred flag to TestChainMonitor that controls whether the underlying ChainMonitor queues operations instead of executing them immediately. The flag is derived from the first fuzz input byte so each of the three nodes can independently run in deferred or immediate mode, while the same byte also selects the channel type. In deferred mode, watch_channel and update_channel always return InProgress. A new flush_and_update_latest_monitors method drains the queued operations and, when the persister reports Completed, promotes the pending shadow monitor snapshots to persisted state. This method is called before release_pending_monitor_events, at each point where the fuzzer completes pending monitor updates, and after watch_channel during node reload so the node starts from a consistent state. The first-byte config layout now uses bits 0 to 2 for initial monitor status, bits 3 to 4 for channel type, and bits 5 to 7 for deferred monitor writes. This fixes the rebase conflict where deferred mode still referenced the pre-channel-type bit layout. AI tools were used in preparing this commit.
1 parent 423c1dc commit d4bf3e0

1 file changed

Lines changed: 64 additions & 4 deletions

File tree

fuzz/src/chanmon_consistency.rs

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,13 @@ struct TestChainMonitor {
267267
Arc<KeyProvider>,
268268
>,
269269
>,
270+
pub deferred: bool,
270271
pub latest_monitors: Mutex<HashMap<ChannelId, LatestMonitorState>>,
271272
}
272273
impl TestChainMonitor {
273274
pub fn new(
274275
broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>,
275-
persister: Arc<TestPersister>, keys: Arc<KeyProvider>,
276+
persister: Arc<TestPersister>, keys: Arc<KeyProvider>, deferred: bool,
276277
) -> Self {
277278
Self {
278279
chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(
@@ -283,14 +284,44 @@ impl TestChainMonitor {
283284
Arc::clone(&persister),
284285
Arc::clone(&keys),
285286
keys.get_peer_storage_key(),
286-
false,
287+
deferred,
287288
)),
288289
logger,
289290
keys,
290291
persister,
292+
deferred,
291293
latest_monitors: Mutex::new(new_hash_map()),
292294
}
293295
}
296+
297+
/// Flushes all deferred monitor operations and, if the persister reports success, promotes
298+
/// pending monitor states to persisted in our shadow records. `TestChainMonitor` maintains
299+
/// its own `latest_monitors` map that tracks serialized monitor snapshots independently of
300+
/// `ChainMonitor`, so that the fuzzer can simulate node restarts by deserializing from these
301+
/// snapshots rather than relying on the persister's storage.
302+
///
303+
/// This simulates the pattern of snapshotting the pending count, persisting the
304+
/// `ChannelManager`, then flushing the queued monitor writes.
305+
fn flush_and_update_latest_monitors(&self) {
306+
let count = self.chain_monitor.pending_operation_count();
307+
if count == 0 {
308+
return;
309+
}
310+
// Execute all queued watch_channel/update_channel operations inside the ChainMonitor.
311+
self.chain_monitor.flush(count, &self.logger);
312+
let persister_res = *self.persister.update_ret.lock().unwrap();
313+
// Only update our local tracking state when the persister signals completion. When
314+
// persistence is still in-progress, the monitors stay in the pending set so that a
315+
// simulated restart can still reload from the last fully-persisted snapshot.
316+
if persister_res == chain::ChannelMonitorUpdateStatus::Completed {
317+
for (_channel_id, state) in self.latest_monitors.lock().unwrap().iter_mut() {
318+
if let Some((id, data)) = state.pending_monitors.drain(..).last() {
319+
state.persisted_monitor_id = id;
320+
state.persisted_monitor = data;
321+
}
322+
}
323+
}
324+
}
294325
}
295326
impl chain::Watch<TestChannelSigner> for TestChainMonitor {
296327
fn watch_channel(
@@ -300,6 +331,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
300331
monitor.write(&mut ser).unwrap();
301332
let monitor_id = monitor.get_latest_update_id();
302333
let res = self.chain_monitor.watch_channel(channel_id, monitor);
334+
if self.deferred {
335+
assert_eq!(res, Ok(chain::ChannelMonitorUpdateStatus::InProgress));
336+
}
303337
let state = match res {
304338
Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState {
305339
persisted_monitor_id: monitor_id,
@@ -349,6 +383,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
349383
let mut ser = VecWriter(Vec::new());
350384
deserialized_monitor.write(&mut ser).unwrap();
351385
let res = self.chain_monitor.update_channel(channel_id, update);
386+
if self.deferred {
387+
assert_eq!(res, chain::ChannelMonitorUpdateStatus::InProgress);
388+
}
352389
match res {
353390
chain::ChannelMonitorUpdateStatus::Completed => {
354391
map_entry.persisted_monitor_id = update.update_id;
@@ -365,6 +402,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
365402
fn release_pending_monitor_events(
366403
&self,
367404
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
405+
if self.deferred {
406+
self.flush_and_update_latest_monitors();
407+
}
368408
return self.chain_monitor.release_pending_monitor_events();
369409
}
370410
}
@@ -876,9 +916,10 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
876916
let broadcast_c = Arc::new(TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) });
877917
let router = FuzzRouter {};
878918

879-
// Read initial monitor styles and channel type from fuzz input byte 0:
919+
// Read initial monitor styles, channel type, and deferred write mode from fuzz input byte 0:
880920
// bits 0-2: monitor styles (1 bit per node)
881921
// bits 3-4: channel type (0=Legacy, 1=KeyedAnchors, 2=ZeroFeeCommitments)
922+
// bits 5-7: deferred monitor write mode (1 bit per node)
882923
let config_byte = if !data.is_empty() { data[0] } else { 0 };
883924
let chan_type = match (config_byte >> 3) & 0b11 {
884925
0 => ChanType::Legacy,
@@ -902,6 +943,11 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
902943
ChannelMonitorUpdateStatus::Completed
903944
}),
904945
];
946+
let deferred = [
947+
config_byte & 0b0010_0000 != 0,
948+
config_byte & 0b0100_0000 != 0,
949+
config_byte & 0b1000_0000 != 0,
950+
];
905951

906952
let mut chain_state = ChainState::new();
907953
let mut node_height_a: u32 = 0;
@@ -930,6 +976,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
930976
update_ret: Mutex::new(mon_style[$node_id as usize].borrow().clone()),
931977
}),
932978
Arc::clone(&keys_manager),
979+
deferred[$node_id as usize],
933980
));
934981

935982
let mut config = UserConfig::default();
@@ -993,6 +1040,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
9931040
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
9941041
}),
9951042
Arc::clone(keys),
1043+
deferred[node_id as usize],
9961044
));
9971045

9981046
let mut config = UserConfig::default();
@@ -1070,18 +1118,28 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
10701118
let manager =
10711119
<(BestBlock, ChanMan)>::read(&mut &ser[..], read_args).expect("Failed to read manager");
10721120
let res = (manager.1, chain_monitor.clone());
1121+
let expected_status = if deferred[node_id as usize] {
1122+
ChannelMonitorUpdateStatus::InProgress
1123+
} else {
1124+
ChannelMonitorUpdateStatus::Completed
1125+
};
10731126
for (channel_id, mon) in monitors.drain() {
10741127
assert_eq!(
10751128
chain_monitor.chain_monitor.watch_channel(channel_id, mon),
1076-
Ok(ChannelMonitorUpdateStatus::Completed)
1129+
Ok(expected_status)
10771130
);
10781131
}
1132+
if deferred[node_id as usize] {
1133+
let count = chain_monitor.chain_monitor.pending_operation_count();
1134+
chain_monitor.chain_monitor.flush(count, &chain_monitor.logger);
1135+
}
10791136
*chain_monitor.persister.update_ret.lock().unwrap() = *mon_style[node_id as usize].borrow();
10801137
res
10811138
};
10821139

10831140
macro_rules! complete_all_pending_monitor_updates {
10841141
($monitor: expr) => {{
1142+
$monitor.flush_and_update_latest_monitors();
10851143
for (channel_id, state) in $monitor.latest_monitors.lock().unwrap().iter_mut() {
10861144
for (id, data) in state.pending_monitors.drain(..) {
10871145
$monitor.chain_monitor.channel_monitor_updated(*channel_id, id).unwrap();
@@ -2060,6 +2118,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
20602118
|monitor: &Arc<TestChainMonitor>,
20612119
chan_funding,
20622120
compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>| {
2121+
monitor.flush_and_update_latest_monitors();
20632122
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
20642123
assert!(
20652124
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
@@ -2076,6 +2135,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
20762135
};
20772136

20782137
let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_id| {
2138+
monitor.flush_and_update_latest_monitors();
20792139
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_id) {
20802140
assert!(
20812141
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),

0 commit comments

Comments
 (0)