Skip to content

Commit ad78799

Browse files
committed
Move to awaiting gossip validation in the background processor
`P2PGossipSync` is a rather poor design. It currently basically requires two circular `Arc` references, leaving `NetworkGraph`s to leak if LDK is un-loaded: * `P2PGossipSync` owns/holds a reference to the `GossipVerifier` and `GossipVerifier` holds an `Arc` to the `P2PGossipSync` and * `PeerManager` holds a reference to the `P2PGossipSync` (as the gossip message handler) which owns/holds a reference to the `GossipVerifier`, which has a `Deref` (likely an `Arc` in practice) to the `PeerManager`. Instead, we should move towards the same design we have elsewhere - hold a `Notifier` and expose waiting on it to the background processor then poll for completion from there (in this case, as in others by checking for completion when handling `get_and_clear_pending_msg_events` calls). After the last few commits of setup, here we finally switch to waking the background processor directly when we detect async gossip validation completion, allowing us to drop the circular references in `P2PGossipSync`/`GossipVerifier` entirely. Fixes #3369
1 parent 91a16c5 commit ad78799

3 files changed

Lines changed: 61 additions & 51 deletions

File tree

lightning-background-processor/src/lib.rs

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ use lightning::util::persist::{
6464
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
6565
};
6666
use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
67+
use lightning::util::wakers::Future;
6768
#[cfg(feature = "std")]
6869
use lightning::util::wakers::Sleeper;
6970
use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -235,6 +236,14 @@ where
235236
GossipSync::None => None,
236237
}
237238
}
239+
240+
fn validation_completion_future(&self) -> Option<Future> {
241+
match self {
242+
GossipSync::P2P(gossip_sync) => Some(gossip_sync.validation_completion_future()),
243+
GossipSync::Rapid(_) => None,
244+
GossipSync::None => None,
245+
}
246+
}
238247
}
239248

240249
/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
@@ -520,12 +529,14 @@ pub(crate) mod futures_util {
520529
C: Future<Output = ()> + Unpin,
521530
D: Future<Output = ()> + Unpin,
522531
E: Future<Output = ()> + Unpin,
532+
F: Future<Output = ()> + Unpin,
523533
> {
524534
pub a: A,
525535
pub b: B,
526536
pub c: C,
527537
pub d: D,
528538
pub e: E,
539+
pub f: F,
529540
}
530541

531542
pub(crate) enum SelectorOutput {
@@ -534,6 +545,7 @@ pub(crate) mod futures_util {
534545
C,
535546
D,
536547
E,
548+
F,
537549
}
538550

539551
impl<
@@ -542,7 +554,8 @@ pub(crate) mod futures_util {
542554
C: Future<Output = ()> + Unpin,
543555
D: Future<Output = ()> + Unpin,
544556
E: Future<Output = ()> + Unpin,
545-
> Future for Selector<A, B, C, D, E>
557+
F: Future<Output = ()> + Unpin,
558+
> Future for Selector<A, B, C, D, E, F>
546559
{
547560
type Output = SelectorOutput;
548561
fn poll(
@@ -580,6 +593,12 @@ pub(crate) mod futures_util {
580593
},
581594
Poll::Pending => {},
582595
}
596+
match Pin::new(&mut self.f).poll(ctx) {
597+
Poll::Ready(()) => {
598+
return Poll::Ready(SelectorOutput::F);
599+
},
600+
Poll::Pending => {},
601+
}
583602
Poll::Pending
584603
}
585604
}
@@ -606,6 +625,12 @@ pub(crate) mod futures_util {
606625
}
607626
}
608627

628+
impl<F: Future<Output = ()> + Unpin> From<Option<F>> for OptionalSelector<F> {
629+
fn from(optional_future: Option<F>) -> Self {
630+
Self { optional_future }
631+
}
632+
}
633+
609634
// If we want to poll a future without an async context to figure out if it has completed or
610635
// not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
611636
// but sadly there's a good bit of boilerplate here.
@@ -1058,18 +1083,13 @@ where
10581083
if mobile_interruptable_platform {
10591084
await_start = Some(sleeper(Duration::from_secs(1)));
10601085
}
1061-
let om_fut = if let Some(om) = onion_messenger.as_ref() {
1062-
let fut = om.get_om().get_update_future();
1063-
OptionalSelector { optional_future: Some(fut) }
1064-
} else {
1065-
OptionalSelector { optional_future: None }
1066-
};
1067-
let lm_fut = if let Some(lm) = liquidity_manager.as_ref() {
1068-
let fut = lm.get_lm().get_pending_msgs_or_needs_persist_future();
1069-
OptionalSelector { optional_future: Some(fut) }
1070-
} else {
1071-
OptionalSelector { optional_future: None }
1072-
};
1086+
let om_fut: OptionalSelector<_> =
1087+
onion_messenger.as_ref().map(|om| om.get_om().get_update_future()).into();
1088+
let lm_fut: OptionalSelector<_> = liquidity_manager
1089+
.as_ref()
1090+
.map(|lm| lm.get_lm().get_pending_msgs_or_needs_persist_future())
1091+
.into();
1092+
let gv_fut: OptionalSelector<_> = gossip_sync.validation_completion_future().into();
10731093
let needs_processing = channel_manager.get_cm().needs_pending_htlc_processing();
10741094
let sleep_delay = match (needs_processing, mobile_interruptable_platform) {
10751095
(true, true) => batch_delay.get().min(Duration::from_millis(100)),
@@ -1083,9 +1103,14 @@ where
10831103
c: chain_monitor.get_update_future(),
10841104
d: om_fut,
10851105
e: lm_fut,
1106+
f: gv_fut,
10861107
};
10871108
match fut.await {
1088-
SelectorOutput::B | SelectorOutput::C | SelectorOutput::D | SelectorOutput::E => {},
1109+
SelectorOutput::B
1110+
| SelectorOutput::C
1111+
| SelectorOutput::D
1112+
| SelectorOutput::E
1113+
| SelectorOutput::F => {},
10891114
SelectorOutput::A(exit) => {
10901115
if exit {
10911116
break;
@@ -1639,11 +1664,12 @@ impl BackgroundProcessor {
16391664
let lm_fut = liquidity_manager
16401665
.as_ref()
16411666
.map(|lm| lm.get_lm().get_pending_msgs_or_needs_persist_future());
1667+
let gv_fut = gossip_sync.validation_completion_future();
16421668
let always_futures = [
16431669
channel_manager.get_cm().get_event_or_persistence_needed_future(),
16441670
chain_monitor.get_update_future(),
16451671
];
1646-
let futures = always_futures.into_iter().chain(om_fut).chain(lm_fut);
1672+
let futures = always_futures.into_iter().chain(om_fut).chain(lm_fut).chain(gv_fut);
16471673
let sleeper = Sleeper::from_futures(futures);
16481674

16491675
let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() {

lightning-block-sync/src/gossip.rs

Lines changed: 8 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@ use bitcoin::constants::ChainHash;
99
use bitcoin::hash_types::BlockHash;
1010
use bitcoin::transaction::{OutPoint, TxOut};
1111

12-
use lightning::ln::peer_handler::APeerManager;
13-
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
1412
use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult};
15-
use lightning::util::logger::Logger;
1613
use lightning::util::native_async::FutureSpawner;
1714
use lightning::util::wakers::Notifier;
1815

@@ -128,46 +125,28 @@ impl<
128125
/// value of 1024 should more than suffice), and ensure you have sufficient file descriptors
129126
/// available on both Bitcoin Core and your LDK application for each request to hold its own
130127
/// connection.
131-
pub struct GossipVerifier<
132-
S: FutureSpawner,
133-
Blocks: Deref + Send + Sync + 'static + Clone,
134-
L: Deref + Send + Sync + 'static,
135-
> where
128+
pub struct GossipVerifier<S: FutureSpawner, Blocks: Deref + Send + Sync + 'static + Clone>
129+
where
136130
Blocks::Target: UtxoSource,
137-
L::Target: Logger,
138131
{
139132
source: Blocks,
140-
peer_manager_wake: Arc<dyn Fn() + Send + Sync>,
141-
gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Arc<Self>, L>>,
142133
spawn: S,
143134
block_cache: Arc<Mutex<VecDeque<(u32, Block)>>>,
144135
}
145136

146137
const BLOCK_CACHE_SIZE: usize = 5;
147138

148-
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone, L: Deref + Send + Sync>
149-
GossipVerifier<S, Blocks, L>
139+
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone> GossipVerifier<S, Blocks>
150140
where
151141
Blocks::Target: UtxoSource,
152-
L::Target: Logger,
153142
{
154-
/// Constructs a new [`GossipVerifier`].
143+
/// Constructs a new [`GossipVerifier`] for use in a [`P2PGossipSync`].
155144
///
156-
/// This is expected to be given to a [`P2PGossipSync`] (initially constructed with `None` for
157-
/// the UTXO lookup) via [`P2PGossipSync::add_utxo_lookup`].
158-
pub fn new<APM: Deref + Send + Sync + Clone + 'static>(
159-
source: Blocks, spawn: S, gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Arc<Self>, L>>,
160-
peer_manager: APM,
161-
) -> Self
162-
where
163-
APM::Target: APeerManager,
164-
{
165-
let peer_manager_wake = Arc::new(move || peer_manager.as_ref().process_events());
145+
/// [`P2PGossipSync`]: lightning::routing::gossip::P2PGossipSync
146+
pub fn new(source: Blocks, spawn: S) -> Self {
166147
Self {
167148
source,
168149
spawn,
169-
gossiper,
170-
peer_manager_wake,
171150
block_cache: Arc::new(Mutex::new(VecDeque::with_capacity(BLOCK_CACHE_SIZE))),
172151
}
173152
}
@@ -256,35 +235,28 @@ where
256235
}
257236
}
258237

259-
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone, L: Deref + Send + Sync> Deref
260-
for GossipVerifier<S, Blocks, L>
238+
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone> Deref for GossipVerifier<S, Blocks>
261239
where
262240
Blocks::Target: UtxoSource,
263-
L::Target: Logger,
264241
{
265242
type Target = Self;
266243
fn deref(&self) -> &Self {
267244
self
268245
}
269246
}
270247

271-
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone, L: Deref + Send + Sync> UtxoLookup
272-
for GossipVerifier<S, Blocks, L>
248+
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone> UtxoLookup for GossipVerifier<S, Blocks>
273249
where
274250
Blocks::Target: UtxoSource,
275-
L::Target: Logger,
276251
{
277252
fn get_utxo(&self, _chain_hash: &ChainHash, scid: u64, notifier: Arc<Notifier>) -> UtxoResult {
278253
let res = UtxoFuture::new(notifier);
279254
let fut = res.clone();
280255
let source = self.source.clone();
281-
let gossiper = Arc::clone(&self.gossiper);
282256
let block_cache = Arc::clone(&self.block_cache);
283-
let pmw = Arc::clone(&self.peer_manager_wake);
284257
self.spawn.spawn(async move {
285258
let res = Self::retrieve_utxo(source, block_cache, scid).await;
286259
fut.resolve(res);
287-
(pmw)();
288260
});
289261
UtxoResult::Async(res)
290262
}

lightning/src/routing/gossip.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use crate::util::indexed_map::{
4343
use crate::util::logger::{Level, Logger};
4444
use crate::util::scid_utils::{block_from_scid, scid_from_parts, MAX_SCID_BLOCK};
4545
use crate::util::ser::{MaybeReadable, Readable, ReadableArgs, RequiredWrapper, Writeable, Writer};
46+
use crate::util::wakers::Future;
4647

4748
use crate::io;
4849
use crate::io_extras::{copy, sink};
@@ -367,6 +368,17 @@ where
367368
&self.network_graph
368369
}
369370

371+
/// Gets a [`Future`] which will resolve the next time an async validation of gossip data
372+
/// completes.
373+
///
374+
/// If the [`UtxoLookup`] provided in [`P2PGossipSync::new`] does not return
375+
/// [`UtxoResult::Async`] values, the returned [`Future`] will never resolve
376+
///
377+
/// [`UtxoResult::Async`]: crate::routing::utxo::UtxoResult::Async
378+
pub fn validation_completion_future(&self) -> Future {
379+
self.network_graph.pending_checks.completion_notifier.get_future()
380+
}
381+
370382
/// Returns true when a full routing table sync should be performed with a peer.
371383
fn should_request_full_sync(&self) -> bool {
372384
const FULL_SYNCS_TO_REQUEST: usize = 5;

0 commit comments

Comments
 (0)