Skip to content

Commit 37edfdb

Browse files
committed
fixup! Add persistent closed channel history and list_closed_channels()
Replace the in-memory set + `insert_or_update` fallback approach with a fully durable solution for tracking `is_outbound`/`is_announced` flags. The previous approach stored these flags in ephemeral `HashSet`s that were seeded from `list_channels()` at startup and consumed on `ChannelClosed`. A fallback to any existing `ClosedChannelDetails` record was added to handle `ReplayEvent`, but a gap remained: if `insert_or_update` failed (returning `ReplayEvent`) and the node restarted before the retry, the in-memory sets would be empty (closed channels don't appear in `list_channels()`) and no persisted record would exist yet, causing both flags to silently default to `falsee`. Fix this by persisting a `PendingChannelInfo` record (containing `is_outbound` and `is_announced`) to the KV store at `ChannelPending` time under a new `pending_channels/` namespace. The `ChannelClosed` handler now resolves the flags with the following priority: 1. `pending_channel_store` — durable, survives restarts and replays 2. In-memory sets — covers channels opened before this version 3. Existing `ClosedChannelDetails` record — idempotency guard The `PendingChannelInfo` record is deleted after `event_queue.add_event` succeeds. It is intentionally kept alive until that point so that any replay of `ChannelClosed` (e.g. due to a failed `insert_or_update` or `add_event`) still finds the correct flags in the store.
1 parent 4f91d27 commit 37edfdb

6 files changed

Lines changed: 176 additions & 47 deletions

File tree

src/builder.rs

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ use crate::io::{
6767
self, CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
6868
CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
6969
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
70+
PENDING_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
71+
PENDING_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
7072
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
7173
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
7274
};
@@ -81,7 +83,7 @@ use crate::tx_broadcaster::TransactionBroadcaster;
8183
use crate::types::{
8284
AsyncPersister, ChainMonitor, ChannelManager, ClosedChannelStore, DynStore, DynStoreRef,
8385
DynStoreWrapper, GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, OnionMessenger,
84-
PaymentStore, PeerManager, PendingPaymentStore,
86+
PaymentStore, PeerManager, PendingChannelStore, PendingPaymentStore,
8587
};
8688
use crate::wallet::persist::KVStoreWalletPersister;
8789
use crate::wallet::Wallet;
@@ -1381,30 +1383,41 @@ fn build_with_store_internal(
13811383

13821384
let kv_store_ref = Arc::clone(&kv_store);
13831385
let logger_ref = Arc::clone(&logger);
1384-
let (payment_store_res, node_metris_res, pending_payment_store_res, closed_channel_store_res) =
1385-
runtime.block_on(async move {
1386-
tokio::join!(
1387-
read_all_objects(
1388-
&*kv_store_ref,
1389-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
1390-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
1391-
Arc::clone(&logger_ref),
1392-
),
1393-
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
1394-
read_all_objects(
1395-
&*kv_store_ref,
1396-
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
1397-
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
1398-
Arc::clone(&logger_ref),
1399-
),
1400-
read_all_objects(
1401-
&*kv_store_ref,
1402-
CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
1403-
CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
1404-
Arc::clone(&logger_ref),
1405-
)
1406-
)
1407-
});
1386+
let (
1387+
payment_store_res,
1388+
node_metris_res,
1389+
pending_payment_store_res,
1390+
closed_channel_store_res,
1391+
pending_channel_store_res,
1392+
) = runtime.block_on(async move {
1393+
tokio::join!(
1394+
read_all_objects(
1395+
&*kv_store_ref,
1396+
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
1397+
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
1398+
Arc::clone(&logger_ref),
1399+
),
1400+
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
1401+
read_all_objects(
1402+
&*kv_store_ref,
1403+
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
1404+
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
1405+
Arc::clone(&logger_ref),
1406+
),
1407+
read_all_objects(
1408+
&*kv_store_ref,
1409+
CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
1410+
CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
1411+
Arc::clone(&logger_ref),
1412+
),
1413+
read_all_objects(
1414+
&*kv_store_ref,
1415+
PENDING_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
1416+
PENDING_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
1417+
Arc::clone(&logger_ref),
1418+
),
1419+
)
1420+
});
14081421

14091422
// Initialize the status fields.
14101423
let node_metrics = match node_metris_res {
@@ -1627,6 +1640,20 @@ fn build_with_store_internal(
16271640
},
16281641
};
16291642

1643+
let pending_channel_store = match pending_channel_store_res {
1644+
Ok(pending_channels) => Arc::new(PendingChannelStore::new(
1645+
pending_channels,
1646+
PENDING_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1647+
PENDING_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1648+
Arc::clone(&kv_store),
1649+
Arc::clone(&logger),
1650+
)),
1651+
Err(e) => {
1652+
log_error!(logger, "Failed to read pending channel data from store: {}", e);
1653+
return Err(BuildError::ReadFailed);
1654+
},
1655+
};
1656+
16301657
let wallet = Arc::new(Wallet::new(
16311658
bdk_wallet,
16321659
wallet_persister,
@@ -2172,6 +2199,7 @@ fn build_with_store_internal(
21722199
peer_store,
21732200
payment_store,
21742201
closed_channel_store,
2202+
pending_channel_store,
21752203
lnurl_auth,
21762204
is_running,
21772205
node_metrics,

src/closed_channel.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,48 @@ impl_writeable_tlv_based!(ClosedChannelDetails, {
7777
(18, is_announced, required),
7878
});
7979

80+
/// Channel flags persisted at channel-pending time so they remain accessible when the channel
81+
/// closes, even after a restart or when `handle_event` returns [`ReplayEvent`].
82+
///
83+
/// [`ReplayEvent`]: lightning::events::ReplayEvent
84+
#[derive(Clone, Debug)]
85+
pub(crate) struct PendingChannelInfo {
86+
pub user_channel_id: UserChannelId,
87+
pub is_outbound: bool,
88+
pub is_announced: bool,
89+
}
90+
91+
impl_writeable_tlv_based!(PendingChannelInfo, {
92+
(0, user_channel_id, required),
93+
(2, is_outbound, required),
94+
(4, is_announced, required),
95+
});
96+
97+
pub(crate) struct PendingChannelInfoUpdate(pub UserChannelId);
98+
99+
impl StorableObjectUpdate<PendingChannelInfo> for PendingChannelInfoUpdate {
100+
fn id(&self) -> UserChannelId {
101+
self.0
102+
}
103+
}
104+
105+
impl StorableObject for PendingChannelInfo {
106+
type Id = UserChannelId;
107+
type Update = PendingChannelInfoUpdate;
108+
109+
fn id(&self) -> UserChannelId {
110+
self.user_channel_id
111+
}
112+
113+
fn update(&mut self, _update: Self::Update) -> bool {
114+
false
115+
}
116+
117+
fn to_update(&self) -> Self::Update {
118+
PendingChannelInfoUpdate(self.user_channel_id)
119+
}
120+
}
121+
80122
pub(crate) struct ClosedChannelDetailsUpdate(pub UserChannelId);
81123

82124
impl StorableObjectUpdate<ClosedChannelDetails> for ClosedChannelDetailsUpdate {

src/event.rs

Lines changed: 70 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum};
3434
use lightning_liquidity::lsps2::utils::compute_opening_fee;
3535
use lightning_types::payment::{PaymentHash, PaymentPreimage};
3636

37-
use crate::closed_channel::ClosedChannelDetails;
37+
use crate::closed_channel::{ClosedChannelDetails, PendingChannelInfo};
3838
use crate::config::{may_announce_channel, Config};
3939
use crate::connection::ConnectionManager;
4040
use crate::data_store::DataStoreUpdateResult;
@@ -56,7 +56,7 @@ use crate::payment::PaymentMetadata;
5656
use crate::runtime::Runtime;
5757
use crate::types::{
5858
ClosedChannelStore, CustomTlvRecord, DynStore, KeysManager, OnionMessenger, PaymentStore,
59-
Sweeper, Wallet,
59+
PendingChannelStore, Sweeper, Wallet,
6060
};
6161
use crate::{
6262
hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore,
@@ -551,6 +551,7 @@ where
551551
payment_store: Arc<PaymentStore>,
552552
peer_store: Arc<PeerStore<L>>,
553553
closed_channel_store: Arc<ClosedChannelStore>,
554+
pending_channel_store: Arc<PendingChannelStore>,
554555
// Tracks which user_channel_ids correspond to outbound channels. Populated at startup from
555556
// list_channels() and updated on ChannelPending events. Consumed on ChannelClosed events.
556557
outbound_channel_ids: Mutex<HashSet<UserChannelId>>,
@@ -577,9 +578,10 @@ where
577578
output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
578579
liquidity_source: Arc<LiquiditySource<Arc<Logger>>>, payment_store: Arc<PaymentStore>,
579580
peer_store: Arc<PeerStore<L>>, closed_channel_store: Arc<ClosedChannelStore>,
580-
keys_manager: Arc<KeysManager>, static_invoice_store: Option<StaticInvoiceStore>,
581-
onion_messenger: Arc<OnionMessenger>, om_mailbox: Option<Arc<OnionMessageMailbox>>,
582-
runtime: Arc<Runtime>, logger: L, config: Arc<Config>,
581+
pending_channel_store: Arc<PendingChannelStore>, keys_manager: Arc<KeysManager>,
582+
static_invoice_store: Option<StaticInvoiceStore>, onion_messenger: Arc<OnionMessenger>,
583+
om_mailbox: Option<Arc<OnionMessageMailbox>>, runtime: Arc<Runtime>, logger: L,
584+
config: Arc<Config>,
583585
) -> Self {
584586
// Seed outbound_channel_ids and announced_channel_ids from currently open channels so we
585587
// correctly classify channels that were already open when this node started.
@@ -609,6 +611,7 @@ where
609611
payment_store,
610612
peer_store,
611613
closed_channel_store,
614+
pending_channel_store,
612615
outbound_channel_ids,
613616
announced_channel_ids,
614617
keys_manager,
@@ -1386,12 +1389,23 @@ where
13861389
100
13871390
);
13881391
}
1392+
// For LSPS2 JIT channels (channel_override_config is Some iff the counterparty
1393+
// is our configured LSP), accept with ZeroConfZeroReserve so the LSP is not
1394+
// forced to keep 1000 sats locked as reserve. Without this, the hard
1395+
// MIN_THEIR_CHAN_RESERVE_SATOSHIS = 1000 floor in LDK reduces the usable
1396+
// outbound capacity enough that the initial HTLC forward fails on small channels.
1397+
let is_lsps2_channel = channel_override_config.is_some();
13891398
let res = if allow_0conf {
1399+
let trusted_features = if is_lsps2_channel {
1400+
TrustedChannelFeatures::ZeroConfZeroReserve
1401+
} else {
1402+
TrustedChannelFeatures::ZeroConf
1403+
};
13901404
self.channel_manager.accept_inbound_channel_from_trusted_peer(
13911405
&temporary_channel_id,
13921406
&counterparty_node_id,
13931407
user_channel_id,
1394-
TrustedChannelFeatures::ZeroConf,
1408+
trusted_features,
13951409
channel_override_config,
13961410
)
13971411
} else {
@@ -1567,13 +1581,13 @@ where
15671581
},
15681582
};
15691583

1570-
let peer_to_store = {
1584+
let (pending_info_opt, peer_to_store) = {
15711585
let network_graph = self.network_graph.read_only();
15721586
let channels =
15731587
self.channel_manager.list_channels_with_counterparty(&counterparty_node_id);
15741588
let pending_channel = channels.into_iter().find(|c| c.channel_id == channel_id);
15751589

1576-
if let Some(ref ch) = pending_channel {
1590+
let pending_info_opt = if let Some(ref ch) = pending_channel {
15771591
if ch.is_outbound {
15781592
self.outbound_channel_ids
15791593
.lock()
@@ -1586,9 +1600,16 @@ where
15861600
.expect("Lock poisoned")
15871601
.insert(UserChannelId(user_channel_id));
15881602
}
1589-
}
1603+
Some(PendingChannelInfo {
1604+
user_channel_id: UserChannelId(user_channel_id),
1605+
is_outbound: ch.is_outbound,
1606+
is_announced: ch.is_announced,
1607+
})
1608+
} else {
1609+
None
1610+
};
15901611

1591-
pending_channel
1612+
let peer_to_store = pending_channel
15921613
.filter(|ch| {
15931614
!ch.is_outbound
15941615
&& self.peer_store.get_peer(&counterparty_node_id).is_none()
@@ -1603,8 +1624,23 @@ where
16031624
node_id: counterparty_node_id,
16041625
address: address.clone(),
16051626
})
1606-
})
1607-
};
1627+
});
1628+
1629+
(pending_info_opt, peer_to_store)
1630+
}; // network_graph is dropped here, before any await
1631+
1632+
if let Some(pending_info) = pending_info_opt {
1633+
if let Err(e) = self.pending_channel_store.insert_or_update(pending_info).await
1634+
{
1635+
log_error!(
1636+
self.logger,
1637+
"Failed to persist pending channel info {}: {}",
1638+
channel_id,
1639+
e
1640+
);
1641+
return Err(ReplayEvent());
1642+
}
1643+
}
16081644
if let Some(peer) = peer_to_store {
16091645
self.peer_store.add_peer(peer).await.unwrap_or_else(|e| {
16101646
log_error!(
@@ -1682,14 +1718,20 @@ where
16821718
.expect("Lock poisoned")
16831719
.remove(&user_channel_id);
16841720

1685-
// On replay (after a restart or after handle_event returns ReplayEvent),
1686-
// the channel is no longer in list_channels() and the in-memory sets are
1687-
// not repopulated for it, so .remove() returns false. Fall back to any
1688-
// already-persisted record so we don't overwrite correct values with false.
1721+
// Primary: use the durably-persisted PendingChannelInfo written at
1722+
// ChannelPending time. Falls back to in-memory sets (populated at startup
1723+
// or on ChannelPending), then to any already-persisted ClosedChannelDetails
1724+
// record (for the replay case where insert_or_update already succeeded but
1725+
// add_event failed and PendingChannelInfo was already cleaned up).
16891726
let (is_outbound, is_announced) = self
1690-
.closed_channel_store
1727+
.pending_channel_store
16911728
.get(&user_channel_id)
1692-
.map(|existing| (existing.is_outbound, existing.is_announced))
1729+
.map(|info| (info.is_outbound, info.is_announced))
1730+
.or_else(|| {
1731+
self.closed_channel_store
1732+
.get(&user_channel_id)
1733+
.map(|existing| (existing.is_outbound, existing.is_announced))
1734+
})
16931735
.unwrap_or((is_outbound_from_set, is_announced_from_set));
16941736

16951737
let closed_at = SystemTime::now()
@@ -1737,7 +1779,16 @@ where
17371779
};
17381780

17391781
match self.event_queue.add_event(event).await {
1740-
Ok(_) => {},
1782+
Ok(_) => {
1783+
if let Err(e) = self.pending_channel_store.remove(&user_channel_id).await {
1784+
log_error!(
1785+
self.logger,
1786+
"Failed to remove pending channel info for {}: {}",
1787+
channel_id,
1788+
e
1789+
);
1790+
}
1791+
},
17411792
Err(e) => {
17421793
log_error!(self.logger, "Failed to push to event queue: {}", e);
17431794
return Err(ReplayEvent());

src/io/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
3333
pub(crate) const CLOSED_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "closed_channels";
3434
pub(crate) const CLOSED_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
3535

36+
/// The pending channel information will be persisted under this prefix.
37+
pub(crate) const PENDING_CHANNEL_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "pending_channels";
38+
pub(crate) const PENDING_CHANNEL_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
39+
3640
/// The node metrics will be persisted under this key.
3741
pub(crate) const NODE_METRICS_PRIMARY_NAMESPACE: &str = "";
3842
pub(crate) const NODE_METRICS_SECONDARY_NAMESPACE: &str = "";

src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,8 @@ use runtime::Runtime;
178178
pub use tokio;
179179
use types::{
180180
Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, ClosedChannelStore,
181-
DynStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router,
182-
Scorer, Sweeper, Wallet,
181+
DynStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager,
182+
PendingChannelStore, Router, Scorer, Sweeper, Wallet,
183183
};
184184
pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, UserChannelId};
185185
pub use vss_client;
@@ -245,6 +245,7 @@ pub struct Node {
245245
peer_store: Arc<PeerStore<Arc<Logger>>>,
246246
payment_store: Arc<PaymentStore>,
247247
closed_channel_store: Arc<ClosedChannelStore>,
248+
pending_channel_store: Arc<PendingChannelStore>,
248249
lnurl_auth: Arc<LnurlAuth>,
249250
is_running: Arc<RwLock<bool>>,
250251
node_metrics: Arc<PersistedNodeMetrics>,
@@ -608,6 +609,7 @@ impl Node {
608609
Arc::clone(&self.payment_store),
609610
Arc::clone(&self.peer_store),
610611
Arc::clone(&self.closed_channel_store),
612+
Arc::clone(&self.pending_channel_store),
611613
Arc::clone(&self.keys_manager),
612614
static_invoice_store,
613615
Arc::clone(&self.onion_messenger),

src/types.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use lightning_net_tokio::SocketDescriptor;
3838

3939
use crate::chain::bitcoind::UtxoSourceClient;
4040
use crate::chain::ChainSource;
41-
use crate::closed_channel::ClosedChannelDetails;
41+
use crate::closed_channel::{ClosedChannelDetails, PendingChannelInfo};
4242
use crate::config::ChannelConfig;
4343
use crate::data_store::DataStore;
4444
use crate::fee_estimator::OnchainFeeEstimator;
@@ -631,3 +631,5 @@ impl From<&(u64, Vec<u8>)> for CustomTlvRecord {
631631
pub(crate) type PendingPaymentStore = DataStore<PendingPaymentDetails, Arc<Logger>>;
632632

633633
pub(crate) type ClosedChannelStore = DataStore<ClosedChannelDetails, Arc<Logger>>;
634+
635+
pub(crate) type PendingChannelStore = DataStore<PendingChannelInfo, Arc<Logger>>;

0 commit comments

Comments
 (0)