Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions src/new_index/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,7 @@ fn parse_blocks(blob: Vec<u8>, magic: u32) -> Result<Vec<SizedBlock>> {
cursor.set_position(end);
}

let pool = rayon::ThreadPoolBuilder::new()
.num_threads(0) // CPU-bound
.thread_name(|i| format!("parse-blocks-{}", i))
.build()
.unwrap();
Ok(pool.install(|| {
Ok(super::THREAD_POOL.install(|| {
slices
.into_par_iter()
.map(|(slice, size)| (deserialize(slice).expect("failed to parse Block"), size))
Expand Down
10 changes: 10 additions & 0 deletions src/new_index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ pub mod precache;
mod query;
pub mod schema;

use std::sync::LazyLock;

pub(crate) static THREAD_POOL: LazyLock<rayon::ThreadPool> = LazyLock::new(|| {
rayon::ThreadPoolBuilder::new()
.num_threads(0) // 0 = use number of logical CPUs
.thread_name(|i| format!("electrs-worker-{}", i))
.build()
.expect("failed to create global rayon thread pool")
});

pub use self::db::{DBRow, DB};
pub use self::fetch::{BlockEntry, FetchFrom};
pub use self::mempool::Mempool;
Expand Down
28 changes: 10 additions & 18 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,12 @@ impl ChainQuery {
let mut processed_items = 0;
let mut lastblock = None;

// If we need to iterate over 500 history entries to
// get one utxo then your address is too active and should be
// throttled.
// TODO: Think of better way to throttle.
let tx_history_limit = limit * 500;

for (history, blockid) in history_iter {
processed_items += 1;
lastblock = Some(blockid.hash);
Expand All @@ -1017,6 +1023,9 @@ impl ChainQuery {
if utxos.len() > limit {
bail!(ErrorKind::TooManyUtxos(limit))
}
if processed_items > tx_history_limit {
bail!(ErrorKind::TooManyTxs(tx_history_limit))
}
}

Ok((utxos, lastblock, processed_items))
Expand Down Expand Up @@ -1447,24 +1456,7 @@ fn lookup_txos(
outpoints: &BTreeSet<OutPoint>,
allow_missing: bool,
) -> HashMap<OutPoint, TxOut> {
let mut loop_count = 10;
let pool = loop {
match rayon::ThreadPoolBuilder::new()
.num_threads(16) // we need to saturate SSD IOPS
.thread_name(|i| format!("lookup-txo-{}", i))
.build()
{
Ok(pool) => break pool,
Err(e) => {
if loop_count == 0 {
panic!("schema::lookup_txos failed to create a ThreadPool: {}", e);
}
std::thread::sleep(std::time::Duration::from_millis(50));
loop_count -= 1;
}
}
};
pool.install(|| {
super::THREAD_POOL.install(|| {
// Should match lookup_txos_sequential
outpoints
.par_iter()
Expand Down
Loading