Skip to content

Commit fd90e0c

Browse files
Add chain::Watch ack_monitor_event API
Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we add an as-yet-unused API to chain::Watch to allow the ChannelManager to tell the a ChannelMonitor that a MonitorEvent has been irrevocably processed and can be deleted.
1 parent 2e84b26 commit fd90e0c

5 files changed

Lines changed: 54 additions & 1 deletion

File tree

fuzz/src/chanmon_consistency.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use lightning::chain;
4141
use lightning::chain::chaininterface::{
4242
BroadcasterInterface, ConfirmationTarget, FeeEstimator, TransactionType,
4343
};
44+
use lightning::chain::chainmonitor::MonitorEventSource;
4445
use lightning::chain::channelmonitor::{ChannelMonitor, MonitorEvent};
4546
use lightning::chain::transaction::OutPoint;
4647
use lightning::chain::{
@@ -367,6 +368,10 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
367368
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
368369
return self.chain_monitor.release_pending_monitor_events();
369370
}
371+
372+
fn ack_monitor_event(&self, source: MonitorEventSource) {
373+
self.chain_monitor.ack_monitor_event(source);
374+
}
370375
}
371376

372377
struct KeyProvider {

lightning/src/chain/chainmonitor.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,21 @@ use core::iter::Cycle;
6666
use core::ops::Deref;
6767
use core::sync::atomic::{AtomicUsize, Ordering};
6868

69+
/// Identifies the source of a [`MonitorEvent`] for acknowledgment via
70+
/// [`chain::Watch::ack_monitor_event`] once the event has been processed.
71+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
72+
pub struct MonitorEventSource {
73+
/// The event ID assigned by the [`ChannelMonitor`].
74+
pub event_id: u64,
75+
/// The channel from which the [`MonitorEvent`] originated.
76+
pub channel_id: ChannelId,
77+
}
78+
79+
impl_writeable_tlv_based!(MonitorEventSource, {
80+
(1, event_id, required),
81+
(3, channel_id, required),
82+
});
83+
6984
/// A pending operation queued for later execution when `ChainMonitor` is in deferred mode.
7085
enum PendingMonitorOp<ChannelSigner: EcdsaChannelSigner> {
7186
/// A new monitor to insert and persist.
@@ -1646,6 +1661,15 @@ where
16461661
}
16471662
pending_monitor_events
16481663
}
1664+
1665+
fn ack_monitor_event(&self, source: MonitorEventSource) {
1666+
let monitors = self.monitors.read().unwrap();
1667+
if let Some(monitor_state) = monitors.get(&source.channel_id) {
1668+
monitor_state.monitor.ack_monitor_event(source.event_id);
1669+
} else {
1670+
debug_assert!(false, "Ack'd monitor events should always have a corresponding monitor");
1671+
}
1672+
}
16491673
}
16501674

16511675
impl<

lightning/src/chain/channelmonitor.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2172,6 +2172,12 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
21722172
self.inner.lock().unwrap().push_monitor_event(event);
21732173
}
21742174

2175+
/// Removes a [`MonitorEvent`] by its event ID, acknowledging that it has been processed.
2176+
/// Generally called by [`chain::Watch::ack_monitor_event`].
2177+
pub fn ack_monitor_event(&self, _event_id: u64) {
2178+
// TODO: once events have ids, remove the corresponding event here
2179+
}
2180+
21752181
/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
21762182
///
21772183
/// For channels featuring anchor outputs, this method will also process [`BumpTransaction`]

lightning/src/chain/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use bitcoin::network::Network;
1818
use bitcoin::script::{Script, ScriptBuf};
1919
use bitcoin::secp256k1::PublicKey;
2020

21+
use crate::chain::chainmonitor::MonitorEventSource;
2122
use crate::chain::channelmonitor::{
2223
ChannelMonitor, ChannelMonitorUpdate, MonitorEvent, ANTI_REORG_DELAY,
2324
};
@@ -425,6 +426,15 @@ pub trait Watch<ChannelSigner: EcdsaChannelSigner> {
425426
fn release_pending_monitor_events(
426427
&self,
427428
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)>;
429+
430+
/// Acknowledges and removes a [`MonitorEvent`] previously returned by
431+
/// [`Watch::release_pending_monitor_events`] by its event ID.
432+
///
433+
/// Once acknowledged, the event will no longer be returned by future calls to
434+
/// [`Watch::release_pending_monitor_events`] and will not be replayed on restart.
435+
///
436+
/// Events may be acknowledged in any order.
437+
fn ack_monitor_event(&self, source: MonitorEventSource);
428438
}
429439

430440
impl<ChannelSigner: EcdsaChannelSigner, T: Watch<ChannelSigner> + ?Sized, W: Deref<Target = T>>
@@ -447,6 +457,10 @@ impl<ChannelSigner: EcdsaChannelSigner, T: Watch<ChannelSigner> + ?Sized, W: Der
447457
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
448458
self.deref().release_pending_monitor_events()
449459
}
460+
461+
fn ack_monitor_event(&self, source: MonitorEventSource) {
462+
self.deref().ack_monitor_event(source)
463+
}
450464
}
451465

452466
/// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to

lightning/src/util/test_utils.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::chain::chaininterface;
1515
#[cfg(any(test, feature = "_externalize_tests"))]
1616
use crate::chain::chaininterface::FEERATE_FLOOR_SATS_PER_KW;
1717
use crate::chain::chaininterface::{ConfirmationTarget, TransactionType};
18-
use crate::chain::chainmonitor::{ChainMonitor, Persist};
18+
use crate::chain::chainmonitor::{ChainMonitor, MonitorEventSource, Persist};
1919
use crate::chain::channelmonitor::{
2020
ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent,
2121
};
@@ -734,6 +734,10 @@ impl<'a> chain::Watch<TestChannelSigner> for TestChainMonitor<'a> {
734734
}
735735
return self.chain_monitor.release_pending_monitor_events();
736736
}
737+
738+
fn ack_monitor_event(&self, source: MonitorEventSource) {
739+
self.chain_monitor.ack_monitor_event(source);
740+
}
737741
}
738742

739743
#[cfg(any(test, feature = "_externalize_tests"))]

0 commit comments

Comments
 (0)