Skip to content

Commit 5eb96ab

Browse files
committed
fuzz: add deferred chanmon checkpoints
Enable deferred ChainMonitor writes in chanmon_consistency. Checkpoint the ChannelManager before flushing captured monitor writes. Treat checkpoint-only progress as progress during settle_all.
1 parent 0269d52 commit 5eb96ab

1 file changed

Lines changed: 99 additions & 37 deletions

File tree

fuzz/src/chanmon_consistency.rs

Lines changed: 99 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,7 @@ struct HarnessNode<'a> {
817817
fee_estimator: Arc<FuzzEstimator>,
818818
wallet: TestWalletSource,
819819
persistence_style: ChannelMonitorUpdateStatus,
820+
deferred: bool,
820821
serialized_manager: Vec<u8>,
821822
height: u32,
822823
last_htlc_clear_fee: u32,
@@ -847,7 +848,7 @@ impl<'a> HarnessNode<'a> {
847848
fn build_chain_monitor(
848849
broadcaster: &Arc<TestBroadcaster>, fee_estimator: &Arc<FuzzEstimator>,
849850
keys_manager: &Arc<KeyProvider>, logger: Arc<dyn Logger + MaybeSend + MaybeSync>,
850-
persister: &Arc<HarnessPersister>,
851+
persister: &Arc<HarnessPersister>, deferred: bool,
851852
) -> Arc<TestChainMonitor> {
852853
Arc::new(chainmonitor::ChainMonitor::new(
853854
None,
@@ -857,14 +858,14 @@ impl<'a> HarnessNode<'a> {
857858
Arc::clone(persister),
858859
Arc::clone(keys_manager),
859860
keys_manager.get_peer_storage_key(),
860-
false,
861+
deferred,
861862
))
862863
}
863864

864865
fn new<Out: Output + MaybeSend + MaybeSync>(
865866
node_id: u8, wallet: TestWalletSource, fee_estimator: Arc<FuzzEstimator>,
866867
broadcaster: Arc<TestBroadcaster>, persistence_style: ChannelMonitorUpdateStatus,
867-
out: &Out, router: &'a FuzzRouter, chan_type: ChanType,
868+
deferred: bool, out: &Out, router: &'a FuzzRouter, chan_type: ChanType,
868869
) -> Self {
869870
let logger = Self::build_logger(node_id, out);
870871
let node_secret = SecretKey::from_slice(&[
@@ -884,6 +885,7 @@ impl<'a> HarnessNode<'a> {
884885
&keys_manager,
885886
Arc::clone(&logger),
886887
&persister,
888+
deferred,
887889
);
888890
let network = Network::Bitcoin;
889891
let best_block_timestamp = genesis_block(network).header.time;
@@ -913,6 +915,7 @@ impl<'a> HarnessNode<'a> {
913915
fee_estimator,
914916
wallet,
915917
persistence_style,
918+
deferred,
916919
serialized_manager: Vec::new(),
917920
height: 0,
918921
last_htlc_clear_fee: 253,
@@ -930,19 +933,25 @@ impl<'a> HarnessNode<'a> {
930933
self.persister.mark_update_completed(chan_id, monitor_id, data);
931934
}
932935

933-
fn complete_all_monitor_updates(&self, chan_id: &ChannelId) {
934-
for (monitor_id, data) in self.persister.drain_pending_updates(chan_id) {
936+
fn complete_all_monitor_updates(&self, chan_id: &ChannelId) -> bool {
937+
assert_eq!(self.monitor.pending_operation_count(), 0);
938+
let completed_updates = self.persister.drain_pending_updates(chan_id);
939+
let completed_any = !completed_updates.is_empty();
940+
for (monitor_id, data) in completed_updates {
935941
self.finish_monitor_update(*chan_id, monitor_id, data);
936942
}
943+
completed_any
937944
}
938945

939946
fn complete_all_pending_monitor_updates(&self) {
947+
assert_eq!(self.monitor.pending_operation_count(), 0);
940948
for (channel_id, monitor_id, data) in self.persister.drain_all_pending_updates() {
941949
self.finish_monitor_update(channel_id, monitor_id, data);
942950
}
943951
}
944952

945953
fn complete_monitor_update(&self, chan_id: &ChannelId, selector: MonitorUpdateSelector) {
954+
assert_eq!(self.monitor.pending_operation_count(), 0);
946955
if let Some((monitor_id, data)) = self.persister.take_pending_update(chan_id, selector) {
947956
self.finish_monitor_update(*chan_id, monitor_id, data);
948957
}
@@ -966,9 +975,30 @@ impl<'a> HarnessNode<'a> {
966975
}
967976
}
968977

969-
fn refresh_serialized_manager(&mut self) {
978+
fn checkpoint_manager_persistence(&mut self) -> bool {
970979
if self.node.get_and_clear_needs_persistence() {
980+
let pending_monitor_writes = self.monitor.pending_operation_count();
971981
self.serialized_manager = self.node.encode();
982+
if self.deferred {
983+
self.monitor.flush(pending_monitor_writes, &self.logger);
984+
} else {
985+
assert_eq!(pending_monitor_writes, 0);
986+
}
987+
true
988+
} else {
989+
assert_eq!(self.monitor.pending_operation_count(), 0);
990+
false
991+
}
992+
}
993+
994+
fn force_checkpoint_manager_persistence(&mut self) {
995+
let pending_monitor_writes = self.monitor.pending_operation_count();
996+
self.serialized_manager = self.node.encode();
997+
self.node.get_and_clear_needs_persistence();
998+
if self.deferred {
999+
self.monitor.flush(pending_monitor_writes, &self.logger);
1000+
} else {
1001+
assert_eq!(pending_monitor_writes, 0);
9721002
}
9731003
}
9741004

@@ -1082,6 +1112,7 @@ impl<'a> HarnessNode<'a> {
10821112
&self.keys_manager,
10831113
Arc::clone(&logger),
10841114
&persister,
1115+
self.deferred,
10851116
);
10861117

10871118
let mut monitors = new_hash_map();
@@ -1126,21 +1157,28 @@ impl<'a> HarnessNode<'a> {
11261157
channel_monitors: monitor_refs,
11271158
};
11281159

1129-
let manager = <(BlockLocator, ChanMan)>::read(&mut &self.serialized_manager[..], read_args)
1130-
.expect("Failed to read manager");
1160+
let (_block_locator, manager) =
1161+
<(BlockLocator, ChanMan)>::read(&mut &self.serialized_manager[..], read_args)
1162+
.expect("Failed to read manager");
1163+
let expected_status = if self.deferred {
1164+
ChannelMonitorUpdateStatus::InProgress
1165+
} else {
1166+
ChannelMonitorUpdateStatus::Completed
1167+
};
11311168
for (channel_id, mon) in monitors.drain() {
1132-
assert_eq!(
1133-
chain_monitor.watch_channel(channel_id, mon),
1134-
Ok(ChannelMonitorUpdateStatus::Completed)
1135-
);
1169+
assert_eq!(chain_monitor.watch_channel(channel_id, mon), Ok(expected_status));
11361170
}
1137-
// Future monitor writes should follow the node's configured persistence style; only the
1138-
// startup watch_channel registration above is forced to Completed.
1139-
*persister.update_ret.lock().unwrap() = self.persistence_style;
1140-
self.node = manager.1;
1171+
self.node = manager;
11411172
self.monitor = chain_monitor;
11421173
self.persister = persister;
11431174
self.logger = logger;
1175+
if self.deferred {
1176+
self.force_checkpoint_manager_persistence();
1177+
}
1178+
// Future monitor writes should follow the node's configured persistence style. Keep the
1179+
// startup watch_channel persistence forced to Completed until any deferred startup writes
1180+
// have been flushed above.
1181+
*self.persister.update_ret.lock().unwrap() = self.persistence_style;
11441182
}
11451183
}
11461184

@@ -1362,11 +1400,13 @@ impl PeerLink {
13621400
|| (self.node_a == node_b && self.node_b == node_a)
13631401
}
13641402

1365-
fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) {
1403+
fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) -> bool {
1404+
let mut completed_updates = false;
13661405
for id in &self.channel_ids {
1367-
nodes[self.node_a].complete_all_monitor_updates(id);
1368-
nodes[self.node_b].complete_all_monitor_updates(id);
1406+
completed_updates |= nodes[self.node_a].complete_all_monitor_updates(id);
1407+
completed_updates |= nodes[self.node_b].complete_all_monitor_updates(id);
13691408
}
1409+
completed_updates
13701410
}
13711411

13721412
fn complete_monitor_updates_for_node(
@@ -1937,9 +1977,12 @@ fn connect_peers(source: &ChanMan<'_>, dest: &ChanMan<'_>) {
19371977
}
19381978

19391979
fn make_channel(
1940-
source: &HarnessNode<'_>, dest: &HarnessNode<'_>, chan_id: i32, trusted_open: bool,
1941-
trusted_accept: bool, chain_state: &mut ChainState,
1980+
nodes: &mut [HarnessNode<'_>; 3], source_idx: usize, dest_idx: usize, chan_id: i32,
1981+
trusted_open: bool, trusted_accept: bool, chain_state: &mut ChainState,
19421982
) {
1983+
assert!(source_idx < dest_idx);
1984+
let (left, right) = nodes.split_at_mut(dest_idx);
1985+
let (source, dest) = (&mut left[source_idx], &mut right[0]);
19431986
if trusted_open {
19441987
source
19451988
.create_channel_to_trusted_peer_0reserve(
@@ -2050,7 +2093,8 @@ fn make_channel(
20502093
}
20512094
};
20522095
dest.handle_funding_created(source.get_our_node_id(), &funding_created);
2053-
// Complete any pending monitor persistence callbacks for dest after watch_channel.
2096+
dest.checkpoint_manager_persistence();
2097+
// Complete any monitor persistence callbacks made available for dest after watch_channel.
20542098
dest.complete_all_pending_monitor_updates();
20552099

20562100
let (funding_signed, channel_id) = {
@@ -2071,7 +2115,8 @@ fn make_channel(
20712115
}
20722116

20732117
source.handle_funding_signed(dest.get_our_node_id(), &funding_signed);
2074-
// Complete any pending monitor persistence callbacks for source after watch_channel.
2118+
source.checkpoint_manager_persistence();
2119+
// Complete any monitor persistence callbacks made available for source after watch_channel.
20752120
source.complete_all_pending_monitor_updates();
20762121

20772122
let events = source.get_and_clear_pending_events();
@@ -2143,6 +2188,11 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
21432188
ChannelMonitorUpdateStatus::Completed
21442189
},
21452190
];
2191+
let deferred = [
2192+
config_byte & 0b0010_0000 != 0,
2193+
config_byte & 0b0100_0000 != 0,
2194+
config_byte & 0b1000_0000 != 0,
2195+
];
21462196

21472197
let wallet_a = TestWalletSource::new(SecretKey::from_slice(&[1; 32]).unwrap());
21482198
let wallet_b = TestWalletSource::new(SecretKey::from_slice(&[2; 32]).unwrap());
@@ -2180,6 +2230,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
21802230
Arc::clone(&fee_est_a),
21812231
Arc::clone(&broadcast_a),
21822232
persistence_styles[0],
2233+
deferred[0],
21832234
&out,
21842235
router,
21852236
chan_type,
@@ -2190,6 +2241,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
21902241
Arc::clone(&fee_est_b),
21912242
Arc::clone(&broadcast_b),
21922243
persistence_styles[1],
2244+
deferred[1],
21932245
&out,
21942246
router,
21952247
chan_type,
@@ -2200,6 +2252,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
22002252
Arc::clone(&fee_est_c),
22012253
Arc::clone(&broadcast_c),
22022254
persistence_styles[2],
2255+
deferred[2],
22032256
&out,
22042257
router,
22052258
chan_type,
@@ -2217,14 +2270,14 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
22172270
// channel gets its own txid and funding outpoint.
22182271
// A-B: channel 2 A and B have 0-reserve (trusted open + trusted accept),
22192272
// channel 3 A has 0-reserve (trusted accept).
2220-
make_channel(&nodes[0], &nodes[1], 1, false, false, &mut chain_state);
2221-
make_channel(&nodes[0], &nodes[1], 2, true, true, &mut chain_state);
2222-
make_channel(&nodes[0], &nodes[1], 3, false, true, &mut chain_state);
2273+
make_channel(&mut nodes, 0, 1, 1, false, false, &mut chain_state);
2274+
make_channel(&mut nodes, 0, 1, 2, true, true, &mut chain_state);
2275+
make_channel(&mut nodes, 0, 1, 3, false, true, &mut chain_state);
22232276
// B-C: channel 4 B has 0-reserve (via trusted accept),
22242277
// channel 5 C has 0-reserve (via trusted open).
2225-
make_channel(&nodes[1], &nodes[2], 4, false, true, &mut chain_state);
2226-
make_channel(&nodes[1], &nodes[2], 5, true, false, &mut chain_state);
2227-
make_channel(&nodes[1], &nodes[2], 6, false, false, &mut chain_state);
2278+
make_channel(&mut nodes, 1, 2, 4, false, true, &mut chain_state);
2279+
make_channel(&mut nodes, 1, 2, 5, true, false, &mut chain_state);
2280+
make_channel(&mut nodes, 1, 2, 6, false, false, &mut chain_state);
22282281

22292282
// Wipe the transactions-broadcasted set to make sure we don't broadcast
22302283
// any transactions during normal operation after setup.
@@ -2251,7 +2304,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
22512304
};
22522305

22532306
for node in &mut nodes {
2254-
node.serialized_manager = node.encode();
2307+
node.force_checkpoint_manager_persistence();
22552308
}
22562309

22572310
Self {
@@ -2671,7 +2724,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
26712724
// claim/fail handling per event batch.
26722725
let mut claim_set = new_hash_map();
26732726
let mut events = nodes[node_idx].get_and_clear_pending_events();
2674-
let had_events = !events.is_empty();
2727+
let mut had_events = !events.is_empty();
26752728
for event in events.drain(..) {
26762729
match event {
26772730
events::Event::PaymentClaimable { payment_hash, .. } => {
@@ -2727,6 +2780,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
27272780
}
27282781
while nodes[node_idx].needs_pending_htlc_processing() {
27292782
nodes[node_idx].process_pending_htlc_forwards();
2783+
had_events = true;
27302784
}
27312785
had_events
27322786
}
@@ -2749,9 +2803,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
27492803
"It may take may iterations to settle the state, but it should not take forever"
27502804
);
27512805
}
2806+
let mut made_progress = self.checkpoint_manager_persistences();
27522807
// Next, make sure no monitor completion callbacks are pending.
2753-
self.ab_link.complete_all_monitor_updates(&self.nodes);
2754-
self.bc_link.complete_all_monitor_updates(&self.nodes);
2808+
made_progress |= self.ab_link.complete_all_monitor_updates(&self.nodes);
2809+
made_progress |= self.bc_link.complete_all_monitor_updates(&self.nodes);
27552810
// Then, make sure any current forwards make their way to their destination.
27562811
if self.process_msg_events(0, false, ProcessMessages::AllMessages) {
27572812
last_pass_no_updates = false;
@@ -2778,6 +2833,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
27782833
last_pass_no_updates = false;
27792834
continue;
27802835
}
2836+
if made_progress {
2837+
last_pass_no_updates = false;
2838+
continue;
2839+
}
27812840
if last_pass_no_updates {
27822841
// In some cases, we may generate a message to send in
27832842
// `process_msg_events`, but block sending until
@@ -2876,19 +2935,22 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
28762935
self.nodes[2].record_last_htlc_clear_fee();
28772936
}
28782937

2879-
fn refresh_serialized_managers(&mut self) {
2938+
fn checkpoint_manager_persistences(&mut self) -> bool {
2939+
let mut made_progress = false;
28802940
for node in &mut self.nodes {
2881-
node.refresh_serialized_manager();
2941+
made_progress |= node.checkpoint_manager_persistence();
28822942
}
2943+
made_progress
28832944
}
28842945
}
28852946

28862947
#[inline]
28872948
pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
28882949
let router = FuzzRouter {};
2889-
// Read initial monitor styles and channel type from fuzz input byte 0:
2950+
// Read initial monitor styles, channel type, and deferred write mode from fuzz input byte 0:
28902951
// bits 0-2: monitor styles (1 bit per node)
28912952
// bits 3-4: channel type (0=Legacy, 1=KeyedAnchors, 2=ZeroFeeCommitments)
2953+
// bits 5-7: deferred monitor write mode (1 bit per node)
28922954
let config_byte = if !data.is_empty() { data[0] } else { 0 };
28932955
let mut harness = Harness::new(config_byte, out, &router);
28942956
let mut read_pos = 1; // First byte was consumed for initial config.
@@ -3300,7 +3362,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
33003362
_ => break 'fuzz_loop,
33013363
}
33023364

3303-
harness.refresh_serialized_managers();
3365+
harness.checkpoint_manager_persistences();
33043366
}
33053367
harness.finish();
33063368
}

0 commit comments

Comments
 (0)