Skip to content

Commit 20275f6

Browse files
committed
fuzz: add deferred chanmon checkpoints
Enable deferred ChainMonitor writes in chanmon_consistency. Checkpoint the ChannelManager before flushing captured monitor writes. Treat checkpoint-only progress as progress during settle_all.
1 parent e0380b1 commit 20275f6

1 file changed

Lines changed: 116 additions & 38 deletions

File tree

fuzz/src/chanmon_consistency.rs

Lines changed: 116 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -710,12 +710,14 @@ struct HarnessNode<'a> {
710710
node: ChanMan<'a>,
711711
monitor: Arc<TestChainMonitor>,
712712
persister: Arc<HarnessPersister>,
713+
monitor_logger: Arc<dyn Logger>,
713714
keys_manager: Arc<KeyProvider>,
714715
logger: Arc<dyn Logger + MaybeSend + MaybeSync>,
715716
broadcaster: Arc<TestBroadcaster>,
716717
fee_estimator: Arc<FuzzEstimator>,
717718
wallet: TestWalletSource,
718719
persistence_style: ChannelMonitorUpdateStatus,
720+
deferred: bool,
719721
serialized_manager: Vec<u8>,
720722
height: u32,
721723
last_htlc_clear_fee: u32,
@@ -749,7 +751,7 @@ impl<'a> HarnessNode<'a> {
749751
fn build_chain_monitor(
750752
broadcaster: &Arc<TestBroadcaster>, fee_estimator: &Arc<FuzzEstimator>,
751753
keys_manager: &Arc<KeyProvider>, logger_for_monitor: Arc<dyn Logger>,
752-
persister: &Arc<HarnessPersister>,
754+
persister: &Arc<HarnessPersister>, deferred: bool,
753755
) -> Arc<TestChainMonitor> {
754756
Arc::new(chainmonitor::ChainMonitor::new(
755757
None,
@@ -759,14 +761,14 @@ impl<'a> HarnessNode<'a> {
759761
Arc::clone(persister),
760762
Arc::clone(keys_manager),
761763
keys_manager.get_peer_storage_key(),
762-
false,
764+
deferred,
763765
))
764766
}
765767

766768
fn new<Out: Output + MaybeSend + MaybeSync>(
767769
node_id: u8, wallet: TestWalletSource, fee_estimator: Arc<FuzzEstimator>,
768770
broadcaster: Arc<TestBroadcaster>, persistence_style: ChannelMonitorUpdateStatus,
769-
out: &Out, router: &'a FuzzRouter, chan_type: ChanType,
771+
deferred: bool, out: &Out, router: &'a FuzzRouter, chan_type: ChanType,
770772
) -> Self {
771773
let (logger_for_monitor, logger) = Self::build_loggers(node_id, out);
772774
let node_secret = SecretKey::from_slice(&[
@@ -784,8 +786,9 @@ impl<'a> HarnessNode<'a> {
784786
&broadcaster,
785787
&fee_estimator,
786788
&keys_manager,
787-
logger_for_monitor,
789+
Arc::clone(&logger_for_monitor),
788790
&persister,
791+
deferred,
789792
);
790793
let network = Network::Bitcoin;
791794
let best_block_timestamp = genesis_block(network).header.time;
@@ -809,12 +812,14 @@ impl<'a> HarnessNode<'a> {
809812
node,
810813
monitor,
811814
persister,
815+
monitor_logger: logger_for_monitor,
812816
keys_manager,
813817
logger,
814818
broadcaster,
815819
fee_estimator,
816820
wallet,
817821
persistence_style,
822+
deferred,
818823
serialized_manager: Vec::new(),
819824
height: 0,
820825
last_htlc_clear_fee: 253,
@@ -830,19 +835,25 @@ impl<'a> HarnessNode<'a> {
830835
self.persister.mark_update_completed(chan_id, monitor_id, data);
831836
}
832837

833-
fn complete_all_monitor_updates(&self, chan_id: &ChannelId) {
834-
for (monitor_id, data) in self.persister.drain_pending_updates(chan_id) {
838+
fn complete_all_monitor_updates(&self, chan_id: &ChannelId) -> bool {
839+
assert_eq!(self.monitor.pending_operation_count(), 0);
840+
let completed_updates = self.persister.drain_pending_updates(chan_id);
841+
let completed_any = !completed_updates.is_empty();
842+
for (monitor_id, data) in completed_updates {
835843
self.finish_monitor_update(*chan_id, monitor_id, data);
836844
}
845+
completed_any
837846
}
838847

839848
fn complete_all_pending_monitor_updates(&self) {
849+
assert_eq!(self.monitor.pending_operation_count(), 0);
840850
for (channel_id, monitor_id, data) in self.persister.drain_all_pending_updates() {
841851
self.finish_monitor_update(channel_id, monitor_id, data);
842852
}
843853
}
844854

845855
fn complete_monitor_update(&self, chan_id: &ChannelId, selector: MonitorUpdateSelector) {
856+
assert_eq!(self.monitor.pending_operation_count(), 0);
846857
if let Some((monitor_id, data)) = self.persister.take_pending_update(chan_id, selector) {
847858
self.finish_monitor_update(*chan_id, monitor_id, data);
848859
}
@@ -866,9 +877,30 @@ impl<'a> HarnessNode<'a> {
866877
}
867878
}
868879

869-
fn refresh_serialized_manager(&mut self) {
880+
fn checkpoint_manager_persistence(&mut self) -> bool {
870881
if self.node.get_and_clear_needs_persistence() {
882+
let pending_monitor_writes = self.monitor.pending_operation_count();
871883
self.serialized_manager = self.node.encode();
884+
if self.deferred {
885+
self.monitor.flush(pending_monitor_writes, &self.monitor_logger);
886+
} else {
887+
assert_eq!(pending_monitor_writes, 0);
888+
}
889+
true
890+
} else {
891+
assert_eq!(self.monitor.pending_operation_count(), 0);
892+
false
893+
}
894+
}
895+
896+
fn force_checkpoint_manager_persistence(&mut self) {
897+
let pending_monitor_writes = self.monitor.pending_operation_count();
898+
self.serialized_manager = self.node.encode();
899+
self.node.get_and_clear_needs_persistence();
900+
if self.deferred {
901+
self.monitor.flush(pending_monitor_writes, &self.monitor_logger);
902+
} else {
903+
assert_eq!(pending_monitor_writes, 0);
872904
}
873905
}
874906

@@ -978,8 +1010,9 @@ impl<'a> HarnessNode<'a> {
9781010
&self.broadcaster,
9791011
&self.fee_estimator,
9801012
&self.keys_manager,
981-
logger_for_monitor,
1013+
Arc::clone(&logger_for_monitor),
9821014
&persister,
1015+
self.deferred,
9831016
);
9841017

9851018
let mut monitors = new_hash_map();
@@ -1038,18 +1071,28 @@ impl<'a> HarnessNode<'a> {
10381071
channel_monitors: monitor_refs,
10391072
};
10401073

1041-
let manager = <(BlockLocator, ChanMan)>::read(&mut &self.serialized_manager[..], read_args)
1042-
.expect("Failed to read manager");
1074+
let (_block_locator, manager) =
1075+
<(BlockLocator, ChanMan)>::read(&mut &self.serialized_manager[..], read_args)
1076+
.expect("Failed to read manager");
1077+
let expected_status = if self.deferred {
1078+
ChannelMonitorUpdateStatus::InProgress
1079+
} else {
1080+
ChannelMonitorUpdateStatus::Completed
1081+
};
10431082
for (channel_id, mon) in monitors.drain() {
1044-
assert_eq!(
1045-
chain_monitor.watch_channel(channel_id, mon),
1046-
Ok(ChannelMonitorUpdateStatus::Completed)
1047-
);
1083+
assert_eq!(chain_monitor.watch_channel(channel_id, mon), Ok(expected_status));
10481084
}
10491085
*persister.update_ret.lock().unwrap() = self.persistence_style;
1050-
self.node = manager.1;
1086+
if self.deferred {
1087+
let count = chain_monitor.pending_operation_count();
1088+
self.serialized_manager = manager.encode();
1089+
manager.get_and_clear_needs_persistence();
1090+
chain_monitor.flush(count, &logger_for_monitor);
1091+
}
1092+
self.node = manager;
10511093
self.monitor = chain_monitor;
10521094
self.persister = persister;
1095+
self.monitor_logger = logger_for_monitor;
10531096
self.logger = logger;
10541097
}
10551098
}
@@ -1265,11 +1308,13 @@ impl PeerLink {
12651308
|| (self.node_a == node_b && self.node_b == node_a)
12661309
}
12671310

1268-
fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) {
1311+
fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) -> bool {
1312+
let mut completed_updates = false;
12691313
for id in &self.channel_ids {
1270-
nodes[self.node_a].complete_all_monitor_updates(id);
1271-
nodes[self.node_b].complete_all_monitor_updates(id);
1314+
completed_updates |= nodes[self.node_a].complete_all_monitor_updates(id);
1315+
completed_updates |= nodes[self.node_b].complete_all_monitor_updates(id);
12721316
}
1317+
completed_updates
12731318
}
12741319

12751320
fn complete_monitor_updates_for_node(
@@ -1839,10 +1884,24 @@ fn connect_peers(source: &ChanMan<'_>, dest: &ChanMan<'_>) {
18391884
dest.peer_connected(source.get_our_node_id(), &init_src, false).unwrap();
18401885
}
18411886

1887+
fn get_two_nodes_mut<'a, 'b>(
1888+
nodes: &'b mut [HarnessNode<'a>; 3], first_idx: usize, second_idx: usize,
1889+
) -> (&'b mut HarnessNode<'a>, &'b mut HarnessNode<'a>) {
1890+
assert_ne!(first_idx, second_idx);
1891+
if first_idx < second_idx {
1892+
let (left, right) = nodes.split_at_mut(second_idx);
1893+
(&mut left[first_idx], &mut right[0])
1894+
} else {
1895+
let (left, right) = nodes.split_at_mut(first_idx);
1896+
(&mut right[0], &mut left[second_idx])
1897+
}
1898+
}
1899+
18421900
fn make_channel(
1843-
source: &HarnessNode<'_>, dest: &HarnessNode<'_>, chan_id: i32, trusted_open: bool,
1844-
trusted_accept: bool, chain_state: &mut ChainState,
1901+
nodes: &mut [HarnessNode<'_>; 3], source_idx: usize, dest_idx: usize, chan_id: i32,
1902+
trusted_open: bool, trusted_accept: bool, chain_state: &mut ChainState,
18451903
) {
1904+
let (source, dest) = get_two_nodes_mut(nodes, source_idx, dest_idx);
18461905
if trusted_open {
18471906
source
18481907
.create_channel_to_trusted_peer_0reserve(
@@ -1953,7 +2012,8 @@ fn make_channel(
19532012
}
19542013
};
19552014
dest.handle_funding_created(source.get_our_node_id(), &funding_created);
1956-
// Complete any pending monitor updates for dest after watch_channel.
2015+
dest.checkpoint_manager_persistence();
2016+
// Complete any flushed monitor updates for dest after watch_channel.
19572017
dest.complete_all_pending_monitor_updates();
19582018

19592019
let (funding_signed, channel_id) = {
@@ -1974,7 +2034,8 @@ fn make_channel(
19742034
}
19752035

19762036
source.handle_funding_signed(dest.get_our_node_id(), &funding_signed);
1977-
// Complete any pending monitor updates for source after watch_channel.
2037+
source.checkpoint_manager_persistence();
2038+
// Complete any flushed monitor updates for source after watch_channel.
19782039
source.complete_all_pending_monitor_updates();
19792040

19802041
let events = source.get_and_clear_pending_events();
@@ -2046,6 +2107,11 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
20462107
ChannelMonitorUpdateStatus::Completed
20472108
},
20482109
];
2110+
let deferred = [
2111+
config_byte & 0b0010_0000 != 0,
2112+
config_byte & 0b0100_0000 != 0,
2113+
config_byte & 0b1000_0000 != 0,
2114+
];
20492115

20502116
let wallet_a = TestWalletSource::new(SecretKey::from_slice(&[1; 32]).unwrap());
20512117
let wallet_b = TestWalletSource::new(SecretKey::from_slice(&[2; 32]).unwrap());
@@ -2083,6 +2149,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
20832149
Arc::clone(&fee_est_a),
20842150
Arc::clone(&broadcast_a),
20852151
persistence_styles[0],
2152+
deferred[0],
20862153
&out,
20872154
router,
20882155
chan_type,
@@ -2093,6 +2160,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
20932160
Arc::clone(&fee_est_b),
20942161
Arc::clone(&broadcast_b),
20952162
persistence_styles[1],
2163+
deferred[1],
20962164
&out,
20972165
router,
20982166
chan_type,
@@ -2103,6 +2171,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
21032171
Arc::clone(&fee_est_c),
21042172
Arc::clone(&broadcast_c),
21052173
persistence_styles[2],
2174+
deferred[2],
21062175
&out,
21072176
router,
21082177
chan_type,
@@ -2120,14 +2189,14 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
21202189
// channel gets its own txid and funding outpoint.
21212190
// A-B: channel 2 A and B have 0-reserve (trusted open + trusted accept),
21222191
// channel 3 A has 0-reserve (trusted accept).
2123-
make_channel(&nodes[0], &nodes[1], 1, false, false, &mut chain_state);
2124-
make_channel(&nodes[0], &nodes[1], 2, true, true, &mut chain_state);
2125-
make_channel(&nodes[0], &nodes[1], 3, false, true, &mut chain_state);
2192+
make_channel(&mut nodes, 0, 1, 1, false, false, &mut chain_state);
2193+
make_channel(&mut nodes, 0, 1, 2, true, true, &mut chain_state);
2194+
make_channel(&mut nodes, 0, 1, 3, false, true, &mut chain_state);
21262195
// B-C: channel 4 B has 0-reserve (via trusted accept),
21272196
// channel 5 C has 0-reserve (via trusted open).
2128-
make_channel(&nodes[1], &nodes[2], 4, false, true, &mut chain_state);
2129-
make_channel(&nodes[1], &nodes[2], 5, true, false, &mut chain_state);
2130-
make_channel(&nodes[1], &nodes[2], 6, false, false, &mut chain_state);
2197+
make_channel(&mut nodes, 1, 2, 4, false, true, &mut chain_state);
2198+
make_channel(&mut nodes, 1, 2, 5, true, false, &mut chain_state);
2199+
make_channel(&mut nodes, 1, 2, 6, false, false, &mut chain_state);
21312200

21322201
// Wipe the transactions-broadcasted set to make sure we don't broadcast
21332202
// any transactions during normal operation after setup.
@@ -2154,7 +2223,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
21542223
};
21552224

21562225
for node in &mut nodes {
2157-
node.serialized_manager = node.encode();
2226+
node.force_checkpoint_manager_persistence();
21582227
}
21592228

21602229
Self {
@@ -2574,7 +2643,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
25742643
// claim/fail handling per event batch.
25752644
let mut claim_set = new_hash_map();
25762645
let mut events = nodes[node_idx].get_and_clear_pending_events();
2577-
let had_events = !events.is_empty();
2646+
let mut had_events = !events.is_empty();
25782647
for event in events.drain(..) {
25792648
match event {
25802649
events::Event::PaymentClaimable { payment_hash, .. } => {
@@ -2630,6 +2699,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
26302699
}
26312700
while nodes[node_idx].needs_pending_htlc_processing() {
26322701
nodes[node_idx].process_pending_htlc_forwards();
2702+
had_events = true;
26332703
}
26342704
had_events
26352705
}
@@ -2649,12 +2719,13 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
26492719
for i in 0..std::usize::MAX {
26502720
if i == 100 {
26512721
panic!(
2652-
"It may take may iterations to settle the state, but it should not take forever"
2653-
);
2722+
"It may take may iterations to settle the state, but it should not take forever"
2723+
);
26542724
}
2725+
let mut made_progress = self.checkpoint_manager_persistences();
26552726
// Next, make sure no monitor updates are pending.
2656-
self.ab_link.complete_all_monitor_updates(&self.nodes);
2657-
self.bc_link.complete_all_monitor_updates(&self.nodes);
2727+
made_progress |= self.ab_link.complete_all_monitor_updates(&self.nodes);
2728+
made_progress |= self.bc_link.complete_all_monitor_updates(&self.nodes);
26582729
// Then, make sure any current forwards make their way to their destination.
26592730
if self.process_msg_events(0, false, ProcessMessages::AllMessages) {
26602731
last_pass_no_updates = false;
@@ -2681,6 +2752,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
26812752
last_pass_no_updates = false;
26822753
continue;
26832754
}
2755+
if made_progress {
2756+
last_pass_no_updates = false;
2757+
continue;
2758+
}
26842759
if last_pass_no_updates {
26852760
// In some cases, we may generate a message to send in
26862761
// `process_msg_events`, but block sending until
@@ -2779,19 +2854,22 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
27792854
self.nodes[2].record_last_htlc_clear_fee();
27802855
}
27812856

2782-
fn refresh_serialized_managers(&mut self) {
2857+
fn checkpoint_manager_persistences(&mut self) -> bool {
2858+
let mut made_progress = false;
27832859
for node in &mut self.nodes {
2784-
node.refresh_serialized_manager();
2860+
made_progress |= node.checkpoint_manager_persistence();
27852861
}
2862+
made_progress
27862863
}
27872864
}
27882865

27892866
#[inline]
27902867
pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
27912868
let router = FuzzRouter {};
2792-
// Read initial monitor styles and channel type from fuzz input byte 0:
2869+
// Read initial monitor styles, channel type, and deferred write mode from fuzz input byte 0:
27932870
// bits 0-2: monitor styles (1 bit per node)
27942871
// bits 3-4: channel type (0=Legacy, 1=KeyedAnchors, 2=ZeroFeeCommitments)
2872+
// bits 5-7: deferred monitor write mode (1 bit per node)
27952873
let config_byte = if !data.is_empty() { data[0] } else { 0 };
27962874
let mut harness = Harness::new(config_byte, out, &router);
27972875
let mut read_pos = 1; // First byte was consumed for initial config.
@@ -3203,7 +3281,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
32033281
_ => break 'fuzz_loop,
32043282
}
32053283

3206-
harness.refresh_serialized_managers();
3284+
harness.checkpoint_manager_persistences();
32073285
}
32083286
harness.finish();
32093287
}

0 commit comments

Comments
 (0)