Skip to content

Commit b56258d

Browse files
joostjagerclaude
andcommitted
Extract watch_channel_internal/update_channel_internal from Watch impl
Pure refactor: move the bodies of Watch::watch_channel and Watch::update_channel into pub(crate) methods on ChainMonitor, and have the Watch trait methods delegate to them. This prepares for adding deferred mode where the Watch methods will conditionally queue operations instead of executing them immediately. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 57ff0c7 commit b56258d

1 file changed

Lines changed: 156 additions & 144 deletions

File tree

lightning/src/chain/chainmonitor.rs

Lines changed: 156 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,6 +1058,160 @@ where
10581058

10591059
Ok(ChannelMonitorUpdateStatus::Completed)
10601060
}
1061+
1062+
fn watch_channel_internal(
1063+
&self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
1064+
) -> Result<ChannelMonitorUpdateStatus, ()> {
1065+
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
1066+
let mut monitors = self.monitors.write().unwrap();
1067+
let entry = match monitors.entry(channel_id) {
1068+
hash_map::Entry::Occupied(_) => {
1069+
log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present");
1070+
return Err(());
1071+
},
1072+
hash_map::Entry::Vacant(e) => e,
1073+
};
1074+
log_trace!(logger, "Got new ChannelMonitor");
1075+
let update_id = monitor.get_latest_update_id();
1076+
let mut pending_monitor_updates = Vec::new();
1077+
let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor);
1078+
match persist_res {
1079+
ChannelMonitorUpdateStatus::InProgress => {
1080+
log_info!(logger, "Persistence of new ChannelMonitor in progress",);
1081+
pending_monitor_updates.push(update_id);
1082+
},
1083+
ChannelMonitorUpdateStatus::Completed => {
1084+
log_info!(logger, "Persistence of new ChannelMonitor completed",);
1085+
},
1086+
ChannelMonitorUpdateStatus::UnrecoverableError => {
1087+
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
1088+
log_error!(logger, "{}", err_str);
1089+
panic!("{}", err_str);
1090+
},
1091+
}
1092+
if let Some(ref chain_source) = self.chain_source {
1093+
monitor.load_outputs_to_watch(chain_source, &self.logger);
1094+
}
1095+
entry.insert(MonitorHolder {
1096+
monitor,
1097+
pending_monitor_updates: Mutex::new(pending_monitor_updates),
1098+
});
1099+
Ok(persist_res)
1100+
}
1101+
1102+
fn update_channel_internal(
1103+
&self, channel_id: ChannelId, update: &ChannelMonitorUpdate,
1104+
) -> ChannelMonitorUpdateStatus {
1105+
// `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those
1106+
// versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`.
1107+
debug_assert_eq!(update.channel_id.unwrap(), channel_id);
1108+
// Update the monitor that watches the channel referred to by the given outpoint.
1109+
let monitors = self.monitors.read().unwrap();
1110+
match monitors.get(&channel_id) {
1111+
None => {
1112+
let logger = WithContext::from(&self.logger, None, Some(channel_id), None);
1113+
log_error!(logger, "Failed to update channel monitor: no such monitor registered");
1114+
1115+
// We should never ever trigger this from within ChannelManager. Technically a
1116+
// user could use this object with some proxying in between which makes this
1117+
// possible, but in tests and fuzzing, this should be a panic.
1118+
#[cfg(debug_assertions)]
1119+
panic!("ChannelManager generated a channel update for a channel that was not yet registered!");
1120+
#[cfg(not(debug_assertions))]
1121+
ChannelMonitorUpdateStatus::InProgress
1122+
},
1123+
Some(monitor_state) => {
1124+
let monitor = &monitor_state.monitor;
1125+
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
1126+
log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,);
1127+
1128+
// We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we
1129+
// have well-ordered updates from the users' point of view. See the
1130+
// `pending_monitor_updates` docs for more.
1131+
let mut pending_monitor_updates =
1132+
monitor_state.pending_monitor_updates.lock().unwrap();
1133+
let update_res = monitor.update_monitor(
1134+
update,
1135+
&self.broadcaster,
1136+
&self.fee_estimator,
1137+
&self.logger,
1138+
);
1139+
1140+
let update_id = update.update_id;
1141+
let persist_res = if update_res.is_err() {
1142+
// Even if updating the monitor returns an error, the monitor's state will
1143+
// still be changed. Therefore, we should persist the updated monitor despite the error.
1144+
// We don't want to persist a `monitor_update` which results in a failure to apply later
1145+
// while reading `channel_monitor` with updates from storage. Instead, we should persist
1146+
// the entire `channel_monitor` here.
1147+
log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor");
1148+
self.persister.update_persisted_channel(
1149+
monitor.persistence_key(),
1150+
None,
1151+
monitor,
1152+
)
1153+
} else {
1154+
self.persister.update_persisted_channel(
1155+
monitor.persistence_key(),
1156+
Some(update),
1157+
monitor,
1158+
)
1159+
};
1160+
match persist_res {
1161+
ChannelMonitorUpdateStatus::InProgress => {
1162+
pending_monitor_updates.push(update_id);
1163+
log_debug!(
1164+
logger,
1165+
"Persistence of ChannelMonitorUpdate id {:?} in progress",
1166+
update_id,
1167+
);
1168+
},
1169+
ChannelMonitorUpdateStatus::Completed => {
1170+
log_debug!(
1171+
logger,
1172+
"Persistence of ChannelMonitorUpdate id {:?} completed",
1173+
update_id,
1174+
);
1175+
},
1176+
ChannelMonitorUpdateStatus::UnrecoverableError => {
1177+
// Take the monitors lock for writing so that we poison it and any future
1178+
// operations going forward fail immediately.
1179+
core::mem::drop(pending_monitor_updates);
1180+
core::mem::drop(monitors);
1181+
let _poison = self.monitors.write().unwrap();
1182+
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
1183+
log_error!(logger, "{}", err_str);
1184+
panic!("{}", err_str);
1185+
},
1186+
}
1187+
1188+
// We may need to start monitoring for any alternative funding transactions.
1189+
if let Some(ref chain_source) = self.chain_source {
1190+
for (funding_outpoint, funding_script) in
1191+
update.internal_renegotiated_funding_data()
1192+
{
1193+
log_trace!(
1194+
logger,
1195+
"Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends",
1196+
funding_outpoint
1197+
);
1198+
chain_source.register_tx(&funding_outpoint.txid, &funding_script);
1199+
chain_source.register_output(WatchedOutput {
1200+
block_hash: None,
1201+
outpoint: funding_outpoint,
1202+
script_pubkey: funding_script,
1203+
});
1204+
}
1205+
}
1206+
1207+
if update_res.is_err() {
1208+
ChannelMonitorUpdateStatus::InProgress
1209+
} else {
1210+
persist_res
1211+
}
1212+
},
1213+
}
1214+
}
10611215
}
10621216

10631217
impl<
@@ -1272,155 +1426,13 @@ where
12721426
fn watch_channel(
12731427
&self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
12741428
) -> Result<ChannelMonitorUpdateStatus, ()> {
1275-
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
1276-
let mut monitors = self.monitors.write().unwrap();
1277-
let entry = match monitors.entry(channel_id) {
1278-
hash_map::Entry::Occupied(_) => {
1279-
log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present");
1280-
return Err(());
1281-
},
1282-
hash_map::Entry::Vacant(e) => e,
1283-
};
1284-
log_trace!(logger, "Got new ChannelMonitor");
1285-
let update_id = monitor.get_latest_update_id();
1286-
let mut pending_monitor_updates = Vec::new();
1287-
let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor);
1288-
match persist_res {
1289-
ChannelMonitorUpdateStatus::InProgress => {
1290-
log_info!(logger, "Persistence of new ChannelMonitor in progress",);
1291-
pending_monitor_updates.push(update_id);
1292-
},
1293-
ChannelMonitorUpdateStatus::Completed => {
1294-
log_info!(logger, "Persistence of new ChannelMonitor completed",);
1295-
},
1296-
ChannelMonitorUpdateStatus::UnrecoverableError => {
1297-
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
1298-
log_error!(logger, "{}", err_str);
1299-
panic!("{}", err_str);
1300-
},
1301-
}
1302-
if let Some(ref chain_source) = self.chain_source {
1303-
monitor.load_outputs_to_watch(chain_source, &self.logger);
1304-
}
1305-
entry.insert(MonitorHolder {
1306-
monitor,
1307-
pending_monitor_updates: Mutex::new(pending_monitor_updates),
1308-
});
1309-
Ok(persist_res)
1429+
self.watch_channel_internal(channel_id, monitor)
13101430
}
13111431

13121432
fn update_channel(
13131433
&self, channel_id: ChannelId, update: &ChannelMonitorUpdate,
13141434
) -> ChannelMonitorUpdateStatus {
1315-
// `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those
1316-
// versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`.
1317-
debug_assert_eq!(update.channel_id.unwrap(), channel_id);
1318-
// Update the monitor that watches the channel referred to by the given outpoint.
1319-
let monitors = self.monitors.read().unwrap();
1320-
match monitors.get(&channel_id) {
1321-
None => {
1322-
let logger = WithContext::from(&self.logger, None, Some(channel_id), None);
1323-
log_error!(logger, "Failed to update channel monitor: no such monitor registered");
1324-
1325-
// We should never ever trigger this from within ChannelManager. Technically a
1326-
// user could use this object with some proxying in between which makes this
1327-
// possible, but in tests and fuzzing, this should be a panic.
1328-
#[cfg(debug_assertions)]
1329-
panic!("ChannelManager generated a channel update for a channel that was not yet registered!");
1330-
#[cfg(not(debug_assertions))]
1331-
ChannelMonitorUpdateStatus::InProgress
1332-
},
1333-
Some(monitor_state) => {
1334-
let monitor = &monitor_state.monitor;
1335-
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
1336-
log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,);
1337-
1338-
// We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we
1339-
// have well-ordered updates from the users' point of view. See the
1340-
// `pending_monitor_updates` docs for more.
1341-
let mut pending_monitor_updates =
1342-
monitor_state.pending_monitor_updates.lock().unwrap();
1343-
let update_res = monitor.update_monitor(
1344-
update,
1345-
&self.broadcaster,
1346-
&self.fee_estimator,
1347-
&self.logger,
1348-
);
1349-
1350-
let update_id = update.update_id;
1351-
let persist_res = if update_res.is_err() {
1352-
// Even if updating the monitor returns an error, the monitor's state will
1353-
// still be changed. Therefore, we should persist the updated monitor despite the error.
1354-
// We don't want to persist a `monitor_update` which results in a failure to apply later
1355-
// while reading `channel_monitor` with updates from storage. Instead, we should persist
1356-
// the entire `channel_monitor` here.
1357-
log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor");
1358-
self.persister.update_persisted_channel(
1359-
monitor.persistence_key(),
1360-
None,
1361-
monitor,
1362-
)
1363-
} else {
1364-
self.persister.update_persisted_channel(
1365-
monitor.persistence_key(),
1366-
Some(update),
1367-
monitor,
1368-
)
1369-
};
1370-
match persist_res {
1371-
ChannelMonitorUpdateStatus::InProgress => {
1372-
pending_monitor_updates.push(update_id);
1373-
log_debug!(
1374-
logger,
1375-
"Persistence of ChannelMonitorUpdate id {:?} in progress",
1376-
update_id,
1377-
);
1378-
},
1379-
ChannelMonitorUpdateStatus::Completed => {
1380-
log_debug!(
1381-
logger,
1382-
"Persistence of ChannelMonitorUpdate id {:?} completed",
1383-
update_id,
1384-
);
1385-
},
1386-
ChannelMonitorUpdateStatus::UnrecoverableError => {
1387-
// Take the monitors lock for writing so that we poison it and any future
1388-
// operations going forward fail immediately.
1389-
core::mem::drop(pending_monitor_updates);
1390-
core::mem::drop(monitors);
1391-
let _poison = self.monitors.write().unwrap();
1392-
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
1393-
log_error!(logger, "{}", err_str);
1394-
panic!("{}", err_str);
1395-
},
1396-
}
1397-
1398-
// We may need to start monitoring for any alternative funding transactions.
1399-
if let Some(ref chain_source) = self.chain_source {
1400-
for (funding_outpoint, funding_script) in
1401-
update.internal_renegotiated_funding_data()
1402-
{
1403-
log_trace!(
1404-
logger,
1405-
"Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends",
1406-
funding_outpoint
1407-
);
1408-
chain_source.register_tx(&funding_outpoint.txid, &funding_script);
1409-
chain_source.register_output(WatchedOutput {
1410-
block_hash: None,
1411-
outpoint: funding_outpoint,
1412-
script_pubkey: funding_script,
1413-
});
1414-
}
1415-
}
1416-
1417-
if update_res.is_err() {
1418-
ChannelMonitorUpdateStatus::InProgress
1419-
} else {
1420-
persist_res
1421-
}
1422-
},
1423-
}
1435+
self.update_channel_internal(channel_id, update)
14241436
}
14251437

14261438
fn release_pending_monitor_events(

0 commit comments

Comments
 (0)