Skip to content

Commit de88309

Browse files
Support persistent monitor events
Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we complete work that was built on recent prior commits and actually start re-providing monitor events on startup if they went un-acked during runtime. This isn't actually supported in prod yet, so this new code will run randomly in tests, to ensure we still support the old paths.
1 parent f744ae2 commit de88309

4 files changed

Lines changed: 86 additions & 13 deletions

File tree

lightning/src/chain/channelmonitor.rs

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1303,6 +1303,11 @@ pub(crate) struct ChannelMonitorImpl<Signer: EcdsaChannelSigner> {
13031303
// block/transaction-connected events and *not* during block/transaction-disconnected events,
13041304
// we further MUST NOT generate events during block/transaction-disconnection.
13051305
pending_monitor_events: Vec<(u64, MonitorEvent)>,
1306+
// `MonitorEvent`s that have been provided to the `ChannelManager` via
1307+
// [`ChannelMonitor::get_and_clear_pending_monitor_events`] and are awaiting
1308+
// [`ChannelMonitor::ack_monitor_event`] for removal. If an event in this queue is not ack'd, it
1309+
// will be re-provided to the `ChannelManager` on startup.
1310+
provided_monitor_events: Vec<(u64, MonitorEvent)>,
13061311
/// When set, monitor events are retained until explicitly acked rather than cleared on read.
13071312
///
13081313
/// Allows the ChannelManager to reconstruct pending HTLC state by replaying monitor events on
@@ -1779,7 +1784,12 @@ pub(crate) fn write_chanmon_internal<Signer: EcdsaChannelSigner, W: Writer>(
17791784
// Only write `persistent_events_enabled` if it's set to true, as it's an even TLV.
17801785
let persistent_events_enabled = channel_monitor.persistent_events_enabled.then_some(());
17811786
let pending_mon_evs_with_ids = if persistent_events_enabled.is_some() {
1782-
Some(Iterable(channel_monitor.pending_monitor_events.iter()))
1787+
Some(Iterable(
1788+
channel_monitor
1789+
.provided_monitor_events
1790+
.iter()
1791+
.chain(channel_monitor.pending_monitor_events.iter()),
1792+
))
17831793
} else {
17841794
None
17851795
};
@@ -1996,6 +2006,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
19962006

19972007
payment_preimages: new_hash_map(),
19982008
pending_monitor_events: Vec::new(),
2009+
provided_monitor_events: Vec::new(),
19992010
persistent_events_enabled: false,
20002011
next_monitor_event_id: 0,
20012012
pending_events: Vec::new(),
@@ -2221,20 +2232,32 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
22212232

22222233
/// Removes a [`MonitorEvent`] by its event ID, acknowledging that it has been processed.
22232234
/// Generally called by [`chain::Watch::ack_monitor_event`].
2224-
pub fn ack_monitor_event(&self, _event_id: u64) {
2225-
// TODO: once events have ids, remove the corresponding event here
2235+
pub fn ack_monitor_event(&self, event_id: u64) {
2236+
let inner = &mut *self.inner.lock().unwrap();
2237+
inner.ack_monitor_event(event_id);
2238+
}
2239+
2240+
/// Enables persistent monitor events mode. When enabled, monitor events are retained until
2241+
/// explicitly acked rather than cleared on read.
2242+
pub(crate) fn set_persistent_events_enabled(&self, enabled: bool) {
2243+
self.inner.lock().unwrap().persistent_events_enabled = enabled;
22262244
}
22272245

22282246
/// Copies [`MonitorEvent`] state from `other` into `self`.
22292247
/// Used in tests to align transient runtime state before equality comparison after a
22302248
/// serialization round-trip.
22312249
#[cfg(any(test, feature = "_test_utils"))]
22322250
pub fn copy_monitor_event_state(&self, other: &ChannelMonitor<Signer>) {
2233-
let (pending, next_id) = {
2251+
let (provided, pending, next_id) = {
22342252
let other_inner = other.inner.lock().unwrap();
2235-
(other_inner.pending_monitor_events.clone(), other_inner.next_monitor_event_id)
2253+
(
2254+
other_inner.provided_monitor_events.clone(),
2255+
other_inner.pending_monitor_events.clone(),
2256+
other_inner.next_monitor_event_id,
2257+
)
22362258
};
22372259
let mut self_inner = self.inner.lock().unwrap();
2260+
self_inner.provided_monitor_events = provided;
22382261
self_inner.pending_monitor_events = pending;
22392262
self_inner.next_monitor_event_id = next_id;
22402263
}
@@ -4666,10 +4689,23 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
46664689
);
46674690
}
46684691

4692+
fn ack_monitor_event(&mut self, event_id: u64) {
4693+
self.provided_monitor_events.retain(|(id, _)| *id != event_id);
4694+
// If this event was generated prior to a restart, it may be in this queue instead
4695+
self.pending_monitor_events.retain(|(id, _)| *id != event_id);
4696+
}
4697+
46694698
fn get_and_clear_pending_monitor_events(&mut self) -> Vec<(u64, MonitorEvent)> {
4670-
let mut ret = Vec::new();
4671-
mem::swap(&mut ret, &mut self.pending_monitor_events);
4672-
ret
4699+
if self.persistent_events_enabled {
4700+
let mut ret = Vec::new();
4701+
mem::swap(&mut ret, &mut self.pending_monitor_events);
4702+
self.provided_monitor_events.extend(ret.iter().cloned());
4703+
ret
4704+
} else {
4705+
let mut ret = Vec::new();
4706+
mem::swap(&mut ret, &mut self.pending_monitor_events);
4707+
ret
4708+
}
46734709
}
46744710

46754711
/// Gets the set of events that are repeated regularly (e.g. those which RBF bump
@@ -5995,8 +6031,8 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
59956031
if inbound_htlc_expiry > max_expiry_height {
59966032
continue;
59976033
}
5998-
let duplicate_event = self.pending_monitor_events.iter().any(
5999-
|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update {
6034+
let duplicate_event = self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter())
6035+
.any(|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update {
60006036
upd.source == *source
60016037
} else { false });
60026038
if duplicate_event {
@@ -6412,7 +6448,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
64126448
// HTLC resolution backwards to and figure out whether we learned a preimage from it.
64136449
if let Some((source, payment_hash, amount_msat)) = payment_data {
64146450
if accepted_preimage_claim {
6415-
if !self.pending_monitor_events.iter().any(
6451+
if !self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter()).any(
64166452
|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) {
64176453
self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry {
64186454
txid: tx.compute_txid(),
@@ -6434,7 +6470,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
64346470
}), &mut self.next_monitor_event_id);
64356471
}
64366472
} else if offered_preimage_claim {
6437-
if !self.pending_monitor_events.iter().any(
6473+
if !self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter()).any(
64386474
|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update {
64396475
upd.source == source
64406476
} else { false }) {
@@ -7026,6 +7062,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
70267062

70277063
payment_preimages,
70287064
pending_monitor_events,
7065+
provided_monitor_events: Vec::new(),
70297066
persistent_events_enabled: persistent_events_enabled.is_some(),
70307067
next_monitor_event_id,
70317068
pending_events,

lightning/src/ln/channelmanager.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3702,6 +3702,8 @@ impl<
37023702
our_network_pubkey, current_timestamp, expanded_inbound_key,
37033703
node_signer.get_receive_auth_key(), secp_ctx.clone(), message_router, logger.clone(),
37043704
);
3705+
#[cfg(any(test, feature = "_test_utils"))]
3706+
let override_persistent_monitor_events = config.override_persistent_monitor_events;
37053707

37063708
ChannelManager {
37073709
config: RwLock::new(config),
@@ -3758,7 +3760,27 @@ impl<
37583760

37593761
logger,
37603762

3761-
persistent_monitor_events: false,
3763+
persistent_monitor_events: {
3764+
#[cfg(not(any(test, feature = "_test_utils")))]
3765+
{ false }
3766+
#[cfg(any(test, feature = "_test_utils"))]
3767+
{
3768+
override_persistent_monitor_events.unwrap_or_else(|| {
3769+
use core::hash::{BuildHasher, Hasher};
3770+
match std::env::var("LDK_TEST_PERSISTENT_MON_EVENTS") {
3771+
Ok(val) => match val.as_str() {
3772+
"1" => true,
3773+
"0" => false,
3774+
_ => panic!("LDK_TEST_PERSISTENT_MON_EVENTS must be 0 or 1, got: {}", val),
3775+
},
3776+
Err(_) => {
3777+
let rand_val = std::collections::hash_map::RandomState::new().build_hasher().finish();
3778+
rand_val % 2 == 0
3779+
},
3780+
}
3781+
})
3782+
}
3783+
},
37623784
}
37633785
}
37643786

@@ -11778,6 +11800,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1177811800
fail_chan!("Already had channel with the new channel_id");
1177911801
},
1178011802
hash_map::Entry::Vacant(e) => {
11803+
monitor.set_persistent_events_enabled(self.persistent_monitor_events);
1178111804
let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor);
1178211805
if let Ok(persist_state) = monitor_res {
1178311806
// There's no problem signing a counterparty's funding transaction if our monitor
@@ -11948,6 +11971,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1194811971
match chan
1194911972
.funding_signed(&msg, best_block, &self.signer_provider, &self.logger)
1195011973
.and_then(|(funded_chan, monitor)| {
11974+
monitor.set_persistent_events_enabled(self.persistent_monitor_events);
1195111975
self.chain_monitor
1195211976
.watch_channel(funded_chan.context.channel_id(), monitor)
1195311977
.map_err(|()| {
@@ -12840,6 +12864,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1284012864

1284112865
if let Some(chan) = chan.as_funded_mut() {
1284212866
if let Some(monitor) = monitor_opt {
12867+
monitor.set_persistent_events_enabled(self.persistent_monitor_events);
1284312868
let monitor_res =
1284412869
self.chain_monitor.watch_channel(monitor.channel_id(), monitor);
1284512870
if let Ok(persist_state) = monitor_res {

lightning/src/ln/monitor_tests.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3594,6 +3594,9 @@ fn do_test_lost_timeout_monitor_events(confirm_tx: CommitmentType, dust_htlcs: b
35943594
let mut cfg = test_default_channel_config();
35953595
cfg.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true;
35963596
cfg.channel_handshake_config.negotiate_anchor_zero_fee_commitments = p2a_anchor;
3597+
// This test specifically tests lost monitor events, which requires the legacy
3598+
// (non-persistent) monitor event behavior.
3599+
cfg.override_persistent_monitor_events = Some(false);
35973600
let cfgs = [Some(cfg.clone()), Some(cfg.clone()), Some(cfg.clone())];
35983601

35993602
let chanmon_cfgs = create_chanmon_cfgs(3);

lightning/src/util/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,6 +1134,10 @@ pub struct UserConfig {
11341134
///
11351135
/// [`ChannelManager::splice_channel`]: crate::ln::channelmanager::ChannelManager::splice_channel
11361136
pub reject_inbound_splices: bool,
1137+
/// If set to `Some`, overrides the random selection of whether to use persistent monitor
1138+
/// events. Only available in tests.
1139+
#[cfg(any(test, feature = "_test_utils"))]
1140+
pub override_persistent_monitor_events: Option<bool>,
11371141
}
11381142

11391143
impl Default for UserConfig {
@@ -1150,6 +1154,8 @@ impl Default for UserConfig {
11501154
enable_htlc_hold: false,
11511155
hold_outbound_htlcs_at_next_hop: false,
11521156
reject_inbound_splices: true,
1157+
#[cfg(any(test, feature = "_test_utils"))]
1158+
override_persistent_monitor_events: None,
11531159
}
11541160
}
11551161
}
@@ -1172,6 +1178,8 @@ impl Readable for UserConfig {
11721178
hold_outbound_htlcs_at_next_hop: Readable::read(reader)?,
11731179
enable_htlc_hold: Readable::read(reader)?,
11741180
reject_inbound_splices: Readable::read(reader)?,
1181+
#[cfg(any(test, feature = "_test_utils"))]
1182+
override_persistent_monitor_events: None,
11751183
})
11761184
}
11771185
}

0 commit comments

Comments
 (0)