Skip to content

Commit 0e18e31

Browse files
committed
lsps2: Add prune_channels API to remove completed JIT channel state
Add `LSPS2ServiceHandler::prune_channels` that lets the LSP operator remove all channels in the `PaymentForwarded` terminal state whose `created_at` timestamp is at least `max_age` old. Passing `Duration::ZERO` prunes all terminal channels regardless of age. All associated state is cleaned up atomically: - per-peer `intercept_scid_by_channel_id` and `intercept_scid_by_user_channel_id` - handler-level `peer_by_intercept_scid` and `peer_by_channel_id` A new `PeerState::prune_terminal_channels` helper handles the intra-peer map cleanup and returns the removed `(scid, channel_id)` pairs for the handler to clean up the outer maps. Integration tests cover: non-terminal channels not pruned, unknown counterparty errors, age filtering, and successful bulk prune.
1 parent 057fdc3 commit 0e18e31

2 files changed

Lines changed: 563 additions & 29 deletions

File tree

lightning-liquidity/src/lsps2/service.rs

Lines changed: 262 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,10 @@ struct OutboundJITChannel {
500500
payment_size_msat: Option<u64>,
501501
trust_model: TrustModel,
502502
/// The time at which the JIT channel was created (i.e., the buy request was accepted).
503-
created_at: LSPSDateTime,
503+
///
504+
/// `None` for channels deserialized from data written before this field was introduced;
505+
/// those channels are treated as having an unknown age by [`PeerState::prune_terminal_channels`].
506+
created_at: Option<LSPSDateTime>,
504507
}
505508

506509
impl_writeable_tlv_based!(OutboundJITChannel, {
@@ -509,13 +512,13 @@ impl_writeable_tlv_based!(OutboundJITChannel, {
509512
(4, opening_fee_params, required),
510513
(6, payment_size_msat, option),
511514
(8, trust_model, required),
512-
(10, created_at, (default_value, LSPSDateTime::new_from_duration_since_epoch(Duration::ZERO))),
515+
(10, created_at, option),
513516
});
514517

515518
impl OutboundJITChannel {
516519
fn new(
517520
payment_size_msat: Option<u64>, opening_fee_params: LSPS2OpeningFeeParams,
518-
user_channel_id: u128, client_trusts_lsp: bool, created_at: LSPSDateTime,
521+
user_channel_id: u128, client_trusts_lsp: bool, created_at: Option<LSPSDateTime>,
519522
) -> Self {
520523
Self {
521524
user_channel_id,
@@ -664,6 +667,48 @@ impl PeerState {
664667
// Return whether the entire state is empty.
665668
self.pending_requests.is_empty() && self.outbound_channels_by_intercept_scid.is_empty()
666669
}
670+
671+
/// Removes all channels in the [`PaymentForwarded`] terminal state whose `created_at`
672+
/// timestamp is at least `max_age` old. Passing [`Duration::ZERO`] removes all terminal
673+
/// channels regardless of age.
674+
///
675+
/// Channels with no `created_at` (deserialized from data written before the field was
676+
/// introduced) have an unknown age and are only removed when `max_age` is
677+
/// [`Duration::ZERO`]; they are skipped by any age-based filter.
678+
///
679+
/// Cleans up the intra-peer auxiliary maps for each removed channel and returns the
680+
/// `(intercept_scid, channel_id)` pairs so the caller can remove them from the
681+
/// handler-level peer lookup maps.
682+
///
683+
/// [`PaymentForwarded`]: OutboundJITChannelState::PaymentForwarded
684+
/// [`Duration::ZERO`]: core::time::Duration::ZERO
685+
fn prune_terminal_channels(
686+
&mut self, now: &LSPSDateTime, max_age: Duration,
687+
) -> Vec<(u64, ChannelId)> {
688+
let mut removed = Vec::new();
689+
self.outbound_channels_by_intercept_scid.retain(|scid, channel| {
690+
if let OutboundJITChannelState::PaymentForwarded { channel_id } = &channel.state {
691+
let should_prune = if max_age == Duration::ZERO {
692+
true
693+
} else {
694+
// Channels without a created_at (pre-upgrade legacy data) have an unknown
695+
// age and are excluded from age-based filtering.
696+
channel.created_at.as_ref()
697+
.map(|ts| now.duration_since(ts) >= max_age)
698+
.unwrap_or(false)
699+
};
700+
if should_prune {
701+
removed.push((*scid, *channel_id));
702+
self.intercept_scid_by_channel_id.retain(|_, iscid| iscid != scid);
703+
self.intercept_scid_by_user_channel_id.retain(|_, iscid| iscid != scid);
704+
self.needs_persist = true;
705+
return false;
706+
}
707+
}
708+
true
709+
});
710+
removed
711+
}
667712
}
668713

669714
impl_writeable_tlv_based!(PeerState, {
@@ -932,9 +977,9 @@ where
932977
peer_by_intercept_scid.insert(intercept_scid, *counterparty_node_id);
933978
}
934979

935-
let created_at = LSPSDateTime::new_from_duration_since_epoch(
980+
let created_at = Some(LSPSDateTime::new_from_duration_since_epoch(
936981
self.time_provider.duration_since_epoch(),
937-
);
982+
));
938983
let outbound_jit_channel = OutboundJITChannel::new(
939984
buy_request.payment_size_msat,
940985
buy_request.opening_fee_params,
@@ -1267,6 +1312,70 @@ where
12671312
Ok(())
12681313
}
12691314

1315+
/// Prunes completed JIT channels from state for a given peer, freeing memory.
1316+
///
1317+
/// Removes all channels in the [`OutboundJITChannelState::PaymentForwarded`] terminal state
1318+
/// whose `created_at` timestamp is at least `max_age` old. Pass [`Duration::ZERO`] to prune
1319+
/// all terminal channels regardless of age.
1320+
///
1321+
/// Channels loaded from persisted data written before the `created_at` field was introduced
1322+
/// have an unknown creation time. They are only pruned when `max_age` is [`Duration::ZERO`];
1323+
/// any positive `max_age` leaves them untouched.
1324+
///
1325+
/// All associated state is cleaned up for each removed channel, including the per-peer
1326+
/// `intercept_scid_by_channel_id` and `intercept_scid_by_user_channel_id` maps as well as
1327+
/// the handler-level `peer_by_intercept_scid` and `peer_by_channel_id` lookups.
1328+
///
1329+
/// Returns the number of channels pruned, or an [`APIError::APIMisuseError`] if the
1330+
/// counterparty has no state.
1331+
///
1332+
/// [`Duration::ZERO`]: core::time::Duration::ZERO
1333+
pub async fn prune_channels(
1334+
&self, counterparty_node_id: PublicKey, max_age: Duration,
1335+
) -> Result<usize, APIError> {
1336+
let now = LSPSDateTime::new_from_duration_since_epoch(
1337+
self.time_provider.duration_since_epoch(),
1338+
);
1339+
1340+
let removed = {
1341+
let outer_state_lock = self.per_peer_state.read().unwrap();
1342+
let inner_state_lock =
1343+
outer_state_lock.get(&counterparty_node_id).ok_or_else(|| {
1344+
APIError::APIMisuseError {
1345+
err: format!(
1346+
"No existing state with counterparty {}",
1347+
counterparty_node_id
1348+
),
1349+
}
1350+
})?;
1351+
let mut peer_state = inner_state_lock.lock().unwrap();
1352+
peer_state.prune_terminal_channels(&now, max_age)
1353+
};
1354+
1355+
let pruned = removed.len();
1356+
if pruned > 0 {
1357+
let mut peer_by_intercept_scid = self.peer_by_intercept_scid.write().unwrap();
1358+
let mut peer_by_channel_id = self.peer_by_channel_id.write().unwrap();
1359+
for (scid, channel_id) in &removed {
1360+
peer_by_intercept_scid.remove(scid);
1361+
peer_by_channel_id.remove(channel_id);
1362+
}
1363+
drop(peer_by_intercept_scid);
1364+
drop(peer_by_channel_id);
1365+
1366+
self.persist_peer_state(counterparty_node_id).await.map_err(|e| {
1367+
APIError::APIMisuseError {
1368+
err: format!(
1369+
"Failed to persist peer state for {}: {}",
1370+
counterparty_node_id, e
1371+
),
1372+
}
1373+
})?;
1374+
}
1375+
1376+
Ok(pruned)
1377+
}
1378+
12701379
/// Abandons a pending JIT‐open flow for `user_channel_id`, removing all local state.
12711380
///
12721381
/// This removes the intercept SCID, any outbound channel state, and associated
@@ -2349,6 +2458,24 @@ where
23492458
}
23502459
}
23512460

2461+
/// Prunes completed JIT channels from state for a given peer, freeing memory.
2462+
///
2463+
/// Wraps [`LSPS2ServiceHandler::prune_channels`].
2464+
pub fn prune_channels(
2465+
&self, counterparty_node_id: PublicKey, max_age: Duration,
2466+
) -> Result<usize, APIError> {
2467+
let mut fut = pin!(self.inner.prune_channels(counterparty_node_id, max_age));
2468+
2469+
let mut waker = dummy_waker();
2470+
let mut ctx = task::Context::from_waker(&mut waker);
2471+
match fut.as_mut().poll(&mut ctx) {
2472+
task::Poll::Ready(result) => result,
2473+
task::Poll::Pending => {
2474+
unreachable!("Should not be pending in a sync context");
2475+
},
2476+
}
2477+
}
2478+
23522479
/// Forward [`Event::ChannelReady`] event parameters into this function.
23532480
///
23542481
/// Wraps [`LSPS2ServiceHandler::channel_ready`].
@@ -2804,7 +2931,7 @@ mod tests {
28042931
opening_fee_params.clone(),
28052932
user_channel_id,
28062933
true,
2807-
LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap(),
2934+
Some(LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap()),
28082935
);
28092936

28102937
let opening_payment_hash = PaymentHash([42; 32]);
@@ -2885,46 +3012,152 @@ mod tests {
28853012
);
28863013
}
28873014

2888-
#[test]
2889-
fn test_outbound_jit_channel_created_at_stored() {
2890-
let opening_fee_params = LSPS2OpeningFeeParams {
2891-
min_fee_msat: 1_000,
2892-
proportional: 0,
2893-
valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(),
3015+
fn make_test_opening_fee_params() -> LSPS2OpeningFeeParams {
3016+
LSPS2OpeningFeeParams {
3017+
min_fee_msat: 1000,
3018+
proportional: 100,
3019+
valid_until: LSPSDateTime::from_str("2035-01-01T00:00:00Z").unwrap(),
28943020
min_lifetime: 144,
28953021
max_client_to_self_delay: 128,
28963022
min_payment_size_msat: 1,
28973023
max_payment_size_msat: 10_000_000_000,
28983024
promise: "ignore".to_string(),
2899-
};
3025+
}
3026+
}
3027+
3028+
#[test]
3029+
fn test_outbound_jit_channel_created_at_stored() {
29003030
let created_at = LSPSDateTime::from_str("2024-06-15T12:00:00Z").unwrap();
2901-
let channel =
2902-
OutboundJITChannel::new(Some(1_000_000), opening_fee_params, 1u128, true, created_at);
2903-
assert_eq!(channel.created_at, created_at);
3031+
let channel = OutboundJITChannel::new(
3032+
Some(1_000_000),
3033+
make_test_opening_fee_params(),
3034+
1u128,
3035+
true,
3036+
Some(created_at),
3037+
);
3038+
assert_eq!(channel.created_at, Some(created_at));
29043039
}
29053040

29063041
#[test]
29073042
fn test_outbound_jit_channel_created_at_round_trips() {
29083043
use lightning::util::ser::{Readable, Writeable};
29093044

2910-
let opening_fee_params = LSPS2OpeningFeeParams {
2911-
min_fee_msat: 1_000,
2912-
proportional: 0,
2913-
valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(),
2914-
min_lifetime: 144,
2915-
max_client_to_self_delay: 128,
2916-
min_payment_size_msat: 1,
2917-
max_payment_size_msat: 10_000_000_000,
2918-
promise: "ignore".to_string(),
2919-
};
29203045
let created_at = LSPSDateTime::from_str("2024-06-15T12:00:00Z").unwrap();
2921-
let channel =
2922-
OutboundJITChannel::new(Some(1_000_000), opening_fee_params, 1u128, true, created_at);
3046+
let channel = OutboundJITChannel::new(
3047+
Some(1_000_000),
3048+
make_test_opening_fee_params(),
3049+
1u128,
3050+
true,
3051+
Some(created_at),
3052+
);
3053+
3054+
let mut buf = Vec::new();
3055+
channel.write(&mut buf).unwrap();
3056+
3057+
let decoded = <OutboundJITChannel as Readable>::read(&mut &buf[..]).unwrap();
3058+
assert_eq!(decoded.created_at, Some(created_at));
3059+
}
3060+
3061+
// Verify that data written before the `created_at` field existed deserializes with
3062+
// `created_at = None`, keeping backwards compatibility.
3063+
#[test]
3064+
fn test_outbound_jit_channel_created_at_defaults_for_old_data() {
3065+
use lightning::util::ser::{Readable, Writeable};
3066+
3067+
// Serialize a channel that has no created_at (simulated by None).
3068+
let channel = OutboundJITChannel::new(
3069+
Some(1_000_000),
3070+
make_test_opening_fee_params(),
3071+
1u128,
3072+
false,
3073+
None,
3074+
);
29233075

29243076
let mut buf = Vec::new();
29253077
channel.write(&mut buf).unwrap();
29263078

3079+
// A channel serialized with created_at = None must not write TLV 10,
3080+
// and must round-trip as None.
29273081
let decoded = <OutboundJITChannel as Readable>::read(&mut &buf[..]).unwrap();
2928-
assert_eq!(decoded.created_at, created_at);
3082+
assert_eq!(decoded.created_at, None);
3083+
}
3084+
3085+
// Verify that a PeerState entry in PaymentForwarded state is correctly removed along with
3086+
// all auxiliary lookup maps when prune_channel logic is exercised manually.
3087+
#[test]
3088+
fn test_peer_state_prune_payment_forwarded_channel() {
3089+
let intercept_scid = 42u64;
3090+
let user_channel_id = 1u128;
3091+
let channel_id = ChannelId([1; 32]);
3092+
let created_at = Some(LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap());
3093+
3094+
let mut jit_channel = OutboundJITChannel::new(
3095+
Some(1_000_000),
3096+
make_test_opening_fee_params(),
3097+
user_channel_id,
3098+
false,
3099+
created_at,
3100+
);
3101+
3102+
// Drive the channel through to PaymentForwarded state.
3103+
let htlc = InterceptedHTLC {
3104+
intercept_id: InterceptId([0; 32]),
3105+
expected_outbound_amount_msat: 1_000_000,
3106+
payment_hash: PaymentHash([1; 32]),
3107+
};
3108+
let action = jit_channel.htlc_intercepted(htlc).unwrap();
3109+
assert!(matches!(action, Some(HTLCInterceptedAction::OpenChannel(_))));
3110+
jit_channel.channel_ready(channel_id).unwrap();
3111+
// Provide enough fee to transition to PaymentForwarded.
3112+
jit_channel.payment_forwarded(1_000).unwrap();
3113+
3114+
// Build a minimal PeerState with the channel and auxiliary maps.
3115+
let mut peer_state = PeerState::new();
3116+
peer_state.outbound_channels_by_intercept_scid.insert(intercept_scid, jit_channel);
3117+
peer_state.intercept_scid_by_user_channel_id.insert(user_channel_id, intercept_scid);
3118+
peer_state.intercept_scid_by_channel_id.insert(channel_id, intercept_scid);
3119+
3120+
// Confirm the channel is in PaymentForwarded state.
3121+
assert!(matches!(
3122+
peer_state.outbound_channels_by_intercept_scid.get(&intercept_scid).unwrap().state,
3123+
OutboundJITChannelState::PaymentForwarded { .. }
3124+
));
3125+
3126+
// Simulate what prune_channel does internally.
3127+
peer_state.outbound_channels_by_intercept_scid.remove(&intercept_scid);
3128+
peer_state.intercept_scid_by_channel_id.retain(|_, iscid| *iscid != intercept_scid);
3129+
peer_state.intercept_scid_by_user_channel_id.retain(|_, iscid| *iscid != intercept_scid);
3130+
peer_state.needs_persist = true;
3131+
3132+
// All maps must be empty after pruning.
3133+
assert!(peer_state.outbound_channels_by_intercept_scid.is_empty());
3134+
assert!(peer_state.intercept_scid_by_channel_id.is_empty());
3135+
assert!(peer_state.intercept_scid_by_user_channel_id.is_empty());
3136+
assert!(peer_state.needs_persist);
3137+
}
3138+
3139+
// Verify that prune_channel rejects non-terminal states.
3140+
#[test]
3141+
fn test_peer_state_prune_channel_non_terminal_rejected() {
3142+
let intercept_scid = 99u64;
3143+
let user_channel_id = 2u128;
3144+
let created_at = Some(LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap());
3145+
let jit_channel = OutboundJITChannel::new(
3146+
Some(500_000),
3147+
make_test_opening_fee_params(),
3148+
user_channel_id,
3149+
false,
3150+
created_at,
3151+
);
3152+
3153+
// Channel is in PendingInitialPayment — a non-terminal state.
3154+
assert!(matches!(jit_channel.state, OutboundJITChannelState::PendingInitialPayment { .. }));
3155+
3156+
// Verify the guard logic that prune_channel uses.
3157+
let is_prunable =
3158+
matches!(jit_channel.state, OutboundJITChannelState::PaymentForwarded { .. });
3159+
assert!(!is_prunable, "PendingInitialPayment must not be considered prunable");
3160+
3161+
let _ = intercept_scid; // silence unused warning
29293162
}
29303163
}

0 commit comments

Comments
 (0)