Skip to content

Commit d574431

Browse files
committed
feat(electrum): batched Headers and script_get_history
1 parent d547730 commit d574431

2 files changed

Lines changed: 93 additions & 73 deletions

File tree

crates/electrum/benches/test_sync.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,15 @@ where
5656

5757
pub fn test_sync_performance(c: &mut Criterion) {
5858
let env = TestEnv::new().unwrap();
59-
let electrum_client = electrum_client::Client::new(env.electrsd.electrum_url.as_str()).unwrap();
59+
60+
// Test local server:
61+
// let electrum_client =
62+
// electrum_client::Client::new(env.electrsd.electrum_url.as_str()).unwrap();
63+
64+
// Test remote server:
65+
let electrum_client =
66+
electrum_client::Client::new("ssl://electrum.blockstream.info:50002").unwrap();
67+
6068
let client = BdkElectrumClient::new(electrum_client);
6169

6270
const NUM_BLOCKS: usize = 200;

crates/electrum/src/bdk_electrum_client.rs

Lines changed: 84 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -70,33 +70,6 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
7070
Ok(tx)
7171
}
7272

73-
/// Fetch block header of given `height`.
74-
///
75-
/// If it hits the cache it will return the cached version and avoid making the request.
76-
fn fetch_header(&self, height: u32) -> Result<Header, Error> {
77-
let block_header_cache = self.block_header_cache.lock().unwrap();
78-
79-
if let Some(header) = block_header_cache.get(&height) {
80-
return Ok(*header);
81-
}
82-
83-
drop(block_header_cache);
84-
85-
self.update_header(height)
86-
}
87-
88-
/// Update a block header at given `height`. Returns the updated header.
89-
fn update_header(&self, height: u32) -> Result<Header, Error> {
90-
let header = self.inner.block_header(height as usize)?;
91-
92-
self.block_header_cache
93-
.lock()
94-
.unwrap()
95-
.insert(height, header);
96-
97-
Ok(header)
98-
}
99-
10073
/// Broadcasts a transaction to the network.
10174
///
10275
/// This is a re-export of [`ElectrumApi::transaction_broadcast`].
@@ -359,27 +332,29 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
359332
outpoints: impl IntoIterator<Item = OutPoint>,
360333
pending_anchors: &mut Vec<(Txid, usize)>,
361334
) -> Result<(), Error> {
362-
for outpoint in outpoints {
363-
let op_txid = outpoint.txid;
364-
let op_tx = self.fetch_tx(op_txid)?;
365-
let op_txout = match op_tx.output.get(outpoint.vout as usize) {
366-
Some(txout) => txout,
367-
None => continue,
368-
};
369-
debug_assert_eq!(op_tx.compute_txid(), op_txid);
335+
let ops_txs = outpoints
336+
.into_iter()
337+
.filter_map(|op| self.fetch_tx(op.txid).ok().map(|tx| (op, tx)))
338+
.collect::<Vec<_>>();
339+
340+
let scripts = ops_txs
341+
.iter()
342+
.map(|(op, tx)| tx.output[op.vout as usize].script_pubkey.as_script());
370343

371-
// attempt to find the following transactions (alongside their chain positions), and
372-
// add to our sparsechain `update`:
344+
let spk_histories = self.inner.batch_script_get_history(scripts)?;
345+
346+
for ((outpoint, tx), spk_history) in ops_txs.into_iter().zip(spk_histories) {
373347
let mut has_residing = false; // tx in which the outpoint resides
374348
let mut has_spending = false; // tx that spends the outpoint
375-
for res in self.inner.script_get_history(&op_txout.script_pubkey)? {
349+
350+
for res in spk_history {
376351
if has_residing && has_spending {
377352
break;
378353
}
379354

380-
if !has_residing && res.tx_hash == op_txid {
355+
if !has_residing && res.tx_hash == outpoint.txid {
381356
has_residing = true;
382-
tx_update.txs.push(Arc::clone(&op_tx));
357+
tx_update.txs.push(Arc::clone(&tx));
383358
match res.height.try_into() {
384359
// Returned heights 0 & -1 are reserved for unconfirmed txs.
385360
Ok(height) if height > 0 => {
@@ -391,7 +366,7 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
391366
}
392367
}
393368

394-
if !has_spending && res.tx_hash != op_txid {
369+
if !has_spending && res.tx_hash != outpoint.txid {
395370
let res_tx = self.fetch_tx(res.tx_hash)?;
396371
// we exclude txs/anchors that do not spend our specified outpoint(s)
397372
has_spending = res_tx
@@ -426,34 +401,45 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
426401
txids: impl IntoIterator<Item = Txid>,
427402
pending_anchors: &mut Vec<(Txid, usize)>,
428403
) -> Result<(), Error> {
404+
let mut txs = Vec::new();
405+
let mut scripts = Vec::new();
429406
for txid in txids {
430-
let tx = match self.fetch_tx(txid) {
431-
Ok(tx) => tx,
432-
Err(electrum_client::Error::Protocol(_)) => continue,
433-
Err(other_err) => return Err(other_err),
434-
};
407+
match self.fetch_tx(txid) {
408+
Ok(tx) => {
409+
let spk = tx
410+
.output
411+
.first()
412+
.map(|txo| &txo.script_pubkey)
413+
.expect("tx must have an output")
414+
.clone();
415+
txs.push(tx);
416+
scripts.push(spk);
417+
}
418+
Err(electrum_client::Error::Protocol(_)) => {
419+
continue;
420+
}
421+
Err(e) => return Err(e),
422+
}
423+
}
435424

436-
let spk = tx
437-
.output
438-
.first()
439-
.map(|txo| &txo.script_pubkey)
440-
.expect("tx must have an output");
425+
// because of restrictions of the Electrum API, we have to use the `script_get_history`
426+
// call to get confirmation status of our transaction
427+
let spk_histories = self
428+
.inner
429+
.batch_script_get_history(scripts.iter().map(|spk| spk.as_script()))?;
441430

442-
// because of restrictions of the Electrum API, we have to use the `script_get_history`
443-
// call to get confirmation status of our transaction
444-
if let Some(r) = self
445-
.inner
446-
.script_get_history(spk)?
431+
for (tx, spk_history) in txs.into_iter().zip(spk_histories) {
432+
if let Some(res) = spk_history
447433
.into_iter()
448-
.find(|r| r.tx_hash == txid)
434+
.find(|res| res.tx_hash == tx.compute_txid())
449435
{
450-
match r.height.try_into() {
436+
match res.height.try_into() {
451437
// Returned heights 0 & -1 are reserved for unconfirmed txs.
452438
Ok(height) if height > 0 => {
453-
pending_anchors.push((txid, height));
439+
pending_anchors.push((tx.compute_txid(), height));
454440
}
455441
_ => {
456-
tx_update.seen_ats.insert((r.tx_hash, start_time));
442+
tx_update.seen_ats.insert((res.tx_hash, start_time));
457443
}
458444
}
459445
}
@@ -472,19 +458,37 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
472458
let mut results = Vec::with_capacity(txs_with_heights.len());
473459
let mut to_fetch = Vec::new();
474460

475-
// Build a map for height to block hash conversions. This is for obtaining block hash data
476-
// with minimum `fetch_header` calls.
477-
let mut height_to_hash: HashMap<u32, BlockHash> = HashMap::new();
478-
for &(_, height) in txs_with_heights {
479-
let h = height as u32;
480-
if !height_to_hash.contains_key(&h) {
481-
// Try to obtain hash from the header cache, or fetch the header if absent.
482-
let hash = self.fetch_header(h)?.block_hash();
483-
height_to_hash.insert(h, hash);
461+
// Figure out which block heights we need headers for.
462+
let mut needed_heights: Vec<u32> =
463+
txs_with_heights.iter().map(|&(_, h)| h as u32).collect();
464+
needed_heights.sort_unstable();
465+
needed_heights.dedup();
466+
467+
// Filter out any heights already in our cache.
468+
let mut cache = self.block_header_cache.lock().unwrap();
469+
let missing_heights: Vec<u32> = needed_heights
470+
.into_iter()
471+
.filter(|h| !cache.contains_key(h))
472+
.collect();
473+
474+
// Fetch all missing headers in one RPC call.
475+
if !missing_heights.is_empty() {
476+
let headers = self.inner.batch_block_header(missing_heights.clone())?;
477+
for (h, hdr) in missing_heights.into_iter().zip(headers.into_iter()) {
478+
cache.insert(h, hdr);
484479
}
485480
}
486481

487-
// Check cache.
482+
let height_to_hash: HashMap<u32, BlockHash> = txs_with_heights
483+
.iter()
484+
.map(|&(_, h)| {
485+
let h = h as u32;
486+
(h, cache[&h].block_hash())
487+
})
488+
.collect();
489+
drop(cache);
490+
491+
// Check our anchor cache and queue up any proofs we still need.
488492
{
489493
let anchor_cache = self.anchor_cache.lock().unwrap();
490494
for &(txid, height) in txs_with_heights {
@@ -505,14 +509,22 @@ impl<E: ElectrumApi> BdkElectrumClient<E> {
505509
let proof = self.inner.transaction_get_merkle(&txid, height)?;
506510

507511
// Validate against header, retrying once on stale header.
508-
let mut header = self.fetch_header(height as u32)?;
512+
let mut header = {
513+
let cache = self.block_header_cache.lock().unwrap();
514+
cache[&(height as u32)]
515+
};
509516
let mut valid = electrum_client::utils::validate_merkle_proof(
510517
&txid,
511518
&header.merkle_root,
512519
&proof,
513520
);
514521
if !valid {
515-
header = self.update_header(height as u32)?;
522+
let new_header = self.inner.block_header(height)?;
523+
self.block_header_cache
524+
.lock()
525+
.unwrap()
526+
.insert(height as u32, new_header);
527+
header = new_header;
516528
valid = electrum_client::utils::validate_merkle_proof(
517529
&txid,
518530
&header.merkle_root,

0 commit comments

Comments
 (0)