Skip to content

Commit af7e8ee

Browse files
committed
fuzz: add deferred chanmon checkpoints
Enable deferred ChainMonitor writes in chanmon_consistency. Checkpoint the ChannelManager before flushing captured monitor writes. Treat checkpoint-only progress as progress during settle_all.
1 parent 9a3cefc commit af7e8ee

1 file changed

Lines changed: 116 additions & 38 deletions

File tree

fuzz/src/chanmon_consistency.rs

Lines changed: 116 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -741,12 +741,14 @@ struct HarnessNode<'a> {
741741
node: ChanMan<'a>,
742742
monitor: Arc<TestChainMonitor>,
743743
persister: Arc<HarnessPersister>,
744+
monitor_logger: Arc<dyn Logger>,
744745
keys_manager: Arc<KeyProvider>,
745746
logger: Arc<dyn Logger + MaybeSend + MaybeSync>,
746747
broadcaster: Arc<TestBroadcaster>,
747748
fee_estimator: Arc<FuzzEstimator>,
748749
wallet: TestWalletSource,
749750
persistence_style: ChannelMonitorUpdateStatus,
751+
deferred: bool,
750752
serialized_manager: Vec<u8>,
751753
height: u32,
752754
last_htlc_clear_fee: u32,
@@ -780,7 +782,7 @@ impl<'a> HarnessNode<'a> {
780782
fn build_chain_monitor(
781783
broadcaster: &Arc<TestBroadcaster>, fee_estimator: &Arc<FuzzEstimator>,
782784
keys_manager: &Arc<KeyProvider>, logger_for_monitor: Arc<dyn Logger>,
783-
persister: &Arc<HarnessPersister>,
785+
persister: &Arc<HarnessPersister>, deferred: bool,
784786
) -> Arc<TestChainMonitor> {
785787
Arc::new(chainmonitor::ChainMonitor::new(
786788
None,
@@ -790,14 +792,14 @@ impl<'a> HarnessNode<'a> {
790792
Arc::clone(persister),
791793
Arc::clone(keys_manager),
792794
keys_manager.get_peer_storage_key(),
793-
false,
795+
deferred,
794796
))
795797
}
796798

797799
fn new<Out: Output + MaybeSend + MaybeSync>(
798800
node_id: u8, wallet: TestWalletSource, fee_estimator: Arc<FuzzEstimator>,
799801
broadcaster: Arc<TestBroadcaster>, persistence_style: ChannelMonitorUpdateStatus,
800-
out: &Out, router: &'a FuzzRouter, chan_type: ChanType,
802+
deferred: bool, out: &Out, router: &'a FuzzRouter, chan_type: ChanType,
801803
) -> Self {
802804
let (logger_for_monitor, logger) = Self::build_loggers(node_id, out);
803805
let node_secret = SecretKey::from_slice(&[
@@ -815,8 +817,9 @@ impl<'a> HarnessNode<'a> {
815817
&broadcaster,
816818
&fee_estimator,
817819
&keys_manager,
818-
logger_for_monitor,
820+
Arc::clone(&logger_for_monitor),
819821
&persister,
822+
deferred,
820823
);
821824
let network = Network::Bitcoin;
822825
let best_block_timestamp = genesis_block(network).header.time;
@@ -840,12 +843,14 @@ impl<'a> HarnessNode<'a> {
840843
node,
841844
monitor,
842845
persister,
846+
monitor_logger: logger_for_monitor,
843847
keys_manager,
844848
logger,
845849
broadcaster,
846850
fee_estimator,
847851
wallet,
848852
persistence_style,
853+
deferred,
849854
serialized_manager: Vec::new(),
850855
height: 0,
851856
last_htlc_clear_fee: 253,
@@ -861,19 +866,25 @@ impl<'a> HarnessNode<'a> {
861866
self.persister.mark_update_completed(chan_id, monitor_id, data);
862867
}
863868

864-
fn complete_all_monitor_updates(&self, chan_id: &ChannelId) {
865-
for (monitor_id, data) in self.persister.drain_pending_updates(chan_id) {
869+
fn complete_all_monitor_updates(&self, chan_id: &ChannelId) -> bool {
870+
assert_eq!(self.monitor.pending_operation_count(), 0);
871+
let completed_updates = self.persister.drain_pending_updates(chan_id);
872+
let completed_any = !completed_updates.is_empty();
873+
for (monitor_id, data) in completed_updates {
866874
self.finish_monitor_update(*chan_id, monitor_id, data);
867875
}
876+
completed_any
868877
}
869878

870879
fn complete_all_pending_monitor_updates(&self) {
880+
assert_eq!(self.monitor.pending_operation_count(), 0);
871881
for (channel_id, monitor_id, data) in self.persister.drain_all_pending_updates() {
872882
self.finish_monitor_update(channel_id, monitor_id, data);
873883
}
874884
}
875885

876886
fn complete_monitor_update(&self, chan_id: &ChannelId, selector: MonitorUpdateSelector) {
887+
assert_eq!(self.monitor.pending_operation_count(), 0);
877888
if let Some((monitor_id, data)) = self.persister.take_pending_update(chan_id, selector) {
878889
self.finish_monitor_update(*chan_id, monitor_id, data);
879890
}
@@ -897,9 +908,30 @@ impl<'a> HarnessNode<'a> {
897908
}
898909
}
899910

900-
fn refresh_serialized_manager(&mut self) {
911+
fn checkpoint_manager_persistence(&mut self) -> bool {
901912
if self.node.get_and_clear_needs_persistence() {
913+
let pending_monitor_writes = self.monitor.pending_operation_count();
902914
self.serialized_manager = self.node.encode();
915+
if self.deferred {
916+
self.monitor.flush(pending_monitor_writes, &self.monitor_logger);
917+
} else {
918+
assert_eq!(pending_monitor_writes, 0);
919+
}
920+
true
921+
} else {
922+
assert_eq!(self.monitor.pending_operation_count(), 0);
923+
false
924+
}
925+
}
926+
927+
fn force_checkpoint_manager_persistence(&mut self) {
928+
let pending_monitor_writes = self.monitor.pending_operation_count();
929+
self.serialized_manager = self.node.encode();
930+
self.node.get_and_clear_needs_persistence();
931+
if self.deferred {
932+
self.monitor.flush(pending_monitor_writes, &self.monitor_logger);
933+
} else {
934+
assert_eq!(pending_monitor_writes, 0);
903935
}
904936
}
905937

@@ -1009,8 +1041,9 @@ impl<'a> HarnessNode<'a> {
10091041
&self.broadcaster,
10101042
&self.fee_estimator,
10111043
&self.keys_manager,
1012-
logger_for_monitor,
1044+
Arc::clone(&logger_for_monitor),
10131045
&persister,
1046+
self.deferred,
10141047
);
10151048

10161049
let mut monitors = new_hash_map();
@@ -1070,18 +1103,28 @@ impl<'a> HarnessNode<'a> {
10701103
channel_monitors: monitor_refs,
10711104
};
10721105

1073-
let manager = <(BlockLocator, ChanMan)>::read(&mut &self.serialized_manager[..], read_args)
1074-
.expect("Failed to read manager");
1106+
let (_block_locator, manager) =
1107+
<(BlockLocator, ChanMan)>::read(&mut &self.serialized_manager[..], read_args)
1108+
.expect("Failed to read manager");
1109+
let expected_status = if self.deferred {
1110+
ChannelMonitorUpdateStatus::InProgress
1111+
} else {
1112+
ChannelMonitorUpdateStatus::Completed
1113+
};
10751114
for (channel_id, mon) in monitors.drain() {
1076-
assert_eq!(
1077-
chain_monitor.watch_channel(channel_id, mon),
1078-
Ok(ChannelMonitorUpdateStatus::Completed)
1079-
);
1115+
assert_eq!(chain_monitor.watch_channel(channel_id, mon), Ok(expected_status));
10801116
}
10811117
*persister.update_ret.lock().unwrap() = self.persistence_style;
1082-
self.node = manager.1;
1118+
if self.deferred {
1119+
let count = chain_monitor.pending_operation_count();
1120+
self.serialized_manager = manager.encode();
1121+
manager.get_and_clear_needs_persistence();
1122+
chain_monitor.flush(count, &logger_for_monitor);
1123+
}
1124+
self.node = manager;
10831125
self.monitor = chain_monitor;
10841126
self.persister = persister;
1127+
self.monitor_logger = logger_for_monitor;
10851128
self.logger = logger;
10861129
}
10871130
}
@@ -1297,11 +1340,13 @@ impl PeerLink {
12971340
|| (self.node_a == node_b && self.node_b == node_a)
12981341
}
12991342

1300-
fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) {
1343+
fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) -> bool {
1344+
let mut completed_updates = false;
13011345
for id in &self.channel_ids {
1302-
nodes[self.node_a].complete_all_monitor_updates(id);
1303-
nodes[self.node_b].complete_all_monitor_updates(id);
1346+
completed_updates |= nodes[self.node_a].complete_all_monitor_updates(id);
1347+
completed_updates |= nodes[self.node_b].complete_all_monitor_updates(id);
13041348
}
1349+
completed_updates
13051350
}
13061351

13071352
fn complete_monitor_updates_for_node(
@@ -1871,10 +1916,24 @@ fn connect_peers(source: &ChanMan<'_>, dest: &ChanMan<'_>) {
18711916
dest.peer_connected(source.get_our_node_id(), &init_src, false).unwrap();
18721917
}
18731918

1919+
fn get_two_nodes_mut<'a, 'b>(
1920+
nodes: &'b mut [HarnessNode<'a>; 3], first_idx: usize, second_idx: usize,
1921+
) -> (&'b mut HarnessNode<'a>, &'b mut HarnessNode<'a>) {
1922+
assert_ne!(first_idx, second_idx);
1923+
if first_idx < second_idx {
1924+
let (left, right) = nodes.split_at_mut(second_idx);
1925+
(&mut left[first_idx], &mut right[0])
1926+
} else {
1927+
let (left, right) = nodes.split_at_mut(first_idx);
1928+
(&mut right[0], &mut left[second_idx])
1929+
}
1930+
}
1931+
18741932
fn make_channel(
1875-
source: &HarnessNode<'_>, dest: &HarnessNode<'_>, chan_id: i32, trusted_open: bool,
1876-
trusted_accept: bool, chain_state: &mut ChainState,
1933+
nodes: &mut [HarnessNode<'_>; 3], source_idx: usize, dest_idx: usize, chan_id: i32,
1934+
trusted_open: bool, trusted_accept: bool, chain_state: &mut ChainState,
18771935
) {
1936+
let (source, dest) = get_two_nodes_mut(nodes, source_idx, dest_idx);
18781937
if trusted_open {
18791938
source
18801939
.create_channel_to_trusted_peer_0reserve(
@@ -1985,7 +2044,8 @@ fn make_channel(
19852044
}
19862045
};
19872046
dest.handle_funding_created(source.get_our_node_id(), &funding_created);
1988-
// Complete any pending monitor updates for dest after watch_channel.
2047+
dest.checkpoint_manager_persistence();
2048+
// Complete any flushed monitor updates for dest after watch_channel.
19892049
dest.complete_all_pending_monitor_updates();
19902050

19912051
let (funding_signed, channel_id) = {
@@ -2006,7 +2066,8 @@ fn make_channel(
20062066
}
20072067

20082068
source.handle_funding_signed(dest.get_our_node_id(), &funding_signed);
2009-
// Complete any pending monitor updates for source after watch_channel.
2069+
source.checkpoint_manager_persistence();
2070+
// Complete any flushed monitor updates for source after watch_channel.
20102071
source.complete_all_pending_monitor_updates();
20112072

20122073
let events = source.get_and_clear_pending_events();
@@ -2078,6 +2139,11 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
20782139
ChannelMonitorUpdateStatus::Completed
20792140
},
20802141
];
2142+
let deferred = [
2143+
config_byte & 0b0010_0000 != 0,
2144+
config_byte & 0b0100_0000 != 0,
2145+
config_byte & 0b1000_0000 != 0,
2146+
];
20812147

20822148
let wallet_a = TestWalletSource::new(SecretKey::from_slice(&[1; 32]).unwrap());
20832149
let wallet_b = TestWalletSource::new(SecretKey::from_slice(&[2; 32]).unwrap());
@@ -2115,6 +2181,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
21152181
Arc::clone(&fee_est_a),
21162182
Arc::clone(&broadcast_a),
21172183
persistence_styles[0],
2184+
deferred[0],
21182185
&out,
21192186
router,
21202187
chan_type,
@@ -2125,6 +2192,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
21252192
Arc::clone(&fee_est_b),
21262193
Arc::clone(&broadcast_b),
21272194
persistence_styles[1],
2195+
deferred[1],
21282196
&out,
21292197
router,
21302198
chan_type,
@@ -2135,6 +2203,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
21352203
Arc::clone(&fee_est_c),
21362204
Arc::clone(&broadcast_c),
21372205
persistence_styles[2],
2206+
deferred[2],
21382207
&out,
21392208
router,
21402209
chan_type,
@@ -2152,14 +2221,14 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
21522221
// channel gets its own txid and funding outpoint.
21532222
// A-B: channel 2 A and B have 0-reserve (trusted open + trusted accept),
21542223
// channel 3 A has 0-reserve (trusted accept).
2155-
make_channel(&nodes[0], &nodes[1], 1, false, false, &mut chain_state);
2156-
make_channel(&nodes[0], &nodes[1], 2, true, true, &mut chain_state);
2157-
make_channel(&nodes[0], &nodes[1], 3, false, true, &mut chain_state);
2224+
make_channel(&mut nodes, 0, 1, 1, false, false, &mut chain_state);
2225+
make_channel(&mut nodes, 0, 1, 2, true, true, &mut chain_state);
2226+
make_channel(&mut nodes, 0, 1, 3, false, true, &mut chain_state);
21582227
// B-C: channel 4 B has 0-reserve (via trusted accept),
21592228
// channel 5 C has 0-reserve (via trusted open).
2160-
make_channel(&nodes[1], &nodes[2], 4, false, true, &mut chain_state);
2161-
make_channel(&nodes[1], &nodes[2], 5, true, false, &mut chain_state);
2162-
make_channel(&nodes[1], &nodes[2], 6, false, false, &mut chain_state);
2229+
make_channel(&mut nodes, 1, 2, 4, false, true, &mut chain_state);
2230+
make_channel(&mut nodes, 1, 2, 5, true, false, &mut chain_state);
2231+
make_channel(&mut nodes, 1, 2, 6, false, false, &mut chain_state);
21632232

21642233
// Wipe the transactions-broadcasted set to make sure we don't broadcast
21652234
// any transactions during normal operation after setup.
@@ -2186,7 +2255,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
21862255
};
21872256

21882257
for node in &mut nodes {
2189-
node.serialized_manager = node.encode();
2258+
node.force_checkpoint_manager_persistence();
21902259
}
21912260

21922261
Self {
@@ -2606,7 +2675,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
26062675
// claim/fail handling per event batch.
26072676
let mut claim_set = new_hash_map();
26082677
let mut events = nodes[node_idx].get_and_clear_pending_events();
2609-
let had_events = !events.is_empty();
2678+
let mut had_events = !events.is_empty();
26102679
for event in events.drain(..) {
26112680
match event {
26122681
events::Event::PaymentClaimable { payment_hash, .. } => {
@@ -2662,6 +2731,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
26622731
}
26632732
while nodes[node_idx].needs_pending_htlc_processing() {
26642733
nodes[node_idx].process_pending_htlc_forwards();
2734+
had_events = true;
26652735
}
26662736
had_events
26672737
}
@@ -2681,12 +2751,13 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
26812751
for i in 0..std::usize::MAX {
26822752
if i == 100 {
26832753
panic!(
2684-
"It may take may iterations to settle the state, but it should not take forever"
2685-
);
2754+
"It may take may iterations to settle the state, but it should not take forever"
2755+
);
26862756
}
2757+
let mut made_progress = self.checkpoint_manager_persistences();
26872758
// Next, make sure no monitor updates are pending.
2688-
self.ab_link.complete_all_monitor_updates(&self.nodes);
2689-
self.bc_link.complete_all_monitor_updates(&self.nodes);
2759+
made_progress |= self.ab_link.complete_all_monitor_updates(&self.nodes);
2760+
made_progress |= self.bc_link.complete_all_monitor_updates(&self.nodes);
26902761
// Then, make sure any current forwards make their way to their destination.
26912762
if self.process_msg_events(0, false, ProcessMessages::AllMessages) {
26922763
last_pass_no_updates = false;
@@ -2713,6 +2784,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
27132784
last_pass_no_updates = false;
27142785
continue;
27152786
}
2787+
if made_progress {
2788+
last_pass_no_updates = false;
2789+
continue;
2790+
}
27162791
if last_pass_no_updates {
27172792
// In some cases, we may generate a message to send in
27182793
// `process_msg_events`, but block sending until
@@ -2811,19 +2886,22 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
28112886
self.nodes[2].record_last_htlc_clear_fee();
28122887
}
28132888

2814-
fn refresh_serialized_managers(&mut self) {
2889+
fn checkpoint_manager_persistences(&mut self) -> bool {
2890+
let mut made_progress = false;
28152891
for node in &mut self.nodes {
2816-
node.refresh_serialized_manager();
2892+
made_progress |= node.checkpoint_manager_persistence();
28172893
}
2894+
made_progress
28182895
}
28192896
}
28202897

28212898
#[inline]
28222899
pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
28232900
let router = FuzzRouter {};
2824-
// Read initial monitor styles and channel type from fuzz input byte 0:
2901+
// Read initial monitor styles, channel type, and deferred write mode from fuzz input byte 0:
28252902
// bits 0-2: monitor styles (1 bit per node)
28262903
// bits 3-4: channel type (0=Legacy, 1=KeyedAnchors, 2=ZeroFeeCommitments)
2904+
// bits 5-7: deferred monitor write mode (1 bit per node)
28272905
let config_byte = if !data.is_empty() { data[0] } else { 0 };
28282906
let mut harness = Harness::new(config_byte, out, &router);
28292907
let mut read_pos = 1; // First byte was consumed for initial config.
@@ -3235,7 +3313,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
32353313
_ => break 'fuzz_loop,
32363314
}
32373315

3238-
harness.refresh_serialized_managers();
3316+
harness.checkpoint_manager_persistences();
32393317
}
32403318
harness.finish();
32413319
}

0 commit comments

Comments
 (0)