Skip to content

Commit eb061da

Browse files
committed
fuzz: add deferred chanmon checkpoints
Enable deferred ChainMonitor writes in chanmon_consistency. Checkpoint the ChannelManager before flushing captured monitor writes. Treat checkpoint-only progress as progress during settle_all.
1 parent 0269d52 commit eb061da

1 file changed

Lines changed: 90 additions & 37 deletions

File tree

fuzz/src/chanmon_consistency.rs

Lines changed: 90 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,7 @@ struct HarnessNode<'a> {
817817
fee_estimator: Arc<FuzzEstimator>,
818818
wallet: TestWalletSource,
819819
persistence_style: ChannelMonitorUpdateStatus,
820+
deferred: bool,
820821
serialized_manager: Vec<u8>,
821822
height: u32,
822823
last_htlc_clear_fee: u32,
@@ -847,7 +848,7 @@ impl<'a> HarnessNode<'a> {
847848
fn build_chain_monitor(
848849
broadcaster: &Arc<TestBroadcaster>, fee_estimator: &Arc<FuzzEstimator>,
849850
keys_manager: &Arc<KeyProvider>, logger: Arc<dyn Logger + MaybeSend + MaybeSync>,
850-
persister: &Arc<HarnessPersister>,
851+
persister: &Arc<HarnessPersister>, deferred: bool,
851852
) -> Arc<TestChainMonitor> {
852853
Arc::new(chainmonitor::ChainMonitor::new(
853854
None,
@@ -857,14 +858,14 @@ impl<'a> HarnessNode<'a> {
857858
Arc::clone(persister),
858859
Arc::clone(keys_manager),
859860
keys_manager.get_peer_storage_key(),
860-
false,
861+
deferred,
861862
))
862863
}
863864

864865
fn new<Out: Output + MaybeSend + MaybeSync>(
865866
node_id: u8, wallet: TestWalletSource, fee_estimator: Arc<FuzzEstimator>,
866867
broadcaster: Arc<TestBroadcaster>, persistence_style: ChannelMonitorUpdateStatus,
867-
out: &Out, router: &'a FuzzRouter, chan_type: ChanType,
868+
deferred: bool, out: &Out, router: &'a FuzzRouter, chan_type: ChanType,
868869
) -> Self {
869870
let logger = Self::build_logger(node_id, out);
870871
let node_secret = SecretKey::from_slice(&[
@@ -884,6 +885,7 @@ impl<'a> HarnessNode<'a> {
884885
&keys_manager,
885886
Arc::clone(&logger),
886887
&persister,
888+
deferred,
887889
);
888890
let network = Network::Bitcoin;
889891
let best_block_timestamp = genesis_block(network).header.time;
@@ -913,6 +915,7 @@ impl<'a> HarnessNode<'a> {
913915
fee_estimator,
914916
wallet,
915917
persistence_style,
918+
deferred,
916919
serialized_manager: Vec::new(),
917920
height: 0,
918921
last_htlc_clear_fee: 253,
@@ -930,10 +933,13 @@ impl<'a> HarnessNode<'a> {
930933
self.persister.mark_update_completed(chan_id, monitor_id, data);
931934
}
932935

933-
fn complete_all_monitor_updates(&self, chan_id: &ChannelId) {
934-
for (monitor_id, data) in self.persister.drain_pending_updates(chan_id) {
936+
fn complete_all_monitor_updates(&self, chan_id: &ChannelId) -> bool {
937+
let completed_updates = self.persister.drain_pending_updates(chan_id);
938+
let completed_any = !completed_updates.is_empty();
939+
for (monitor_id, data) in completed_updates {
935940
self.finish_monitor_update(*chan_id, monitor_id, data);
936941
}
942+
completed_any
937943
}
938944

939945
fn complete_all_pending_monitor_updates(&self) {
@@ -966,9 +972,30 @@ impl<'a> HarnessNode<'a> {
966972
}
967973
}
968974

969-
fn refresh_serialized_manager(&mut self) {
975+
fn checkpoint_manager_persistence(&mut self) -> bool {
970976
if self.node.get_and_clear_needs_persistence() {
977+
let pending_monitor_writes = self.monitor.pending_operation_count();
971978
self.serialized_manager = self.node.encode();
979+
if self.deferred {
980+
self.monitor.flush(pending_monitor_writes, &self.logger);
981+
} else {
982+
assert_eq!(pending_monitor_writes, 0);
983+
}
984+
true
985+
} else {
986+
assert_eq!(self.monitor.pending_operation_count(), 0);
987+
false
988+
}
989+
}
990+
991+
fn force_checkpoint_manager_persistence(&mut self) {
992+
let pending_monitor_writes = self.monitor.pending_operation_count();
993+
self.serialized_manager = self.node.encode();
994+
self.node.get_and_clear_needs_persistence();
995+
if self.deferred {
996+
self.monitor.flush(pending_monitor_writes, &self.logger);
997+
} else {
998+
assert_eq!(pending_monitor_writes, 0);
972999
}
9731000
}
9741001

@@ -1073,15 +1100,14 @@ impl<'a> HarnessNode<'a> {
10731100
&mut self, use_old_mons: u8, out: &Out, router: &'a FuzzRouter, chan_type: ChanType,
10741101
) {
10751102
let logger = Self::build_logger(self.node_id, out);
1076-
// Re-registering monitors during reload reflects data that was already selected from
1077-
// simulated storage, so these startup watch_channel calls should complete immediately.
1078-
let persister = Self::build_persister(ChannelMonitorUpdateStatus::Completed);
1103+
let persister = Self::build_persister(self.persistence_style);
10791104
let chain_monitor = Self::build_chain_monitor(
10801105
&self.broadcaster,
10811106
&self.fee_estimator,
10821107
&self.keys_manager,
10831108
Arc::clone(&logger),
10841109
&persister,
1110+
self.deferred,
10851111
);
10861112

10871113
let mut monitors = new_hash_map();
@@ -1128,19 +1154,22 @@ impl<'a> HarnessNode<'a> {
11281154

11291155
let manager = <(BlockLocator, ChanMan)>::read(&mut &self.serialized_manager[..], read_args)
11301156
.expect("Failed to read manager");
1157+
let expected_status = if self.deferred {
1158+
ChannelMonitorUpdateStatus::InProgress
1159+
} else {
1160+
self.persistence_style
1161+
};
11311162
for (channel_id, mon) in monitors.drain() {
1132-
assert_eq!(
1133-
chain_monitor.watch_channel(channel_id, mon),
1134-
Ok(ChannelMonitorUpdateStatus::Completed)
1135-
);
1163+
assert_eq!(chain_monitor.watch_channel(channel_id, mon), Ok(expected_status));
11361164
}
1137-
// Future monitor writes should follow the node's configured persistence style; only the
1138-
// startup watch_channel registration above is forced to Completed.
1139-
*persister.update_ret.lock().unwrap() = self.persistence_style;
11401165
self.node = manager.1;
11411166
self.monitor = chain_monitor;
11421167
self.persister = persister;
11431168
self.logger = logger;
1169+
// In deferred mode, the startup watch_channel registrations above queue monitor operations
1170+
// even if the reloaded ChannelManager does not need persistence. Always checkpoint here so
1171+
// those registrations can be flushed against the manager snapshot they belong to.
1172+
self.force_checkpoint_manager_persistence();
11441173
}
11451174
}
11461175

@@ -1362,11 +1391,13 @@ impl PeerLink {
13621391
|| (self.node_a == node_b && self.node_b == node_a)
13631392
}
13641393

1365-
fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) {
1394+
fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) -> bool {
1395+
let mut completed_updates = false;
13661396
for id in &self.channel_ids {
1367-
nodes[self.node_a].complete_all_monitor_updates(id);
1368-
nodes[self.node_b].complete_all_monitor_updates(id);
1397+
completed_updates |= nodes[self.node_a].complete_all_monitor_updates(id);
1398+
completed_updates |= nodes[self.node_b].complete_all_monitor_updates(id);
13691399
}
1400+
completed_updates
13701401
}
13711402

13721403
fn complete_monitor_updates_for_node(
@@ -1937,9 +1968,12 @@ fn connect_peers(source: &ChanMan<'_>, dest: &ChanMan<'_>) {
19371968
}
19381969

19391970
fn make_channel(
1940-
source: &HarnessNode<'_>, dest: &HarnessNode<'_>, chan_id: i32, trusted_open: bool,
1941-
trusted_accept: bool, chain_state: &mut ChainState,
1971+
nodes: &mut [HarnessNode<'_>; 3], source_idx: usize, dest_idx: usize, chan_id: i32,
1972+
trusted_open: bool, trusted_accept: bool, chain_state: &mut ChainState,
19421973
) {
1974+
assert!(source_idx < dest_idx);
1975+
let (left, right) = nodes.split_at_mut(dest_idx);
1976+
let (source, dest) = (&mut left[source_idx], &mut right[0]);
19431977
if trusted_open {
19441978
source
19451979
.create_channel_to_trusted_peer_0reserve(
@@ -2050,7 +2084,8 @@ fn make_channel(
20502084
}
20512085
};
20522086
dest.handle_funding_created(source.get_our_node_id(), &funding_created);
2053-
// Complete any pending monitor persistence callbacks for dest after watch_channel.
2087+
dest.checkpoint_manager_persistence();
2088+
// Complete any monitor persistence callbacks made available for dest after watch_channel.
20542089
dest.complete_all_pending_monitor_updates();
20552090

20562091
let (funding_signed, channel_id) = {
@@ -2071,7 +2106,8 @@ fn make_channel(
20712106
}
20722107

20732108
source.handle_funding_signed(dest.get_our_node_id(), &funding_signed);
2074-
// Complete any pending monitor persistence callbacks for source after watch_channel.
2109+
source.checkpoint_manager_persistence();
2110+
// Complete any monitor persistence callbacks made available for source after watch_channel.
20752111
source.complete_all_pending_monitor_updates();
20762112

20772113
let events = source.get_and_clear_pending_events();
@@ -2143,6 +2179,11 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
21432179
ChannelMonitorUpdateStatus::Completed
21442180
},
21452181
];
2182+
let deferred = [
2183+
config_byte & 0b0010_0000 != 0,
2184+
config_byte & 0b0100_0000 != 0,
2185+
config_byte & 0b1000_0000 != 0,
2186+
];
21462187

21472188
let wallet_a = TestWalletSource::new(SecretKey::from_slice(&[1; 32]).unwrap());
21482189
let wallet_b = TestWalletSource::new(SecretKey::from_slice(&[2; 32]).unwrap());
@@ -2180,6 +2221,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
21802221
Arc::clone(&fee_est_a),
21812222
Arc::clone(&broadcast_a),
21822223
persistence_styles[0],
2224+
deferred[0],
21832225
&out,
21842226
router,
21852227
chan_type,
@@ -2190,6 +2232,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
21902232
Arc::clone(&fee_est_b),
21912233
Arc::clone(&broadcast_b),
21922234
persistence_styles[1],
2235+
deferred[1],
21932236
&out,
21942237
router,
21952238
chan_type,
@@ -2200,6 +2243,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
22002243
Arc::clone(&fee_est_c),
22012244
Arc::clone(&broadcast_c),
22022245
persistence_styles[2],
2246+
deferred[2],
22032247
&out,
22042248
router,
22052249
chan_type,
@@ -2217,14 +2261,14 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
22172261
// channel gets its own txid and funding outpoint.
22182262
// A-B: channel 2 A and B have 0-reserve (trusted open + trusted accept),
22192263
// channel 3 A has 0-reserve (trusted accept).
2220-
make_channel(&nodes[0], &nodes[1], 1, false, false, &mut chain_state);
2221-
make_channel(&nodes[0], &nodes[1], 2, true, true, &mut chain_state);
2222-
make_channel(&nodes[0], &nodes[1], 3, false, true, &mut chain_state);
2264+
make_channel(&mut nodes, 0, 1, 1, false, false, &mut chain_state);
2265+
make_channel(&mut nodes, 0, 1, 2, true, true, &mut chain_state);
2266+
make_channel(&mut nodes, 0, 1, 3, false, true, &mut chain_state);
22232267
// B-C: channel 4 B has 0-reserve (via trusted accept),
22242268
// channel 5 C has 0-reserve (via trusted open).
2225-
make_channel(&nodes[1], &nodes[2], 4, false, true, &mut chain_state);
2226-
make_channel(&nodes[1], &nodes[2], 5, true, false, &mut chain_state);
2227-
make_channel(&nodes[1], &nodes[2], 6, false, false, &mut chain_state);
2269+
make_channel(&mut nodes, 1, 2, 4, false, true, &mut chain_state);
2270+
make_channel(&mut nodes, 1, 2, 5, true, false, &mut chain_state);
2271+
make_channel(&mut nodes, 1, 2, 6, false, false, &mut chain_state);
22282272

22292273
// Wipe the transactions-broadcasted set to make sure we don't broadcast
22302274
// any transactions during normal operation after setup.
@@ -2251,7 +2295,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
22512295
};
22522296

22532297
for node in &mut nodes {
2254-
node.serialized_manager = node.encode();
2298+
node.force_checkpoint_manager_persistence();
22552299
}
22562300

22572301
Self {
@@ -2671,7 +2715,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
26712715
// claim/fail handling per event batch.
26722716
let mut claim_set = new_hash_map();
26732717
let mut events = nodes[node_idx].get_and_clear_pending_events();
2674-
let had_events = !events.is_empty();
2718+
let mut had_events = !events.is_empty();
26752719
for event in events.drain(..) {
26762720
match event {
26772721
events::Event::PaymentClaimable { payment_hash, .. } => {
@@ -2727,6 +2771,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
27272771
}
27282772
while nodes[node_idx].needs_pending_htlc_processing() {
27292773
nodes[node_idx].process_pending_htlc_forwards();
2774+
had_events = true;
27302775
}
27312776
had_events
27322777
}
@@ -2749,9 +2794,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
27492794
"It may take may iterations to settle the state, but it should not take forever"
27502795
);
27512796
}
2797+
let mut made_progress = self.checkpoint_manager_persistences();
27522798
// Next, make sure no monitor completion callbacks are pending.
2753-
self.ab_link.complete_all_monitor_updates(&self.nodes);
2754-
self.bc_link.complete_all_monitor_updates(&self.nodes);
2799+
made_progress |= self.ab_link.complete_all_monitor_updates(&self.nodes);
2800+
made_progress |= self.bc_link.complete_all_monitor_updates(&self.nodes);
27552801
// Then, make sure any current forwards make their way to their destination.
27562802
if self.process_msg_events(0, false, ProcessMessages::AllMessages) {
27572803
last_pass_no_updates = false;
@@ -2778,6 +2824,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
27782824
last_pass_no_updates = false;
27792825
continue;
27802826
}
2827+
if made_progress {
2828+
last_pass_no_updates = false;
2829+
continue;
2830+
}
27812831
if last_pass_no_updates {
27822832
// In some cases, we may generate a message to send in
27832833
// `process_msg_events`, but block sending until
@@ -2876,19 +2926,22 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
28762926
self.nodes[2].record_last_htlc_clear_fee();
28772927
}
28782928

2879-
fn refresh_serialized_managers(&mut self) {
2929+
fn checkpoint_manager_persistences(&mut self) -> bool {
2930+
let mut made_progress = false;
28802931
for node in &mut self.nodes {
2881-
node.refresh_serialized_manager();
2932+
made_progress |= node.checkpoint_manager_persistence();
28822933
}
2934+
made_progress
28832935
}
28842936
}
28852937

28862938
#[inline]
28872939
pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
28882940
let router = FuzzRouter {};
2889-
// Read initial monitor styles and channel type from fuzz input byte 0:
2941+
// Read initial monitor styles, channel type, and deferred write mode from fuzz input byte 0:
28902942
// bits 0-2: monitor styles (1 bit per node)
28912943
// bits 3-4: channel type (0=Legacy, 1=KeyedAnchors, 2=ZeroFeeCommitments)
2944+
// bits 5-7: deferred monitor write mode (1 bit per node)
28922945
let config_byte = if !data.is_empty() { data[0] } else { 0 };
28932946
let mut harness = Harness::new(config_byte, out, &router);
28942947
let mut read_pos = 1; // First byte was consumed for initial config.
@@ -3300,7 +3353,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
33003353
_ => break 'fuzz_loop,
33013354
}
33023355

3303-
harness.refresh_serialized_managers();
3356+
harness.checkpoint_manager_persistences();
33043357
}
33053358
harness.finish();
33063359
}

0 commit comments

Comments
 (0)