Skip to content

Commit 064f836

Browse files
Persistent monitor events for HTLC forward claims
Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Building on the work of prior commits, here we stop immediately acking monitor events for HTLC claims for forwarded HTLCs, and instead ack the monitor event when the forward is fully resolved and we don't want the monitor event to be re-provided back to us on startup. If the inbound edge channel is on-chain, we'll ack the event when the inbound edge preimage monitor update completes and the user processes a PaymentForwarded event. If the inbound edge is open, we'll ack the event when the inbound HTLC is fully removed via revoke_and_ack (this ensures we'll remember to resolve the htlc off-chain even if we lose the holding cell).
1 parent 6c6ebc2 commit 064f836

7 files changed

Lines changed: 605 additions & 58 deletions

File tree

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 444 additions & 16 deletions
Large diffs are not rendered by default.

lightning/src/ln/channelmanager.rs

Lines changed: 94 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3718,14 +3718,23 @@ impl TrustedChannelFeatures {
37183718
struct ClaimCompletionActionParams {
37193719
definitely_duplicate: bool,
37203720
inbound_htlc_value_msat: Option<u64>,
3721+
inbound_edge_closed: bool,
37213722
}
37223723

37233724
impl ClaimCompletionActionParams {
37243725
fn new_claim(inbound_htlc_value_msat: u64) -> Self {
3725-
Self { definitely_duplicate: false, inbound_htlc_value_msat: Some(inbound_htlc_value_msat) }
3726+
Self {
3727+
definitely_duplicate: false,
3728+
inbound_htlc_value_msat: Some(inbound_htlc_value_msat),
3729+
inbound_edge_closed: false,
3730+
}
37263731
}
37273732
fn duplicate_claim() -> Self {
3728-
Self { definitely_duplicate: true, inbound_htlc_value_msat: None }
3733+
Self {
3734+
definitely_duplicate: true,
3735+
inbound_htlc_value_msat: None,
3736+
inbound_edge_closed: false,
3737+
}
37293738
}
37303739
}
37313740

@@ -9821,16 +9830,56 @@ impl<
98219830
monitor_event_id
98229831
.map(|event_id| MonitorEventSource { event_id, channel_id: next_channel_id }),
98239832
|claim_completion_action_params| {
9824-
let ClaimCompletionActionParams { definitely_duplicate, inbound_htlc_value_msat } =
9825-
claim_completion_action_params;
9833+
let ClaimCompletionActionParams {
9834+
definitely_duplicate,
9835+
inbound_htlc_value_msat,
9836+
inbound_edge_closed,
9837+
} = claim_completion_action_params;
98269838
let chan_to_release = EventUnblockedChannel {
98279839
counterparty_node_id: next_channel_counterparty_node_id,
98289840
funding_txo: next_channel_outpoint,
98299841
channel_id: next_channel_id,
98309842
blocking_action: completed_blocker,
98319843
};
98329844

9833-
if definitely_duplicate && startup_replay {
9845+
if self.persistent_monitor_events {
9846+
let monitor_event_source = monitor_event_id.map(|event_id| {
9847+
MonitorEventSource { event_id, channel_id: next_channel_id }
9848+
});
9849+
// If persistent_monitor_events is enabled, then we'll get a MonitorEvent for this HTLC
9850+
// claim re-provided to us until we explicitly ack it.
9851+
// * If the inbound edge is closed, then we can ack it when we know the preimage is
9852+
// durably persisted there + the user has processed a `PaymentForwarded` event
9853+
// * If the inbound edge is open, then we'll ack the monitor event when HTLC has been
9854+
// irrevocably removed via revoke_and_ack. This prevents forgetting to claim the HTLC
9855+
// backwards if we lose the off-chain HTLC from the holding cell after a restart.
9856+
if definitely_duplicate {
9857+
if inbound_edge_closed {
9858+
if let Some(id) = monitor_event_source {
9859+
self.chain_monitor.ack_monitor_event(id);
9860+
}
9861+
}
9862+
(None, None)
9863+
} else if let Some(event) =
9864+
make_payment_forwarded_event(inbound_htlc_value_msat)
9865+
{
9866+
let preimage_update_action =
9867+
MonitorUpdateCompletionAction::EmitForwardEvent {
9868+
event,
9869+
post_event_ackable_monitor_event: inbound_edge_closed
9870+
.then_some(monitor_event_source)
9871+
.flatten(),
9872+
};
9873+
(Some(preimage_update_action), None)
9874+
} else if inbound_edge_closed {
9875+
let preimage_update_action = monitor_event_source.map(|src| {
9876+
MonitorUpdateCompletionAction::AckMonitorEvents { event_ids: vec![src] }
9877+
});
9878+
(preimage_update_action, None)
9879+
} else {
9880+
(None, None)
9881+
}
9882+
} else if definitely_duplicate && startup_replay {
98349883
// On startup we may get redundant claims which are related to
98359884
// monitor updates still in flight. In that case, we shouldn't
98369885
// immediately free, but instead let that monitor update complete
@@ -10163,6 +10212,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1016310212
let (action_opt, raa_blocker_opt) = completion_action(ClaimCompletionActionParams {
1016410213
definitely_duplicate: false,
1016510214
inbound_htlc_value_msat: None,
10215+
inbound_edge_closed: true,
1016610216
});
1016710217

1016810218
if let Some(raa_blocker) = raa_blocker_opt {
@@ -12866,16 +12916,25 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1286612916
chan.update_fulfill_htlc(&msg),
1286712917
chan_entry
1286812918
);
12869-
let logger = WithChannelContext::from(&self.logger, &chan.context, None);
12870-
for prev_hop in res.0.previous_hop_data() {
12871-
log_trace!(logger,
12919+
// If persistent_monitor_events is enabled, we don't need to block preimage-removing
12920+
// monitor updates because we'll get the preimage from monitor events (that are
12921+
// guaranteed to be re-provided until they are explicitly acked) rather than from
12922+
// polling the monitor's internal state.
12923+
if !self.persistent_monitor_events {
12924+
let logger =
12925+
WithChannelContext::from(&self.logger, &chan.context, None);
12926+
for prev_hop in res.0.previous_hop_data() {
12927+
log_trace!(logger,
1287212928
"Holding the next revoke_and_ack until the preimage is durably persisted in the inbound edge's ChannelMonitor",
1287312929
);
12874-
peer_state
12875-
.actions_blocking_raa_monitor_updates
12876-
.entry(msg.channel_id)
12877-
.or_insert_with(Vec::new)
12878-
.push(RAAMonitorUpdateBlockingAction::from_prev_hop_data(prev_hop));
12930+
peer_state
12931+
.actions_blocking_raa_monitor_updates
12932+
.entry(msg.channel_id)
12933+
.or_insert_with(Vec::new)
12934+
.push(RAAMonitorUpdateBlockingAction::from_prev_hop_data(
12935+
prev_hop,
12936+
));
12937+
}
1287912938
}
1288012939

1288112940
// Note that we do not need to push an `actions_blocking_raa_monitor_updates`
@@ -13901,29 +13960,22 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1390113960
.channel_by_id
1390213961
.contains_key(&channel_id)
1390313962
});
13904-
let we_are_sender =
13905-
matches!(htlc_update.source, HTLCSource::OutboundRoute { .. });
13906-
if from_onchain | we_are_sender {
13907-
// Claim the funds from the previous hop, if there is one. In the future we can
13908-
// store attribution data in the `ChannelMonitor` and provide it here.
13909-
self.claim_funds_internal(
13910-
htlc_update.source,
13911-
preimage,
13912-
htlc_update.htlc_value_msat,
13913-
None,
13914-
from_onchain,
13915-
counterparty_node_id,
13916-
funding_outpoint,
13917-
channel_id,
13918-
htlc_update.user_channel_id,
13919-
None,
13920-
None,
13921-
Some(event_id),
13922-
);
13923-
}
13924-
if !we_are_sender {
13925-
self.chain_monitor.ack_monitor_event(monitor_event_source);
13926-
}
13963+
// Claim the funds from the previous hop, if there is one. In the future we can
13964+
// store attribution data in the `ChannelMonitor` and provide it here.
13965+
self.claim_funds_internal(
13966+
htlc_update.source,
13967+
preimage,
13968+
htlc_update.htlc_value_msat,
13969+
None,
13970+
from_onchain,
13971+
counterparty_node_id,
13972+
funding_outpoint,
13973+
channel_id,
13974+
htlc_update.user_channel_id,
13975+
None,
13976+
None,
13977+
Some(event_id),
13978+
);
1392713979
} else {
1392813980
log_trace!(logger, "Failing HTLC from our monitor");
1392913981
let failure_reason = LocalHTLCFailureReason::OnChainTimeout;
@@ -20865,6 +20917,12 @@ impl<
2086520917
downstream_user_channel_id,
2086620918
) in pending_claims_to_replay
2086720919
{
20920+
// If persistent_monitor_events is enabled, we don't need to explicitly reclaim HTLCs on
20921+
// startup because we can just wait for the relevant MonitorEvents to be re-provided to us
20922+
// during runtime.
20923+
if channel_manager.persistent_monitor_events {
20924+
continue;
20925+
}
2086820926
// We use `downstream_closed` in place of `from_onchain` here just as a guess - we
2086920927
// don't remember in the `ChannelMonitor` where we got a preimage from, but if the
2087020928
// channel is closed we just assume that it probably came from an on-chain claim.

lightning/src/ln/functional_test_utils.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3169,7 +3169,7 @@ pub fn expect_payment_forwarded<CM: AChannelManager, H: NodeHolder<CM = CM>>(
31693169
macro_rules! expect_payment_forwarded {
31703170
($node: expr, $prev_node: expr, $next_node: expr, $expected_fee: expr, $upstream_force_closed: expr, $downstream_force_closed: expr) => {
31713171
let mut events = $node.node.get_and_clear_pending_events();
3172-
assert_eq!(events.len(), 1);
3172+
assert_eq!(events.len(), 1, "{events:?}");
31733173
$crate::ln::functional_test_utils::expect_payment_forwarded(
31743174
events.pop().unwrap(),
31753175
&$node,
@@ -5676,7 +5676,13 @@ pub fn reconnect_nodes<'a, 'b, 'c, 'd>(args: ReconnectArgs<'a, 'b, 'c, 'd>) {
56765676
node_a.node.handle_revoke_and_ack(node_b_id, &bs_revoke_and_ack);
56775677
check_added_monitors(
56785678
&node_a,
5679-
if pending_responding_commitment_signed_dup_monitor.1 { 0 } else { 1 },
5679+
if pending_responding_commitment_signed_dup_monitor.1
5680+
&& !node_a.node.test_persistent_monitor_events_enabled()
5681+
{
5682+
0
5683+
} else {
5684+
1
5685+
},
56805686
);
56815687
if !allow_post_commitment_dance_msgs.1 {
56825688
assert!(node_a.node.get_and_clear_pending_msg_events().is_empty());

lightning/src/ln/functional_tests.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7916,7 +7916,15 @@ fn do_test_onchain_htlc_settlement_after_close(
79167916
_ => panic!("Unexpected event"),
79177917
};
79187918
nodes[1].node.handle_revoke_and_ack(node_c_id, &carol_revocation);
7919-
check_added_monitors(&nodes[1], 1);
7919+
if nodes[1].node.test_persistent_monitor_events_enabled() {
7920+
if broadcast_alice && !go_onchain_before_fulfill {
7921+
check_added_monitors(&nodes[1], 1);
7922+
} else {
7923+
check_added_monitors(&nodes[1], 2);
7924+
}
7925+
} else {
7926+
check_added_monitors(&nodes[1], 1);
7927+
}
79207928

79217929
// If this test requires the force-closed channel to not be on-chain until after the fulfill,
79227930
// here's where we put said channel's commitment tx on-chain.
@@ -7950,6 +7958,13 @@ fn do_test_onchain_htlc_settlement_after_close(
79507958
check_spends!(bob_txn[0], chan_ab.3);
79517959
}
79527960
}
7961+
if nodes[1].node.test_persistent_monitor_events_enabled() {
7962+
if !broadcast_alice || go_onchain_before_fulfill {
7963+
// In some cases we'll replay the claim via a MonitorEvent and be unable to detect that it's
7964+
// a duplicate since the inbound edge is on-chain.
7965+
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], fee, went_onchain, false);
7966+
}
7967+
}
79537968

79547969
// Step (6):
79557970
// Finally, check that Bob broadcasted a preimage-claiming transaction for the HTLC output on the

lightning/src/ln/payment_tests.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -962,8 +962,33 @@ fn do_retry_with_no_persist(confirm_before_reload: bool) {
962962
let fulfill_msg = htlc_fulfill.update_fulfill_htlcs.remove(0);
963963
nodes[1].node.handle_update_fulfill_htlc(node_c_id, fulfill_msg);
964964
check_added_monitors(&nodes[1], 1);
965-
do_commitment_signed_dance(&nodes[1], &nodes[2], &htlc_fulfill.commitment_signed, false, false);
966-
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], None, true, false);
965+
{
966+
// Drive the commitment signed dance manually so we can account for the extra monitor
967+
// update when persistent monitor events are enabled.
968+
let persistent = nodes[1].node.test_persistent_monitor_events_enabled();
969+
nodes[1].node.handle_commitment_signed_batch_test(node_c_id, &htlc_fulfill.commitment_signed);
970+
check_added_monitors(&nodes[1], 1);
971+
let (extra_msg, cs_raa, htlcs) =
972+
do_main_commitment_signed_dance(&nodes[1], &nodes[2], false);
973+
assert!(htlcs.is_empty());
974+
assert!(extra_msg.is_none());
975+
// nodes[1] handles nodes[2]'s RAA. When persistent monitor events are enabled, this
976+
// triggers the re-provided outbound monitor event, generating an extra preimage update
977+
// on the (closed) inbound channel.
978+
nodes[1].node.handle_revoke_and_ack(node_c_id, &cs_raa);
979+
check_added_monitors(&nodes[1], if persistent { 2 } else { 1 });
980+
}
981+
if nodes[1].node.test_persistent_monitor_events_enabled() {
982+
// The re-provided monitor event generates a duplicate PaymentForwarded against the
983+
// closed inbound channel.
984+
let events = nodes[1].node.get_and_clear_pending_events();
985+
assert_eq!(events.len(), 2);
986+
for event in events {
987+
assert!(matches!(event, Event::PaymentForwarded { .. }));
988+
}
989+
} else {
990+
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], None, true, false);
991+
}
967992

968993
if confirm_before_reload {
969994
let best_block = nodes[0].blocks.lock().unwrap().last().unwrap().clone();

lightning/src/ln/reload_tests.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1972,6 +1972,13 @@ fn test_reload_node_with_preimage_in_monitor_claims_htlc() {
19721972
Some(true)
19731973
);
19741974

1975+
if nodes[1].node.test_persistent_monitor_events_enabled() {
1976+
// Polling messages causes us to re-release the unacked HTLC claim monitor event, which
1977+
// regenerates a preimage monitor update and forward event below.
1978+
let msgs = nodes[1].node.get_and_clear_pending_msg_events();
1979+
assert!(msgs.is_empty());
1980+
}
1981+
19751982
// When the claim is reconstructed during reload, a PaymentForwarded event is generated.
19761983
// Fetching events triggers the pending monitor update (adding preimage) to be applied.
19771984
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false);

lightning/src/util/test_utils.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,9 @@ pub struct TestChainMonitor<'a> {
523523
pub pause_flush: AtomicBool,
524524
/// Buffer of the last 20 monitor updates, most recent first.
525525
pub recent_monitor_updates: Mutex<Vec<(ChannelId, ChannelMonitorUpdate)>>,
526+
/// When set to `true`, `release_pending_monitor_events` sorts events by `ChannelId` to
527+
/// ensure deterministic processing order regardless of HashMap iteration order.
528+
pub deterministic_mon_events_order: AtomicBool,
526529
}
527530
impl<'a> TestChainMonitor<'a> {
528531
pub fn new(
@@ -584,6 +587,7 @@ impl<'a> TestChainMonitor<'a> {
584587
write_blocker: Mutex::new(None),
585588
pause_flush: AtomicBool::new(false),
586589
recent_monitor_updates: Mutex::new(Vec::new()),
590+
deterministic_mon_events_order: AtomicBool::new(false),
587591
}
588592
}
589593

@@ -745,7 +749,11 @@ impl<'a> chain::Watch<TestChannelSigner> for TestChainMonitor<'a> {
745749
let count = self.chain_monitor.pending_operation_count();
746750
self.chain_monitor.flush(count, &self.logger);
747751
}
748-
return self.chain_monitor.release_pending_monitor_events();
752+
let mut events = self.chain_monitor.release_pending_monitor_events();
753+
if self.deterministic_mon_events_order.load(Ordering::Acquire) {
754+
events.sort_by_key(|(_, channel_id, _, _)| *channel_id);
755+
}
756+
events
749757
}
750758

751759
fn ack_monitor_event(&self, source: MonitorEventSource) {

0 commit comments

Comments
 (0)