Skip to content

Commit d049f01

Browse files
Persistent monitor events for HTLC forward claims
Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Building on the work of prior commits, here we stop immediately acking monitor events for HTLC claims for forwarded HTLCs, and instead ack the monitor event when the forward is fully resolved and we don't want the monitor event to be re-provided back to us on startup. If the inbound edge channel is on-chain, we'll ack the event when the inbound edge preimage monitor update completes and the user processes a PaymentForwarded event. If the inbound edge is open, we'll ack the event when the inbound HTLC is fully removed via revoke_and_ack (this ensures we'll remember to resolve the htlc off-chain even if we lose the holding cell).
1 parent 8af4e22 commit d049f01

7 files changed

Lines changed: 605 additions & 58 deletions

File tree

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 444 additions & 16 deletions
Large diffs are not rendered by default.

lightning/src/ln/channelmanager.rs

Lines changed: 94 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3712,14 +3712,23 @@ impl TrustedChannelFeatures {
37123712
struct ClaimCompletionActionParams {
37133713
definitely_duplicate: bool,
37143714
inbound_htlc_value_msat: Option<u64>,
3715+
inbound_edge_closed: bool,
37153716
}
37163717

37173718
impl ClaimCompletionActionParams {
37183719
fn new_claim(inbound_htlc_value_msat: u64) -> Self {
3719-
Self { definitely_duplicate: false, inbound_htlc_value_msat: Some(inbound_htlc_value_msat) }
3720+
Self {
3721+
definitely_duplicate: false,
3722+
inbound_htlc_value_msat: Some(inbound_htlc_value_msat),
3723+
inbound_edge_closed: false,
3724+
}
37203725
}
37213726
fn duplicate_claim() -> Self {
3722-
Self { definitely_duplicate: true, inbound_htlc_value_msat: None }
3727+
Self {
3728+
definitely_duplicate: true,
3729+
inbound_htlc_value_msat: None,
3730+
inbound_edge_closed: false,
3731+
}
37233732
}
37243733
}
37253734

@@ -9815,16 +9824,56 @@ impl<
98159824
monitor_event_id
98169825
.map(|event_id| MonitorEventSource { event_id, channel_id: next_channel_id }),
98179826
|claim_completion_action_params| {
9818-
let ClaimCompletionActionParams { definitely_duplicate, inbound_htlc_value_msat } =
9819-
claim_completion_action_params;
9827+
let ClaimCompletionActionParams {
9828+
definitely_duplicate,
9829+
inbound_htlc_value_msat,
9830+
inbound_edge_closed,
9831+
} = claim_completion_action_params;
98209832
let chan_to_release = EventUnblockedChannel {
98219833
counterparty_node_id: next_channel_counterparty_node_id,
98229834
funding_txo: next_channel_outpoint,
98239835
channel_id: next_channel_id,
98249836
blocking_action: completed_blocker,
98259837
};
98269838

9827-
if definitely_duplicate && startup_replay {
9839+
if self.persistent_monitor_events {
9840+
let monitor_event_source = monitor_event_id.map(|event_id| {
9841+
MonitorEventSource { event_id, channel_id: next_channel_id }
9842+
});
9843+
// If persistent_monitor_events is enabled, then we'll get a MonitorEvent for this HTLC
9844+
// claim re-provided to us until we explicitly ack it.
9845+
// * If the inbound edge is closed, then we can ack it when we know the preimage is
9846+
// durably persisted there + the user has processed a `PaymentForwarded` event
9847+
// * If the inbound edge is open, then we'll ack the monitor event when HTLC has been
9848+
// irrevocably removed via revoke_and_ack. This prevents forgetting to claim the HTLC
9849+
// backwards if we lose the off-chain HTLC from the holding cell after a restart.
9850+
if definitely_duplicate {
9851+
if inbound_edge_closed {
9852+
if let Some(id) = monitor_event_source {
9853+
self.chain_monitor.ack_monitor_event(id);
9854+
}
9855+
}
9856+
(None, None)
9857+
} else if let Some(event) =
9858+
make_payment_forwarded_event(inbound_htlc_value_msat)
9859+
{
9860+
let preimage_update_action =
9861+
MonitorUpdateCompletionAction::EmitForwardEvent {
9862+
event,
9863+
post_event_ackable_monitor_event: inbound_edge_closed
9864+
.then_some(monitor_event_source)
9865+
.flatten(),
9866+
};
9867+
(Some(preimage_update_action), None)
9868+
} else if inbound_edge_closed {
9869+
let preimage_update_action = monitor_event_source.map(|src| {
9870+
MonitorUpdateCompletionAction::AckMonitorEvents { event_ids: vec![src] }
9871+
});
9872+
(preimage_update_action, None)
9873+
} else {
9874+
(None, None)
9875+
}
9876+
} else if definitely_duplicate && startup_replay {
98289877
// On startup we may get redundant claims which are related to
98299878
// monitor updates still in flight. In that case, we shouldn't
98309879
// immediately free, but instead let that monitor update complete
@@ -10157,6 +10206,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1015710206
let (action_opt, raa_blocker_opt) = completion_action(ClaimCompletionActionParams {
1015810207
definitely_duplicate: false,
1015910208
inbound_htlc_value_msat: None,
10209+
inbound_edge_closed: true,
1016010210
});
1016110211

1016210212
if let Some(raa_blocker) = raa_blocker_opt {
@@ -12860,16 +12910,25 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1286012910
chan.update_fulfill_htlc(&msg),
1286112911
chan_entry
1286212912
);
12863-
let logger = WithChannelContext::from(&self.logger, &chan.context, None);
12864-
for prev_hop in res.0.previous_hop_data() {
12865-
log_trace!(logger,
12913+
// If persistent_monitor_events is enabled, we don't need to block preimage-removing
12914+
// monitor updates because we'll get the preimage from monitor events (that are
12915+
// guaranteed to be re-provided until they are explicitly acked) rather than from
12916+
// polling the monitor's internal state.
12917+
if !self.persistent_monitor_events {
12918+
let logger =
12919+
WithChannelContext::from(&self.logger, &chan.context, None);
12920+
for prev_hop in res.0.previous_hop_data() {
12921+
log_trace!(logger,
1286612922
"Holding the next revoke_and_ack until the preimage is durably persisted in the inbound edge's ChannelMonitor",
1286712923
);
12868-
peer_state
12869-
.actions_blocking_raa_monitor_updates
12870-
.entry(msg.channel_id)
12871-
.or_insert_with(Vec::new)
12872-
.push(RAAMonitorUpdateBlockingAction::from_prev_hop_data(prev_hop));
12924+
peer_state
12925+
.actions_blocking_raa_monitor_updates
12926+
.entry(msg.channel_id)
12927+
.or_insert_with(Vec::new)
12928+
.push(RAAMonitorUpdateBlockingAction::from_prev_hop_data(
12929+
prev_hop,
12930+
));
12931+
}
1287312932
}
1287412933

1287512934
// Note that we do not need to push an `actions_blocking_raa_monitor_updates`
@@ -13895,29 +13954,22 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1389513954
.channel_by_id
1389613955
.contains_key(&channel_id)
1389713956
});
13898-
let we_are_sender =
13899-
matches!(htlc_update.source, HTLCSource::OutboundRoute { .. });
13900-
if from_onchain | we_are_sender {
13901-
// Claim the funds from the previous hop, if there is one. In the future we can
13902-
// store attribution data in the `ChannelMonitor` and provide it here.
13903-
self.claim_funds_internal(
13904-
htlc_update.source,
13905-
preimage,
13906-
htlc_update.htlc_value_msat,
13907-
None,
13908-
from_onchain,
13909-
counterparty_node_id,
13910-
funding_outpoint,
13911-
channel_id,
13912-
htlc_update.user_channel_id,
13913-
None,
13914-
None,
13915-
Some(event_id),
13916-
);
13917-
}
13918-
if !we_are_sender {
13919-
self.chain_monitor.ack_monitor_event(monitor_event_source);
13920-
}
13957+
// Claim the funds from the previous hop, if there is one. In the future we can
13958+
// store attribution data in the `ChannelMonitor` and provide it here.
13959+
self.claim_funds_internal(
13960+
htlc_update.source,
13961+
preimage,
13962+
htlc_update.htlc_value_msat,
13963+
None,
13964+
from_onchain,
13965+
counterparty_node_id,
13966+
funding_outpoint,
13967+
channel_id,
13968+
htlc_update.user_channel_id,
13969+
None,
13970+
None,
13971+
Some(event_id),
13972+
);
1392113973
} else {
1392213974
log_trace!(logger, "Failing HTLC from our monitor");
1392313975
let failure_reason = LocalHTLCFailureReason::OnChainTimeout;
@@ -20859,6 +20911,12 @@ impl<
2085920911
downstream_user_channel_id,
2086020912
) in pending_claims_to_replay
2086120913
{
20914+
// If persistent_monitor_events is enabled, we don't need to explicitly reclaim HTLCs on
20915+
// startup because we can just wait for the relevant MonitorEvents to be re-provided to us
20916+
// during runtime.
20917+
if channel_manager.persistent_monitor_events {
20918+
continue;
20919+
}
2086220920
// We use `downstream_closed` in place of `from_onchain` here just as a guess - we
2086320921
// don't remember in the `ChannelMonitor` where we got a preimage from, but if the
2086420922
// channel is closed we just assume that it probably came from an on-chain claim.

lightning/src/ln/functional_test_utils.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3169,7 +3169,7 @@ pub fn expect_payment_forwarded<CM: AChannelManager, H: NodeHolder<CM = CM>>(
31693169
macro_rules! expect_payment_forwarded {
31703170
($node: expr, $prev_node: expr, $next_node: expr, $expected_fee: expr, $upstream_force_closed: expr, $downstream_force_closed: expr) => {
31713171
let mut events = $node.node.get_and_clear_pending_events();
3172-
assert_eq!(events.len(), 1);
3172+
assert_eq!(events.len(), 1, "{events:?}");
31733173
$crate::ln::functional_test_utils::expect_payment_forwarded(
31743174
events.pop().unwrap(),
31753175
&$node,
@@ -5676,7 +5676,13 @@ pub fn reconnect_nodes<'a, 'b, 'c, 'd>(args: ReconnectArgs<'a, 'b, 'c, 'd>) {
56765676
node_a.node.handle_revoke_and_ack(node_b_id, &bs_revoke_and_ack);
56775677
check_added_monitors(
56785678
&node_a,
5679-
if pending_responding_commitment_signed_dup_monitor.1 { 0 } else { 1 },
5679+
if pending_responding_commitment_signed_dup_monitor.1
5680+
&& !node_a.node.test_persistent_monitor_events_enabled()
5681+
{
5682+
0
5683+
} else {
5684+
1
5685+
},
56805686
);
56815687
if !allow_post_commitment_dance_msgs.1 {
56825688
assert!(node_a.node.get_and_clear_pending_msg_events().is_empty());

lightning/src/ln/functional_tests.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7916,7 +7916,15 @@ fn do_test_onchain_htlc_settlement_after_close(
79167916
_ => panic!("Unexpected event"),
79177917
};
79187918
nodes[1].node.handle_revoke_and_ack(node_c_id, &carol_revocation);
7919-
check_added_monitors(&nodes[1], 1);
7919+
if nodes[1].node.test_persistent_monitor_events_enabled() {
7920+
if broadcast_alice && !go_onchain_before_fulfill {
7921+
check_added_monitors(&nodes[1], 1);
7922+
} else {
7923+
check_added_monitors(&nodes[1], 2);
7924+
}
7925+
} else {
7926+
check_added_monitors(&nodes[1], 1);
7927+
}
79207928

79217929
// If this test requires the force-closed channel to not be on-chain until after the fulfill,
79227930
// here's where we put said channel's commitment tx on-chain.
@@ -7950,6 +7958,13 @@ fn do_test_onchain_htlc_settlement_after_close(
79507958
check_spends!(bob_txn[0], chan_ab.3);
79517959
}
79527960
}
7961+
if nodes[1].node.test_persistent_monitor_events_enabled() {
7962+
if !broadcast_alice || go_onchain_before_fulfill {
7963+
// In some cases we'll replay the claim via a MonitorEvent and be unable to detect that it's
7964+
// a duplicate since the inbound edge is on-chain.
7965+
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], fee, went_onchain, false);
7966+
}
7967+
}
79537968

79547969
// Step (6):
79557970
// Finally, check that Bob broadcasted a preimage-claiming transaction for the HTLC output on the

lightning/src/ln/payment_tests.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -962,8 +962,33 @@ fn do_retry_with_no_persist(confirm_before_reload: bool) {
962962
let fulfill_msg = htlc_fulfill.update_fulfill_htlcs.remove(0);
963963
nodes[1].node.handle_update_fulfill_htlc(node_c_id, fulfill_msg);
964964
check_added_monitors(&nodes[1], 1);
965-
do_commitment_signed_dance(&nodes[1], &nodes[2], &htlc_fulfill.commitment_signed, false, false);
966-
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], None, true, false);
965+
{
966+
// Drive the commitment signed dance manually so we can account for the extra monitor
967+
// update when persistent monitor events are enabled.
968+
let persistent = nodes[1].node.test_persistent_monitor_events_enabled();
969+
nodes[1].node.handle_commitment_signed_batch_test(node_c_id, &htlc_fulfill.commitment_signed);
970+
check_added_monitors(&nodes[1], 1);
971+
let (extra_msg, cs_raa, htlcs) =
972+
do_main_commitment_signed_dance(&nodes[1], &nodes[2], false);
973+
assert!(htlcs.is_empty());
974+
assert!(extra_msg.is_none());
975+
// nodes[1] handles nodes[2]'s RAA. When persistent monitor events are enabled, this
976+
// triggers the re-provided outbound monitor event, generating an extra preimage update
977+
// on the (closed) inbound channel.
978+
nodes[1].node.handle_revoke_and_ack(node_c_id, &cs_raa);
979+
check_added_monitors(&nodes[1], if persistent { 2 } else { 1 });
980+
}
981+
if nodes[1].node.test_persistent_monitor_events_enabled() {
982+
// The re-provided monitor event generates a duplicate PaymentForwarded against the
983+
// closed inbound channel.
984+
let events = nodes[1].node.get_and_clear_pending_events();
985+
assert_eq!(events.len(), 2);
986+
for event in events {
987+
assert!(matches!(event, Event::PaymentForwarded { .. }));
988+
}
989+
} else {
990+
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], None, true, false);
991+
}
967992

968993
if confirm_before_reload {
969994
let best_block = nodes[0].blocks.lock().unwrap().last().unwrap().clone();

lightning/src/ln/reload_tests.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1972,6 +1972,13 @@ fn test_reload_node_with_preimage_in_monitor_claims_htlc() {
19721972
Some(true)
19731973
);
19741974

1975+
if nodes[1].node.test_persistent_monitor_events_enabled() {
1976+
// Polling messages causes us to re-release the unacked HTLC claim monitor event, which
1977+
// regenerates a preimage monitor update and forward event below.
1978+
let msgs = nodes[1].node.get_and_clear_pending_msg_events();
1979+
assert!(msgs.is_empty());
1980+
}
1981+
19751982
// When the claim is reconstructed during reload, a PaymentForwarded event is generated.
19761983
// Fetching events triggers the pending monitor update (adding preimage) to be applied.
19771984
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false);

lightning/src/util/test_utils.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,9 @@ pub struct TestChainMonitor<'a> {
523523
pub pause_flush: AtomicBool,
524524
/// Buffer of the last 20 monitor updates, most recent first.
525525
pub recent_monitor_updates: Mutex<Vec<(ChannelId, ChannelMonitorUpdate)>>,
526+
/// When set to `true`, `release_pending_monitor_events` sorts events by `ChannelId` to
527+
/// ensure deterministic processing order regardless of HashMap iteration order.
528+
pub deterministic_mon_events_order: AtomicBool,
526529
}
527530
impl<'a> TestChainMonitor<'a> {
528531
pub fn new(
@@ -584,6 +587,7 @@ impl<'a> TestChainMonitor<'a> {
584587
write_blocker: Mutex::new(None),
585588
pause_flush: AtomicBool::new(false),
586589
recent_monitor_updates: Mutex::new(Vec::new()),
590+
deterministic_mon_events_order: AtomicBool::new(false),
587591
}
588592
}
589593

@@ -745,7 +749,11 @@ impl<'a> chain::Watch<TestChannelSigner> for TestChainMonitor<'a> {
745749
let count = self.chain_monitor.pending_operation_count();
746750
self.chain_monitor.flush(count, &self.logger);
747751
}
748-
return self.chain_monitor.release_pending_monitor_events();
752+
let mut events = self.chain_monitor.release_pending_monitor_events();
753+
if self.deterministic_mon_events_order.load(Ordering::Acquire) {
754+
events.sort_by_key(|(_, channel_id, _, _)| *channel_id);
755+
}
756+
events
749757
}
750758

751759
fn ack_monitor_event(&self, source: MonitorEventSource) {

0 commit comments

Comments
 (0)