Skip to content

Commit 91ff7a5

Browse files
committed
f - move error check and soft delete channels
1 parent 2cdcbdf commit 91ff7a5

1 file changed

Lines changed: 189 additions & 57 deletions

File tree

lightning/src/ln/resource_manager.rs

Lines changed: 189 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl Default for ResourceManagerConfig {
102102

103103
impl ResourceManagerConfig {
104104
fn validate(&self) -> Result<(), ()> {
105-
if self.general_allocation_pct + self.congestion_allocation_pct >= 100 {
105+
if self.general_allocation_pct as u16 + self.congestion_allocation_pct as u16 >= 100 {
106106
return Err(());
107107
}
108108
if self.resolution_period == Duration::ZERO {
@@ -140,7 +140,7 @@ impl Display for ForwardingOutcome {
140140
}
141141
}
142142

143-
#[derive(Clone, PartialEq, Eq, Debug)]
143+
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
144144
enum BucketAssigned {
145145
General,
146146
Congestion,
@@ -452,6 +452,11 @@ struct Channel {
452452
/// Tracks which channels have misused the congestion bucket and the unix timestamp.
453453
last_congestion_misuse: HashMap<u64, u64>,
454454
protected_bucket: BucketResources,
455+
456+
/// Whether this channel has been closed. We keep channels around until all its
457+
/// [`Self::pending_htlcs`] have been resolved. This ensures that HTLC resolutions that
458+
/// arrive after a force-close can still update reputation and revenue correctly.
459+
closed: bool,
455460
}
456461

457462
impl Channel {
@@ -483,6 +488,7 @@ impl Channel {
483488
bucket_allocations.protected_slots,
484489
bucket_allocations.protected_liquidity,
485490
),
491+
closed: false,
486492
})
487493
}
488494

@@ -541,9 +547,8 @@ impl Channel {
541547

542548
fn pending_htlcs_in_congestion(&self) -> bool {
543549
self.pending_htlcs
544-
.iter()
545-
.find(|(_, pending_htlc)| pending_htlc.bucket == BucketAssigned::Congestion)
546-
.is_some()
550+
.values()
551+
.any(|pending_htlc| pending_htlc.bucket == BucketAssigned::Congestion)
547552
}
548553

549554
fn sufficient_reputation(
@@ -567,6 +572,16 @@ impl Channel {
567572
}
568573
}
569574

575+
fn has_incoming_htlc_references(channels: &HashMap<u64, Channel>, channel_id: u64) -> bool {
576+
channels.iter().any(|channel| {
577+
channel
578+
.1
579+
.pending_htlcs
580+
.iter()
581+
.any(|pending_htlc| pending_htlc.0.incoming_channel_id == channel_id)
582+
})
583+
}
584+
570585
/// An implementation for managing channel resources and informing HTLC forwarding decisions. It
571586
/// implements the core of the mitigation as proposed in <https://github.com/lightning/bolts/pull/1280>.
572587
pub struct DefaultResourceManager {
@@ -622,9 +637,7 @@ impl DefaultResourceManager {
622637
}
623638
}
624639
}
625-
}
626640

627-
impl DefaultResourceManager {
628641
/// Registers a new channel with the resource manager for tracking.
629642
///
630643
/// This should be called when a channel becomes ready for forwarding
@@ -667,7 +680,7 @@ impl DefaultResourceManager {
667680
entry.insert(channel);
668681
Ok(())
669682
},
670-
Entry::Occupied(_) => Ok(()),
683+
Entry::Occupied(_) => Err(()),
671684
}
672685
}
673686

@@ -677,31 +690,19 @@ impl DefaultResourceManager {
677690
pub fn remove_channel(&self, channel_id: u64) -> Result<(), ()> {
678691
let mut channels_lock = self.channels.lock().unwrap();
679692

680-
// Release bucket resources on each incoming channel for its pending HTLCs.
681-
if let Some(removed_channel) = channels_lock.remove(&channel_id) {
682-
for (htlc_ref, pending_htlc) in &removed_channel.pending_htlcs {
683-
debug_assert!(channels_lock.get(&htlc_ref.incoming_channel_id).is_some());
684-
if let Some(incoming_channel) = channels_lock.get_mut(&htlc_ref.incoming_channel_id)
685-
{
686-
let _ = match pending_htlc.bucket {
687-
BucketAssigned::General => incoming_channel
688-
.general_bucket
689-
.remove_htlc(channel_id, pending_htlc.incoming_amount_msat),
690-
BucketAssigned::Congestion => incoming_channel
691-
.congestion_bucket
692-
.remove_htlc(pending_htlc.incoming_amount_msat),
693-
BucketAssigned::Protected => incoming_channel
694-
.protected_bucket
695-
.remove_htlc(pending_htlc.incoming_amount_msat),
696-
};
697-
}
698-
}
699-
}
693+
let incoming_htlcs_pending = has_incoming_htlc_references(&channels_lock, channel_id);
700694

701-
// Clean up pending HTLC entries and channel slots.
702-
for (_, channel) in channels_lock.iter_mut() {
703-
channel.pending_htlcs.retain(|htlc_ref, _| htlc_ref.incoming_channel_id != channel_id);
704-
channel.general_bucket.remove_channel_slots(channel_id);
695+
// If the channel has pending HTLCs, it is soft-deleted and will be fully removed once
696+
// all its pending HTLCs have been resolved.
697+
let channel = channels_lock.get_mut(&channel_id).ok_or(())?;
698+
if channel.pending_htlcs.is_empty() && !incoming_htlcs_pending {
699+
// If the channel had no pending HTLCs, we can remove it.
700+
channels_lock.remove(&channel_id);
701+
for (_, channel) in channels_lock.iter_mut() {
702+
channel.general_bucket.remove_channel_slots(channel_id);
703+
}
704+
} else {
705+
channel.closed = true;
705706
}
706707
Ok(())
707708
}
@@ -728,7 +729,8 @@ impl DefaultResourceManager {
728729
let mut channels_lock = self.channels.lock().unwrap();
729730

730731
let htlc_ref = HtlcRef { incoming_channel_id, htlc_id };
731-
let outgoing_channel = channels_lock.get_mut(&outgoing_channel_id).ok_or(())?;
732+
let outgoing_channel =
733+
channels_lock.get_mut(&outgoing_channel_id).filter(|c| !c.closed).ok_or(())?;
732734

733735
if outgoing_channel.pending_htlcs.get(&htlc_ref).is_some() {
734736
return Err(());
@@ -741,7 +743,8 @@ impl DefaultResourceManager {
741743
let in_flight_htlc_risk = self.htlc_in_flight_risk(fee, incoming_cltv_expiry, height_added);
742744
let pending_htlcs_in_congestion = outgoing_channel.pending_htlcs_in_congestion();
743745

744-
let incoming_channel = channels_lock.get_mut(&incoming_channel_id).ok_or(())?;
746+
let incoming_channel =
747+
channels_lock.get_mut(&incoming_channel_id).filter(|c| !c.closed).ok_or(())?;
745748

746749
let (accountable, bucket_assigned) = if !incoming_accountable {
747750
if incoming_channel.general_available(
@@ -837,41 +840,42 @@ impl DefaultResourceManager {
837840
resolved_at: u64,
838841
) -> Result<(), ()> {
839842
let mut channels_lock = self.channels.lock().unwrap();
840-
if channels_lock.get(&incoming_channel_id).is_none()
841-
|| channels_lock.get(&outgoing_channel_id).is_none()
842-
{
843-
return Err(());
844-
}
845-
846843
let outgoing_channel = channels_lock.get_mut(&outgoing_channel_id).ok_or(())?;
847844

848845
let htlc_ref = HtlcRef { incoming_channel_id, htlc_id };
849846
let pending_htlc = outgoing_channel.pending_htlcs.remove(&htlc_ref).ok_or(())?;
850847

851-
if resolved_at < pending_htlc.added_at_unix_seconds {
852-
return Err(());
848+
let outgoing_closed = outgoing_channel.closed && outgoing_channel.pending_htlcs.is_empty();
849+
if resolved_at >= pending_htlc.added_at_unix_seconds {
850+
let resolution_time =
851+
Duration::from_secs(resolved_at - pending_htlc.added_at_unix_seconds);
852+
let effective_fee = self.effective_fees(
853+
pending_htlc.fee,
854+
resolution_time,
855+
pending_htlc.outgoing_accountable,
856+
settled,
857+
);
858+
// Note that the decaying averages for reputation and revenue clamp `resolved_at` to
859+
// `last_updated_unix_secs` if it falls behind. This could happen because
860+
// `last_updated_unix_secs` is frequently mutated. We accept the clamping rather
861+
// than erroring, as rejecting out-of-order timestamps would leave resources stuck.
862+
outgoing_channel.outgoing_reputation.add_value(effective_fee, resolved_at);
853863
}
854-
let resolution_time = Duration::from_secs(resolved_at - pending_htlc.added_at_unix_seconds);
855-
let effective_fee = self.effective_fees(
856-
pending_htlc.fee,
857-
resolution_time,
858-
pending_htlc.outgoing_accountable,
859-
settled,
860-
);
861-
// Note that the decaying averages for reputation and revenue clamp `resolved_at` to
862-
// `last_updated_unix_secs` if it falls behind. This could happen because
863-
// `last_updated_unix_secs` is frequently mutated. We accept the clamping rather
864-
// than erroring, as rejecting out-of-order timestamps would leave resources stuck.
865-
outgoing_channel.outgoing_reputation.add_value(effective_fee, resolved_at);
866864

867865
let incoming_channel = channels_lock.get_mut(&incoming_channel_id).ok_or(())?;
866+
let incoming_closed = incoming_channel.closed && incoming_channel.pending_htlcs.is_empty();
868867
match pending_htlc.bucket {
869868
BucketAssigned::General => incoming_channel
870869
.general_bucket
871870
.remove_htlc(outgoing_channel_id, pending_htlc.incoming_amount_msat)?,
872871
BucketAssigned::Congestion => {
873872
// Mark that congestion bucket was misused if it took more than the valid
874-
// resolution period
873+
// resolution period. Here we are bit lenient if resolve_at < added_at.
874+
// This means there is a bug as resolution can't be before added time but
875+
// no reason to mark congestion as misused in this case.
876+
let resolution_time = Duration::from_secs(
877+
resolved_at.saturating_sub(pending_htlc.added_at_unix_seconds),
878+
);
875879
if resolution_time > self.config.resolution_period {
876880
incoming_channel.misused_congestion(outgoing_channel_id, resolved_at);
877881
}
@@ -883,11 +887,33 @@ impl DefaultResourceManager {
883887
},
884888
}
885889

886-
if settled {
890+
if settled && resolved_at >= pending_htlc.added_at_unix_seconds {
887891
let fee: i64 = i64::try_from(pending_htlc.fee).unwrap_or(i64::MAX);
888892
incoming_channel.incoming_revenue.add_value(fee, resolved_at);
889893
}
890894

895+
let mut remove_channel = |closed: bool, channel_id: u64| {
896+
// If the channel had been marked as closed and it does not have any other pending
897+
// HTLCs, it can be fully removed now.
898+
if closed {
899+
if !has_incoming_htlc_references(&channels_lock, channel_id) {
900+
channels_lock.remove(&channel_id);
901+
for (_, channel) in channels_lock.iter_mut() {
902+
channel.general_bucket.remove_channel_slots(channel_id);
903+
}
904+
}
905+
}
906+
};
907+
remove_channel(incoming_closed, incoming_channel_id);
908+
remove_channel(outgoing_closed, outgoing_channel_id);
909+
910+
// This is a bug as resolution time can't be before added time. This prevents us from
911+
// properly updating the reputation and revenue as those need accurate timestamps but
912+
// we error here rather than earlier to optimistically release bucket resources.
913+
if resolved_at < pending_htlc.added_at_unix_seconds {
914+
return Err(());
915+
}
916+
891917
Ok(())
892918
}
893919
}
@@ -1429,7 +1455,7 @@ mod tests {
14291455
let channels = rm.channels.lock().unwrap();
14301456
let htlc_ref = HtlcRef { incoming_channel_id, htlc_id };
14311457
let htlc = channels.get(&outgoing_channel_id).unwrap().pending_htlcs.get(&htlc_ref);
1432-
htlc.map(|htlc| htlc.bucket.clone())
1458+
htlc.map(|htlc| htlc.bucket)
14331459
}
14341460

14351461
fn count_pending_htlcs(rm: &DefaultResourceManager, outgoing_scid: u64) -> usize {
@@ -2294,6 +2320,112 @@ mod tests {
22942320
assert!(get_htlc_bucket(&rm, INCOMING_SCID, htlc_id, OUTGOING_SCID_2).is_none());
22952321
}
22962322

2323+
#[test]
2324+
fn test_remove_channel_no_pending_htlcs() {
2325+
let rm = create_test_resource_manager_with_channels();
2326+
let channels = rm.channels.lock().unwrap();
2327+
assert!(channels.get(&OUTGOING_SCID).is_some());
2328+
drop(channels);
2329+
2330+
rm.remove_channel(OUTGOING_SCID).unwrap();
2331+
2332+
let channels = rm.channels.lock().unwrap();
2333+
assert!(channels.get(&OUTGOING_SCID).is_none());
2334+
}
2335+
2336+
#[test]
2337+
fn test_resolve_htlc_after_channel_close() {
2338+
let entropy_source = TestKeysInterface::new(&[0; 32], Network::Testnet);
2339+
let rm = create_test_resource_manager_with_channel_pairs(2);
2340+
2341+
let added_at = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
2342+
2343+
// HTLC 1: INCOMING_SCID (100) -> OUTGOING_SCID (200)
2344+
assert_eq!(
2345+
add_test_htlc(&rm, false, 1, Some(added_at), &entropy_source).unwrap(),
2346+
ForwardingOutcome::Forward(false),
2347+
);
2348+
2349+
// HTLC 2: INCOMING_SCID_2 (101) -> OUTGOING_SCID_2 (201)
2350+
assert_eq!(
2351+
rm.add_htlc(
2352+
INCOMING_SCID_2,
2353+
HTLC_AMOUNT + FEE_AMOUNT,
2354+
CLTV_EXPIRY,
2355+
OUTGOING_SCID_2,
2356+
HTLC_AMOUNT,
2357+
false,
2358+
1,
2359+
CURRENT_HEIGHT,
2360+
added_at,
2361+
&entropy_source,
2362+
)
2363+
.unwrap(),
2364+
ForwardingOutcome::Forward(false),
2365+
);
2366+
2367+
// Close the outgoing channel for HTLC 1. It has a pending HTLC so it is soft-deleted.
2368+
rm.remove_channel(OUTGOING_SCID).unwrap();
2369+
{
2370+
let channels = rm.channels.lock().unwrap();
2371+
let outgoing = channels.get(&OUTGOING_SCID).unwrap();
2372+
assert!(outgoing.closed);
2373+
assert_eq!(outgoing.pending_htlcs.len(), 1);
2374+
}
2375+
2376+
// Close the incoming channel for HTLC 2. It is referenced as incoming by an HTLC on
2377+
// OUTGOING_SCID_2, so it is soft-deleted.
2378+
rm.remove_channel(INCOMING_SCID_2).unwrap();
2379+
{
2380+
let channels = rm.channels.lock().unwrap();
2381+
let incoming = channels.get(&INCOMING_SCID_2).unwrap();
2382+
assert!(incoming.closed);
2383+
}
2384+
2385+
// New HTLCs should be rejected on closed channels.
2386+
assert!(add_test_htlc(&rm, false, 2, None, &entropy_source).is_err());
2387+
assert!(rm
2388+
.add_htlc(
2389+
INCOMING_SCID_2,
2390+
HTLC_AMOUNT + FEE_AMOUNT,
2391+
CLTV_EXPIRY,
2392+
OUTGOING_SCID_2,
2393+
HTLC_AMOUNT,
2394+
false,
2395+
2,
2396+
CURRENT_HEIGHT,
2397+
added_at,
2398+
&entropy_source,
2399+
)
2400+
.is_err());
2401+
2402+
let resolved_at = added_at + 10;
2403+
2404+
// Resolve HTLC 1. Outgoing channel (200) is closed and now has no pending HTLCs,
2405+
// so it is fully removed. Incoming channel (100) gets revenue updated.
2406+
rm.resolve_htlc(INCOMING_SCID, 1, OUTGOING_SCID, true, resolved_at).unwrap();
2407+
{
2408+
let channels = rm.channels.lock().unwrap();
2409+
assert!(channels.get(&OUTGOING_SCID).is_none());
2410+
let incoming = channels.get(&INCOMING_SCID).unwrap();
2411+
assert_eq!(
2412+
incoming.incoming_revenue.aggregated_decaying_average.value,
2413+
FEE_AMOUNT as i64,
2414+
);
2415+
}
2416+
2417+
// Resolve HTLC 2. Incoming channel (101) is closed and has no pending HTLCs or
2418+
// incoming references remaining, so it is fully removed. Outgoing channel (201)
2419+
// gets reputation updated.
2420+
rm.resolve_htlc(INCOMING_SCID_2, 1, OUTGOING_SCID_2, true, resolved_at).unwrap();
2421+
{
2422+
let channels = rm.channels.lock().unwrap();
2423+
assert!(channels.get(&INCOMING_SCID_2).is_none());
2424+
let outgoing = channels.get(&OUTGOING_SCID_2).unwrap();
2425+
assert_eq!(outgoing.outgoing_reputation.value, FEE_AMOUNT as i64);
2426+
}
2427+
}
2428+
22972429
#[test]
22982430
fn test_decaying_average_bounds() {
22992431
for (start, bound) in [(1000, i64::MAX), (-1000, i64::MIN)] {

0 commit comments

Comments
 (0)