Skip to content

Commit ccf1751

Browse files
Add monitor event ids
Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. To allow the ChannelManager to ack specific monitor events once they are resolved in upcoming commits, here we give each MonitorEvent a corresponding unique id. It's implemented in such a way that we can delete legacy monitor event code in the future when the new persistent monitor events flag is enabled by default.
1 parent b962285 commit ccf1751

6 files changed

Lines changed: 160 additions & 64 deletions

File tree

lightning/src/chain/chainmonitor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1639,7 +1639,7 @@ where
16391639

16401640
fn release_pending_monitor_events(
16411641
&self,
1642-
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
1642+
) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> {
16431643
for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() {
16441644
let _ = self.channel_monitor_updated(channel_id, update_id);
16451645
}

lightning/src/chain/channelmonitor.rs

Lines changed: 144 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,13 @@ impl Readable for ChannelMonitorUpdate {
184184
}
185185
}
186186

187-
fn push_monitor_event(pending_monitor_events: &mut Vec<MonitorEvent>, event: MonitorEvent) {
188-
pending_monitor_events.push(event);
187+
fn push_monitor_event(
188+
pending_monitor_events: &mut Vec<(u64, MonitorEvent)>, event: MonitorEvent,
189+
next_monitor_event_id: &mut u64,
190+
) {
191+
let id = *next_monitor_event_id;
192+
*next_monitor_event_id += 1;
193+
pending_monitor_events.push((id, event));
189194
}
190195

191196
/// An event to be processed by the ChannelManager.
@@ -1297,13 +1302,14 @@ pub(crate) struct ChannelMonitorImpl<Signer: EcdsaChannelSigner> {
12971302
// Note that because the `event_lock` in `ChainMonitor` is only taken in
12981303
// block/transaction-connected events and *not* during block/transaction-disconnected events,
12991304
// we further MUST NOT generate events during block/transaction-disconnection.
1300-
pending_monitor_events: Vec<MonitorEvent>,
1305+
pending_monitor_events: Vec<(u64, MonitorEvent)>,
13011306
/// When set, monitor events are retained until explicitly acked rather than cleared on read.
13021307
///
13031308
/// Allows the ChannelManager to reconstruct pending HTLC state by replaying monitor events on
13041309
/// startup, and make the monitor responsible for both off- and on-chain payment resolution. Will
13051310
/// be always set once support for this feature is complete.
13061311
persistent_events_enabled: bool,
1312+
next_monitor_event_id: u64,
13071313

13081314
pub(super) pending_events: Vec<Event>,
13091315
pub(super) is_processing_pending_events: bool,
@@ -1684,32 +1690,38 @@ pub(crate) fn write_chanmon_internal<Signer: EcdsaChannelSigner, W: Writer>(
16841690
writer.write_all(&payment_preimage.0[..])?;
16851691
}
16861692

1687-
writer.write_all(
1688-
&(channel_monitor
1689-
.pending_monitor_events
1690-
.iter()
1691-
.filter(|ev| match ev {
1692-
MonitorEvent::HTLCEvent(_) => true,
1693-
MonitorEvent::HolderForceClosed(_) => true,
1694-
MonitorEvent::HolderForceClosedWithInfo { .. } => true,
1695-
_ => false,
1696-
})
1697-
.count() as u64)
1698-
.to_be_bytes(),
1699-
)?;
1700-
for event in channel_monitor.pending_monitor_events.iter() {
1701-
match event {
1702-
MonitorEvent::HTLCEvent(upd) => {
1703-
0u8.write(writer)?;
1704-
upd.write(writer)?;
1705-
},
1706-
MonitorEvent::HolderForceClosed(_) => 1u8.write(writer)?,
1707-
// `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. To keep
1708-
// backwards compatibility, we write a `HolderForceClosed` event along with the
1709-
// `HolderForceClosedWithInfo` event. This is deduplicated in the reader.
1710-
MonitorEvent::HolderForceClosedWithInfo { .. } => 1u8.write(writer)?,
1711-
_ => {}, // Covered in the TLV writes below
1693+
if !channel_monitor.persistent_events_enabled {
1694+
writer.write_all(
1695+
&(channel_monitor
1696+
.pending_monitor_events
1697+
.iter()
1698+
.filter(|(_, ev)| match ev {
1699+
MonitorEvent::HTLCEvent(_) => true,
1700+
MonitorEvent::HolderForceClosed(_) => true,
1701+
MonitorEvent::HolderForceClosedWithInfo { .. } => true,
1702+
_ => false,
1703+
})
1704+
.count() as u64)
1705+
.to_be_bytes(),
1706+
)?;
1707+
for (_, event) in channel_monitor.pending_monitor_events.iter() {
1708+
match event {
1709+
MonitorEvent::HTLCEvent(upd) => {
1710+
0u8.write(writer)?;
1711+
upd.write(writer)?;
1712+
},
1713+
MonitorEvent::HolderForceClosed(_) => 1u8.write(writer)?,
1714+
// `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. To keep
1715+
// backwards compatibility, we write a `HolderForceClosed` event along with the
1716+
// `HolderForceClosedWithInfo` event. This is deduplicated in the reader.
1717+
MonitorEvent::HolderForceClosedWithInfo { .. } => 1u8.write(writer)?,
1718+
_ => {}, // Covered in the TLV writes below
1719+
}
17121720
}
1721+
} else {
1722+
// If `persistent_events_enabled` is set, we'll write the events with their event ids in the
1723+
// TLV section below.
1724+
writer.write_all(&(0u64).to_be_bytes())?;
17131725
}
17141726

17151727
writer.write_all(&(channel_monitor.pending_events.len() as u64).to_be_bytes())?;
@@ -1744,25 +1756,40 @@ pub(crate) fn write_chanmon_internal<Signer: EcdsaChannelSigner, W: Writer>(
17441756

17451757
// If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed`
17461758
// for backwards compatibility.
1747-
let holder_force_closed_compat = channel_monitor.pending_monitor_events.iter().find_map(|ev| {
1748-
if let MonitorEvent::HolderForceClosedWithInfo { outpoint, .. } = ev {
1749-
Some(MonitorEvent::HolderForceClosed(*outpoint))
1750-
} else {
1751-
None
1752-
}
1753-
});
1754-
let pending_monitor_events_legacy = Iterable(
1755-
channel_monitor.pending_monitor_events.iter().chain(holder_force_closed_compat.as_ref()),
1756-
);
1759+
let holder_force_closed_compat =
1760+
channel_monitor.pending_monitor_events.iter().find_map(|(_, ev)| {
1761+
if let MonitorEvent::HolderForceClosedWithInfo { outpoint, .. } = ev {
1762+
Some(MonitorEvent::HolderForceClosed(*outpoint))
1763+
} else {
1764+
None
1765+
}
1766+
});
1767+
let pending_monitor_events_legacy = if !channel_monitor.persistent_events_enabled {
1768+
Some(Iterable(
1769+
channel_monitor
1770+
.pending_monitor_events
1771+
.iter()
1772+
.map(|(_, ev)| ev)
1773+
.chain(holder_force_closed_compat.as_ref()),
1774+
))
1775+
} else {
1776+
None
1777+
};
17571778

17581779
// Only write `persistent_events_enabled` if it's set to true, as it's an even TLV.
17591780
let persistent_events_enabled = channel_monitor.persistent_events_enabled.then_some(());
1781+
let pending_mon_evs_with_ids = if persistent_events_enabled.is_some() {
1782+
Some(Iterable(channel_monitor.pending_monitor_events.iter()))
1783+
} else {
1784+
None
1785+
};
17601786

17611787
write_tlv_fields!(writer, {
17621788
(1, channel_monitor.funding_spend_confirmed, option),
17631789
(2, persistent_events_enabled, option),
17641790
(3, channel_monitor.htlcs_resolved_on_chain, required_vec),
1765-
(5, pending_monitor_events_legacy, required), // Equivalent to required_vec because Iterable also writes as WithoutLength
1791+
(4, pending_mon_evs_with_ids, option),
1792+
(5, pending_monitor_events_legacy, option), // Equivalent to optional_vec because Iterable also writes as WithoutLength
17661793
(7, channel_monitor.funding_spend_seen, required),
17671794
(9, channel_monitor.counterparty_node_id, required),
17681795
(11, channel_monitor.confirmed_commitment_tx_counterparty_output, option),
@@ -1783,6 +1810,7 @@ pub(crate) fn write_chanmon_internal<Signer: EcdsaChannelSigner, W: Writer>(
17831810
(37, channel_monitor.funding_seen_onchain, required),
17841811
(39, channel_monitor.best_block.previous_blocks, required),
17851812
(41, channel_monitor.funding.contribution, option),
1813+
(43, channel_monitor.next_monitor_event_id, required),
17861814
});
17871815

17881816
Ok(())
@@ -1969,6 +1997,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
19691997
payment_preimages: new_hash_map(),
19701998
pending_monitor_events: Vec::new(),
19711999
persistent_events_enabled: false,
2000+
next_monitor_event_id: 0,
19722001
pending_events: Vec::new(),
19732002
is_processing_pending_events: false,
19742003

@@ -2182,7 +2211,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
21822211

21832212
/// Get the list of HTLCs who's status has been updated on chain. This should be called by
21842213
/// ChannelManager via [`chain::Watch::release_pending_monitor_events`].
2185-
pub fn get_and_clear_pending_monitor_events(&self) -> Vec<MonitorEvent> {
2214+
pub fn get_and_clear_pending_monitor_events(&self) -> Vec<(u64, MonitorEvent)> {
21862215
self.inner.lock().unwrap().get_and_clear_pending_monitor_events()
21872216
}
21882217

@@ -2196,6 +2225,20 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
21962225
// TODO: once events have ids, remove the corresponding event here
21972226
}
21982227

2228+
/// Copies [`MonitorEvent`] state from `other` into `self`.
2229+
/// Used in tests to align transient runtime state before equality comparison after a
2230+
/// serialization round-trip.
2231+
#[cfg(any(test, feature = "_test_utils"))]
2232+
pub fn copy_monitor_event_state(&self, other: &ChannelMonitor<Signer>) {
2233+
let (pending, next_id) = {
2234+
let other_inner = other.inner.lock().unwrap();
2235+
(other_inner.pending_monitor_events.clone(), other_inner.next_monitor_event_id)
2236+
};
2237+
let mut self_inner = self.inner.lock().unwrap();
2238+
self_inner.pending_monitor_events = pending;
2239+
self_inner.next_monitor_event_id = next_id;
2240+
}
2241+
21992242
/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
22002243
///
22012244
/// For channels featuring anchor outputs, this method will also process [`BumpTransaction`]
@@ -3921,7 +3964,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
39213964
outpoint: funding_outpoint,
39223965
channel_id: self.channel_id,
39233966
};
3924-
push_monitor_event(&mut self.pending_monitor_events, event);
3967+
push_monitor_event(&mut self.pending_monitor_events, event, &mut self.next_monitor_event_id);
39253968
}
39263969

39273970
// Although we aren't signing the transaction directly here, the transaction will be signed
@@ -4541,12 +4584,16 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
45414584
"Failing HTLC from late counterparty commitment update immediately \
45424585
(funding spend already confirmed)"
45434586
);
4544-
self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
4545-
payment_hash,
4546-
payment_preimage: None,
4547-
source: source.clone(),
4548-
htlc_value_satoshis,
4549-
}));
4587+
push_monitor_event(
4588+
&mut self.pending_monitor_events,
4589+
MonitorEvent::HTLCEvent(HTLCUpdate {
4590+
payment_hash,
4591+
payment_preimage: None,
4592+
source: source.clone(),
4593+
htlc_value_satoshis,
4594+
}),
4595+
&mut self.next_monitor_event_id,
4596+
);
45504597
self.htlcs_resolved_on_chain.push(IrrevocablyResolvedHTLC {
45514598
commitment_tx_output_idx: None,
45524599
resolving_txid: Some(confirmed_txid),
@@ -4612,10 +4659,14 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
46124659
}
46134660

46144661
fn push_monitor_event(&mut self, event: MonitorEvent) {
4615-
push_monitor_event(&mut self.pending_monitor_events, event);
4662+
push_monitor_event(
4663+
&mut self.pending_monitor_events,
4664+
event,
4665+
&mut self.next_monitor_event_id,
4666+
);
46164667
}
46174668

4618-
fn get_and_clear_pending_monitor_events(&mut self) -> Vec<MonitorEvent> {
4669+
fn get_and_clear_pending_monitor_events(&mut self) -> Vec<(u64, MonitorEvent)> {
46194670
let mut ret = Vec::new();
46204671
mem::swap(&mut ret, &mut self.pending_monitor_events);
46214672
ret
@@ -5945,7 +5996,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
59455996
continue;
59465997
}
59475998
let duplicate_event = self.pending_monitor_events.iter().any(
5948-
|update| if let &MonitorEvent::HTLCEvent(ref upd) = update {
5999+
|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update {
59496000
upd.source == *source
59506001
} else { false });
59516002
if duplicate_event {
@@ -5963,7 +6014,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
59636014
payment_preimage: None,
59646015
payment_hash: htlc.payment_hash,
59656016
htlc_value_satoshis: Some(htlc.amount_msat / 1000),
5966-
}));
6017+
}), &mut self.next_monitor_event_id);
59676018
}
59686019
}
59696020
}
@@ -6362,7 +6413,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
63626413
if let Some((source, payment_hash, amount_msat)) = payment_data {
63636414
if accepted_preimage_claim {
63646415
if !self.pending_monitor_events.iter().any(
6365-
|update| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) {
6416+
|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) {
63666417
self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry {
63676418
txid: tx.compute_txid(),
63686419
height,
@@ -6380,11 +6431,11 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
63806431
payment_preimage: Some(payment_preimage),
63816432
payment_hash,
63826433
htlc_value_satoshis: Some(amount_msat / 1000),
6383-
}));
6434+
}), &mut self.next_monitor_event_id);
63846435
}
63856436
} else if offered_preimage_claim {
63866437
if !self.pending_monitor_events.iter().any(
6387-
|update| if let &MonitorEvent::HTLCEvent(ref upd) = update {
6438+
|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update {
63886439
upd.source == source
63896440
} else { false }) {
63906441
self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry {
@@ -6404,7 +6455,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
64046455
payment_preimage: Some(payment_preimage),
64056456
payment_hash,
64066457
htlc_value_satoshis: Some(amount_msat / 1000),
6407-
}));
6458+
}), &mut self.next_monitor_event_id);
64086459
}
64096460
} else {
64106461
self.onchain_events_awaiting_threshold_conf.retain(|ref entry| {
@@ -6770,10 +6821,13 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
67706821
let mut best_block_previous_blocks = None;
67716822
let mut current_funding_contribution = None;
67726823
let mut persistent_events_enabled: Option<()> = None;
6824+
let mut next_monitor_event_id: Option<u64> = None;
6825+
let mut pending_mon_evs_with_ids: Option<Vec<ReadableIdMonitorEvent>> = None;
67736826
read_tlv_fields!(reader, {
67746827
(1, funding_spend_confirmed, option),
67756828
(2, persistent_events_enabled, option),
67766829
(3, htlcs_resolved_on_chain, optional_vec),
6830+
(4, pending_mon_evs_with_ids, optional_vec),
67776831
(5, pending_monitor_events_legacy, optional_vec),
67786832
(7, funding_spend_seen, option),
67796833
(9, counterparty_node_id, option),
@@ -6795,6 +6849,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
67956849
(37, funding_seen_onchain, (default_value, true)),
67966850
(39, best_block_previous_blocks, option), // Added and always set in 0.3
67976851
(41, current_funding_contribution, option),
6852+
(43, next_monitor_event_id, option),
67986853
});
67996854
if let Some(previous_blocks) = best_block_previous_blocks {
68006855
best_block.previous_blocks = previous_blocks;
@@ -6836,6 +6891,22 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
68366891
}
68376892
}
68386893

6894+
// If persistent events are enabled, use the events with their persisted IDs from TLV 4.
6895+
// Otherwise, use the legacy events from TLV 5 and assign sequential IDs.
6896+
let (next_monitor_event_id, pending_monitor_events): (u64, Vec<(u64, MonitorEvent)>) =
6897+
if persistent_events_enabled.is_some() {
6898+
let evs = pending_mon_evs_with_ids.unwrap_or_default()
6899+
.into_iter().map(|ev| (ev.0, ev.1)).collect();
6900+
(next_monitor_event_id.unwrap_or(0), evs)
6901+
} else if let Some(events) = pending_monitor_events_legacy {
6902+
let next_id = next_monitor_event_id.unwrap_or(events.len() as u64);
6903+
let evs = events.into_iter().enumerate()
6904+
.map(|(i, ev)| (i as u64, ev)).collect();
6905+
(next_id, evs)
6906+
} else {
6907+
(next_monitor_event_id.unwrap_or(0), Vec::new())
6908+
};
6909+
68396910
let channel_parameters = channel_parameters.unwrap_or_else(|| {
68406911
onchain_tx_handler.channel_parameters().clone()
68416912
});
@@ -6954,8 +7025,9 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
69547025
current_holder_commitment_number,
69557026

69567027
payment_preimages,
6957-
pending_monitor_events: pending_monitor_events_legacy.unwrap(),
7028+
pending_monitor_events,
69587029
persistent_events_enabled: persistent_events_enabled.is_some(),
7030+
next_monitor_event_id,
69597031
pending_events,
69607032
is_processing_pending_events: false,
69617033

@@ -7004,6 +7076,22 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
70047076
}
70057077
}
70067078

7079+
/// Deserialization wrapper for reading a `(u64, MonitorEvent)`.
7080+
/// Necessary because we can't deserialize a (Readable, MaybeReadable) tuple due to trait
7081+
/// conflicts.
7082+
struct ReadableIdMonitorEvent(u64, MonitorEvent);
7083+
7084+
impl MaybeReadable for ReadableIdMonitorEvent {
7085+
fn read<R: io::Read>(reader: &mut R) -> Result<Option<Self>, DecodeError> {
7086+
let id: u64 = Readable::read(reader)?;
7087+
let event_opt: Option<MonitorEvent> = MaybeReadable::read(reader)?;
7088+
match event_opt {
7089+
Some(ev) => Ok(Some(ReadableIdMonitorEvent(id, ev))),
7090+
None => Ok(None),
7091+
}
7092+
}
7093+
}
7094+
70077095
#[cfg(test)]
70087096
pub(super) fn dummy_monitor<S: EcdsaChannelSigner + 'static>(
70097097
channel_id: ChannelId, wrap_signer: impl FnOnce(crate::sign::InMemorySigner) -> S,

0 commit comments

Comments
 (0)