diff --git a/crates/chain/src/indexer/keychain_txout.rs b/crates/chain/src/indexer/keychain_txout.rs index bdea4b82d..bcb4937cd 100644 --- a/crates/chain/src/indexer/keychain_txout.rs +++ b/crates/chain/src/indexer/keychain_txout.rs @@ -9,12 +9,14 @@ use crate::{ spk_txout::SpkTxOutIndex, DescriptorExt, DescriptorId, Indexed, Indexer, KeychainIndexed, SpkIterator, }; -use alloc::{borrow::ToOwned, vec::Vec}; +use alloc::{borrow::ToOwned, sync::Arc, vec::Vec}; use bitcoin::{Amount, OutPoint, ScriptBuf, SignedAmount, Transaction, TxOut, Txid}; use core::{ fmt::Debug, ops::{Bound, RangeBounds}, }; +#[cfg(feature = "std")] +use std::thread; use crate::Merge; @@ -445,6 +447,8 @@ impl KeychainTxOutIndex { } /// Syncs the state of the inner spk index after changes to a keychain + /// Note: Uses multithreading to parallelize the derivation of script pubkeys when processing + /// more than 1000 indices (only when the std feature is enabled) fn replenish_inner_index(&mut self, did: DescriptorId, keychain: &K, lookahead: u32) { let descriptor = self.descriptors.get(&did).expect("invariant"); let next_store_index = self @@ -454,13 +458,92 @@ impl KeychainTxOutIndex { .last() .map_or(0, |((_, index), _)| *index + 1); let next_reveal_index = self.last_revealed.get(&did).map_or(0, |v| *v + 1); - for (new_index, new_spk) in - SpkIterator::new_with_range(descriptor, next_store_index..next_reveal_index + lookahead) + let end_index = next_reveal_index + lookahead; + + if next_store_index >= end_index { + return; + } + + let total_indices = end_index - next_store_index; + + // Helper function to generate script pubkeys in a single-threaded manner + let generate_spks = |descriptor: &Descriptor, + start: u32, + end: u32| + -> Vec<(u32, ScriptBuf)> { + let mut results = Vec::with_capacity((end - start) as usize); + for (new_index, new_spk) in SpkIterator::new_with_range(descriptor, start..end) { + results.push((new_index, new_spk)); + } + results + }; + + let process_results = |this: &mut Self, results: Vec<(u32, ScriptBuf)>| { + for (new_index, new_spk) in results { + let _inserted = this + .inner + .insert_spk((keychain.clone(), new_index), new_spk); + debug_assert!(_inserted, "replenish lookahead: must not have existing spk: keychain={:?}, lookahead={}, next_store_index={}, next_reveal_index={}", keychain, lookahead, next_store_index, next_reveal_index); + } + }; + + #[cfg(not(feature = "std"))] { - let _inserted = self - .inner - .insert_spk((keychain.clone(), new_index), new_spk); - debug_assert!(_inserted, "replenish lookahead: must not have existing spk: keychain={:?}, lookahead={}, next_store_index={}, next_reveal_index={}", keychain, lookahead, next_store_index, next_reveal_index); + let results = generate_spks(descriptor, next_store_index, end_index); + process_results(self, results); + return; + } + + #[cfg(feature = "std")] + { + if total_indices < 1000 { + let results = generate_spks(descriptor, next_store_index, end_index); + process_results(self, results); + return; + } + + let num_cpus = thread::available_parallelism() + .map(|p| p.get()) + .unwrap_or(1); + let chunk_size = (total_indices + num_cpus as u32 - 1) / num_cpus as u32; + let descriptor = Arc::new(descriptor.clone()); + let mut handles = Vec::with_capacity(num_cpus); + + for i in 0..num_cpus { + let start = next_store_index + (i as u32 * chunk_size); + if start >= end_index { + break; + } + + let end = (start + chunk_size).min(end_index); + let descriptor_clone = Arc::clone(&descriptor); + + let handle = thread::spawn(move || { + let mut thread_results = Vec::with_capacity((end - start) as usize); + + for (new_index, new_spk) in + SpkIterator::new_with_range(&*descriptor_clone, start..end) + { + thread_results.push((new_index, new_spk)); + } + + thread_results + }); + + handles.push(handle); + } + + let mut all_results = Vec::new(); + for handle in handles { + if let Ok(thread_results) = handle.join() { + all_results.extend(thread_results); + } + } + + // Sort results by index to ensure they're processed in order + all_results.sort_by_key(|(index, _)| *index); + + process_results(self, all_results); } } diff --git a/crates/chain/tests/test_keychain_txout_index.rs b/crates/chain/tests/test_keychain_txout_index.rs index 8b299b896..8a3eb28c4 100644 --- a/crates/chain/tests/test_keychain_txout_index.rs +++ b/crates/chain/tests/test_keychain_txout_index.rs @@ -542,10 +542,10 @@ fn lookahead_to_target() { }, TestCase { lookahead: 13, - external_last_revealed: Some(100), - internal_last_revealed: Some(21), - internal_target: Some(120), - external_target: Some(130), + external_last_revealed: Some(1100), + internal_last_revealed: Some(1200), + internal_target: Some(1110), + external_target: Some(1120), }, ];