Skip to content

Commit de4120d

Browse files
committed
f - move error check and soft delete channels
1 parent 6b18f56 commit de4120d

1 file changed

Lines changed: 185 additions & 50 deletions

File tree

lightning/src/ln/resource_manager.rs

Lines changed: 185 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl Default for ResourceManagerConfig {
102102

103103
impl ResourceManagerConfig {
104104
fn validate(&self) -> Result<(), ()> {
105-
if self.general_allocation_pct + self.congestion_allocation_pct >= 100 {
105+
if self.general_allocation_pct as u16 + self.congestion_allocation_pct as u16 >= 100 {
106106
return Err(());
107107
}
108108
if self.resolution_period == Duration::ZERO {
@@ -456,6 +456,11 @@ struct Channel {
456456
/// Tracks which channels have misused the congestion bucket and the unix timestamp.
457457
last_congestion_misuse: HashMap<u64, u64>,
458458
protected_bucket: BucketResources,
459+
460+
/// Whether this channel has been closed. We keep channels around until all its
461+
/// [`Self::pending_htlcs`] have been resolved. This ensures that HTLC resolutions that
462+
/// arrive after a force-close can still update reputation and revenue correctly.
463+
closed: bool,
459464
}
460465

461466
impl Channel {
@@ -487,6 +492,7 @@ impl Channel {
487492
bucket_allocations.protected_slots,
488493
bucket_allocations.protected_liquidity,
489494
),
495+
closed: false,
490496
})
491497
}
492498

@@ -575,6 +581,16 @@ impl Channel {
575581
}
576582
}
577583

584+
fn has_incoming_htlc_references(channels: &HashMap<u64, Channel>, channel_id: u64) -> bool {
585+
channels.iter().any(|channel| {
586+
channel
587+
.1
588+
.pending_htlcs
589+
.iter()
590+
.any(|pending_htlc| pending_htlc.0.incoming_channel_id == channel_id)
591+
})
592+
}
593+
578594
/// An implementation for managing channel resources and informing HTLC forwarding decisions. It
579595
/// implements the core of the mitigation as proposed in <https://github.com/lightning/bolts/pull/1280>.
580596
pub struct DefaultResourceManager {
@@ -675,7 +691,7 @@ impl DefaultResourceManager {
675691
entry.insert(channel);
676692
Ok(())
677693
},
678-
Entry::Occupied(_) => Ok(()),
694+
Entry::Occupied(_) => Err(()),
679695
}
680696
}
681697

@@ -685,31 +701,19 @@ impl DefaultResourceManager {
685701
pub fn remove_channel(&self, channel_id: u64) -> Result<(), ()> {
686702
let mut channels_lock = self.channels.lock().unwrap();
687703

688-
// Release bucket resources on each incoming channel for its pending HTLCs.
689-
if let Some(removed_channel) = channels_lock.remove(&channel_id) {
690-
for (htlc_ref, pending_htlc) in &removed_channel.pending_htlcs {
691-
debug_assert!(channels_lock.get(&htlc_ref.incoming_channel_id).is_some());
692-
if let Some(incoming_channel) = channels_lock.get_mut(&htlc_ref.incoming_channel_id)
693-
{
694-
let _ = match pending_htlc.bucket {
695-
BucketAssigned::General => incoming_channel
696-
.general_bucket
697-
.remove_htlc(channel_id, pending_htlc.incoming_amount_msat),
698-
BucketAssigned::Congestion => incoming_channel
699-
.congestion_bucket
700-
.remove_htlc(pending_htlc.incoming_amount_msat),
701-
BucketAssigned::Protected => incoming_channel
702-
.protected_bucket
703-
.remove_htlc(pending_htlc.incoming_amount_msat),
704-
};
705-
}
706-
}
707-
}
704+
let incoming_htlcs_pending = has_incoming_htlc_references(&channels_lock, channel_id);
708705

709-
// Clean up pending HTLC entries and channel slots.
710-
for (_, channel) in channels_lock.iter_mut() {
711-
channel.pending_htlcs.retain(|htlc_ref, _| htlc_ref.incoming_channel_id != channel_id);
712-
channel.general_bucket.remove_channel_slots(channel_id);
706+
// If the channel has pending HTLCs, it is soft-deleted and will be fully removed once
707+
// all its pending HTLCs have been resolved.
708+
let channel = channels_lock.get_mut(&channel_id).ok_or(())?;
709+
if channel.pending_htlcs.is_empty() && !incoming_htlcs_pending {
710+
// If the channel had no pending HTLCs, we can remove it.
711+
channels_lock.remove(&channel_id);
712+
for (_, channel) in channels_lock.iter_mut() {
713+
channel.general_bucket.remove_channel_slots(channel_id);
714+
}
715+
} else {
716+
channel.closed = true;
713717
}
714718
Ok(())
715719
}
@@ -736,7 +740,8 @@ impl DefaultResourceManager {
736740
let mut channels_lock = self.channels.lock().unwrap();
737741

738742
let htlc_ref = HtlcRef { incoming_channel_id, htlc_id };
739-
let outgoing_channel = channels_lock.get_mut(&outgoing_channel_id).ok_or(())?;
743+
let outgoing_channel =
744+
channels_lock.get_mut(&outgoing_channel_id).filter(|c| !c.closed).ok_or(())?;
740745

741746
if outgoing_channel.pending_htlcs.get(&htlc_ref).is_some() {
742747
return Err(());
@@ -749,7 +754,8 @@ impl DefaultResourceManager {
749754
let in_flight_htlc_risk = self.htlc_in_flight_risk(fee, incoming_cltv_expiry, height_added);
750755
let pending_htlcs_in_congestion = outgoing_channel.pending_htlcs_in_congestion();
751756

752-
let incoming_channel = channels_lock.get_mut(&incoming_channel_id).ok_or(())?;
757+
let incoming_channel =
758+
channels_lock.get_mut(&incoming_channel_id).filter(|c| !c.closed).ok_or(())?;
753759

754760
let (accountable, bucket_assigned) = if !incoming_accountable {
755761
if incoming_channel.general_available(
@@ -845,41 +851,42 @@ impl DefaultResourceManager {
845851
resolved_at: u64,
846852
) -> Result<(), ()> {
847853
let mut channels_lock = self.channels.lock().unwrap();
848-
if channels_lock.get(&incoming_channel_id).is_none()
849-
|| channels_lock.get(&outgoing_channel_id).is_none()
850-
{
851-
return Err(());
852-
}
853-
854854
let outgoing_channel = channels_lock.get_mut(&outgoing_channel_id).ok_or(())?;
855855

856856
let htlc_ref = HtlcRef { incoming_channel_id, htlc_id };
857857
let pending_htlc = outgoing_channel.pending_htlcs.remove(&htlc_ref).ok_or(())?;
858858

859-
if resolved_at < pending_htlc.added_at_unix_seconds {
860-
return Err(());
859+
let outgoing_closed = outgoing_channel.closed && outgoing_channel.pending_htlcs.is_empty();
860+
if resolved_at >= pending_htlc.added_at_unix_seconds {
861+
let resolution_time =
862+
Duration::from_secs(resolved_at - pending_htlc.added_at_unix_seconds);
863+
let effective_fee = self.effective_fees(
864+
pending_htlc.fee,
865+
resolution_time,
866+
pending_htlc.outgoing_accountable,
867+
settled,
868+
);
869+
// Note that the decaying averages for reputation and revenue clamp `resolved_at` to
870+
// `last_updated_unix_secs` if it falls behind. This could happen because
871+
// `last_updated_unix_secs` is frequently mutated. We accept the clamping rather
872+
// than erroring, as rejecting out-of-order timestamps would leave resources stuck.
873+
outgoing_channel.outgoing_reputation.add_value(effective_fee, resolved_at);
861874
}
862-
let resolution_time = Duration::from_secs(resolved_at - pending_htlc.added_at_unix_seconds);
863-
let effective_fee = self.effective_fees(
864-
pending_htlc.fee,
865-
resolution_time,
866-
pending_htlc.outgoing_accountable,
867-
settled,
868-
);
869-
// Note that the decaying averages for reputation and revenue clamp `resolved_at` to
870-
// `last_updated_unix_secs` if it falls behind. This could happen because
871-
// `last_updated_unix_secs` is frequently mutated. We accept the clamping rather
872-
// than erroring, as rejecting out-of-order timestamps would leave resources stuck.
873-
outgoing_channel.outgoing_reputation.add_value(effective_fee, resolved_at);
874875

875876
let incoming_channel = channels_lock.get_mut(&incoming_channel_id).ok_or(())?;
877+
let incoming_closed = incoming_channel.closed && incoming_channel.pending_htlcs.is_empty();
876878
match pending_htlc.bucket {
877879
BucketAssigned::General => incoming_channel
878880
.general_bucket
879881
.remove_htlc(outgoing_channel_id, pending_htlc.incoming_amount_msat)?,
880882
BucketAssigned::Congestion => {
881883
// Mark that congestion bucket was misused if it took more than the valid
882-
// resolution period
884+
// resolution period. Here we are bit lenient if resolve_at < added_at.
885+
// This means there is a bug as resolution can't be before added time but
886+
// no reason to mark congestion as misused in this case.
887+
let resolution_time = Duration::from_secs(
888+
resolved_at.saturating_sub(pending_htlc.added_at_unix_seconds),
889+
);
883890
if resolution_time > self.config.resolution_period {
884891
incoming_channel.misused_congestion(outgoing_channel_id, resolved_at);
885892
}
@@ -891,11 +898,33 @@ impl DefaultResourceManager {
891898
},
892899
}
893900

894-
if settled {
901+
if settled && resolved_at >= pending_htlc.added_at_unix_seconds {
895902
let fee: i64 = i64::try_from(pending_htlc.fee).unwrap_or(i64::MAX);
896903
incoming_channel.incoming_revenue.add_value(fee, resolved_at);
897904
}
898905

906+
let mut remove_channel = |closed: bool, channel_id: u64| {
907+
// If the channel had been marked as closed and it does not have any other pending
908+
// HTLCs, it can be fully removed now.
909+
if closed {
910+
if !has_incoming_htlc_references(&channels_lock, channel_id) {
911+
channels_lock.remove(&channel_id);
912+
for (_, channel) in channels_lock.iter_mut() {
913+
channel.general_bucket.remove_channel_slots(channel_id);
914+
}
915+
}
916+
}
917+
};
918+
remove_channel(incoming_closed, incoming_channel_id);
919+
remove_channel(outgoing_closed, outgoing_channel_id);
920+
921+
// This is a bug as resolution time can't be before added time. This prevents us from
922+
// properly updating the reputation and revenue as those need accurate timestamps but
923+
// we error here rather than earlier to optimistically release bucket resources.
924+
if resolved_at < pending_htlc.added_at_unix_seconds {
925+
return Err(());
926+
}
927+
899928
Ok(())
900929
}
901930
}
@@ -2302,6 +2331,112 @@ mod tests {
23022331
assert!(get_htlc_bucket(&rm, INCOMING_SCID, htlc_id, OUTGOING_SCID_2).is_none());
23032332
}
23042333

2334+
#[test]
2335+
fn test_remove_channel_no_pending_htlcs() {
2336+
let rm = create_test_resource_manager_with_channels();
2337+
let channels = rm.channels.lock().unwrap();
2338+
assert!(channels.get(&OUTGOING_SCID).is_some());
2339+
drop(channels);
2340+
2341+
rm.remove_channel(OUTGOING_SCID).unwrap();
2342+
2343+
let channels = rm.channels.lock().unwrap();
2344+
assert!(channels.get(&OUTGOING_SCID).is_none());
2345+
}
2346+
2347+
#[test]
2348+
fn test_resolve_htlc_after_channel_close() {
2349+
let entropy_source = TestKeysInterface::new(&[0; 32], Network::Testnet);
2350+
let rm = create_test_resource_manager_with_channel_pairs(2);
2351+
2352+
let added_at = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
2353+
2354+
// HTLC 1: INCOMING_SCID (100) -> OUTGOING_SCID (200)
2355+
assert_eq!(
2356+
add_test_htlc(&rm, false, 1, Some(added_at), &entropy_source).unwrap(),
2357+
ForwardingOutcome::Forward(false),
2358+
);
2359+
2360+
// HTLC 2: INCOMING_SCID_2 (101) -> OUTGOING_SCID_2 (201)
2361+
assert_eq!(
2362+
rm.add_htlc(
2363+
INCOMING_SCID_2,
2364+
HTLC_AMOUNT + FEE_AMOUNT,
2365+
CLTV_EXPIRY,
2366+
OUTGOING_SCID_2,
2367+
HTLC_AMOUNT,
2368+
false,
2369+
1,
2370+
CURRENT_HEIGHT,
2371+
added_at,
2372+
&entropy_source,
2373+
)
2374+
.unwrap(),
2375+
ForwardingOutcome::Forward(false),
2376+
);
2377+
2378+
// Close the outgoing channel for HTLC 1. It has a pending HTLC so it is soft-deleted.
2379+
rm.remove_channel(OUTGOING_SCID).unwrap();
2380+
{
2381+
let channels = rm.channels.lock().unwrap();
2382+
let outgoing = channels.get(&OUTGOING_SCID).unwrap();
2383+
assert!(outgoing.closed);
2384+
assert_eq!(outgoing.pending_htlcs.len(), 1);
2385+
}
2386+
2387+
// Close the incoming channel for HTLC 2. It is referenced as incoming by an HTLC on
2388+
// OUTGOING_SCID_2, so it is soft-deleted.
2389+
rm.remove_channel(INCOMING_SCID_2).unwrap();
2390+
{
2391+
let channels = rm.channels.lock().unwrap();
2392+
let incoming = channels.get(&INCOMING_SCID_2).unwrap();
2393+
assert!(incoming.closed);
2394+
}
2395+
2396+
// New HTLCs should be rejected on closed channels.
2397+
assert!(add_test_htlc(&rm, false, 2, None, &entropy_source).is_err());
2398+
assert!(rm
2399+
.add_htlc(
2400+
INCOMING_SCID_2,
2401+
HTLC_AMOUNT + FEE_AMOUNT,
2402+
CLTV_EXPIRY,
2403+
OUTGOING_SCID_2,
2404+
HTLC_AMOUNT,
2405+
false,
2406+
2,
2407+
CURRENT_HEIGHT,
2408+
added_at,
2409+
&entropy_source,
2410+
)
2411+
.is_err());
2412+
2413+
let resolved_at = added_at + 10;
2414+
2415+
// Resolve HTLC 1. Outgoing channel (200) is closed and now has no pending HTLCs,
2416+
// so it is fully removed. Incoming channel (100) gets revenue updated.
2417+
rm.resolve_htlc(INCOMING_SCID, 1, OUTGOING_SCID, true, resolved_at).unwrap();
2418+
{
2419+
let channels = rm.channels.lock().unwrap();
2420+
assert!(channels.get(&OUTGOING_SCID).is_none());
2421+
let incoming = channels.get(&INCOMING_SCID).unwrap();
2422+
assert_eq!(
2423+
incoming.incoming_revenue.aggregated_decaying_average.value,
2424+
FEE_AMOUNT as i64,
2425+
);
2426+
}
2427+
2428+
// Resolve HTLC 2. Incoming channel (101) is closed and has no pending HTLCs or
2429+
// incoming references remaining, so it is fully removed. Outgoing channel (201)
2430+
// gets reputation updated.
2431+
rm.resolve_htlc(INCOMING_SCID_2, 1, OUTGOING_SCID_2, true, resolved_at).unwrap();
2432+
{
2433+
let channels = rm.channels.lock().unwrap();
2434+
assert!(channels.get(&INCOMING_SCID_2).is_none());
2435+
let outgoing = channels.get(&OUTGOING_SCID_2).unwrap();
2436+
assert_eq!(outgoing.outgoing_reputation.value, FEE_AMOUNT as i64);
2437+
}
2438+
}
2439+
23052440
#[test]
23062441
fn test_decaying_average_bounds() {
23072442
for (start, bound) in [(1000, i64::MAX), (-1000, i64::MIN)] {

0 commit comments

Comments
 (0)