Skip to content

Commit 5236dba

Browse files
authored
Merge pull request #4294 from TheBlueMatt/2025-12-gossip-validate-arc-loop
Remove circular reference in `GossipVerifier`
2 parents 1c730c8 + 15ddb31 commit 5236dba

File tree

9 files changed

+419
-387
lines changed

9 files changed

+419
-387
lines changed

fuzz/src/router.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use lightning::types::features::{BlindedHopFeatures, Bolt12InvoiceFeatures};
3131
use lightning::util::config::UserConfig;
3232
use lightning::util::hash_tables::*;
3333
use lightning::util::ser::LengthReadable;
34+
use lightning::util::wakers::Notifier;
3435

3536
use bitcoin::hashes::Hash;
3637
use bitcoin::network::Network;
@@ -88,12 +89,11 @@ impl InputData {
8889
}
8990
}
9091

91-
struct FuzzChainSource<'a, 'b, Out: test_logger::Output> {
92+
struct FuzzChainSource {
9293
input: Arc<InputData>,
93-
net_graph: &'a NetworkGraph<&'b test_logger::TestLogger<Out>>,
9494
}
95-
impl<Out: test_logger::Output> UtxoLookup for FuzzChainSource<'_, '_, Out> {
96-
fn get_utxo(&self, _chain_hash: &ChainHash, _short_channel_id: u64) -> UtxoResult {
95+
impl UtxoLookup for FuzzChainSource {
96+
fn get_utxo(&self, _chain_hash: &ChainHash, _scid: u64, notifier: Arc<Notifier>) -> UtxoResult {
9797
let input_slice = self.input.get_slice(2);
9898
if input_slice.is_none() {
9999
return UtxoResult::Sync(Err(UtxoLookupError::UnknownTx));
@@ -107,17 +107,17 @@ impl<Out: test_logger::Output> UtxoLookup for FuzzChainSource<'_, '_, Out> {
107107
&[0, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)),
108108
&[1, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)),
109109
&[2, _] => {
110-
let future = UtxoFuture::new();
111-
future.resolve_without_forwarding(self.net_graph, Ok(txo_res));
110+
let future = UtxoFuture::new(notifier);
111+
future.resolve(Ok(txo_res));
112112
UtxoResult::Async(future.clone())
113113
},
114114
&[3, _] => {
115-
let future = UtxoFuture::new();
116-
future.resolve_without_forwarding(self.net_graph, Err(UtxoLookupError::UnknownTx));
115+
let future = UtxoFuture::new(notifier);
116+
future.resolve(Err(UtxoLookupError::UnknownTx));
117117
UtxoResult::Async(future.clone())
118118
},
119119
&[4, _] => {
120-
UtxoResult::Async(UtxoFuture::new()) // the future will never resolve
120+
UtxoResult::Async(UtxoFuture::new(notifier)) // the future will never resolve
121121
},
122122
&[..] => UtxoResult::Sync(Ok(txo_res)),
123123
}
@@ -197,7 +197,7 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
197197

198198
let our_pubkey = get_pubkey!();
199199
let net_graph = NetworkGraph::new(Network::Bitcoin, &logger);
200-
let chain_source = FuzzChainSource { input: Arc::clone(&input), net_graph: &net_graph };
200+
let chain_source = FuzzChainSource { input: Arc::clone(&input) };
201201

202202
let mut node_pks = new_hash_map();
203203
let mut scid = 42;
@@ -335,9 +335,7 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
335335
node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1), ());
336336
node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2), ());
337337
let _ = net_graph
338-
.update_channel_from_unsigned_announcement::<&FuzzChainSource<'_, '_, Out>>(
339-
&msg, &None,
340-
);
338+
.update_channel_from_unsigned_announcement::<&FuzzChainSource>(&msg, &None);
341339
},
342340
2 => {
343341
let msg =

lightning-background-processor/src/lib.rs

Lines changed: 51 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ use lightning::util::persist::{
6464
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
6565
};
6666
use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
67+
use lightning::util::wakers::Future;
6768
#[cfg(feature = "std")]
6869
use lightning::util::wakers::Sleeper;
6970
use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -245,6 +246,14 @@ where
245246
GossipSync::None => None,
246247
}
247248
}
249+
250+
fn validation_completion_future(&self) -> Option<Future> {
251+
match self {
252+
GossipSync::P2P(gossip_sync) => Some(gossip_sync.validation_completion_future()),
253+
GossipSync::Rapid(_) => None,
254+
GossipSync::None => None,
255+
}
256+
}
248257
}
249258

250259
/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
@@ -530,12 +539,14 @@ pub(crate) mod futures_util {
530539
C: Future<Output = ()> + Unpin,
531540
D: Future<Output = ()> + Unpin,
532541
E: Future<Output = ()> + Unpin,
542+
F: Future<Output = ()> + Unpin,
533543
> {
534544
pub a: A,
535545
pub b: B,
536546
pub c: C,
537547
pub d: D,
538548
pub e: E,
549+
pub f: F,
539550
}
540551

541552
pub(crate) enum SelectorOutput {
@@ -544,6 +555,7 @@ pub(crate) mod futures_util {
544555
C,
545556
D,
546557
E,
558+
F,
547559
}
548560

549561
impl<
@@ -552,7 +564,8 @@ pub(crate) mod futures_util {
552564
C: Future<Output = ()> + Unpin,
553565
D: Future<Output = ()> + Unpin,
554566
E: Future<Output = ()> + Unpin,
555-
> Future for Selector<A, B, C, D, E>
567+
F: Future<Output = ()> + Unpin,
568+
> Future for Selector<A, B, C, D, E, F>
556569
{
557570
type Output = SelectorOutput;
558571
fn poll(
@@ -590,6 +603,12 @@ pub(crate) mod futures_util {
590603
},
591604
Poll::Pending => {},
592605
}
606+
match Pin::new(&mut self.f).poll(ctx) {
607+
Poll::Ready(()) => {
608+
return Poll::Ready(SelectorOutput::F);
609+
},
610+
Poll::Pending => {},
611+
}
593612
Poll::Pending
594613
}
595614
}
@@ -616,6 +635,12 @@ pub(crate) mod futures_util {
616635
}
617636
}
618637

638+
impl<F: Future<Output = ()> + Unpin> From<Option<F>> for OptionalSelector<F> {
639+
fn from(optional_future: Option<F>) -> Self {
640+
Self { optional_future }
641+
}
642+
}
643+
619644
// If we want to poll a future without an async context to figure out if it has completed or
620645
// not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
621646
// but sadly there's a good bit of boilerplate here.
@@ -1070,18 +1095,13 @@ where
10701095
if mobile_interruptable_platform {
10711096
await_start = Some(sleeper(Duration::from_secs(1)));
10721097
}
1073-
let om_fut = if let Some(om) = onion_messenger.as_ref() {
1074-
let fut = om.get_om().get_update_future();
1075-
OptionalSelector { optional_future: Some(fut) }
1076-
} else {
1077-
OptionalSelector { optional_future: None }
1078-
};
1079-
let lm_fut = if let Some(lm) = liquidity_manager.as_ref() {
1080-
let fut = lm.get_lm().get_pending_msgs_or_needs_persist_future();
1081-
OptionalSelector { optional_future: Some(fut) }
1082-
} else {
1083-
OptionalSelector { optional_future: None }
1084-
};
1098+
let om_fut: OptionalSelector<_> =
1099+
onion_messenger.as_ref().map(|om| om.get_om().get_update_future()).into();
1100+
let lm_fut: OptionalSelector<_> = liquidity_manager
1101+
.as_ref()
1102+
.map(|lm| lm.get_lm().get_pending_msgs_or_needs_persist_future())
1103+
.into();
1104+
let gv_fut: OptionalSelector<_> = gossip_sync.validation_completion_future().into();
10851105
let needs_processing = channel_manager.get_cm().needs_pending_htlc_processing();
10861106
let sleep_delay = match (needs_processing, mobile_interruptable_platform) {
10871107
(true, true) => batch_delay.get().min(Duration::from_millis(100)),
@@ -1095,9 +1115,14 @@ where
10951115
c: chain_monitor.get_update_future(),
10961116
d: om_fut,
10971117
e: lm_fut,
1118+
f: gv_fut,
10981119
};
10991120
match fut.await {
1100-
SelectorOutput::B | SelectorOutput::C | SelectorOutput::D | SelectorOutput::E => {},
1121+
SelectorOutput::B
1122+
| SelectorOutput::C
1123+
| SelectorOutput::D
1124+
| SelectorOutput::E
1125+
| SelectorOutput::F => {},
11011126
SelectorOutput::A(exit) => {
11021127
if exit {
11031128
break;
@@ -1669,28 +1694,18 @@ impl BackgroundProcessor {
16691694
log_trace!(logger, "Terminating background processor.");
16701695
break;
16711696
}
1672-
let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) {
1673-
(Some(om), Some(lm)) => Sleeper::from_four_futures(
1674-
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1675-
&chain_monitor.get_update_future(),
1676-
&om.get_om().get_update_future(),
1677-
&lm.get_lm().get_pending_msgs_or_needs_persist_future(),
1678-
),
1679-
(Some(om), None) => Sleeper::from_three_futures(
1680-
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1681-
&chain_monitor.get_update_future(),
1682-
&om.get_om().get_update_future(),
1683-
),
1684-
(None, Some(lm)) => Sleeper::from_three_futures(
1685-
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1686-
&chain_monitor.get_update_future(),
1687-
&lm.get_lm().get_pending_msgs_or_needs_persist_future(),
1688-
),
1689-
(None, None) => Sleeper::from_two_futures(
1690-
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1691-
&chain_monitor.get_update_future(),
1692-
),
1693-
};
1697+
let om_fut = onion_messenger.as_ref().map(|om| om.get_om().get_update_future());
1698+
let lm_fut = liquidity_manager
1699+
.as_ref()
1700+
.map(|lm| lm.get_lm().get_pending_msgs_or_needs_persist_future());
1701+
let gv_fut = gossip_sync.validation_completion_future();
1702+
let always_futures = [
1703+
channel_manager.get_cm().get_event_or_persistence_needed_future(),
1704+
chain_monitor.get_update_future(),
1705+
];
1706+
let futures = always_futures.into_iter().chain(om_fut).chain(lm_fut).chain(gv_fut);
1707+
let sleeper = Sleeper::from_futures(futures);
1708+
16941709
let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() {
16951710
batch_delay.get()
16961711
} else {

lightning-block-sync/src/gossip.rs

Lines changed: 13 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@ use bitcoin::constants::ChainHash;
99
use bitcoin::hash_types::BlockHash;
1010
use bitcoin::transaction::{OutPoint, TxOut};
1111

12-
use lightning::ln::peer_handler::APeerManager;
13-
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
1412
use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult};
15-
use lightning::util::logger::Logger;
1613
use lightning::util::native_async::FutureSpawner;
14+
use lightning::util::wakers::Notifier;
1715

1816
use std::collections::VecDeque;
1917
use std::future::Future;
@@ -127,46 +125,28 @@ impl<
127125
/// value of 1024 should more than suffice), and ensure you have sufficient file descriptors
128126
/// available on both Bitcoin Core and your LDK application for each request to hold its own
129127
/// connection.
130-
pub struct GossipVerifier<
131-
S: FutureSpawner,
132-
Blocks: Deref + Send + Sync + 'static + Clone,
133-
L: Deref + Send + Sync + 'static,
134-
> where
128+
pub struct GossipVerifier<S: FutureSpawner, Blocks: Deref + Send + Sync + 'static + Clone>
129+
where
135130
Blocks::Target: UtxoSource,
136-
L::Target: Logger,
137131
{
138132
source: Blocks,
139-
peer_manager_wake: Arc<dyn Fn() + Send + Sync>,
140-
gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Arc<Self>, L>>,
141133
spawn: S,
142134
block_cache: Arc<Mutex<VecDeque<(u32, Block)>>>,
143135
}
144136

145137
const BLOCK_CACHE_SIZE: usize = 5;
146138

147-
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone, L: Deref + Send + Sync>
148-
GossipVerifier<S, Blocks, L>
139+
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone> GossipVerifier<S, Blocks>
149140
where
150141
Blocks::Target: UtxoSource,
151-
L::Target: Logger,
152142
{
153-
/// Constructs a new [`GossipVerifier`].
143+
/// Constructs a new [`GossipVerifier`] for use in a [`P2PGossipSync`].
154144
///
155-
/// This is expected to be given to a [`P2PGossipSync`] (initially constructed with `None` for
156-
/// the UTXO lookup) via [`P2PGossipSync::add_utxo_lookup`].
157-
pub fn new<APM: Deref + Send + Sync + Clone + 'static>(
158-
source: Blocks, spawn: S, gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Arc<Self>, L>>,
159-
peer_manager: APM,
160-
) -> Self
161-
where
162-
APM::Target: APeerManager,
163-
{
164-
let peer_manager_wake = Arc::new(move || peer_manager.as_ref().process_events());
145+
/// [`P2PGossipSync`]: lightning::routing::gossip::P2PGossipSync
146+
pub fn new(source: Blocks, spawn: S) -> Self {
165147
Self {
166148
source,
167149
spawn,
168-
gossiper,
169-
peer_manager_wake,
170150
block_cache: Arc::new(Mutex::new(VecDeque::with_capacity(BLOCK_CACHE_SIZE))),
171151
}
172152
}
@@ -255,35 +235,28 @@ where
255235
}
256236
}
257237

258-
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone, L: Deref + Send + Sync> Deref
259-
for GossipVerifier<S, Blocks, L>
238+
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone> Deref for GossipVerifier<S, Blocks>
260239
where
261240
Blocks::Target: UtxoSource,
262-
L::Target: Logger,
263241
{
264242
type Target = Self;
265243
fn deref(&self) -> &Self {
266244
self
267245
}
268246
}
269247

270-
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone, L: Deref + Send + Sync> UtxoLookup
271-
for GossipVerifier<S, Blocks, L>
248+
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone> UtxoLookup for GossipVerifier<S, Blocks>
272249
where
273250
Blocks::Target: UtxoSource,
274-
L::Target: Logger,
275251
{
276-
fn get_utxo(&self, _chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult {
277-
let res = UtxoFuture::new();
252+
fn get_utxo(&self, _chain_hash: &ChainHash, scid: u64, notifier: Arc<Notifier>) -> UtxoResult {
253+
let res = UtxoFuture::new(notifier);
278254
let fut = res.clone();
279255
let source = self.source.clone();
280-
let gossiper = Arc::clone(&self.gossiper);
281256
let block_cache = Arc::clone(&self.block_cache);
282-
let pmw = Arc::clone(&self.peer_manager_wake);
283257
self.spawn.spawn(async move {
284-
let res = Self::retrieve_utxo(source, block_cache, short_channel_id).await;
285-
fut.resolve(gossiper.network_graph(), &*gossiper, res);
286-
(pmw)();
258+
let res = Self::retrieve_utxo(source, block_cache, scid).await;
259+
fut.resolve(res);
287260
});
288261
UtxoResult::Async(res)
289262
}

0 commit comments

Comments
 (0)