Skip to content

Commit bd2d404

Browse files
Persistent monitor events for HTLC forward claims
Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Building on the work of prior commits, here we stop immediately acking monitor events for HTLC claims for forwarded HTLCs, and instead ack the monitor event when the forward is fully resolved and we don't want the monitor event to be re-provided back to us on startup. If the inbound edge channel is on-chain, we'll ack the event when the inbound edge preimage monitor update completes and the user processes a PaymentForwarded event. If the inbound edge is open, we'll ack the event when the inbound HTLC is fully removed via revoke_and_ack (this ensures we'll remember to resolve the htlc off-chain even if we lose the holding cell).
1 parent c980152 commit bd2d404

7 files changed

Lines changed: 614 additions & 67 deletions

File tree

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 444 additions & 16 deletions
Large diffs are not rendered by default.

lightning/src/ln/channelmanager.rs

Lines changed: 103 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3633,14 +3633,23 @@ impl TrustedChannelFeatures {
36333633
struct ClaimCompletionActionParams {
36343634
definitely_duplicate: bool,
36353635
inbound_htlc_value_msat: Option<u64>,
3636+
inbound_edge_closed: bool,
36363637
}
36373638

36383639
impl ClaimCompletionActionParams {
36393640
fn new_claim(inbound_htlc_value_msat: u64) -> Self {
3640-
Self { definitely_duplicate: false, inbound_htlc_value_msat: Some(inbound_htlc_value_msat) }
3641+
Self {
3642+
definitely_duplicate: false,
3643+
inbound_htlc_value_msat: Some(inbound_htlc_value_msat),
3644+
inbound_edge_closed: false,
3645+
}
36413646
}
36423647
fn duplicate_claim() -> Self {
3643-
Self { definitely_duplicate: true, inbound_htlc_value_msat: None }
3648+
Self {
3649+
definitely_duplicate: true,
3650+
inbound_htlc_value_msat: None,
3651+
inbound_edge_closed: false,
3652+
}
36443653
}
36453654
}
36463655

@@ -9649,16 +9658,56 @@ impl<
96499658
monitor_event_id
96509659
.map(|event_id| MonitorEventSource { event_id, channel_id: next_channel_id }),
96519660
|claim_completion_action_params| {
9652-
let ClaimCompletionActionParams { definitely_duplicate, inbound_htlc_value_msat } =
9653-
claim_completion_action_params;
9661+
let ClaimCompletionActionParams {
9662+
definitely_duplicate,
9663+
inbound_htlc_value_msat,
9664+
inbound_edge_closed,
9665+
} = claim_completion_action_params;
96549666
let chan_to_release = EventUnblockedChannel {
96559667
counterparty_node_id: next_channel_counterparty_node_id,
96569668
funding_txo: next_channel_outpoint,
96579669
channel_id: next_channel_id,
96589670
blocking_action: completed_blocker,
96599671
};
96609672

9661-
if definitely_duplicate && startup_replay {
9673+
if self.persistent_monitor_events {
9674+
let monitor_event_source = monitor_event_id.map(|event_id| {
9675+
MonitorEventSource { event_id, channel_id: next_channel_id }
9676+
});
9677+
// If persistent_monitor_events is enabled, then we'll get a MonitorEvent for this HTLC
9678+
// claim re-provided to us until we explicitly ack it.
9679+
// * If the inbound edge is closed, then we can ack it when we know the preimage is
9680+
// durably persisted there + the user has processed a `PaymentForwarded` event
9681+
// * If the inbound edge is open, then we'll ack the monitor event when HTLC has been
9682+
// irrevocably removed via revoke_and_ack. This prevents forgetting to claim the HTLC
9683+
// backwards if we lose the off-chain HTLC from the holding cell after a restart.
9684+
if definitely_duplicate {
9685+
if inbound_edge_closed {
9686+
if let Some(id) = monitor_event_source {
9687+
self.chain_monitor.ack_monitor_event(id);
9688+
}
9689+
}
9690+
(None, None)
9691+
} else if let Some(event) =
9692+
make_payment_forwarded_event(inbound_htlc_value_msat)
9693+
{
9694+
let preimage_update_action =
9695+
MonitorUpdateCompletionAction::EmitForwardEvent {
9696+
event,
9697+
post_event_ackable_monitor_event: inbound_edge_closed
9698+
.then_some(monitor_event_source)
9699+
.flatten(),
9700+
};
9701+
(Some(preimage_update_action), None)
9702+
} else if inbound_edge_closed {
9703+
let preimage_update_action = monitor_event_source.map(|src| {
9704+
MonitorUpdateCompletionAction::AckMonitorEvents { event_ids: vec![src] }
9705+
});
9706+
(preimage_update_action, None)
9707+
} else {
9708+
(None, None)
9709+
}
9710+
} else if definitely_duplicate && startup_replay {
96629711
// On startup we may get redundant claims which are related to
96639712
// monitor updates still in flight. In that case, we shouldn't
96649713
// immediately free, but instead let that monitor update complete
@@ -9991,6 +10040,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
999110040
let (action_opt, raa_blocker_opt) = completion_action(ClaimCompletionActionParams {
999210041
definitely_duplicate: false,
999310042
inbound_htlc_value_msat: None,
10043+
inbound_edge_closed: true,
999410044
});
999510045

999610046
if let Some(raa_blocker) = raa_blocker_opt {
@@ -12712,23 +12762,32 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1271212762
chan.update_fulfill_htlc(&msg),
1271312763
chan_entry
1271412764
);
12715-
let prev_hops = match &res.0 {
12716-
HTLCSource::PreviousHopData(prev_hop) => vec![prev_hop],
12717-
HTLCSource::TrampolineForward { previous_hop_data, .. } => {
12718-
previous_hop_data.iter().collect()
12719-
},
12720-
_ => vec![],
12721-
};
12722-
let logger = WithChannelContext::from(&self.logger, &chan.context, None);
12723-
for prev_hop in prev_hops {
12724-
log_trace!(logger,
12725-
"Holding the next revoke_and_ack until the preimage is durably persisted in the inbound edge's ChannelMonitor",
12726-
);
12727-
peer_state
12728-
.actions_blocking_raa_monitor_updates
12729-
.entry(msg.channel_id)
12730-
.or_insert_with(Vec::new)
12731-
.push(RAAMonitorUpdateBlockingAction::from_prev_hop_data(prev_hop));
12765+
// If persistent_monitor_events is enabled, we don't need to block preimage-removing
12766+
// monitor updates because we'll get the preimage from monitor events (that are
12767+
// guaranteed to be re-provided until they are explicitly acked) rather than from
12768+
// polling the monitor's internal state.
12769+
if !self.persistent_monitor_events {
12770+
let prev_hops = match &res.0 {
12771+
HTLCSource::PreviousHopData(prev_hop) => vec![prev_hop],
12772+
HTLCSource::TrampolineForward { previous_hop_data, .. } => {
12773+
previous_hop_data.iter().collect()
12774+
},
12775+
_ => vec![],
12776+
};
12777+
let logger =
12778+
WithChannelContext::from(&self.logger, &chan.context, None);
12779+
for prev_hop in prev_hops {
12780+
log_trace!(logger,
12781+
"Holding the next revoke_and_ack until the preimage is durably persisted in the inbound edge's ChannelMonitor",
12782+
);
12783+
peer_state
12784+
.actions_blocking_raa_monitor_updates
12785+
.entry(msg.channel_id)
12786+
.or_insert_with(Vec::new)
12787+
.push(RAAMonitorUpdateBlockingAction::from_prev_hop_data(
12788+
prev_hop,
12789+
));
12790+
}
1273212791
}
1273312792

1273412793
// Note that we do not need to push an `actions_blocking_raa_monitor_updates`
@@ -13730,29 +13789,22 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1373013789
.channel_by_id
1373113790
.contains_key(&channel_id)
1373213791
});
13733-
let we_are_sender =
13734-
matches!(htlc_update.source, HTLCSource::OutboundRoute { .. });
13735-
if from_onchain | we_are_sender {
13736-
// Claim the funds from the previous hop, if there is one. In the future we can
13737-
// store attribution data in the `ChannelMonitor` and provide it here.
13738-
self.claim_funds_internal(
13739-
htlc_update.source,
13740-
preimage,
13741-
htlc_update.htlc_value_msat,
13742-
None,
13743-
from_onchain,
13744-
counterparty_node_id,
13745-
funding_outpoint,
13746-
channel_id,
13747-
htlc_update.user_channel_id,
13748-
None,
13749-
None,
13750-
Some(event_id),
13751-
);
13752-
}
13753-
if !we_are_sender {
13754-
self.chain_monitor.ack_monitor_event(monitor_event_source);
13755-
}
13792+
// Claim the funds from the previous hop, if there is one. In the future we can
13793+
// store attribution data in the `ChannelMonitor` and provide it here.
13794+
self.claim_funds_internal(
13795+
htlc_update.source,
13796+
preimage,
13797+
htlc_update.htlc_value_msat,
13798+
None,
13799+
from_onchain,
13800+
counterparty_node_id,
13801+
funding_outpoint,
13802+
channel_id,
13803+
htlc_update.user_channel_id,
13804+
None,
13805+
None,
13806+
Some(event_id),
13807+
);
1375613808
} else {
1375713809
log_trace!(logger, "Failing HTLC from our monitor");
1375813810
let failure_reason = LocalHTLCFailureReason::OnChainTimeout;
@@ -20679,6 +20731,12 @@ impl<
2067920731
downstream_user_channel_id,
2068020732
) in pending_claims_to_replay
2068120733
{
20734+
// If persistent_monitor_events is enabled, we don't need to explicitly reclaim HTLCs on
20735+
// startup because we can just wait for the relevant MonitorEvents to be re-provided to us
20736+
// during runtime.
20737+
if channel_manager.persistent_monitor_events {
20738+
continue;
20739+
}
2068220740
// We use `downstream_closed` in place of `from_onchain` here just as a guess - we
2068320741
// don't remember in the `ChannelMonitor` where we got a preimage from, but if the
2068420742
// channel is closed we just assume that it probably came from an on-chain claim.

lightning/src/ln/functional_test_utils.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3166,7 +3166,7 @@ pub fn expect_payment_forwarded<CM: AChannelManager, H: NodeHolder<CM = CM>>(
31663166
macro_rules! expect_payment_forwarded {
31673167
($node: expr, $prev_node: expr, $next_node: expr, $expected_fee: expr, $upstream_force_closed: expr, $downstream_force_closed: expr) => {
31683168
let mut events = $node.node.get_and_clear_pending_events();
3169-
assert_eq!(events.len(), 1);
3169+
assert_eq!(events.len(), 1, "{events:?}");
31703170
$crate::ln::functional_test_utils::expect_payment_forwarded(
31713171
events.pop().unwrap(),
31723172
&$node,
@@ -5673,7 +5673,13 @@ pub fn reconnect_nodes<'a, 'b, 'c, 'd>(args: ReconnectArgs<'a, 'b, 'c, 'd>) {
56735673
node_a.node.handle_revoke_and_ack(node_b_id, &bs_revoke_and_ack);
56745674
check_added_monitors(
56755675
&node_a,
5676-
if pending_responding_commitment_signed_dup_monitor.1 { 0 } else { 1 },
5676+
if pending_responding_commitment_signed_dup_monitor.1
5677+
&& !node_a.node.test_persistent_monitor_events_enabled()
5678+
{
5679+
0
5680+
} else {
5681+
1
5682+
},
56775683
);
56785684
if !allow_post_commitment_dance_msgs.1 {
56795685
assert!(node_a.node.get_and_clear_pending_msg_events().is_empty());

lightning/src/ln/functional_tests.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7916,7 +7916,15 @@ fn do_test_onchain_htlc_settlement_after_close(
79167916
_ => panic!("Unexpected event"),
79177917
};
79187918
nodes[1].node.handle_revoke_and_ack(node_c_id, &carol_revocation);
7919-
check_added_monitors(&nodes[1], 1);
7919+
if nodes[1].node.test_persistent_monitor_events_enabled() {
7920+
if broadcast_alice && !go_onchain_before_fulfill {
7921+
check_added_monitors(&nodes[1], 1);
7922+
} else {
7923+
check_added_monitors(&nodes[1], 2);
7924+
}
7925+
} else {
7926+
check_added_monitors(&nodes[1], 1);
7927+
}
79207928

79217929
// If this test requires the force-closed channel to not be on-chain until after the fulfill,
79227930
// here's where we put said channel's commitment tx on-chain.
@@ -7950,6 +7958,13 @@ fn do_test_onchain_htlc_settlement_after_close(
79507958
check_spends!(bob_txn[0], chan_ab.3);
79517959
}
79527960
}
7961+
if nodes[1].node.test_persistent_monitor_events_enabled() {
7962+
if !broadcast_alice || go_onchain_before_fulfill {
7963+
// In some cases we'll replay the claim via a MonitorEvent and be unable to detect that it's
7964+
// a duplicate since the inbound edge is on-chain.
7965+
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], fee, went_onchain, false);
7966+
}
7967+
}
79537968

79547969
// Step (6):
79557970
// Finally, check that Bob broadcasted a preimage-claiming transaction for the HTLC output on the

lightning/src/ln/payment_tests.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -927,8 +927,33 @@ fn do_retry_with_no_persist(confirm_before_reload: bool) {
927927
let fulfill_msg = htlc_fulfill.update_fulfill_htlcs.remove(0);
928928
nodes[1].node.handle_update_fulfill_htlc(node_c_id, fulfill_msg);
929929
check_added_monitors(&nodes[1], 1);
930-
do_commitment_signed_dance(&nodes[1], &nodes[2], &htlc_fulfill.commitment_signed, false, false);
931-
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], None, true, false);
930+
{
931+
// Drive the commitment signed dance manually so we can account for the extra monitor
932+
// update when persistent monitor events are enabled.
933+
let persistent = nodes[1].node.test_persistent_monitor_events_enabled();
934+
nodes[1].node.handle_commitment_signed_batch_test(node_c_id, &htlc_fulfill.commitment_signed);
935+
check_added_monitors(&nodes[1], 1);
936+
let (extra_msg, cs_raa, htlcs) =
937+
do_main_commitment_signed_dance(&nodes[1], &nodes[2], false);
938+
assert!(htlcs.is_empty());
939+
assert!(extra_msg.is_none());
940+
// nodes[1] handles nodes[2]'s RAA. When persistent monitor events are enabled, this
941+
// triggers the re-provided outbound monitor event, generating an extra preimage update
942+
// on the (closed) inbound channel.
943+
nodes[1].node.handle_revoke_and_ack(node_c_id, &cs_raa);
944+
check_added_monitors(&nodes[1], if persistent { 2 } else { 1 });
945+
}
946+
if nodes[1].node.test_persistent_monitor_events_enabled() {
947+
// The re-provided monitor event generates a duplicate PaymentForwarded against the
948+
// closed inbound channel.
949+
let events = nodes[1].node.get_and_clear_pending_events();
950+
assert_eq!(events.len(), 2);
951+
for event in events {
952+
assert!(matches!(event, Event::PaymentForwarded { .. }));
953+
}
954+
} else {
955+
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], None, true, false);
956+
}
932957

933958
if confirm_before_reload {
934959
let best_block = nodes[0].blocks.lock().unwrap().last().unwrap().clone();

lightning/src/ln/reload_tests.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1972,6 +1972,13 @@ fn test_reload_node_with_preimage_in_monitor_claims_htlc() {
19721972
Some(true)
19731973
);
19741974

1975+
if nodes[1].node.test_persistent_monitor_events_enabled() {
1976+
// Polling messages causes us to re-release the unacked HTLC claim monitor event, which
1977+
// regenerates a preimage monitor update and forward event below.
1978+
let msgs = nodes[1].node.get_and_clear_pending_msg_events();
1979+
assert!(msgs.is_empty());
1980+
}
1981+
19751982
// When the claim is reconstructed during reload, a PaymentForwarded event is generated.
19761983
// Fetching events triggers the pending monitor update (adding preimage) to be applied.
19771984
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false);

lightning/src/util/test_utils.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,9 @@ pub struct TestChainMonitor<'a> {
523523
pub pause_flush: AtomicBool,
524524
/// Buffer of the last 20 monitor updates, most recent first.
525525
pub recent_monitor_updates: Mutex<Vec<(ChannelId, ChannelMonitorUpdate)>>,
526+
/// When set to `true`, `release_pending_monitor_events` sorts events by `ChannelId` to
527+
/// ensure deterministic processing order regardless of HashMap iteration order.
528+
pub deterministic_mon_events_order: AtomicBool,
526529
}
527530
impl<'a> TestChainMonitor<'a> {
528531
pub fn new(
@@ -584,6 +587,7 @@ impl<'a> TestChainMonitor<'a> {
584587
write_blocker: Mutex::new(None),
585588
pause_flush: AtomicBool::new(false),
586589
recent_monitor_updates: Mutex::new(Vec::new()),
590+
deterministic_mon_events_order: AtomicBool::new(false),
587591
}
588592
}
589593

@@ -745,7 +749,11 @@ impl<'a> chain::Watch<TestChannelSigner> for TestChainMonitor<'a> {
745749
let count = self.chain_monitor.pending_operation_count();
746750
self.chain_monitor.flush(count, &self.logger);
747751
}
748-
return self.chain_monitor.release_pending_monitor_events();
752+
let mut events = self.chain_monitor.release_pending_monitor_events();
753+
if self.deterministic_mon_events_order.load(Ordering::Acquire) {
754+
events.sort_by_key(|(_, channel_id, _, _)| *channel_id);
755+
}
756+
events
749757
}
750758

751759
fn ack_monitor_event(&self, source: MonitorEventSource) {

0 commit comments

Comments
 (0)