Skip to content

Commit 9489f3f

Browse files
Parallelize Esplora tx_sync confirmed/unconfirmed lookups
EsploraSyncClient::sync re-confirms every watched transaction and output on each pass, one or more HTTP round-trips each, issued strictly serially. Against a remote Esplora the sync wall-time scales with watched_set * round_trip_latency and exceeds a wallet-sync timeout on any wallet with real channel history. Fan out the per-tx / per-output lookups in get_confirmed_transactions (both the watched_transactions phase and the watched_outputs spend-status phase) and the per-block checks in get_unconfirmed_transactions with a bounded buffer_unordered(4), async client only; the blocking client keeps the serial path. Ordering is preserved (results are still sorted by block height then in-block position before being handed to Confirm), and every consistency check the serial version performed is preserved in a sequential post-processing pass. Concurrency is deliberately low: against a shared, rate-limited Esplora the effective request rate is the fleet aggregate, so a small per-node bound stays under the server's per-client limit while removing the serial latency floor. Adds a self-contained timing test (mock Esplora with injected per-request latency, no bitcoind/electrs): N=40 watched txs at 100ms/lookup sync in ~1.0s parallel vs ~4.1s serial. Confirmation/ordering correctness remains covered by the electrs-backed test_esplora_syncs integration test.
1 parent f56f47f commit 9489f3f

3 files changed

Lines changed: 316 additions & 7 deletions

File tree

lightning-transaction-sync/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ electrum-client = { version = "0.24.0", optional = true, default-features = fals
4141

4242
[dev-dependencies]
4343
lightning = { version = "0.2.0", path = "../lightning", default-features = false, features = ["std", "_test_utils"] }
44-
tokio = { version = "1.35.0", features = ["macros"] }
44+
tokio = { version = "1.35.0", features = ["macros", "rt", "rt-multi-thread", "net", "io-util", "time"] }
4545

4646
[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
4747
electrsd = { version = "0.35.0", default-features = false, features = ["legacy"] }

lightning-transaction-sync/src/esplora.rs

Lines changed: 150 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,21 @@ use esplora_client::Builder;
2626
use core::ops::Deref;
2727
use std::collections::HashSet;
2828

29+
/// Maximum number of concurrent in-flight Esplora requests issued while syncing
30+
/// confirmed/unconfirmed transactions (async client only).
31+
///
32+
/// The Esplora chain sync re-confirms every watched transaction/output on each
33+
/// pass, which is one or more HTTP round-trips each. Against a remote Esplora
34+
/// these run sequentially in the stock client, so sync wall-time scales with
35+
/// `watched_set * round_trip_latency` and easily exceeds an LDK wallet-sync
36+
/// timeout on wallets with real channel history. We fan these out with a bounded
37+
/// concurrency instead. The bound is deliberately small: when many nodes sync
38+
/// against a shared, rate-limited endpoint the effective request rate is the
39+
/// fleet aggregate, so a low per-node concurrency keeps us under the server's
40+
/// per-client limit while still removing the strictly-serial latency floor.
41+
#[cfg(feature = "async-interface")]
42+
const ESPLORA_SYNC_CONCURRENCY: usize = 4;
43+
2944
/// Synchronizes LDK with a given [`Esplora`] server.
3045
///
3146
/// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of
@@ -298,19 +313,109 @@ where
298313

299314
let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
300315

316+
// Phase A: resolve the confirmation status of each directly-watched
317+
// transaction. `watched_transactions` is a set (unique txids), so a tx
318+
// resolved here is never a duplicate; the async path fans the lookups
319+
// out with bounded concurrency and merges the results.
320+
#[cfg(feature = "async-interface")]
321+
{
322+
use futures::stream::{self, StreamExt};
323+
let results: Vec<Result<Option<ConfirmedTx>, InternalError>> =
324+
stream::iter(sync_state.watched_transactions.iter().copied())
325+
.map(|txid| async move { self.get_confirmed_tx(txid, None, None).await })
326+
.buffer_unordered(ESPLORA_SYNC_CONCURRENCY)
327+
.collect()
328+
.await;
329+
for r in results {
330+
if let Some(confirmed_tx) = r? {
331+
if !confirmed_txs.iter().any(|ctx| ctx.txid == confirmed_tx.txid) {
332+
confirmed_txs.push(confirmed_tx);
333+
}
334+
}
335+
}
336+
}
337+
#[cfg(not(feature = "async-interface"))]
301338
for txid in &sync_state.watched_transactions {
302339
if confirmed_txs.iter().any(|ctx| ctx.txid == *txid) {
303340
continue;
304341
}
305-
if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(*txid, None, None))? {
342+
if let Some(confirmed_tx) = self.get_confirmed_tx(*txid, None, None)? {
306343
confirmed_txs.push(confirmed_tx);
307344
}
308345
}
309346

347+
// Phase B: for each watched output, fetch its spend status and, if it was
348+
// spent, resolve the spending transaction. Phase A is fully merged into
349+
// `confirmed_txs` before this runs, so the consistency check below sees
350+
// the same state the sequential version did.
351+
#[cfg(feature = "async-interface")]
352+
{
353+
use futures::stream::{self, StreamExt};
354+
355+
// B1: fan out the output-status lookups.
356+
let outpoints: Vec<_> =
357+
sync_state.watched_outputs.values().map(|o| o.outpoint).collect();
358+
let status_results: Vec<Result<Option<esplora_client::OutputStatus>, InternalError>> =
359+
stream::iter(outpoints.into_iter())
360+
.map(|outpoint| async move {
361+
self.client
362+
.get_output_status(&outpoint.txid, outpoint.index as u64)
363+
.await
364+
.map_err(InternalError::from)
365+
})
366+
.buffer_unordered(ESPLORA_SYNC_CONCURRENCY)
367+
.collect()
368+
.await;
369+
370+
// B2: post-process sequentially, preserving every consistency check
371+
// the sequential version performed, and build the to-fetch list.
372+
let mut to_fetch: Vec<(Txid, Option<BlockHash>, Option<u32>)> = Vec::new();
373+
for status_res in status_results {
374+
let output_status = match status_res? {
375+
Some(s) => s,
376+
None => continue,
377+
};
378+
if let Some(spending_txid) = output_status.txid {
379+
if let Some(spending_tx_status) = output_status.status {
380+
if confirmed_txs.iter().any(|ctx| ctx.txid == spending_txid) {
381+
if spending_tx_status.confirmed {
382+
continue;
383+
} else {
384+
log_trace!(self.logger, "Inconsistency: Detected previously-confirmed Tx {} as unconfirmed", spending_txid);
385+
return Err(InternalError::Inconsistency);
386+
}
387+
}
388+
to_fetch.push((
389+
spending_txid,
390+
spending_tx_status.block_hash,
391+
spending_tx_status.block_height,
392+
));
393+
}
394+
}
395+
}
396+
397+
// B3: fan out the dependent confirmed-tx lookups.
398+
let dep_results: Vec<Result<Option<ConfirmedTx>, InternalError>> =
399+
stream::iter(to_fetch.into_iter())
400+
.map(|(txid, bh, height)| async move {
401+
self.get_confirmed_tx(txid, bh, height).await
402+
})
403+
.buffer_unordered(ESPLORA_SYNC_CONCURRENCY)
404+
.collect()
405+
.await;
406+
for r in dep_results {
407+
if let Some(confirmed_tx) = r? {
408+
if !confirmed_txs.iter().any(|ctx| ctx.txid == confirmed_tx.txid) {
409+
confirmed_txs.push(confirmed_tx);
410+
}
411+
}
412+
}
413+
}
414+
#[cfg(not(feature = "async-interface"))]
310415
for (_, output) in &sync_state.watched_outputs {
311-
if let Some(output_status) = maybe_await!(self
416+
if let Some(output_status) = self
312417
.client
313-
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64))?
418+
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64)?
314419
{
315420
if let Some(spending_txid) = output_status.txid {
316421
if let Some(spending_tx_status) = output_status.status {
@@ -324,11 +429,11 @@ where
324429
}
325430
}
326431

327-
if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(
432+
if let Some(confirmed_tx) = self.get_confirmed_tx(
328433
spending_txid,
329434
spending_tx_status.block_hash,
330435
spending_tx_status.block_height,
331-
))? {
436+
)? {
332437
confirmed_txs.push(confirmed_tx);
333438
}
334439
}
@@ -436,9 +541,48 @@ where
436541

437542
let mut unconfirmed_txs = Vec::new();
438543

544+
// The async path fans the per-block status checks out with bounded
545+
// concurrency. The `None` block hash is a hard invariant violation
546+
// (pre-0.0.113 channel), so we screen for it before fanning out rather
547+
// than panicking from inside a concurrent task.
548+
#[cfg(feature = "async-interface")]
549+
{
550+
use futures::stream::{self, StreamExt};
551+
let mut items: Vec<(Txid, BlockHash)> = Vec::with_capacity(relevant_txids.len());
552+
for (txid, _conf_height, block_hash_opt) in relevant_txids {
553+
if let Some(block_hash) = block_hash_opt {
554+
items.push((txid, block_hash));
555+
} else {
556+
log_error!(self.logger, "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
557+
panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
558+
}
559+
}
560+
let results: Vec<(Txid, Result<esplora_client::BlockStatus, InternalError>)> =
561+
stream::iter(items.into_iter())
562+
.map(|(txid, block_hash)| async move {
563+
let r = self
564+
.client
565+
.get_block_status(&block_hash)
566+
.await
567+
.map_err(InternalError::from);
568+
(txid, r)
569+
})
570+
.buffer_unordered(ESPLORA_SYNC_CONCURRENCY)
571+
.collect()
572+
.await;
573+
for (txid, status_res) in results {
574+
let block_status = status_res?;
575+
if block_status.in_best_chain {
576+
// Skip if the block in question is still confirmed.
577+
continue;
578+
}
579+
unconfirmed_txs.push(txid);
580+
}
581+
}
582+
#[cfg(not(feature = "async-interface"))]
439583
for (txid, _conf_height, block_hash_opt) in relevant_txids {
440584
if let Some(block_hash) = block_hash_opt {
441-
let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
585+
let block_status = self.client.get_block_status(&block_hash)?;
442586
if block_status.in_best_chain {
443587
// Skip if the block in question is still confirmed.
444588
continue;
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
#![cfg(all(not(target_os = "windows"), feature = "esplora-async"))]
2+
3+
//! Self-contained before/after timing test for the parallelized Esplora tx_sync.
4+
//!
5+
//! Stands up a minimal mock Esplora HTTP server that injects a fixed latency on
6+
//! the per-transaction `/merkleblock-proof` lookups (the fan-out the confirmed
7+
//! sync parallelizes) and returns 404 for them, which esplora-client maps to
8+
//! `Ok(None)` -> no confirmations, so the run is deterministic and needs no
9+
//! bitcoind/electrs.
10+
//!
11+
//! With `N` watched transactions and `DELAY` per merkle lookup:
12+
//! - strictly-serial sync ~= N * DELAY
13+
//! - parallel sync (buffer_unordered(C)) ~= ceil(N / C) * DELAY
14+
//!
15+
//! The test asserts the wall-time is well under the serial estimate, so it
16+
//! PASSES on the parallel implementation and would FAIL on the old serial one.
17+
//! That is the "before/after": run it on the base commit (serial) to see it
18+
//! blow the bound, and on this branch (parallel) to see it pass.
19+
//!
20+
//! Correctness of actual confirmation handling / ordering is covered by the
21+
//! electrs-backed `test_esplora_syncs` integration test (run in CI).
22+
23+
use lightning::chain::transaction::TransactionData;
24+
use lightning::chain::{Confirm, Filter};
25+
use lightning::util::test_utils::TestLogger;
26+
use lightning_transaction_sync::EsploraSyncClient;
27+
28+
use bitcoin::block::Header;
29+
use bitcoin::consensus::encode::serialize_hex;
30+
use bitcoin::constants::genesis_block;
31+
use bitcoin::hashes::Hash;
32+
use bitcoin::network::Network;
33+
use bitcoin::{BlockHash, ScriptBuf, Txid};
34+
35+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
36+
use tokio::net::TcpListener;
37+
38+
use std::time::{Duration, Instant};
39+
40+
// Number of watched transactions and the per-merkle-lookup latency injected by
41+
// the mock. Chosen so serial (~N*DELAY = 4s) and parallel (~1s) are clearly
42+
// separable with margin for CI jitter.
43+
const N_TXS: usize = 40;
44+
const DELAY_MS: u64 = 100;
45+
46+
// Minimal `Confirm` that reports nothing relevant, so `get_unconfirmed_transactions`
47+
// has no work and the run is dominated by the confirmed-tx merkle fan-out.
48+
struct NoopConfirmable;
49+
impl Confirm for NoopConfirmable {
50+
fn transactions_confirmed(&self, _h: &Header, _txdata: &TransactionData, _height: u32) {}
51+
fn transaction_unconfirmed(&self, _txid: &Txid) {}
52+
fn best_block_updated(&self, _h: &Header, _height: u32) {}
53+
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
54+
Vec::new()
55+
}
56+
}
57+
58+
// Spawn a mock Esplora server on an ephemeral port; returns the port.
59+
async fn spawn_mock_esplora() -> u16 {
60+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
61+
let port = listener.local_addr().unwrap().port();
62+
63+
// Precompute valid canned responses from the real genesis block.
64+
let genesis = genesis_block(Network::Bitcoin);
65+
let tip_hash_hex = genesis.block_hash().to_string(); // 64 hex chars
66+
let header_hex = serialize_hex(&genesis.header); // 80-byte header, 160 hex chars
67+
68+
tokio::spawn(async move {
69+
loop {
70+
let (mut sock, _) = match listener.accept().await {
71+
Ok(x) => x,
72+
Err(_) => continue,
73+
};
74+
let tip = tip_hash_hex.clone();
75+
let hdr = header_hex.clone();
76+
tokio::spawn(async move {
77+
// Read until end of request headers.
78+
let mut buf = Vec::new();
79+
let mut tmp = [0u8; 1024];
80+
loop {
81+
match sock.read(&mut tmp).await {
82+
Ok(0) => break,
83+
Ok(n) => {
84+
buf.extend_from_slice(&tmp[..n]);
85+
if buf.windows(4).any(|w| w == b"\r\n\r\n") {
86+
break;
87+
}
88+
},
89+
Err(_) => return,
90+
}
91+
}
92+
let req = String::from_utf8_lossy(&buf);
93+
let path = req
94+
.lines()
95+
.next()
96+
.and_then(|l| l.split_whitespace().nth(1))
97+
.unwrap_or("/")
98+
.to_string();
99+
100+
let (status, body): (&str, String) = if path == "/blocks/tip/hash" {
101+
("200 OK", tip.clone())
102+
} else if path.ends_with("/header") {
103+
("200 OK", hdr.clone())
104+
} else if path.ends_with("/status") {
105+
("200 OK", "{\"in_best_chain\":true,\"height\":100,\"next_best\":null}".into())
106+
} else if path.contains("/merkleblock-proof") {
107+
// The fan-out under test: inject latency, then 404 -> Ok(None).
108+
tokio::time::sleep(Duration::from_millis(DELAY_MS)).await;
109+
("404 Not Found", String::new())
110+
} else {
111+
("404 Not Found", String::new())
112+
};
113+
114+
let resp = format!(
115+
"HTTP/1.1 {}\r\ncontent-type: text/plain\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
116+
status,
117+
body.len(),
118+
body
119+
);
120+
let _ = sock.write_all(resp.as_bytes()).await;
121+
let _ = sock.shutdown().await;
122+
});
123+
}
124+
});
125+
126+
port
127+
}
128+
129+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
130+
async fn parallel_tx_sync_beats_serial_bound() {
131+
let port = spawn_mock_esplora().await;
132+
let url = format!("http://127.0.0.1:{}", port);
133+
let mut logger = TestLogger::new();
134+
let tx_sync = EsploraSyncClient::new(url, &mut logger);
135+
136+
// Register N distinct watched transactions; each resolves to a 404 merkle
137+
// lookup (Ok(None)), so no confirmations are produced.
138+
let script = ScriptBuf::new();
139+
for i in 0..N_TXS {
140+
let txid = Txid::from_byte_array([(i as u8).wrapping_add(1); 32]);
141+
tx_sync.register_tx(&txid, script.as_script());
142+
}
143+
144+
let confirmable = NoopConfirmable;
145+
let confirmables: Vec<&NoopConfirmable> = vec![&confirmable];
146+
147+
let start = Instant::now();
148+
tx_sync.sync(confirmables).await.expect("sync should complete");
149+
let elapsed = start.elapsed();
150+
println!(
151+
"[parallel-timing] N={} delay={}ms -> sync took {}ms (serial estimate ~{}ms)",
152+
N_TXS,
153+
DELAY_MS,
154+
elapsed.as_millis(),
155+
(N_TXS as u64) * DELAY_MS
156+
);
157+
158+
let serial_estimate_ms = (N_TXS as u64) * DELAY_MS;
159+
assert!(
160+
elapsed.as_millis() < (serial_estimate_ms / 2) as u128,
161+
"sync took {}ms; expected well under serial estimate {}ms -- parallel fan-out not happening?",
162+
elapsed.as_millis(),
163+
serial_estimate_ms
164+
);
165+
}

0 commit comments

Comments
 (0)