Skip to content

Commit 0234d9e

Browse files
Merge pull request #22 from moneydevkit/fix/parallel-esplora-tx-sync
Parallelize Esplora tx_sync confirmed/unconfirmed lookups
2 parents f56f47f + 076890c commit 0234d9e

3 files changed

Lines changed: 321 additions & 7 deletions

File tree

lightning-transaction-sync/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ electrum-client = { version = "0.24.0", optional = true, default-features = fals
4141

4242
[dev-dependencies]
4343
lightning = { version = "0.2.0", path = "../lightning", default-features = false, features = ["std", "_test_utils"] }
44-
tokio = { version = "1.35.0", features = ["macros"] }
44+
tokio = { version = "1.35.0", features = ["macros", "rt", "rt-multi-thread", "net", "io-util", "time"] }
4545

4646
[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
4747
electrsd = { version = "0.35.0", default-features = false, features = ["legacy"] }

lightning-transaction-sync/src/esplora.rs

Lines changed: 155 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,21 @@ use esplora_client::Builder;
2626
use core::ops::Deref;
2727
use std::collections::HashSet;
2828

29+
/// Maximum number of concurrent in-flight Esplora requests issued while syncing
30+
/// confirmed/unconfirmed transactions (async client only).
31+
///
32+
/// The Esplora chain sync re-confirms every watched transaction/output on each
33+
/// pass, which is one or more HTTP round-trips each. Against a remote Esplora
34+
/// these run sequentially in the stock client, so sync wall-time scales with
35+
/// `watched_set * round_trip_latency` and easily exceeds an LDK wallet-sync
36+
/// timeout on wallets with real channel history. We fan these out with a bounded
37+
/// concurrency instead. The bound is deliberately small: when many nodes sync
38+
/// against a shared, rate-limited endpoint the effective request rate is the
39+
/// fleet aggregate, so a low per-node concurrency keeps us under the server's
40+
/// per-client limit while still removing the strictly-serial latency floor.
41+
#[cfg(feature = "async-interface")]
42+
const ESPLORA_SYNC_CONCURRENCY: usize = 4;
43+
2944
/// Synchronizes LDK with a given [`Esplora`] server.
3045
///
3146
/// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of
@@ -298,19 +313,114 @@ where
298313

299314
let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
300315

316+
// Phase A: resolve the confirmation status of each directly-watched
317+
// transaction. `watched_transactions` is a set (unique txids), so a tx
318+
// resolved here is never a duplicate; the async path fans the lookups
319+
// out with bounded concurrency and merges the results.
320+
#[cfg(feature = "async-interface")]
321+
{
322+
use futures::stream::{self, StreamExt};
323+
let results: Vec<Result<Option<ConfirmedTx>, InternalError>> =
324+
stream::iter(sync_state.watched_transactions.iter().copied())
325+
.map(|txid| async move { self.get_confirmed_tx(txid, None, None).await })
326+
.buffer_unordered(ESPLORA_SYNC_CONCURRENCY)
327+
.collect()
328+
.await;
329+
for r in results {
330+
if let Some(confirmed_tx) = r? {
331+
if !confirmed_txs.iter().any(|ctx| ctx.txid == confirmed_tx.txid) {
332+
confirmed_txs.push(confirmed_tx);
333+
}
334+
}
335+
}
336+
}
337+
#[cfg(not(feature = "async-interface"))]
301338
for txid in &sync_state.watched_transactions {
302339
if confirmed_txs.iter().any(|ctx| ctx.txid == *txid) {
303340
continue;
304341
}
305-
if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(*txid, None, None))? {
342+
if let Some(confirmed_tx) = self.get_confirmed_tx(*txid, None, None)? {
306343
confirmed_txs.push(confirmed_tx);
307344
}
308345
}
309346

347+
// Phase B: for each watched output, fetch its spend status and, if it was
348+
// spent, resolve the spending transaction. Phase A is fully merged into
349+
// `confirmed_txs` before this runs, so the consistency check below sees
350+
// the same state the sequential version did.
351+
#[cfg(feature = "async-interface")]
352+
{
353+
use futures::stream::{self, StreamExt};
354+
355+
// B1: fan out the output-status lookups.
356+
let outpoints: Vec<_> =
357+
sync_state.watched_outputs.values().map(|o| o.outpoint).collect();
358+
let status_results: Vec<Result<Option<esplora_client::OutputStatus>, InternalError>> =
359+
stream::iter(outpoints.into_iter())
360+
.map(|outpoint| async move {
361+
self.client
362+
.get_output_status(&outpoint.txid, outpoint.index as u64)
363+
.await
364+
.map_err(InternalError::from)
365+
})
366+
.buffer_unordered(ESPLORA_SYNC_CONCURRENCY)
367+
.collect()
368+
.await;
369+
370+
// B2: post-process sequentially, preserving every consistency check
371+
// the sequential version performed, and build the to-fetch list.
372+
let phase_a_txids: HashSet<Txid> =
373+
confirmed_txs.iter().map(|ctx| ctx.txid).collect();
374+
let mut to_fetch: Vec<(Txid, Option<BlockHash>, Option<u32>)> = Vec::new();
375+
// `transpose` drops outputs with no status while still surfacing any
376+
// lookup error through the `?` below.
377+
for status_res in status_results.into_iter().filter_map(Result::transpose) {
378+
let output_status = status_res?;
379+
let (Some(spending_txid), Some(spending_tx_status)) =
380+
(output_status.txid, output_status.status)
381+
else {
382+
continue;
383+
};
384+
385+
if phase_a_txids.contains(&spending_txid) {
386+
// Phase A already resolved this spend as confirmed; the server
387+
// flipping it back to unconfirmed is an inconsistency.
388+
if !spending_tx_status.confirmed {
389+
log_trace!(self.logger, "Inconsistency: Detected previously-confirmed Tx {} as unconfirmed", spending_txid);
390+
return Err(InternalError::Inconsistency);
391+
}
392+
continue;
393+
}
394+
395+
to_fetch.push((
396+
spending_txid,
397+
spending_tx_status.block_hash,
398+
spending_tx_status.block_height,
399+
));
400+
}
401+
402+
// B3: fan out the dependent confirmed-tx lookups.
403+
let dep_results: Vec<Result<Option<ConfirmedTx>, InternalError>> =
404+
stream::iter(to_fetch.into_iter())
405+
.map(|(txid, bh, height)| async move {
406+
self.get_confirmed_tx(txid, bh, height).await
407+
})
408+
.buffer_unordered(ESPLORA_SYNC_CONCURRENCY)
409+
.collect()
410+
.await;
411+
for r in dep_results {
412+
if let Some(confirmed_tx) = r? {
413+
if !confirmed_txs.iter().any(|ctx| ctx.txid == confirmed_tx.txid) {
414+
confirmed_txs.push(confirmed_tx);
415+
}
416+
}
417+
}
418+
}
419+
#[cfg(not(feature = "async-interface"))]
310420
for (_, output) in &sync_state.watched_outputs {
311-
if let Some(output_status) = maybe_await!(self
421+
if let Some(output_status) = self
312422
.client
313-
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64))?
423+
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64)?
314424
{
315425
if let Some(spending_txid) = output_status.txid {
316426
if let Some(spending_tx_status) = output_status.status {
@@ -324,11 +434,11 @@ where
324434
}
325435
}
326436

327-
if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(
437+
if let Some(confirmed_tx) = self.get_confirmed_tx(
328438
spending_txid,
329439
spending_tx_status.block_hash,
330440
spending_tx_status.block_height,
331-
))? {
441+
)? {
332442
confirmed_txs.push(confirmed_tx);
333443
}
334444
}
@@ -436,9 +546,48 @@ where
436546

437547
let mut unconfirmed_txs = Vec::new();
438548

549+
// The async path fans the per-block status checks out with bounded
550+
// concurrency. The `None` block hash is a hard invariant violation
551+
// (pre-0.0.113 channel), so we screen for it before fanning out rather
552+
// than panicking from inside a concurrent task.
553+
#[cfg(feature = "async-interface")]
554+
{
555+
use futures::stream::{self, StreamExt};
556+
let mut items: Vec<(Txid, BlockHash)> = Vec::with_capacity(relevant_txids.len());
557+
for (txid, _conf_height, block_hash_opt) in relevant_txids {
558+
if let Some(block_hash) = block_hash_opt {
559+
items.push((txid, block_hash));
560+
} else {
561+
log_error!(self.logger, "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
562+
panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
563+
}
564+
}
565+
let results: Vec<(Txid, Result<esplora_client::BlockStatus, InternalError>)> =
566+
stream::iter(items.into_iter())
567+
.map(|(txid, block_hash)| async move {
568+
let r = self
569+
.client
570+
.get_block_status(&block_hash)
571+
.await
572+
.map_err(InternalError::from);
573+
(txid, r)
574+
})
575+
.buffer_unordered(ESPLORA_SYNC_CONCURRENCY)
576+
.collect()
577+
.await;
578+
for (txid, status_res) in results {
579+
let block_status = status_res?;
580+
if block_status.in_best_chain {
581+
// Skip if the block in question is still confirmed.
582+
continue;
583+
}
584+
unconfirmed_txs.push(txid);
585+
}
586+
}
587+
#[cfg(not(feature = "async-interface"))]
439588
for (txid, _conf_height, block_hash_opt) in relevant_txids {
440589
if let Some(block_hash) = block_hash_opt {
441-
let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
590+
let block_status = self.client.get_block_status(&block_hash)?;
442591
if block_status.in_best_chain {
443592
// Skip if the block in question is still confirmed.
444593
continue;
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
#![cfg(all(not(target_os = "windows"), feature = "esplora-async"))]
2+
3+
//! Self-contained before/after timing test for the parallelized Esplora tx_sync.
4+
//!
5+
//! Stands up a minimal mock Esplora HTTP server that injects a fixed latency on
6+
//! the per-transaction `/merkleblock-proof` lookups (the fan-out the confirmed
7+
//! sync parallelizes) and returns 404 for them, which esplora-client maps to
8+
//! `Ok(None)` -> no confirmations, so the run is deterministic and needs no
9+
//! bitcoind/electrs.
10+
//!
11+
//! With `N` watched transactions and `DELAY` per merkle lookup:
12+
//! - strictly-serial sync ~= N * DELAY
13+
//! - parallel sync (buffer_unordered(C)) ~= ceil(N / C) * DELAY
14+
//!
15+
//! The test asserts the wall-time is well under the serial estimate, so it
16+
//! PASSES on the parallel implementation and would FAIL on the old serial one.
17+
//! That is the "before/after": run it on the base commit (serial) to see it
18+
//! blow the bound, and on this branch (parallel) to see it pass.
19+
//!
20+
//! Correctness of actual confirmation handling / ordering is covered by the
21+
//! electrs-backed `test_esplora_syncs` integration test (run in CI).
22+
23+
use lightning::chain::transaction::TransactionData;
24+
use lightning::chain::{Confirm, Filter};
25+
use lightning::util::test_utils::TestLogger;
26+
use lightning_transaction_sync::EsploraSyncClient;
27+
28+
use bitcoin::block::Header;
29+
use bitcoin::consensus::encode::serialize_hex;
30+
use bitcoin::constants::genesis_block;
31+
use bitcoin::hashes::Hash;
32+
use bitcoin::network::Network;
33+
use bitcoin::{BlockHash, ScriptBuf, Txid};
34+
35+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
36+
use tokio::net::TcpListener;
37+
38+
use std::time::{Duration, Instant};
39+
40+
// Number of watched transactions and the per-merkle-lookup latency injected by
41+
// the mock. Chosen so serial (~N*DELAY = 4s) and parallel (~1s) are clearly
42+
// separable with margin for CI jitter.
43+
const N_TXS: usize = 40;
44+
const DELAY_MS: u64 = 100;
45+
46+
// Minimal `Confirm` that reports nothing relevant, so `get_unconfirmed_transactions`
47+
// has no work and the run is dominated by the confirmed-tx merkle fan-out.
48+
struct NoopConfirmable;
49+
impl Confirm for NoopConfirmable {
50+
fn transactions_confirmed(&self, _h: &Header, _txdata: &TransactionData, _height: u32) {}
51+
fn transaction_unconfirmed(&self, _txid: &Txid) {}
52+
fn best_block_updated(&self, _h: &Header, _height: u32) {}
53+
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
54+
Vec::new()
55+
}
56+
}
57+
58+
// Spawn a mock Esplora server on an ephemeral port; returns the port.
59+
async fn spawn_mock_esplora() -> u16 {
60+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
61+
let port = listener.local_addr().unwrap().port();
62+
63+
// Precompute valid canned responses from the real genesis block.
64+
let genesis = genesis_block(Network::Bitcoin);
65+
let tip_hash_hex = genesis.block_hash().to_string(); // 64 hex chars
66+
let header_hex = serialize_hex(&genesis.header); // 80-byte header, 160 hex chars
67+
68+
tokio::spawn(async move {
69+
loop {
70+
let (mut sock, _) = match listener.accept().await {
71+
Ok(x) => x,
72+
Err(_) => continue,
73+
};
74+
let tip = tip_hash_hex.clone();
75+
let hdr = header_hex.clone();
76+
tokio::spawn(async move {
77+
// Read until end of request headers.
78+
let mut buf = Vec::new();
79+
let mut tmp = [0u8; 1024];
80+
loop {
81+
match sock.read(&mut tmp).await {
82+
Ok(0) => break,
83+
Ok(n) => {
84+
buf.extend_from_slice(&tmp[..n]);
85+
if buf.windows(4).any(|w| w == b"\r\n\r\n") {
86+
break;
87+
}
88+
},
89+
Err(_) => return,
90+
}
91+
}
92+
let req = String::from_utf8_lossy(&buf);
93+
let path = req
94+
.lines()
95+
.next()
96+
.and_then(|l| l.split_whitespace().nth(1))
97+
.unwrap_or("/")
98+
.to_string();
99+
100+
let (status, body): (&str, String) = if path == "/blocks/tip/hash" {
101+
("200 OK", tip.clone())
102+
} else if path.ends_with("/header") {
103+
("200 OK", hdr.clone())
104+
} else if path.ends_with("/status") {
105+
("200 OK", "{\"in_best_chain\":true,\"height\":100,\"next_best\":null}".into())
106+
} else if path.contains("/merkleblock-proof") {
107+
// The fan-out under test: inject latency, then 404 -> Ok(None).
108+
tokio::time::sleep(Duration::from_millis(DELAY_MS)).await;
109+
("404 Not Found", String::new())
110+
} else {
111+
("404 Not Found", String::new())
112+
};
113+
114+
let resp = format!(
115+
"HTTP/1.1 {}\r\ncontent-type: text/plain\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
116+
status,
117+
body.len(),
118+
body
119+
);
120+
let _ = sock.write_all(resp.as_bytes()).await;
121+
let _ = sock.shutdown().await;
122+
});
123+
}
124+
});
125+
126+
port
127+
}
128+
129+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
130+
async fn parallel_tx_sync_beats_serial_bound() {
131+
let port = spawn_mock_esplora().await;
132+
let url = format!("http://127.0.0.1:{}", port);
133+
let mut logger = TestLogger::new();
134+
let tx_sync = EsploraSyncClient::new(url, &mut logger);
135+
136+
// Register N distinct watched transactions; each resolves to a 404 merkle
137+
// lookup (Ok(None)), so no confirmations are produced.
138+
let script = ScriptBuf::new();
139+
for i in 0..N_TXS {
140+
let txid = Txid::from_byte_array([(i as u8).wrapping_add(1); 32]);
141+
tx_sync.register_tx(&txid, script.as_script());
142+
}
143+
144+
let confirmable = NoopConfirmable;
145+
let confirmables: Vec<&NoopConfirmable> = vec![&confirmable];
146+
147+
let start = Instant::now();
148+
tx_sync.sync(confirmables).await.expect("sync should complete");
149+
let elapsed = start.elapsed();
150+
println!(
151+
"[parallel-timing] N={} delay={}ms -> sync took {}ms (serial estimate ~{}ms)",
152+
N_TXS,
153+
DELAY_MS,
154+
elapsed.as_millis(),
155+
(N_TXS as u64) * DELAY_MS
156+
);
157+
158+
let serial_estimate_ms = (N_TXS as u64) * DELAY_MS;
159+
assert!(
160+
elapsed.as_millis() < (serial_estimate_ms / 2) as u128,
161+
"sync took {}ms; expected well under serial estimate {}ms -- parallel fan-out not happening?",
162+
elapsed.as_millis(),
163+
serial_estimate_ms
164+
);
165+
}

0 commit comments

Comments
 (0)