diff --git a/src/new_index/fetch.rs b/src/new_index/fetch.rs index 5be79233..b1b21391 100644 --- a/src/new_index/fetch.rs +++ b/src/new_index/fetch.rs @@ -290,12 +290,7 @@ fn parse_blocks(blob: Vec, magic: u32) -> Result> { cursor.set_position(end); } - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(0) // CPU-bound - .thread_name(|i| format!("parse-blocks-{}", i)) - .build() - .unwrap(); - Ok(pool.install(|| { + Ok(super::THREAD_POOL.install(|| { slices .into_par_iter() .map(|(slice, size)| (deserialize(slice).expect("failed to parse Block"), size)) diff --git a/src/new_index/mod.rs b/src/new_index/mod.rs index bc16a1b5..ef9bcb0e 100644 --- a/src/new_index/mod.rs +++ b/src/new_index/mod.rs @@ -5,6 +5,16 @@ pub mod precache; mod query; pub mod schema; +use std::sync::LazyLock; + +pub(crate) static THREAD_POOL: LazyLock = LazyLock::new(|| { + rayon::ThreadPoolBuilder::new() + .num_threads(0) // 0 = use number of logical CPUs + .thread_name(|i| format!("electrs-worker-{}", i)) + .build() + .expect("failed to create global rayon thread pool") +}); + pub use self::db::{DBRow, DB}; pub use self::fetch::{BlockEntry, FetchFrom}; pub use self::mempool::Mempool; diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index e98a68ed..b40b31df 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -997,6 +997,12 @@ impl ChainQuery { let mut processed_items = 0; let mut lastblock = None; + // If we need to iterate over 500 history entries to + // get one utxo then your address is too active and should be + // throttled. + // TODO: Think of better way to throttle. + let tx_history_limit = limit * 500; + for (history, blockid) in history_iter { processed_items += 1; lastblock = Some(blockid.hash); @@ -1017,6 +1023,9 @@ impl ChainQuery { if utxos.len() > limit { bail!(ErrorKind::TooManyUtxos(limit)) } + if processed_items > tx_history_limit { + bail!(ErrorKind::TooManyTxs(tx_history_limit)) + } } Ok((utxos, lastblock, processed_items)) @@ -1447,24 +1456,7 @@ fn lookup_txos( outpoints: &BTreeSet, allow_missing: bool, ) -> HashMap { - let mut loop_count = 10; - let pool = loop { - match rayon::ThreadPoolBuilder::new() - .num_threads(16) // we need to saturate SSD IOPS - .thread_name(|i| format!("lookup-txo-{}", i)) - .build() - { - Ok(pool) => break pool, - Err(e) => { - if loop_count == 0 { - panic!("schema::lookup_txos failed to create a ThreadPool: {}", e); - } - std::thread::sleep(std::time::Duration::from_millis(50)); - loop_count -= 1; - } - } - }; - pool.install(|| { + super::THREAD_POOL.install(|| { // Should match lookup_txos_sequential outpoints .par_iter()