diff --git a/dash-spv-ffi/src/callbacks.rs b/dash-spv-ffi/src/callbacks.rs index 9df9fe0f2..2f5990c9d 100644 --- a/dash-spv-ffi/src/callbacks.rs +++ b/dash-spv-ffi/src/callbacks.rs @@ -347,7 +347,7 @@ impl FFISyncEventCallbacks { } => { if let Some(cb) = self.on_blocks_needed { let ffi_blocks: Vec = blocks - .iter() + .keys() .map(|key| FFIBlockNeeded { height: key.height(), hash: *key.hash().as_byte_array(), @@ -361,15 +361,17 @@ impl FFISyncEventCallbacks { height, new_addresses, confirmed_txids, + .. } => { if let Some(cb) = self.on_block_processed { let hash_bytes = block_hash.as_byte_array(); let txid_bytes: Vec<[u8; 32]> = confirmed_txids.iter().map(|txid| *txid.as_byte_array()).collect(); + let total_new_addresses: usize = new_addresses.values().map(|v| v.len()).sum(); cb( *height, hash_bytes as *const [u8; 32], - new_addresses.len() as u32, + total_new_addresses as u32, txid_bytes.as_ptr(), txid_bytes.len() as u32, self.user_data, @@ -836,3 +838,85 @@ impl FFIWalletEventCallbacks { } } } + +#[cfg(test)] +mod tests { + use super::*; + use dashcore::hashes::Hash; + use dashcore::{Address, BlockHash, Network, Txid}; + use key_wallet_manager::{FilterMatchKey, WalletId}; + use std::collections::{BTreeMap, BTreeSet}; + use std::sync::atomic::{AtomicU32, Ordering}; + + /// `BlocksNeeded` dispatch must pass exactly one entry per + /// `FilterMatchKey` to the FFI callback (i.e. iterate keys, not + /// inflated by the per-block wallet attribution). + #[test] + fn test_blocks_needed_dispatch_passes_unique_keys_count() { + static COUNT: AtomicU32 = AtomicU32::new(u32::MAX); + extern "C" fn cb(_blocks: *const FFIBlockNeeded, count: u32, _user: *mut c_void) { + COUNT.store(count, Ordering::SeqCst); + } + + let callbacks = FFISyncEventCallbacks { + on_blocks_needed: Some(cb), + ..FFISyncEventCallbacks::default() + }; + + let mut blocks: BTreeMap> = BTreeMap::new(); + // Two distinct blocks, each attributed to two wallets. The dispatch + // must report 2 (unique keys), not 4. + blocks.insert( + FilterMatchKey::new(10, BlockHash::from_byte_array([1u8; 32])), + BTreeSet::from([[1u8; 32], [2u8; 32]]), + ); + blocks.insert( + FilterMatchKey::new(20, BlockHash::from_byte_array([2u8; 32])), + BTreeSet::from([[1u8; 32], [2u8; 32]]), + ); + + callbacks.dispatch(&SyncEvent::BlocksNeeded { + blocks, + }); + assert_eq!(COUNT.load(Ordering::SeqCst), 2); + } + + /// `BlockProcessed` dispatch must report the total address count + /// summed across all per-wallet entries in the `new_addresses` map. + #[test] + fn test_block_processed_dispatch_sums_per_wallet_addresses() { + static NEW_ADDR_COUNT: AtomicU32 = AtomicU32::new(u32::MAX); + extern "C" fn cb( + _height: u32, + _hash: *const [u8; 32], + new_address_count: u32, + _txids: *const [u8; 32], + _txid_count: u32, + _user: *mut c_void, + ) { + NEW_ADDR_COUNT.store(new_address_count, Ordering::SeqCst); + } + + let callbacks = FFISyncEventCallbacks { + on_block_processed: Some(cb), + ..FFISyncEventCallbacks::default() + }; + + let addr_a = Address::dummy(Network::Regtest, 1); + let addr_b = Address::dummy(Network::Regtest, 2); + let addr_c = Address::dummy(Network::Regtest, 3); + let mut new_addresses: BTreeMap> = BTreeMap::new(); + // Wallet 1 contributes 2 new addresses, wallet 2 contributes 1. Total = 3. + new_addresses.insert([1u8; 32], vec![addr_a, addr_b]); + new_addresses.insert([2u8; 32], vec![addr_c]); + + callbacks.dispatch(&SyncEvent::BlockProcessed { + block_hash: BlockHash::from_byte_array([7u8; 32]), + height: 100, + wallets: BTreeSet::new(), + new_addresses, + confirmed_txids: vec![Txid::from_byte_array([9u8; 32])], + }); + assert_eq!(NEW_ADDR_COUNT.load(Ordering::SeqCst), 3); + } +} diff --git a/dash-spv/src/sync/blocks/manager.rs b/dash-spv/src/sync/blocks/manager.rs index 45ffed61b..6f69e27dc 100644 --- a/dash-spv/src/sync/blocks/manager.rs +++ b/dash-spv/src/sync/blocks/manager.rs @@ -79,15 +79,17 @@ impl BlocksManager 0 { tracing::info!( "Found {} relevant transactions ({} new, {} existing) {} at height {}, new addresses: {}", @@ -96,7 +98,7 @@ impl BlocksManager BlocksManager = result.relevant_txids().cloned().collect(); // Collect new addresses for gap limit rescanning - let new_addresses: Vec<_> = result.new_addresses.into_iter().collect(); - if !new_addresses.is_empty() { + let new_addresses = result.new_addresses; + if new_addresses_total > 0 { tracing::debug!( - "Block {} generated {} new addresses for gap limit maintenance", + "Block {} generated {} new addresses for gap limit maintenance across {} wallets", height, + new_addresses_total, new_addresses.len() ); } @@ -124,6 +127,7 @@ impl BlocksManager; @@ -215,8 +219,8 @@ mod tests { let requests = network.request_sender(); let block_hash = dashcore::BlockHash::dummy(0); - let mut blocks = BTreeSet::new(); - blocks.insert(FilterMatchKey::new(100, block_hash)); + let mut blocks = BTreeMap::new(); + blocks.insert(FilterMatchKey::new(100, block_hash), BTreeSet::from([MOCK_WALLET_ID])); let event = SyncEvent::BlocksNeeded { blocks, }; @@ -227,4 +231,100 @@ mod tests { assert_eq!(manager.state(), SyncState::Syncing); assert!(events.is_empty()); } + + /// `process_buffered_blocks` must call `process_block_for_wallets` with + /// the exact wallet set carried in the pipeline so already-synced + /// wallets are not touched by routing logic. + #[tokio::test] + async fn test_process_buffered_blocks_routes_wallet_set() { + use dashcore::block::Header; + use dashcore::{Block, TxMerkleNode}; + use dashcore_hashes::Hash; + + let mut manager = create_test_manager().await; + manager.progress.set_state(SyncState::Syncing); + + let header = Header { + version: dashcore::blockdata::block::Version::from_consensus(1), + prev_blockhash: dashcore::BlockHash::all_zeros(), + merkle_root: TxMerkleNode::all_zeros(), + time: 0, + bits: dashcore::CompactTarget::from_consensus(0), + nonce: 0, + }; + let block = Block { + header, + txdata: vec![], + }; + manager.pipeline.add_from_storage(block.clone(), 100, BTreeSet::from([MOCK_WALLET_ID])); + + let events = manager.process_buffered_blocks().await.unwrap(); + assert!(matches!(events.first(), Some(SyncEvent::BlockProcessed { .. }))); + + // MOCK_WALLET_ID was in the routed set, so MockWallet recorded the + // block. (MockWallet::process_block_for_wallets returns early when + // its id is absent.) + let processed = manager.wallet.read().await.processed_blocks(); + let processed = processed.lock().await; + assert_eq!(processed.len(), 1); + assert_eq!(processed[0].1, 100); + } + + /// A wallet that is NOT in the pipeline's interested set must not be + /// routed the block. Two wallets are registered, but only `wallet_in` + /// appears in the routed set; the other wallet's processed log must + /// stay empty for that block. + #[tokio::test] + async fn test_process_buffered_blocks_excludes_uninterested_wallet() { + use dashcore::block::Header; + use dashcore::{Block, TxMerkleNode}; + use dashcore_hashes::Hash; + use key_wallet_manager::test_utils::{MockWalletState, MultiMockWallet}; + use key_wallet_manager::WalletId; + + let storage = DiskStorageManager::with_temp_dir().await.unwrap(); + let multi = MultiMockWallet::new(); + let wallet_in: WalletId = [0xAA; 32]; + let wallet_out: WalletId = [0xBB; 32]; + let multi = Arc::new(RwLock::new(multi)); + { + let mut w = multi.write().await; + w.insert_wallet(wallet_in, MockWalletState::default()); + w.insert_wallet(wallet_out, MockWalletState::default()); + } + let mut manager: BlocksManager< + PersistentBlockHeaderStorage, + PersistentBlockStorage, + MultiMockWallet, + > = BlocksManager::new(multi.clone(), storage.block_headers(), storage.blocks()).await; + manager.progress.set_state(SyncState::Syncing); + + let header = Header { + version: dashcore::blockdata::block::Version::from_consensus(1), + prev_blockhash: dashcore::BlockHash::all_zeros(), + merkle_root: TxMerkleNode::all_zeros(), + time: 0, + bits: dashcore::CompactTarget::from_consensus(0), + nonce: 0, + }; + let block = Block { + header, + txdata: vec![], + }; + // Only wallet_in is in the routed set. + manager.pipeline.add_from_storage(block.clone(), 100, BTreeSet::from([wallet_in])); + + let _ = manager.process_buffered_blocks().await.unwrap(); + + let processed = multi.read().await.processed(); + let processed = processed.lock().await; + // Exactly one entry, for wallet_in only. + assert_eq!(processed.len(), 1); + assert_eq!(processed[0].0, wallet_in); + assert_eq!(processed[0].2, 100); + assert!( + !processed.iter().any(|(id, _, _)| *id == wallet_out), + "wallet_out was not in the routed set, must not be processed" + ); + } } diff --git a/dash-spv/src/sync/blocks/pipeline.rs b/dash-spv/src/sync/blocks/pipeline.rs index 2338841e8..fc6475ab7 100644 --- a/dash-spv/src/sync/blocks/pipeline.rs +++ b/dash-spv/src/sync/blocks/pipeline.rs @@ -11,7 +11,7 @@ use crate::network::RequestSender; use crate::sync::download_coordinator::{DownloadConfig, DownloadCoordinator}; use dashcore::blockdata::block::Block; use dashcore::BlockHash; -use key_wallet_manager::FilterMatchKey; +use key_wallet_manager::{FilterMatchKey, WalletId}; /// Maximum number of concurrent block downloads. const MAX_CONCURRENT_BLOCK_DOWNLOADS: usize = 20; @@ -36,6 +36,9 @@ pub(super) struct BlocksPipeline { downloaded: BTreeMap, /// Map hash -> height for looking up height when block arrives. hash_to_height: HashMap, + /// Per-block interested wallets, populated when the block is queued. + /// Only those wallets get the block processed. + hash_to_wallets: HashMap>, } impl std::fmt::Debug for BlocksPipeline { @@ -66,17 +69,25 @@ impl BlocksPipeline { pending_heights: BTreeSet::new(), downloaded: BTreeMap::new(), hash_to_height: HashMap::new(), + hash_to_wallets: HashMap::new(), } } - /// Queue blocks with their heights for download. - /// - /// This is the preferred method as it enables height-ordered processing. - pub(super) fn queue(&mut self, blocks: impl IntoIterator) { - for key in blocks { - self.coordinator.enqueue([*key.hash()]); - self.pending_heights.insert(key.height()); - self.hash_to_height.insert(*key.hash(), key.height()); + /// Queue blocks with their heights and per-block interested wallet sets. + pub(super) fn queue( + &mut self, + blocks: impl IntoIterator)>, + ) { + for (key, wallets) in blocks { + let hash = *key.hash(); + let already_tracked = + self.hash_to_height.contains_key(&hash) || self.hash_to_wallets.contains_key(&hash); + if !already_tracked { + self.coordinator.enqueue([hash]); + self.pending_heights.insert(key.height()); + self.hash_to_height.insert(hash, key.height()); + } + self.hash_to_wallets.entry(hash).or_default().extend(wallets); } } @@ -141,12 +152,13 @@ impl BlocksPipeline { true } - /// Take the next block that's safe to process in height order. + /// Take the next block that's safe to process in height order, along with + /// the wallet set whose filters matched this block. /// /// Returns None if: /// - No downloaded blocks available, or /// - Waiting for a lower-height block still pending - pub(super) fn take_next_ordered_block(&mut self) -> Option<(Block, u32)> { + pub(super) fn take_next_ordered_block(&mut self) -> Option<(Block, u32, BTreeSet)> { let lowest_downloaded = *self.downloaded.keys().next()?; // Check if any pending blocks have lower heights @@ -156,15 +168,22 @@ impl BlocksPipeline { } } - // Safe to return this block let block = self.downloaded.remove(&lowest_downloaded).unwrap(); - Some((block, lowest_downloaded)) + let wallets = self.hash_to_wallets.remove(&block.block_hash()).unwrap_or_default(); + Some((block, lowest_downloaded, wallets)) } /// Add a block that was loaded from storage (skip download). /// /// Used when blocks are already persisted from a previous sync. - pub(super) fn add_from_storage(&mut self, block: Block, height: u32) { + pub(super) fn add_from_storage( + &mut self, + block: Block, + height: u32, + wallets: BTreeSet, + ) { + let hash = block.block_hash(); + self.hash_to_wallets.entry(hash).or_default().extend(wallets); self.downloaded.insert(height, block); } @@ -212,7 +231,7 @@ mod tests { fn test_queue_block() { let mut pipeline = BlocksPipeline::new(); let block = make_test_block(1); - pipeline.queue([FilterMatchKey::new(100, block.block_hash())]); + pipeline.queue([(FilterMatchKey::new(100, block.block_hash()), BTreeSet::new())]); assert_eq!(pipeline.coordinator.pending_count(), 1); assert!(!pipeline.is_complete()); @@ -226,9 +245,9 @@ mod tests { let block2 = make_test_block(2); let block3 = make_test_block(3); pipeline.queue([ - FilterMatchKey::new(100, block1.block_hash()), - FilterMatchKey::new(101, block2.block_hash()), - FilterMatchKey::new(102, block3.block_hash()), + (FilterMatchKey::new(100, block1.block_hash()), BTreeSet::new()), + (FilterMatchKey::new(101, block2.block_hash()), BTreeSet::new()), + (FilterMatchKey::new(102, block3.block_hash()), BTreeSet::new()), ]); assert_eq!(pipeline.coordinator.pending_count(), 3); @@ -245,7 +264,7 @@ mod tests { let hash = block.block_hash(); // Queue with height tracking - pipeline.queue([FilterMatchKey::new(100, block.block_hash())]); + pipeline.queue([(FilterMatchKey::new(100, block.block_hash()), BTreeSet::new())]); // Simulate sending via coordinator let hashes = pipeline.coordinator.take_pending(1); @@ -276,7 +295,7 @@ mod tests { // Queue more blocks than max concurrent for i in 0..=MAX_CONCURRENT_BLOCK_DOWNLOADS { let block = make_test_block(i as u8); - pipeline.queue([FilterMatchKey::new(i as u32, block.block_hash())]); + pipeline.queue([(FilterMatchKey::new(i as u32, block.block_hash()), BTreeSet::new())]); } // Take and mark as downloading up to limit @@ -301,6 +320,7 @@ mod tests { pending_heights: BTreeSet::new(), downloaded: BTreeMap::new(), hash_to_height: HashMap::new(), + hash_to_wallets: HashMap::new(), }; // Use coordinator directly to set up in-flight state @@ -328,7 +348,7 @@ mod tests { // Use add_from_storage to test ordering logic without network // Add block 2 first (out of order) - pipeline.add_from_storage(block2.clone(), 101); + pipeline.add_from_storage(block2.clone(), 101, BTreeSet::new()); // Also track height 100 as pending to simulate waiting pipeline.pending_heights.insert(100); @@ -337,15 +357,15 @@ mod tests { // Add block 1 pipeline.pending_heights.remove(&100); - pipeline.add_from_storage(block1.clone(), 100); + pipeline.add_from_storage(block1.clone(), 100, BTreeSet::new()); // Now block 1 is ready (lowest height) - let (block, height) = pipeline.take_next_ordered_block().unwrap(); + let (block, height, _) = pipeline.take_next_ordered_block().unwrap(); assert_eq!(height, 100); assert_eq!(block.block_hash(), hash1); // Block 2 is now ready - let (block, height) = pipeline.take_next_ordered_block().unwrap(); + let (block, height, _) = pipeline.take_next_ordered_block().unwrap(); assert_eq!(height, 101); assert_eq!(block.block_hash(), hash2); @@ -360,7 +380,7 @@ mod tests { // Add block at height 101, but height 100 is still pending pipeline.pending_heights.insert(100); - pipeline.add_from_storage(block2.clone(), 101); + pipeline.add_from_storage(block2.clone(), 101, BTreeSet::new()); // Cannot take block 2 - block at height 100 is still pending assert!(pipeline.take_next_ordered_block().is_none()); @@ -369,7 +389,7 @@ mod tests { pipeline.pending_heights.remove(&100); // Now block 2 is ready - let (_, height) = pipeline.take_next_ordered_block().unwrap(); + let (_, height, _) = pipeline.take_next_ordered_block().unwrap(); assert_eq!(height, 101); } @@ -379,11 +399,11 @@ mod tests { let block = make_test_block(1); let hash = block.block_hash(); - pipeline.add_from_storage(block.clone(), 100); + pipeline.add_from_storage(block.clone(), 100, BTreeSet::new()); assert_eq!(pipeline.downloaded.len(), 1); - let (taken_block, height) = pipeline.take_next_ordered_block().unwrap(); + let (taken_block, height, _) = pipeline.take_next_ordered_block().unwrap(); assert_eq!(height, 100); assert_eq!(taken_block.block_hash(), hash); } @@ -395,7 +415,7 @@ mod tests { // Adding to downloaded makes it incomplete let block = make_test_block(1); - pipeline.add_from_storage(block, 100); + pipeline.add_from_storage(block, 100, BTreeSet::new()); assert!(!pipeline.is_complete()); // Take the block @@ -416,13 +436,147 @@ mod tests { assert!(pipeline.is_complete()); } + #[test] + fn test_queue_propagates_wallet_set_through_take_next() { + // A block queued with a non-empty wallet set must yield that exact + // wallet set when taken in height order via `take_next_ordered_block`. + let mut pipeline = BlocksPipeline::new(); + let block = make_test_block(1); + let hash = block.block_hash(); + let wallets: BTreeSet = BTreeSet::from([[1u8; 32], [2u8; 32]]); + + pipeline.queue([(FilterMatchKey::new(100, hash), wallets.clone())]); + + // Drive the block through receive_block to land it in `downloaded`. + let hashes = pipeline.coordinator.take_pending(1); + pipeline.coordinator.mark_sent(&hashes); + assert!(pipeline.receive_block(&block)); + + let (taken_block, height, taken_wallets) = pipeline.take_next_ordered_block().unwrap(); + assert_eq!(taken_block.block_hash(), hash); + assert_eq!(height, 100); + assert_eq!(taken_wallets, wallets); + } + + #[test] + fn test_queue_merges_wallet_sets_for_repeat_hashes() { + // Queueing the same block hash twice with different wallet sets must + // produce the union when the block is later taken from the pipeline, + // and must not double-count it in the coordinator's pending state. + let mut pipeline = BlocksPipeline::new(); + let block = make_test_block(1); + let hash = block.block_hash(); + let wallets_a: BTreeSet = BTreeSet::from([[1u8; 32]]); + let wallets_b: BTreeSet = BTreeSet::from([[2u8; 32], [3u8; 32]]); + + pipeline.queue([(FilterMatchKey::new(100, hash), wallets_a.clone())]); + assert_eq!(pipeline.coordinator.pending_count(), 1); + pipeline.queue([(FilterMatchKey::new(100, hash), wallets_b.clone())]); + // Re-queueing must not double the coordinator's pending count. + assert_eq!(pipeline.coordinator.pending_count(), 1); + + // Land the block in `downloaded` to retrieve it. + let hashes = pipeline.coordinator.take_pending(1); + assert_eq!(hashes.len(), 1); + pipeline.coordinator.mark_sent(&hashes); + assert!(pipeline.receive_block(&block)); + + let (_, _, taken_wallets) = pipeline.take_next_ordered_block().unwrap(); + let mut expected = wallets_a; + expected.extend(wallets_b); + assert_eq!(taken_wallets, expected); + } + + #[test] + fn test_queue_does_not_re_enqueue_in_flight_hash() { + // A late-arriving wallet match for a block already in flight must + // merge the wallet id without re-enqueueing the hash. Re-enqueueing + // would cause a duplicate request and corrupt the coordinator's + // pending/in-flight state. + let mut pipeline = BlocksPipeline::new(); + let block = make_test_block(1); + let hash = block.block_hash(); + let wallets_a: BTreeSet = BTreeSet::from([[1u8; 32]]); + let wallets_b: BTreeSet = BTreeSet::from([[2u8; 32]]); + + pipeline.queue([(FilterMatchKey::new(100, hash), wallets_a.clone())]); + // Move the hash to in-flight. + let hashes = pipeline.coordinator.take_pending(1); + pipeline.coordinator.mark_sent(&hashes); + assert_eq!(pipeline.coordinator.pending_count(), 0); + assert_eq!(pipeline.coordinator.active_count(), 1); + + // A second queue call for the same hash must not push it back to + // pending while it is in flight. + pipeline.queue([(FilterMatchKey::new(100, hash), wallets_b.clone())]); + assert_eq!(pipeline.coordinator.pending_count(), 0); + assert_eq!(pipeline.coordinator.active_count(), 1); + + // Late wallet ids are still merged for when the block arrives. + assert!(pipeline.receive_block(&block)); + let (_, _, taken_wallets) = pipeline.take_next_ordered_block().unwrap(); + let mut expected = wallets_a; + expected.extend(wallets_b); + assert_eq!(taken_wallets, expected); + } + + #[test] + fn test_queue_does_not_re_enqueue_downloaded_hash() { + // A late-arriving wallet match for a block already received and sitting + // in `downloaded` (but not yet consumed by `take_next_ordered_block`) + // must merge the wallet id without re-enqueueing the hash. + let mut pipeline = BlocksPipeline::new(); + let block = make_test_block(1); + let hash = block.block_hash(); + let wallets_a: BTreeSet = BTreeSet::from([[1u8; 32]]); + let wallets_b: BTreeSet = BTreeSet::from([[2u8; 32]]); + + pipeline.queue([(FilterMatchKey::new(100, hash), wallets_a.clone())]); + let hashes = pipeline.coordinator.take_pending(1); + pipeline.coordinator.mark_sent(&hashes); + assert!(pipeline.receive_block(&block)); + assert_eq!(pipeline.downloaded.len(), 1); + assert_eq!(pipeline.coordinator.pending_count(), 0); + assert_eq!(pipeline.coordinator.active_count(), 0); + + // Late-arriving match for the same hash must not re-enqueue. + pipeline.queue([(FilterMatchKey::new(100, hash), wallets_b.clone())]); + assert_eq!(pipeline.coordinator.pending_count(), 0); + assert_eq!(pipeline.coordinator.active_count(), 0); + assert_eq!(pipeline.downloaded.len(), 1); + + // Late wallet ids are still merged for when the block is taken. + let (_, _, taken_wallets) = pipeline.take_next_ordered_block().unwrap(); + let mut expected = wallets_a; + expected.extend(wallets_b); + assert_eq!(taken_wallets, expected); + } + + #[test] + fn test_add_from_storage_merges_wallet_sets() { + // The `add_from_storage` path must merge wallet sets for repeat + // additions of the same block hash, matching `queue`'s semantics. + let mut pipeline = BlocksPipeline::new(); + let block = make_test_block(1); + let wallets_a: BTreeSet = BTreeSet::from([[1u8; 32]]); + let wallets_b: BTreeSet = BTreeSet::from([[2u8; 32]]); + + pipeline.add_from_storage(block.clone(), 100, wallets_a.clone()); + pipeline.add_from_storage(block.clone(), 100, wallets_b.clone()); + + let (_, _, taken_wallets) = pipeline.take_next_ordered_block().unwrap(); + let mut expected = wallets_a; + expected.extend(wallets_b); + assert_eq!(taken_wallets, expected); + } + #[test] fn test_receive_block_duplicate() { let mut pipeline = BlocksPipeline::new(); let block = make_test_block(1); // Queue and mark as sent via coordinator - pipeline.queue([FilterMatchKey::new(100, block.block_hash())]); + pipeline.queue([(FilterMatchKey::new(100, block.block_hash()), BTreeSet::new())]); let hashes = pipeline.coordinator.take_pending(1); pipeline.coordinator.mark_sent(&hashes); diff --git a/dash-spv/src/sync/blocks/sync_manager.rs b/dash-spv/src/sync/blocks/sync_manager.rs index c4566a7b1..5b02650b8 100644 --- a/dash-spv/src/sync/blocks/sync_manager.rs +++ b/dash-spv/src/sync/blocks/sync_manager.rs @@ -10,7 +10,8 @@ use crate::types::HashedBlock; use crate::SyncError; use async_trait::async_trait; use dashcore::network::message::NetworkMessage; -use key_wallet_manager::WalletInterface; +use key_wallet_manager::{FilterMatchKey, WalletId, WalletInterface}; +use std::collections::BTreeSet; #[async_trait] impl SyncManager @@ -115,10 +116,10 @@ impl SyncM tracing::debug!("Blocks needed: {} blocks", blocks.len()); - let mut to_download = Vec::new(); + let mut to_download: Vec<(FilterMatchKey, BTreeSet)> = Vec::new(); let block_storage = self.block_storage.read().await; - for key in blocks { + for (key, wallets) in blocks { // Check if block is already stored (from previous sync) if let Ok(Some(hashed_block)) = block_storage.load_block(key.height()).await { if hashed_block.hash() != key.hash() { @@ -135,13 +136,17 @@ impl SyncM ))); } // Block loaded from storage, add to pipeline for processing - self.pipeline.add_from_storage(hashed_block.block().clone(), key.height()); + self.pipeline.add_from_storage( + hashed_block.block().clone(), + key.height(), + wallets.clone(), + ); self.progress.add_from_storage(1); continue; } - // Block not in storage, queue for download with height - to_download.push(key.clone()); + // Block not in storage, queue for download with height + wallets + to_download.push((key.clone(), wallets.clone())); } drop(block_storage); diff --git a/dash-spv/src/sync/events.rs b/dash-spv/src/sync/events.rs index a5ed9734d..86fe6ec76 100644 --- a/dash-spv/src/sync/events.rs +++ b/dash-spv/src/sync/events.rs @@ -2,8 +2,8 @@ use crate::sync::ManagerIdentifier; use dashcore::ephemerealdata::chain_lock::ChainLock; use dashcore::ephemerealdata::instant_lock::InstantLock; use dashcore::{Address, BlockHash, Txid}; -use key_wallet_manager::FilterMatchKey; -use std::collections::BTreeSet; +use key_wallet_manager::{FilterMatchKey, WalletId}; +use std::collections::{BTreeMap, BTreeSet}; /// Events that managers can emit and subscribe to. /// @@ -80,11 +80,15 @@ pub enum SyncEvent { /// Filters matched the wallet, blocks need downloading. /// + /// Each block is tagged with the wallets whose addresses matched its filter, + /// so the block is processed only for those wallets. + /// /// Emitted by: `FiltersManager` /// Consumed by: `BlocksManager` BlocksNeeded { - /// Blocks to download (sorted by height) - blocks: BTreeSet, + /// Blocks to download (height-ordered by `FilterMatchKey`), each + /// associated with the wallet ids that need it. + blocks: BTreeMap>, }, /// Block downloaded and processed through wallet. @@ -97,8 +101,11 @@ pub enum SyncEvent { block_hash: BlockHash, /// Height of the processed block height: u32, - /// New addresses discovered from wallet gap limit maintenance - new_addresses: Vec
, + /// Wallets the block was actually processed for. + wallets: BTreeSet, + /// New addresses discovered from wallet gap limit maintenance, attributed + /// to the wallet that produced them. + new_addresses: BTreeMap>, /// Transaction IDs confirmed in this block that are relevant to the wallet confirmed_txids: Vec, }, @@ -213,7 +220,8 @@ impl SyncEvent { new_addresses, .. } => { - format!("BlockProcessed(height={}, new_addrs={})", height, new_addresses.len()) + let total: usize = new_addresses.values().map(|v| v.len()).sum(); + format!("BlockProcessed(height={}, new_addrs={})", height, total) } SyncEvent::MasternodeStateUpdated { height, diff --git a/dash-spv/src/sync/filters/batch.rs b/dash-spv/src/sync/filters/batch.rs index 75ed22025..02b80be12 100644 --- a/dash-spv/src/sync/filters/batch.rs +++ b/dash-spv/src/sync/filters/batch.rs @@ -1,7 +1,7 @@ use dashcore::bip158::BlockFilter; use dashcore::Address; -use key_wallet_manager::FilterMatchKey; -use std::collections::{HashMap, HashSet}; +use key_wallet_manager::{FilterMatchKey, WalletId}; +use std::collections::{BTreeSet, HashMap, HashSet}; /// A completed batch of compact block filters ready for verification. /// @@ -24,8 +24,14 @@ pub(super) struct FiltersBatch { pending_blocks: u32, /// Whether rescan has been completed for this batch. rescan_complete: bool, - /// Addresses discovered during block processing that need rescan. - collected_addresses: HashSet
, + /// Wallets that were behind for this batch's height range at scan time and + /// therefore need their `synced_height` advanced when the batch commits. + /// Already-synced wallets must not be touched. + scanned_wallets: BTreeSet, + /// Addresses discovered during block processing that still need rescan, + /// attributed per wallet so we can rerun matching only against the wallet + /// that produced each new address. + collected_addresses: HashMap>, } impl FiltersBatch { @@ -43,7 +49,8 @@ impl FiltersBatch { scanned: false, pending_blocks: 0, rescan_complete: false, - collected_addresses: HashSet::new(), + scanned_wallets: BTreeSet::new(), + collected_addresses: HashMap::new(), } } /// Start height of this batch (inclusive). @@ -100,13 +107,26 @@ impl FiltersBatch { self.rescan_complete = true; } /// Add addresses discovered during block processing for later rescan. - pub(super) fn add_addresses(&mut self, addresses: impl IntoIterator) { - self.collected_addresses.extend(addresses); - } - /// Take collected addresses for rescan, leaving the set empty. - pub(super) fn take_collected_addresses(&mut self) -> HashSet
{ + pub(super) fn add_addresses_for_wallet( + &mut self, + wallet_id: WalletId, + addresses: impl IntoIterator, + ) { + self.collected_addresses.entry(wallet_id).or_default().extend(addresses); + } + /// Take collected per-wallet addresses for rescan, leaving the map empty. + pub(super) fn take_collected_addresses(&mut self) -> HashMap> { std::mem::take(&mut self.collected_addresses) } + /// Record the set of wallets that were behind for this batch at scan time. + pub(super) fn set_scanned_wallets(&mut self, wallets: BTreeSet) { + self.scanned_wallets = wallets; + } + /// Wallets that were behind at scan time and must have their synced_height + /// advanced when this batch commits. + pub(super) fn scanned_wallets(&self) -> &BTreeSet { + &self.scanned_wallets + } } impl PartialEq for FiltersBatch { diff --git a/dash-spv/src/sync/filters/block_match_tracker.rs b/dash-spv/src/sync/filters/block_match_tracker.rs new file mode 100644 index 000000000..6f9301491 --- /dev/null +++ b/dash-spv/src/sync/filters/block_match_tracker.rs @@ -0,0 +1,268 @@ +//! Per-block tracking state used by `FiltersManager` while filter matches +//! flow through the block download and apply pipeline. +//! +//! Owns two related maps: +//! +//! - `blocks_remaining`: in-flight matched blocks awaiting `BlockProcessed`, +//! keyed by block hash. The associated `(height, batch_start)` lets the +//! `BlockProcessed` handler decrement the right batch's `pending_blocks`. +//! - `processed_blocks_per_wallet`: which wallets have already had each +//! processed block applied to their state, keyed by height (so commit-time +//! pruning is one `split_off` call) then by hash. Lets a runtime-added +//! wallet still receive a block that was previously processed for another +//! wallet only: the gate is per-wallet, not global. +//! +//! These two maps are coupled: every call site that consults one consults the +//! other, and the lifecycle (track on filter match, record on +//! `BlockProcessed`, prune on commit, clear on reset) is shared. Splitting +//! them out keeps `FiltersManager` focused on batch orchestration. + +use std::collections::{BTreeMap, BTreeSet, HashMap}; + +use dashcore::BlockHash; +use key_wallet_manager::{FilterMatchKey, WalletId}; + +/// Result of recording a filter match for a block against a candidate wallet +/// set. The wallet set carried by `NewlyTracked` and `InFlight` is the +/// residual after subtracting wallets that have already had this block +/// processed. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(super) enum BlockTrackResult { + /// Block was newly tracked for the residual wallets. Caller should emit a + /// `BlocksNeeded` event with this set and account for the block in the + /// batch's `pending_blocks` count. + NewlyTracked { + wallets: BTreeSet, + }, + /// Block is already in flight. Caller should still emit a `BlocksNeeded` + /// event with the residual wallets so the `BlocksPipeline` merges them + /// into the pending wallet set, but must NOT increment the batch's + /// `pending_blocks` count (already counted on first match). + InFlight { + wallets: BTreeSet, + }, + /// All candidate wallets already have this block applied. Caller skips it. + AlreadyProcessed, +} + +/// Per-block tracking state for matched blocks flowing through the filter → +/// block → wallet pipeline. See module-level docs for the invariants. +#[derive(Debug, Default)] +pub(super) struct BlockMatchTracker { + /// In-flight matched blocks awaiting `BlockProcessed`. Maps + /// `block_hash → (height, batch_start)` so the `BlockProcessed` handler + /// can decrement the right batch's `pending_blocks` count. + blocks_remaining: BTreeMap, + /// Per-(height, hash) record of which wallets have had this block + /// applied. Bounded by `prune_at_or_below` after every commit, since + /// below `committed_height` a new wallet can only re-enter via the `tick` + /// rescan trigger which calls `clear` outright. + processed_blocks_per_wallet: BTreeMap>>, +} + +impl BlockMatchTracker { + /// Create an empty tracker. + pub(super) fn new() -> Self { + Self::default() + } + + /// Track a filter match for a block against a candidate wallet set, + /// returning only the wallets that still need the block applied. See + /// `BlockTrackResult` for per-case caller responsibilities. + pub(super) fn track( + &mut self, + key: &FilterMatchKey, + batch_start: u32, + candidate_wallets: BTreeSet, + ) -> BlockTrackResult { + let processed = self.already_processed_wallets(key); + let residual: BTreeSet = + candidate_wallets.difference(&processed).copied().collect(); + if residual.is_empty() { + return BlockTrackResult::AlreadyProcessed; + } + if self.blocks_remaining.contains_key(key.hash()) { + return BlockTrackResult::InFlight { + wallets: residual, + }; + } + self.blocks_remaining.insert(*key.hash(), (key.height(), batch_start)); + BlockTrackResult::NewlyTracked { + wallets: residual, + } + } + + /// Record that `wallets` have had the block at `(height, hash)` applied + /// to their state. Idempotent: existing entries merge, never shrink. + pub(super) fn record_processed( + &mut self, + height: u32, + hash: BlockHash, + wallets: &BTreeSet, + ) { + if wallets.is_empty() { + return; + } + self.processed_blocks_per_wallet + .entry(height) + .or_default() + .entry(hash) + .or_default() + .extend(wallets.iter().copied()); + } + + /// Remove the in-flight entry for `hash`, returning its + /// `(height, batch_start)` if it was tracked. + pub(super) fn finish_in_flight(&mut self, hash: &BlockHash) -> Option<(u32, u32)> { + self.blocks_remaining.remove(hash) + } + + /// Drop every per-wallet processing record at or below `height`. Called + /// after `try_commit_batches` advances `committed_height`: below the new + /// committed height a new wallet can only re-enter via the `tick` rescan + /// trigger, which already wipes the map outright via `clear`. + pub(super) fn prune_at_or_below(&mut self, height: u32) { + self.processed_blocks_per_wallet = + self.processed_blocks_per_wallet.split_off(&(height + 1)); + } + + /// True when there is no in-flight or processed-record state. + pub(super) fn is_empty(&self) -> bool { + self.blocks_remaining.is_empty() && self.processed_blocks_per_wallet.is_empty() + } + + /// Drop all in-flight and processed-record state. + pub(super) fn clear(&mut self) { + self.blocks_remaining.clear(); + self.processed_blocks_per_wallet.clear(); + } + + /// Wallets that have already had this block applied to their state. + fn already_processed_wallets(&self, key: &FilterMatchKey) -> BTreeSet { + self.processed_blocks_per_wallet + .get(&key.height()) + .and_then(|m| m.get(key.hash())) + .cloned() + .unwrap_or_default() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn hash_n(n: u8) -> BlockHash { + dashcore::block::Header::dummy(n as u32).block_hash() + } + + /// `track` walks through the full state machine: NewlyTracked on first + /// match, InFlight on re-match while the block is awaiting processing, + /// NewlyTracked again for a residual wallet after the first wallet's + /// processing is recorded, and AlreadyProcessed once every candidate is + /// covered. + #[test] + fn track_state_machine() { + let mut tracker = BlockMatchTracker::new(); + let hash = hash_n(0); + let key = FilterMatchKey::new(100, hash); + let wallet_a: WalletId = [0xA1; 32]; + let wallet_b: WalletId = [0xB2; 32]; + + // First match for {A}: nothing tracked yet, helper records the block. + assert_eq!( + tracker.track(&key, 0, BTreeSet::from([wallet_a])), + BlockTrackResult::NewlyTracked { + wallets: BTreeSet::from([wallet_a]) + } + ); + assert_eq!(tracker.finish_in_flight(&hash), Some((100, 0))); + // Put it back in flight to continue the scenario. + assert!(matches!( + tracker.track(&key, 0, BTreeSet::from([wallet_a])), + BlockTrackResult::NewlyTracked { .. } + )); + + // Re-match for {A} while still in flight: residual is {A}, InFlight. + assert_eq!( + tracker.track(&key, 0, BTreeSet::from([wallet_a])), + BlockTrackResult::InFlight { + wallets: BTreeSet::from([wallet_a]) + } + ); + + // Block is delivered and processed for {A}. + assert!(tracker.finish_in_flight(&hash).is_some()); + tracker.record_processed(100, hash, &BTreeSet::from([wallet_a])); + + // Late-added B's filter matches the same block: residual is {B} and + // it gets re-queued via NewlyTracked. + assert_eq!( + tracker.track(&key, 5000, BTreeSet::from([wallet_a, wallet_b])), + BlockTrackResult::NewlyTracked { + wallets: BTreeSet::from([wallet_b]) + } + ); + + // After B is processed, both wallets are covered: AlreadyProcessed. + assert!(tracker.finish_in_flight(&hash).is_some()); + tracker.record_processed(100, hash, &BTreeSet::from([wallet_b])); + assert_eq!( + tracker.track(&key, 5000, BTreeSet::from([wallet_a, wallet_b])), + BlockTrackResult::AlreadyProcessed + ); + } + + /// `prune_at_or_below` drops every entry at or below the given height + /// while retaining strictly higher entries. Idempotent under repeated + /// calls with the same threshold. + #[test] + fn prune_at_or_below_drops_low_entries() { + let mut tracker = BlockMatchTracker::new(); + let wallet: WalletId = [0xFA; 32]; + let h_low = hash_n(1); + let h_mid = hash_n(2); + let h_high = hash_n(3); + + tracker.record_processed(2500, h_low, &BTreeSet::from([wallet])); + tracker.record_processed(4999, h_mid, &BTreeSet::from([wallet])); + tracker.record_processed(7500, h_high, &BTreeSet::from([wallet])); + + tracker.prune_at_or_below(4999); + + // Entries at or below 4999 are gone, the 7500 entry survives. + let key_low = FilterMatchKey::new(2500, h_low); + let key_mid = FilterMatchKey::new(4999, h_mid); + let key_high = FilterMatchKey::new(7500, h_high); + assert!(tracker.already_processed_wallets(&key_low).is_empty()); + assert!(tracker.already_processed_wallets(&key_mid).is_empty()); + assert!(tracker.already_processed_wallets(&key_high).contains(&wallet)); + + // Repeat call is a no-op. + tracker.prune_at_or_below(4999); + assert!(tracker.already_processed_wallets(&key_high).contains(&wallet)); + } + + /// `is_empty` and `clear` cover both maps together: populating either + /// flips `is_empty`, and `clear` returns to the initial state. + #[test] + fn is_empty_and_clear_cover_both_maps() { + let mut tracker = BlockMatchTracker::new(); + let wallet: WalletId = [0xCC; 32]; + let hash = hash_n(0); + let key = FilterMatchKey::new(100, hash); + + assert!(tracker.is_empty()); + + // Only blocks_remaining populated. + tracker.track(&key, 0, BTreeSet::from([wallet])); + assert!(!tracker.is_empty()); + tracker.clear(); + assert!(tracker.is_empty()); + + // Only processed_blocks_per_wallet populated. + tracker.record_processed(100, hash, &BTreeSet::from([wallet])); + assert!(!tracker.is_empty()); + tracker.clear(); + assert!(tracker.is_empty()); + } +} diff --git a/dash-spv/src/sync/filters/manager.rs b/dash-spv/src/sync/filters/manager.rs index d800d0f95..76ea261ce 100644 --- a/dash-spv/src/sync/filters/manager.rs +++ b/dash-spv/src/sync/filters/manager.rs @@ -4,13 +4,14 @@ //! and matches against wallet to identify blocks for download. //! Emits FiltersStored, FiltersSyncComplete and BlocksNeeded events. -use std::collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::sync::Arc; use dashcore::bip158::BlockFilter; -use dashcore::{Address, BlockHash}; +use dashcore::Address; use super::batch::FiltersBatch; +use super::block_match_tracker::{BlockMatchTracker, BlockTrackResult}; use super::pipeline::FiltersPipeline; use crate::error::SyncResult; use crate::network::RequestSender; @@ -22,7 +23,7 @@ use crate::validation::{FilterValidationInput, FilterValidator, Validator}; use crate::sync::progress::ProgressPercentage; use dashcore::hash_types::FilterHeader; use key_wallet_manager::WalletInterface; -use key_wallet_manager::{check_compact_filters_for_addresses, FilterMatchKey}; +use key_wallet_manager::{check_compact_filters_for_addresses, FilterMatchKey, WalletId}; use tokio::sync::RwLock; /// Batch size for processing filters. @@ -66,11 +67,10 @@ pub struct FiltersManager< pub(super) active_batches: BTreeMap, /// Current block height being processed (for progress tracking). processing_height: u32, - /// Blocks remaining that need to be processed. - /// Maps block_hash -> (height, batch_start) for batch association. - pub(super) blocks_remaining: BTreeMap, - /// Block hashes that have been matched and queued for download. - pub(super) filters_matched: HashSet, + /// Per-block tracking state for matched blocks: in-flight blocks awaiting + /// `BlockProcessed` and the per-wallet record of which wallets already + /// have a given processed block applied. + pub(super) tracker: BlockMatchTracker, } impl @@ -114,16 +114,14 @@ impl bool { self.active_batches.is_empty() - && self.blocks_remaining.is_empty() - && self.filters_matched.is_empty() + && self.tracker.is_empty() && self.pending_batches.is_empty() && self.filter_pipeline.is_idle() } @@ -453,16 +451,16 @@ impl = self @@ -473,7 +471,7 @@ impl self.progress.committed_height() { self.progress.update_committed_height(end); - self.wallet.write().await.update_synced_height(end); + let scanned_wallets = batch.scanned_wallets().clone(); + if !scanned_wallets.is_empty() { + let mut wallet = self.wallet.write().await; + for wallet_id in &scanned_wallets { + wallet.update_wallet_synced_height(wallet_id, end); + } + } } + // Drop processed-wallet records for the committed range. Below the + // new committed_height a new wallet can only get here via the + // `tick` rescan trigger, which already wipes the map via + // `clear_in_flight_state`, so older entries can never be consulted. + self.tracker.prune_at_or_below(end); self.processing_height = end + 1; tracing::info!( @@ -581,22 +592,24 @@ impl, + new_addresses: &HashMap>, ) -> SyncResult> { if new_addresses.is_empty() { return Ok(vec![]); } - let Some(batch) = self.active_batches.get_mut(&batch_start) else { + let Some(batch) = self.active_batches.get(&batch_start) else { return Ok(vec![]); }; tracing::info!( - "Rescan filters ({}-{}) for {} new addresses", + "Rescan filters ({}-{}) for new addresses across {} wallets", batch.start_height(), batch.end_height(), new_addresses.len() @@ -605,37 +618,63 @@ impl = { + let wallet = self.wallet.read().await; + new_addresses.keys().map(|id| (*id, wallet.wallet_synced_height(id))).collect() + }; + + let mut block_to_wallets: BTreeMap> = BTreeMap::new(); + for (wallet_id, addresses) in new_addresses { + if addresses.is_empty() { + continue; + } + let addresses_vec: Vec<_> = addresses.iter().cloned().collect(); + let min_synced = synced_heights.get(wallet_id).copied().unwrap_or(0); + let matches = + check_compact_filters_for_addresses(batch_filters, addresses_vec, min_synced); + for key in matches { + block_to_wallets.entry(key).or_default().insert(*wallet_id); + } + } - // Match filters against new addresses only - let addresses_vec: Vec<_> = new_addresses.into_iter().collect(); - let matches = check_compact_filters_for_addresses(batch.filters(), addresses_vec); let mut events = Vec::new(); - let mut blocks_needed = BTreeSet::new(); + let mut blocks_needed: BTreeMap> = BTreeMap::new(); let mut new_blocks_count = 0; - if !matches.is_empty() { - self.progress.add_matched(matches.len() as u32); + if !block_to_wallets.is_empty() { + self.progress.add_matched(block_to_wallets.len() as u32); } - for key in matches { - // Skip blocks that were already matched (even if already processed) - if self.filters_matched.contains(key.hash()) { - continue; - } - // Queue blocks discovered by rescan for download - if let btree_map::Entry::Vacant(e) = self.blocks_remaining.entry(*key.hash()) { - e.insert((key.height(), batch_start)); - self.filters_matched.insert(*key.hash()); - blocks_needed.insert(key); - new_blocks_count += 1; + for (key, wallets) in block_to_wallets { + match self.tracker.track(&key, batch_start, wallets) { + BlockTrackResult::NewlyTracked { + wallets, + } => { + blocks_needed.insert(key, wallets); + new_blocks_count += 1; + } + BlockTrackResult::InFlight { + wallets, + } => { + // Block already on its way; merge late wallet ids into the + // pipeline's pending wallet set via a fresh BlocksNeeded. + blocks_needed.insert(key, wallets); + } + BlockTrackResult::AlreadyProcessed => {} } } - // Update batch pending_blocks count + // Update batch pending_blocks count for the genuinely new entries only. if new_blocks_count > 0 { if let Some(batch) = self.active_batches.get_mut(&batch_start) { batch.set_pending_blocks(batch.pending_blocks() + new_blocks_count); } tracing::info!("Rescan found {} additional blocks", new_blocks_count); + } + if !blocks_needed.is_empty() { events.push(SyncEvent::BlocksNeeded { blocks: blocks_needed, }); @@ -644,68 +683,163 @@ impl SyncResult> { let mut events = Vec::new(); - let Some(batch) = self.active_batches.get_mut(&batch_start) else { - tracing::debug!("scan_batch: batch {} not found", batch_start); - return Ok(events); + let (batch_end, filters_empty) = { + let Some(batch) = self.active_batches.get_mut(&batch_start) else { + tracing::debug!("scan_batch: batch {} not found", batch_start); + return Ok(events); + }; + + tracing::debug!( + "scan_batch: batch {}-{} has {} filters", + batch.start_height(), + batch.end_height(), + batch.filters().len() + ); + + batch.mark_scanned(); + (batch.end_height(), batch.filters().is_empty()) }; - tracing::debug!( - "scan_batch: batch {}-{} has {} filters", - batch.start_height(), - batch.end_height(), - batch.filters().len() - ); + // Snapshot per-wallet state for the wallets behind this batch's range. + // A wallet whose `synced_height >= batch_end` is fully covered and is + // skipped entirely, its addresses never even get tested against these + // filters. + let wallet = self.wallet.read().await; + let behind = wallet.wallets_behind(batch_end); + let mut wallet_states: Vec<(WalletId, u32, Vec
)> = Vec::new(); + for wallet_id in &behind { + let synced = wallet.wallet_synced_height(wallet_id); + let addresses = wallet.monitored_addresses_for(wallet_id); + if !addresses.is_empty() { + wallet_states.push((*wallet_id, synced, addresses)); + } + } + drop(wallet); - batch.mark_scanned(); + // Every behind wallet's coverage advances to `batch_end` once this + // batch commits. That includes wallets without any monitored + // addresses: they have nothing to match against these filters, so the + // batch fully accounts for their range and their `synced_height` must + // advance to keep `wallets_behind` from listing them on every future + // batch. + let scanned_wallets: BTreeSet = behind.clone(); - // Get all filters in the batch - if batch.filters().is_empty() { + if let Some(batch) = self.active_batches.get_mut(&batch_start) { + batch.set_scanned_wallets(scanned_wallets); + } + + if filters_empty { tracing::debug!("scan_batch: batch filters are empty, returning early"); return Ok(events); } - // Match against wallet's current addresses - let wallet = self.wallet.read().await; - let addresses = wallet.monitored_addresses(); - let matches = check_compact_filters_for_addresses(batch.filters(), addresses); - drop(wallet); + if wallet_states.is_empty() { + // No addresses to scan, but `scanned_wallets` was still recorded + // so any zero-address behind wallets advance at commit. + tracing::debug!("scan_batch: no behind wallets with monitored addresses"); + return Ok(events); + } + + // Single-pass union-then-attribute: build the union of all addresses + // across behind wallets, run the filters once, then for each matched + // block re-test per-wallet scripts to attribute the match correctly. + let union_addresses: Vec
= + wallet_states.iter().flat_map(|(_, _, addrs)| addrs.iter().cloned()).collect(); + let min_synced = wallet_states.iter().map(|(_, synced, _)| *synced).min().unwrap_or(0); + + let block_to_wallets = { + let Some(batch) = self.active_batches.get(&batch_start) else { + return Ok(events); + }; + let batch_filters = batch.filters(); + + let matches = + check_compact_filters_for_addresses(batch_filters, union_addresses, min_synced); + let mut block_to_wallets: BTreeMap> = + BTreeMap::new(); + for key in matches { + let Some(filter) = batch_filters.get(&key) else { + tracing::warn!( + "skipping unmatched filter key at height {}: hash {}", + key.height(), + key.hash() + ); + continue; + }; + for (wallet_id, wallet_synced, addresses) in &wallet_states { + if key.height() <= *wallet_synced { + continue; + } + let scripts: Vec> = + addresses.iter().map(|a| a.script_pubkey().to_bytes()).collect(); + let matched = match filter + .match_any(key.hash(), scripts.iter().map(|v| v.as_slice())) + { + Ok(matched) => matched, + Err(e) => { + tracing::warn!( + "filter match_any error during attribution at height {}: {}; treating as non-match", + key.height(), + e + ); + false + } + }; + if matched { + block_to_wallets.entry(key.clone()).or_default().insert(*wallet_id); + } + } + } + block_to_wallets + }; tracing::info!( - "Batch {}-{}: found {} matching blocks", - batch.start_height(), - batch.end_height(), - matches.len() + "Batch {}-{}: found {} matching blocks across {} behind wallets", + batch_start, + batch_end, + block_to_wallets.len(), + wallet_states.len() ); - if matches.is_empty() { + if block_to_wallets.is_empty() { return Ok(events); } - self.progress.add_matched(matches.len() as u32); + self.progress.add_matched(block_to_wallets.len() as u32); - // Filter out already-processed blocks and track the new ones - let mut blocks_needed = BTreeSet::new(); + // Either (re)queue the block via `BlocksNeeded` or skip if every + // candidate wallet already has it processed. In-flight blocks still + // re-emit so the BlocksPipeline merges any late-arriving wallet ids. + let mut blocks_needed: BTreeMap> = BTreeMap::new(); let mut new_blocks_count = 0; - for key in matches { - if self.filters_matched.contains(key.hash()) { - continue; - } - if self.blocks_remaining.contains_key(key.hash()) { - continue; + for (key, wallets) in block_to_wallets { + match self.tracker.track(&key, batch_start, wallets) { + BlockTrackResult::NewlyTracked { + wallets, + } => { + blocks_needed.insert(key, wallets); + new_blocks_count += 1; + } + BlockTrackResult::InFlight { + wallets, + } => { + blocks_needed.insert(key, wallets); + } + BlockTrackResult::AlreadyProcessed => {} } - self.blocks_remaining.insert(*key.hash(), (key.height(), batch_start)); - self.filters_matched.insert(*key.hash()); - blocks_needed.insert(key); - new_blocks_count += 1; } - // Update batch pending_blocks count - if let Some(batch) = self.active_batches.get_mut(&batch_start) { - batch.set_pending_blocks(batch.pending_blocks() + new_blocks_count); + // Update batch pending_blocks count for the genuinely new entries only. + if new_blocks_count > 0 { + if let Some(batch) = self.active_batches.get_mut(&batch_start) { + batch.set_pending_blocks(batch.pending_blocks() + new_blocks_count); + } } if !blocks_needed.is_empty() { @@ -773,9 +907,13 @@ mod tests { PersistentFilterHeaderStorage, PersistentFilterStorage, StorageManager, }; use crate::sync::{ManagerIdentifier, SyncManagerProgress}; + use dashcore::bip158::BlockFilter; use dashcore::Header; + use dashcore::{Block, Network, Transaction}; use dashcore_hashes::Hash; - use key_wallet_manager::test_utils::MockWallet; + use key_wallet_manager::test_utils::{ + MockWallet, MockWalletState, MultiMockWallet, MOCK_WALLET_ID, + }; use tokio::sync::mpsc::unbounded_channel; type TestFiltersManager = FiltersManager< @@ -784,6 +922,12 @@ mod tests { PersistentFilterStorage, MockWallet, >; + type MultiTestFiltersManager = FiltersManager< + PersistentBlockHeaderStorage, + PersistentFilterHeaderStorage, + PersistentFilterStorage, + MultiMockWallet, + >; type TestSyncManager = dyn SyncManager; async fn create_test_manager() -> TestFiltersManager { @@ -798,6 +942,30 @@ mod tests { .await } + async fn create_multi_test_manager( + wallet: Arc>, + ) -> MultiTestFiltersManager { + let storage = DiskStorageManager::with_temp_dir().await.unwrap(); + FiltersManager::new( + wallet, + storage.block_headers(), + storage.filter_headers(), + storage.filters(), + ) + .await + } + + /// Build a real `BlockFilter` for a single-output block paying `address`. + fn filter_for_address( + height: u32, + address: &dashcore::Address, + ) -> (FilterMatchKey, BlockFilter) { + let tx = Transaction::dummy(address, 0..0, &[height as u64]); + let block = Block::dummy(height, vec![tx]); + let filter = BlockFilter::dummy(&block); + (FilterMatchKey::new(height, block.block_hash()), filter) + } + #[tokio::test] async fn test_filters_manager_new() { let manager = create_test_manager().await; @@ -816,7 +984,7 @@ mod tests { // Set wallet committed height via last_processed_height (MockWallet default delegates) let mut wallet = MockWallet::new(); - wallet.update_last_processed_height(50); + wallet.update_wallet_synced_height(&MOCK_WALLET_ID, 50); let wallet = Arc::new(RwLock::new(wallet)); // Pre-populate filter storage with filters at heights 1..=100 @@ -928,6 +1096,618 @@ mod tests { manager.try_commit_batches().await.unwrap(); assert_eq!(manager.active_batches.len(), 0); assert_eq!(manager.progress.committed_height(), 4999); + // No wallets were recorded as scanned for this batch, so the per-wallet + // synced_height stays at its initial value. + assert_eq!(manager.wallet.read().await.wallet_synced_height(&MOCK_WALLET_ID), 0); + } + + #[tokio::test] + async fn test_batch_commit_advances_only_scanned_wallets() { + let mut manager = create_test_manager().await; + manager.set_state(SyncState::Syncing); + + // First batch records MOCK_WALLET_ID as scanned, so its synced_height + // advances to the batch end on commit. + let mut batch1 = FiltersBatch::new(0, 4999, HashMap::new()); + batch1.set_pending_blocks(0); + batch1.mark_scanned(); + batch1.mark_rescan_complete(); + batch1.set_scanned_wallets(BTreeSet::from([MOCK_WALLET_ID])); + manager.active_batches.insert(0, batch1); + + manager.try_commit_batches().await.unwrap(); + assert_eq!(manager.progress.committed_height(), 4999); + assert_eq!(manager.wallet.read().await.wallet_synced_height(&MOCK_WALLET_ID), 4999); + + // Second batch leaves scanned_wallets empty (nothing to scan in this + // range), so the per-wallet synced_height stays put even though the + // committed_height advances. + let mut batch2 = FiltersBatch::new(5000, 9999, HashMap::new()); + batch2.set_pending_blocks(0); + batch2.mark_scanned(); + batch2.mark_rescan_complete(); + manager.active_batches.insert(5000, batch2); + + manager.try_commit_batches().await.unwrap(); + assert_eq!(manager.progress.committed_height(), 9999); + assert_eq!(manager.wallet.read().await.wallet_synced_height(&MOCK_WALLET_ID), 4999); + } + + /// Two wallets in the same batch: only the wallet recorded in + /// `scanned_wallets` advances, the other stays put even after commit. + #[tokio::test] + async fn test_batch_commit_advances_only_recorded_wallet_with_two_wallets() { + let wallet_a: WalletId = [0xAA; 32]; + let wallet_b: WalletId = [0xBB; 32]; + let multi = MultiMockWallet::new(); + let multi = Arc::new(RwLock::new(multi)); + { + let mut w = multi.write().await; + w.insert_wallet(wallet_a, MockWalletState::default()); + w.insert_wallet(wallet_b, MockWalletState::default()); + } + let mut manager = create_multi_test_manager(multi.clone()).await; + manager.set_state(SyncState::Syncing); + + // Batch records only wallet_a as scanned. wallet_b is excluded. + let mut batch = FiltersBatch::new(0, 4999, HashMap::new()); + batch.set_pending_blocks(0); + batch.mark_scanned(); + batch.mark_rescan_complete(); + batch.set_scanned_wallets(BTreeSet::from([wallet_a])); + manager.active_batches.insert(0, batch); + + manager.try_commit_batches().await.unwrap(); + assert_eq!(manager.progress.committed_height(), 4999); + assert_eq!(multi.read().await.wallet_synced_height(&wallet_a), 4999); + assert_eq!(multi.read().await.wallet_synced_height(&wallet_b), 0); + } + + /// `scan_batch` with two wallets at different `synced_height` values: + /// only the wallet whose synced_height is below the matching block's + /// height should be attributed. + #[tokio::test] + async fn test_scan_batch_attributes_per_wallet_height() { + let wallet_low: WalletId = [0x01; 32]; + let wallet_high: WalletId = [0x02; 32]; + let address_low = dashcore::Address::dummy(Network::Regtest, 1); + let address_high = dashcore::Address::dummy(Network::Regtest, 2); + + let multi = MultiMockWallet::new(); + let multi = Arc::new(RwLock::new(multi)); + { + let mut w = multi.write().await; + // wallet_low is behind: synced_height=10, will see filters above 10. + w.insert_wallet( + wallet_low, + MockWalletState { + addresses: vec![address_low.clone()], + synced_height: 10, + last_processed_height: 10, + }, + ); + // wallet_high is mostly synced: synced_height=50, only sees > 50. + w.insert_wallet( + wallet_high, + MockWalletState { + addresses: vec![address_high.clone()], + synced_height: 50, + last_processed_height: 50, + }, + ); + } + let mut manager = create_multi_test_manager(multi).await; + manager.set_state(SyncState::Syncing); + + // Build a batch with three filters: at 30 paying wallet_low's address, + // at 60 paying wallet_high's address, at 70 paying wallet_low's address. + let mut filters: HashMap = HashMap::new(); + let (key_30, f_30) = filter_for_address(30, &address_low); + let (key_60, f_60) = filter_for_address(60, &address_high); + let (key_70, f_70) = filter_for_address(70, &address_low); + filters.insert(key_30.clone(), f_30); + filters.insert(key_60.clone(), f_60); + filters.insert(key_70.clone(), f_70); + + let mut batch = FiltersBatch::new(0, 99, filters); + batch.mark_verified(); + manager.active_batches.insert(0, batch); + manager.progress.update_stored_height(99); + + let events = manager.scan_batch(0).await.unwrap(); + + // Find the BlocksNeeded event. + let blocks = events + .iter() + .find_map(|e| match e { + SyncEvent::BlocksNeeded { + blocks, + } => Some(blocks), + _ => None, + }) + .expect("BlocksNeeded event"); + + // Block at 30 only attributable to wallet_low (height <= wallet_high.synced) + let attr_30 = blocks.get(&key_30).expect("entry for height 30"); + assert!(attr_30.contains(&wallet_low)); + assert!(!attr_30.contains(&wallet_high)); + + // Block at 60 only attributable to wallet_high (matches its address); + // wallet_low's address does not match so it shouldn't be there either. + let attr_60 = blocks.get(&key_60).expect("entry for height 60"); + assert!(attr_60.contains(&wallet_high)); + assert!(!attr_60.contains(&wallet_low)); + + // Block at 70 only attributable to wallet_low: matches wallet_low's + // address, and wallet_high's address does not match this filter. + let attr_70 = blocks.get(&key_70).expect("entry for height 70"); + assert!(attr_70.contains(&wallet_low)); + assert!(!attr_70.contains(&wallet_high)); + } + + /// `rescan_batch` with multiple wallets in `addresses_by_wallet`: + /// each wallet's new addresses are matched independently and the + /// attribution is correct in the emitted `BlocksNeeded`. + #[tokio::test] + async fn test_rescan_batch_attributes_per_wallet_addresses() { + let wallet_a: WalletId = [0x0A; 32]; + let wallet_b: WalletId = [0x0B; 32]; + let address_a = dashcore::Address::dummy(Network::Regtest, 11); + let address_b = dashcore::Address::dummy(Network::Regtest, 22); + + let multi = MultiMockWallet::new(); + let multi = Arc::new(RwLock::new(multi)); + { + let mut w = multi.write().await; + w.insert_wallet(wallet_a, MockWalletState::default()); + w.insert_wallet(wallet_b, MockWalletState::default()); + } + let mut manager = create_multi_test_manager(multi).await; + manager.set_state(SyncState::Syncing); + + let mut filters: HashMap = HashMap::new(); + let (key_a, f_a) = filter_for_address(15, &address_a); + let (key_b, f_b) = filter_for_address(25, &address_b); + filters.insert(key_a.clone(), f_a); + filters.insert(key_b.clone(), f_b); + + let mut batch = FiltersBatch::new(0, 99, filters); + batch.mark_verified(); + manager.active_batches.insert(0, batch); + + let mut new_addresses: HashMap> = HashMap::new(); + new_addresses.insert(wallet_a, HashSet::from([address_a])); + new_addresses.insert(wallet_b, HashSet::from([address_b])); + + let events = manager.rescan_batch(0, &new_addresses).await.unwrap(); + + let blocks = events + .iter() + .find_map(|e| match e { + SyncEvent::BlocksNeeded { + blocks, + } => Some(blocks), + _ => None, + }) + .expect("BlocksNeeded event"); + + let attr_a = blocks.get(&key_a).expect("entry for wallet_a's match"); + assert!(attr_a.contains(&wallet_a)); + assert!(!attr_a.contains(&wallet_b)); + + let attr_b = blocks.get(&key_b).expect("entry for wallet_b's match"); + assert!(attr_b.contains(&wallet_b)); + assert!(!attr_b.contains(&wallet_a)); + } + + /// `rescan_batch` honours each wallet's own `synced_height`: a new + /// address belonging to a wallet that has already advanced past a height + /// must not produce a `BlocksNeeded` for that height, even when the + /// filter for that height matches the new address. Two wallets at + /// different heights are exercised so that both the include-above and + /// skip-below paths run. + #[tokio::test] + async fn test_rescan_batch_skips_below_per_wallet_synced_height() { + let wallet_low: WalletId = [0xA1; 32]; + let wallet_high: WalletId = [0xA2; 32]; + let address_low = dashcore::Address::dummy(Network::Regtest, 41); + let address_high = dashcore::Address::dummy(Network::Regtest, 42); + + let multi = MultiMockWallet::new(); + let multi = Arc::new(RwLock::new(multi)); + { + let mut w = multi.write().await; + w.insert_wallet( + wallet_low, + MockWalletState { + addresses: vec![], + synced_height: 20, + last_processed_height: 20, + }, + ); + w.insert_wallet( + wallet_high, + MockWalletState { + addresses: vec![], + synced_height: 60, + last_processed_height: 60, + }, + ); + } + let mut manager = create_multi_test_manager(multi).await; + manager.set_state(SyncState::Syncing); + + // Filters at 30 (matches wallet_low) and 70 (matches wallet_high). + // For wallet_low (synced=20), height 30 is fresh and 70 is also fresh + // since 70 > 20. For wallet_high (synced=60), height 30 is below its + // synced_height so it must be skipped, while 70 is fresh. + let (key_30, f_30) = filter_for_address(30, &address_low); + let (key_70, f_70) = filter_for_address(70, &address_high); + let mut filters: HashMap = HashMap::new(); + filters.insert(key_30.clone(), f_30); + filters.insert(key_70.clone(), f_70); + + let mut batch = FiltersBatch::new(0, 99, filters); + batch.mark_verified(); + manager.active_batches.insert(0, batch); + + // wallet_high also "discovers" address_low to demonstrate that even + // when a new address would match a low height, the per-wallet + // synced_height filter prevents emitting it. + let mut new_addresses: HashMap> = HashMap::new(); + new_addresses.insert(wallet_low, HashSet::from([address_low.clone()])); + new_addresses.insert(wallet_high, HashSet::from([address_low.clone(), address_high])); + + let events = manager.rescan_batch(0, &new_addresses).await.unwrap(); + + let blocks = events + .iter() + .find_map(|e| match e { + SyncEvent::BlocksNeeded { + blocks, + } => Some(blocks), + _ => None, + }) + .expect("BlocksNeeded event"); + + // wallet_low must see height 30, wallet_high must NOT (synced=60>30). + let attr_30 = blocks.get(&key_30).expect("entry at height 30 for wallet_low"); + assert!(attr_30.contains(&wallet_low)); + assert!(!attr_30.contains(&wallet_high)); + + // wallet_high must see height 70 since 70 > 60. + let attr_70 = blocks.get(&key_70).expect("entry at height 70 for wallet_high"); + assert!(attr_70.contains(&wallet_high)); + } + + /// `scan_batch` for a behind wallet with no monitored addresses still + /// records the wallet in `scanned_wallets` so its `synced_height` + /// advances at commit. Otherwise zero-address wallets would be listed by + /// `wallets_behind` on every batch forever. + #[tokio::test] + async fn test_scan_batch_advances_zero_address_wallet() { + let wallet_id: WalletId = [0xCC; 32]; + let multi = MultiMockWallet::new(); + let multi = Arc::new(RwLock::new(multi)); + { + let mut w = multi.write().await; + w.insert_wallet(wallet_id, MockWalletState::default()); + } + let mut manager = create_multi_test_manager(multi.clone()).await; + manager.set_state(SyncState::Syncing); + + // Batch with one filter at height 50 (irrelevant: wallet has no addresses). + let mut filters: HashMap = HashMap::new(); + let throwaway_address = dashcore::Address::dummy(Network::Regtest, 99); + let (key, filter) = filter_for_address(50, &throwaway_address); + filters.insert(key, filter); + + let mut batch = FiltersBatch::new(0, 99, filters); + batch.mark_verified(); + manager.active_batches.insert(0, batch); + manager.progress.update_stored_height(99); + + let events = manager.scan_batch(0).await.unwrap(); + assert!(events.is_empty(), "no addresses should mean no BlocksNeeded events"); + + // Mark batch ready so commit can run, then commit. + if let Some(b) = manager.active_batches.get_mut(&0) { + b.set_pending_blocks(0); + b.mark_rescan_complete(); + } + manager.try_commit_batches().await.unwrap(); + + // Wallet had no addresses, but it was behind, so its synced_height + // advances to the batch end after commit. + assert_eq!(multi.read().await.wallet_synced_height(&wallet_id), 99); + } + + /// `scan_batch` after a runtime-added wallet whose address matches a + /// block already in flight must re-emit `BlocksNeeded` so the + /// `BlocksPipeline` merges the new wallet id into the pending set. + #[tokio::test] + async fn test_scan_batch_in_flight_re_emits_for_late_wallet() { + let wallet_id: WalletId = [0xDD; 32]; + let address = dashcore::Address::dummy(Network::Regtest, 7); + + let multi = MultiMockWallet::new(); + let multi = Arc::new(RwLock::new(multi)); + { + let mut w = multi.write().await; + w.insert_wallet( + wallet_id, + MockWalletState { + addresses: vec![address.clone()], + synced_height: 0, + last_processed_height: 0, + }, + ); + } + let mut manager = create_multi_test_manager(multi).await; + manager.set_state(SyncState::Syncing); + + // One matching filter at height 40. + let (key_40, f_40) = filter_for_address(40, &address); + let mut filters: HashMap = HashMap::new(); + filters.insert(key_40.clone(), f_40); + + let mut batch = FiltersBatch::new(0, 99, filters); + batch.mark_verified(); + manager.active_batches.insert(0, batch); + manager.progress.update_stored_height(99); + + // Pre-seed the tracker so `tracker.track` returns InFlight. + manager.tracker.track(&key_40, 0, BTreeSet::from([wallet_id])); + + let events = manager.scan_batch(0).await.unwrap(); + + let blocks = events + .iter() + .find_map(|e| match e { + SyncEvent::BlocksNeeded { + blocks, + } => Some(blocks), + _ => None, + }) + .expect("InFlight path must still emit BlocksNeeded for wallet-set merge"); + let attribution = blocks.get(&key_40).expect("entry for the in-flight block"); + assert!(attribution.contains(&wallet_id)); + } + + /// `scan_batch` `AlreadyProcessed` path: when every candidate wallet has + /// already had this block processed, the block is skipped (no + /// `BlocksNeeded`). + #[tokio::test] + async fn test_scan_batch_already_processed_is_skipped() { + let wallet_id: WalletId = [0xEE; 32]; + let address = dashcore::Address::dummy(Network::Regtest, 8); + + let multi = MultiMockWallet::new(); + let multi = Arc::new(RwLock::new(multi)); + { + let mut w = multi.write().await; + w.insert_wallet( + wallet_id, + MockWalletState { + addresses: vec![address.clone()], + synced_height: 0, + last_processed_height: 0, + }, + ); + } + let mut manager = create_multi_test_manager(multi).await; + manager.set_state(SyncState::Syncing); + + let (key_40, f_40) = filter_for_address(40, &address); + let mut filters: HashMap = HashMap::new(); + filters.insert(key_40.clone(), f_40); + + let mut batch = FiltersBatch::new(0, 99, filters); + batch.mark_verified(); + manager.active_batches.insert(0, batch); + manager.progress.update_stored_height(99); + + // Pre-record processing for the only candidate wallet so the residual + // is empty and `tracker.track` returns `AlreadyProcessed`. + manager.tracker.record_processed(40, *key_40.hash(), &BTreeSet::from([wallet_id])); + + let events = manager.scan_batch(0).await.unwrap(); + let has_blocks_needed = events.iter().any(|e| matches!(e, SyncEvent::BlocksNeeded { .. })); + assert!(!has_blocks_needed, "AlreadyProcessed must not emit BlocksNeeded"); + } + + /// `scan_batch` for a wallet added at runtime whose address matches a + /// block already processed for another wallet must re-emit `BlocksNeeded` + /// with only the late wallet in the attribution set so the block reloads + /// from storage and applies for the late wallet without disturbing the + /// already-processed one. + #[tokio::test] + async fn test_scan_batch_late_wallet_recovers_already_processed_block() { + let early: WalletId = [0xE1; 32]; + let late: WalletId = [0xE2; 32]; + let address = dashcore::Address::dummy(Network::Regtest, 9); + + let multi = MultiMockWallet::new(); + let multi = Arc::new(RwLock::new(multi)); + { + let mut w = multi.write().await; + w.insert_wallet( + early, + MockWalletState { + addresses: vec![address.clone()], + synced_height: 0, + last_processed_height: 0, + }, + ); + w.insert_wallet( + late, + MockWalletState { + addresses: vec![address.clone()], + synced_height: 0, + last_processed_height: 0, + }, + ); + } + let mut manager = create_multi_test_manager(multi).await; + manager.set_state(SyncState::Syncing); + + let (key_40, f_40) = filter_for_address(40, &address); + let mut filters: HashMap = HashMap::new(); + filters.insert(key_40.clone(), f_40); + + let mut batch = FiltersBatch::new(0, 99, filters); + batch.mark_verified(); + manager.active_batches.insert(0, batch); + manager.progress.update_stored_height(99); + + // The early wallet has already had this block applied. The late + // wallet has not. Both wallets' addresses match the filter at 40. + manager.tracker.record_processed(40, *key_40.hash(), &BTreeSet::from([early])); + + let events = manager.scan_batch(0).await.unwrap(); + let blocks = events + .iter() + .find_map(|e| match e { + SyncEvent::BlocksNeeded { + blocks, + } => Some(blocks), + _ => None, + }) + .expect("late wallet must trigger a BlocksNeeded re-emit"); + let attribution = blocks.get(&key_40).expect("entry for the recovered block"); + assert!(attribution.contains(&late), "late wallet must receive the block"); + assert!( + !attribution.contains(&early), + "early wallet was already processed for this block, must be excluded" + ); + } + + /// `try_commit_batches` prunes `processed_blocks_per_wallet` entries at + /// or below the new committed_height, since they cannot be reached again + /// without `clear_in_flight_state` wiping the map outright. + #[tokio::test] + async fn test_commit_prunes_processed_blocks_per_wallet() { + let mut manager = create_test_manager().await; + manager.set_state(SyncState::Syncing); + + let wallet_id: WalletId = [0xFA; 32]; + let hash_in = dashcore::block::Header::dummy(0).block_hash(); + let hash_out = dashcore::block::Header::dummy(1).block_hash(); + let key_in = FilterMatchKey::new(2500, hash_in); + let key_out = FilterMatchKey::new(7500, hash_out); + manager.tracker.record_processed(2500, hash_in, &BTreeSet::from([wallet_id])); + manager.tracker.record_processed(7500, hash_out, &BTreeSet::from([wallet_id])); + + // Batch 0..=4999 is ready to commit; pruning drops the 2500 entry but + // keeps the 7500 entry which sits above the new committed_height. + let mut batch = FiltersBatch::new(0, 4999, HashMap::new()); + batch.set_pending_blocks(0); + batch.mark_scanned(); + batch.mark_rescan_complete(); + manager.active_batches.insert(0, batch); + + manager.try_commit_batches().await.unwrap(); + + assert_eq!(manager.progress.committed_height(), 4999); + // The 2500 record is gone: a fresh `track` for the same wallet + // re-tracks the block instead of returning `AlreadyProcessed`. + assert!(matches!( + manager.tracker.track(&key_in, 0, BTreeSet::from([wallet_id])), + BlockTrackResult::NewlyTracked { .. } + )); + // The 7500 record survives above the committed height. + assert_eq!( + manager.tracker.track(&key_out, 0, BTreeSet::from([wallet_id])), + BlockTrackResult::AlreadyProcessed + ); + } + + /// `tick` rescan with a wallet that has a non-zero `synced_height`: the + /// batch must start at `synced_height + 1`, not at genesis. + #[tokio::test] + async fn test_tick_rescans_from_wallet_synced_height_not_genesis() { + let mut manager = create_test_manager().await; + + // Wallet sits at synced_height=150, manager committed at 300, so + // the wallet falls behind and the rescan trigger fires. + manager.wallet.write().await.update_wallet_synced_height(&MOCK_WALLET_ID, 150); + manager.set_state(SyncState::Synced); + manager.progress.update_committed_height(300); + manager.progress.update_stored_height(300); + manager.progress.update_filter_header_tip_height(300); + manager.progress.update_target_height(300); + + // Headers must exist in storage so start_download can resolve them. + let headers = dashcore::block::Header::dummy_batch(0..301); + manager.header_storage.write().await.store_headers(&headers).await.unwrap(); + + let (tx, _rx) = unbounded_channel(); + let _ = manager.tick(&RequestSender::new(tx)).await.unwrap(); + + // Batch must start at 151, not at 0. + assert!(manager.active_batches.contains_key(&151)); + assert!(!manager.active_batches.contains_key(&0)); + } + + /// scan_batch's union-then-attribute pass must not falsely attribute a + /// block to a wallet whose own address does not actually match the + /// filter, even if the union pass picked up the block. + #[tokio::test] + async fn test_scan_batch_attribution_excludes_non_matching_wallet() { + let wallet_a: WalletId = [0xAA; 32]; + let wallet_b: WalletId = [0xBB; 32]; + let address_a = dashcore::Address::dummy(Network::Regtest, 31); + let address_b = dashcore::Address::dummy(Network::Regtest, 32); + + let multi = MultiMockWallet::new(); + let multi = Arc::new(RwLock::new(multi)); + { + let mut w = multi.write().await; + w.insert_wallet( + wallet_a, + MockWalletState { + addresses: vec![address_a.clone()], + synced_height: 0, + last_processed_height: 0, + }, + ); + w.insert_wallet( + wallet_b, + MockWalletState { + addresses: vec![address_b.clone()], + synced_height: 0, + last_processed_height: 0, + }, + ); + } + let mut manager = create_multi_test_manager(multi).await; + manager.set_state(SyncState::Syncing); + + // Filter at height 40 only matches address_a. address_b is in the + // union but does not match this specific filter, so the attribution + // pass must exclude wallet_b. + let (key_40, f_40) = filter_for_address(40, &address_a); + let mut filters: HashMap = HashMap::new(); + filters.insert(key_40.clone(), f_40); + + let mut batch = FiltersBatch::new(0, 99, filters); + batch.mark_verified(); + manager.active_batches.insert(0, batch); + manager.progress.update_stored_height(99); + + let events = manager.scan_batch(0).await.unwrap(); + let blocks = events + .iter() + .find_map(|e| match e { + SyncEvent::BlocksNeeded { + blocks, + } => Some(blocks), + _ => None, + }) + .expect("BlocksNeeded event"); + let attribution = blocks.get(&key_40).expect("entry for the matching block"); + assert!(attribution.contains(&wallet_a)); + assert!(!attribution.contains(&wallet_b)); } #[tokio::test] @@ -960,22 +1740,79 @@ mod tests { let mut manager = create_test_manager().await; manager.set_state(SyncState::Syncing); - // Add blocks from different batches + let wallet: WalletId = [1; 32]; let hash1 = dashcore::block::Header::dummy(0).block_hash(); let hash2 = dashcore::block::Header::dummy(1).block_hash(); - manager.blocks_remaining.insert(hash1, (100, 0)); // batch 0 - manager.blocks_remaining.insert(hash2, (5100, 5000)); // batch 5000 + // Track blocks from two different batches. + manager.tracker.track(&FilterMatchKey::new(100, hash1), 0, BTreeSet::from([wallet])); + manager.tracker.track(&FilterMatchKey::new(5100, hash2), 5000, BTreeSet::from([wallet])); - // Verify batch association - assert_eq!(manager.blocks_remaining.get(&hash1), Some(&(100, 0))); - assert_eq!(manager.blocks_remaining.get(&hash2), Some(&(5100, 5000))); + // Each block round-trips its (height, batch_start) on `finish_in_flight`. + assert_eq!(manager.tracker.finish_in_flight(&hash1), Some((100, 0))); + assert_eq!(manager.tracker.finish_in_flight(&hash2), Some((5100, 5000))); + } + + #[tokio::test] + async fn test_track_block_match_per_wallet_residual() { + let mut manager = create_test_manager().await; + let hash = dashcore::block::Header::dummy(0).block_hash(); + let key = FilterMatchKey::new(100, hash); + let wallet_a: WalletId = [0xA1; 32]; + let wallet_b: WalletId = [0xB2; 32]; + + // First match for {A}: nothing tracked yet, helper records the block. + assert_eq!( + manager.tracker.track(&key, 0, BTreeSet::from([wallet_a])), + BlockTrackResult::NewlyTracked { + wallets: BTreeSet::from([wallet_a]) + } + ); + + // Second match for {A} while still in flight: residual is {A} (no + // processing has been recorded yet), so InFlight re-emits to merge + // late-arriving wallet ids into the pipeline's pending set. + assert_eq!( + manager.tracker.track(&key, 0, BTreeSet::from([wallet_a])), + BlockTrackResult::InFlight { + wallets: BTreeSet::from([wallet_a]) + } + ); + + // Block is delivered and processed for {A}. Round-trip the (height, + // batch_start) tuple while removing the in-flight entry, then record + // the processing. + assert_eq!(manager.tracker.finish_in_flight(&hash), Some((100, 0))); + manager.tracker.record_processed(100, hash, &BTreeSet::from([wallet_a])); + + // Late-added wallet B's filter matches the same block. A is already + // processed, B is not — residual is {B} and it gets re-queued via + // NewlyTracked so the block reloads from storage and applies for B + // only. + assert_eq!( + manager.tracker.track(&key, 5000, BTreeSet::from([wallet_a, wallet_b])), + BlockTrackResult::NewlyTracked { + wallets: BTreeSet::from([wallet_b]) + } + ); + assert_eq!(manager.tracker.finish_in_flight(&hash), Some((100, 5000))); + + // After B is also processed, a third match including only A and B + // returns AlreadyProcessed since both are covered. + manager.tracker.record_processed(100, hash, &BTreeSet::from([wallet_b])); + assert_eq!( + manager.tracker.track(&key, 5000, BTreeSet::from([wallet_a, wallet_b])), + BlockTrackResult::AlreadyProcessed + ); + assert!(manager.tracker.finish_in_flight(&hash).is_none()); } #[tokio::test] async fn test_is_idle() { let mut manager = create_test_manager().await; let hash = dashcore::block::Header::dummy(0).block_hash(); + let key = FilterMatchKey::new(100, hash); + let wallet_id: WalletId = [0xCC; 32]; // Fresh manager is idle assert!(manager.is_idle()); @@ -985,13 +1822,13 @@ mod tests { assert!(!manager.is_idle()); manager.active_batches.clear(); - manager.blocks_remaining.insert(hash, (0, 0)); + manager.tracker.track(&key, 0, BTreeSet::from([wallet_id])); assert!(!manager.is_idle()); - manager.blocks_remaining.clear(); + manager.tracker.clear(); - manager.filters_matched.insert(hash); + manager.tracker.record_processed(100, hash, &BTreeSet::from([wallet_id])); assert!(!manager.is_idle()); - manager.filters_matched.clear(); + manager.tracker.clear(); manager.pending_batches.insert(FiltersBatch::new(0, 999, HashMap::new())); assert!(!manager.is_idle()); @@ -1003,8 +1840,8 @@ mod tests { // Populate all fields, then clear_in_flight_state restores idleness manager.active_batches.insert(0, FiltersBatch::new(0, 999, HashMap::new())); - manager.blocks_remaining.insert(hash, (0, 0)); - manager.filters_matched.insert(hash); + manager.tracker.track(&key, 0, BTreeSet::from([wallet_id])); + manager.tracker.record_processed(100, hash, &BTreeSet::from([wallet_id])); manager.pending_batches.insert(FiltersBatch::new(1000, 1999, HashMap::new())); manager.filter_pipeline.init(2000, 2999); assert!(!manager.is_idle()); @@ -1026,13 +1863,15 @@ mod tests { // Add addresses using test utility let addr1 = dashcore::Address::dummy(Network::Testnet, 1); let addr2 = dashcore::Address::dummy(Network::Testnet, 2); + let wallet_id: WalletId = [7; 32]; - batch.add_addresses([addr1.clone(), addr2.clone()]); + batch.add_addresses_for_wallet(wallet_id, [addr1.clone(), addr2.clone()]); let collected = batch.take_collected_addresses(); - assert_eq!(collected.len(), 2); - assert!(collected.contains(&addr1)); - assert!(collected.contains(&addr2)); + let for_wallet = collected.get(&wallet_id).expect("wallet entry"); + assert_eq!(for_wallet.len(), 2); + assert!(for_wallet.contains(&addr1)); + assert!(for_wallet.contains(&addr2)); // After take, should be empty assert!(batch.take_collected_addresses().is_empty()); @@ -1044,7 +1883,7 @@ mod tests { assert_eq!(manager.state(), SyncState::WaitForEvents); // Wallet committed to height 100, so scan_start will be 101 - manager.wallet.write().await.update_last_processed_height(100); + manager.wallet.write().await.update_wallet_synced_height(&MOCK_WALLET_ID, 100); // Filter headers only reached 50, so its below scan_start manager.progress.update_filter_header_tip_height(50); // Chain tip higher so the Synced early-return is not taken @@ -1121,7 +1960,7 @@ mod tests { // Simulate restart where everything is already synced but state is WaitForEvents. // committed == stored == filter_header_tip — start_download detects synced state. manager.set_state(SyncState::WaitForEvents); - manager.wallet.write().await.update_last_processed_height(100); + manager.wallet.write().await.update_wallet_synced_height(&MOCK_WALLET_ID, 100); manager.progress.update_committed_height(100); manager.progress.update_stored_height(100); manager.progress.update_filter_header_tip_height(100); @@ -1167,4 +2006,177 @@ mod tests { assert_eq!(manager.state(), SyncState::Synced); assert!(events.is_empty()); } + + /// A wallet whose `synced_height` sits below the manager's `committed_height` + /// must trigger a rescan from the wallet's height. This simulates a wallet + /// being added at runtime behind current scan progress. + #[tokio::test] + async fn test_tick_rescans_when_wallet_falls_behind_committed() { + let mut manager = create_test_manager().await; + + // Set up a single address on the wallet and a real matching filter at + // height 50 so scan_batch can emit a `BlocksNeeded` for it on rescan. + let address = dashcore::Address::dummy(Network::Regtest, 7); + manager.wallet.write().await.set_addresses(vec![address.clone()]); + + // Build matching block + filter at height 50. + let tx = Transaction::dummy(&address, 0..0, &[50u64]); + let block_at_50 = Block::dummy(50, vec![tx]); + let filter_at_50 = BlockFilter::dummy(&block_at_50); + + // Headers must form a contiguous range so the storage segment is + // fully populated. Only the height-50 entry needs to be the real + // header; the rest are dummies and never get matched against. + let mut headers: Vec = dashcore::block::Header::dummy_batch(0..201); + headers[50] = block_at_50.header; + manager.header_storage.write().await.store_headers(&headers).await.unwrap(); + + // Persist a filter at every height in 0..=100 so `load_filters` over + // the initial batch range succeeds. Non-matching heights get a + // throwaway filter, only height 50 gets the address-matching one. + let mut filter_store = manager.filter_storage.write().await; + let dummy_filter = BlockFilter::new(&[0u8; 32]); + for h in 0..=100u32 { + if h == 50 { + filter_store.store_filter(h, &filter_at_50.content).await.unwrap(); + } else { + filter_store.store_filter(h, &dummy_filter.content).await.unwrap(); + } + } + drop(filter_store); + + // Manager believes filters are committed up to 100. Filter headers + // and target are pinned at 100 too so start_download immediately + // scans the freshly created batch instead of waiting for downloads. + manager.set_state(SyncState::Synced); + manager.progress.update_committed_height(100); + manager.progress.update_stored_height(100); + manager.progress.update_filter_header_tip_height(100); + manager.progress.update_target_height(100); + + // Pre-populate in-flight state so we can verify clear_in_flight_state runs. + manager.active_batches.insert(101, FiltersBatch::new(101, 200, HashMap::new())); + let stale_hash = dashcore::block::Header::dummy(0).block_hash(); + let stale_key = FilterMatchKey::new(150, stale_hash); + manager.tracker.record_processed(150, stale_hash, &BTreeSet::from([MOCK_WALLET_ID])); + manager.filter_pipeline.init(101, 200); + + // MockWallet defaults to synced_height=0, so wallets_behind(100) = {MOCK_WALLET_ID}. + assert_eq!(manager.wallet.read().await.synced_height(), 0); + + let (tx, _rx) = unbounded_channel(); + let requests = RequestSender::new(tx); + + // Sanity: the pre-populated stale processed record is present, so + // `track` for the same wallet would short-circuit to AlreadyProcessed. + assert_eq!( + manager.tracker.track(&stale_key, 0, BTreeSet::from([MOCK_WALLET_ID])), + BlockTrackResult::AlreadyProcessed + ); + // Undo the side effect of the probing `track` so the original + // processed record is the only state present going into `tick`. + manager.tracker.clear(); + manager.tracker.record_processed(150, stale_hash, &BTreeSet::from([MOCK_WALLET_ID])); + + let events = manager.tick(&requests).await.unwrap(); + + // Old in-flight state was cleared and a fresh batch was created at scan_start=0. + assert!(!manager.active_batches.contains_key(&101)); + assert!(manager.active_batches.contains_key(&0)); + // The stale pre-populated record was wiped by `clear_in_flight_state`: + // a fresh `track` for the same wallet now returns `NewlyTracked`. + assert!(matches!( + manager.tracker.track(&stale_key, 0, BTreeSet::from([MOCK_WALLET_ID])), + BlockTrackResult::NewlyTracked { .. } + )); + + // start_download set committed_height to scan_start - 1 = 0. + assert_eq!(manager.progress.committed_height(), 0); + assert_eq!(manager.state(), SyncState::Syncing); + + // Verify a `BlocksNeeded` event was emitted that includes MOCK_WALLET_ID + // for the matching block at height 50. + let blocks_needed = events + .iter() + .find_map(|e| match e { + SyncEvent::BlocksNeeded { + blocks, + } => Some(blocks), + _ => None, + }) + .expect("BlocksNeeded event from rescan"); + let key_50 = FilterMatchKey::new(50, block_at_50.block_hash()); + let attribution = blocks_needed.get(&key_50).expect("entry for matching block 50"); + assert!(attribution.contains(&MOCK_WALLET_ID)); + } + + /// When every managed wallet is at or beyond `committed_height`, the rescan + /// trigger must not fire even though the aggregate `synced_height` could + /// otherwise look stale. + #[tokio::test] + async fn test_tick_does_not_rescan_when_no_wallets_behind() { + let mut manager = create_test_manager().await; + + // Wallet at synced_height=200, manager committed at 100 → no wallets behind. + manager.wallet.write().await.update_wallet_synced_height(&MOCK_WALLET_ID, 200); + + manager.set_state(SyncState::Synced); + manager.progress.update_committed_height(100); + manager.progress.update_stored_height(100); + manager.progress.update_filter_header_tip_height(200); + manager.progress.update_target_height(200); + + let (tx, _rx) = unbounded_channel(); + let requests = RequestSender::new(tx); + + let events = manager.tick(&requests).await.unwrap(); + + assert!(events.is_empty()); + assert_eq!(manager.progress.committed_height(), 100); + assert_eq!(manager.state(), SyncState::Synced); + assert!(manager.active_batches.is_empty()); + } + + /// `committed_height = 0` on a fresh manager must not falsely trip the + /// rescan trigger. `wallets_behind(0)` returns an empty set since heights + /// are unsigned, so no wallet can be strictly less than 0. + #[tokio::test] + async fn test_tick_does_not_rescan_at_genesis_committed() { + let mut manager = create_test_manager().await; + // Default state: committed_height=0, wallet synced_height=0, state=WaitForEvents. + assert_eq!(manager.progress.committed_height(), 0); + assert_eq!(manager.state(), SyncState::WaitForEvents); + + let (tx, _rx) = unbounded_channel(); + let requests = RequestSender::new(tx); + + let events = manager.tick(&requests).await.unwrap(); + + assert!(events.is_empty()); + assert!(manager.is_idle()); + assert_eq!(manager.state(), SyncState::WaitForEvents); + } + + /// The rescan trigger only fires in `Syncing | Synced | WaitForEvents`. + /// `WaitingForConnections` must be skipped since we're not actively syncing. + #[tokio::test] + async fn test_tick_does_not_rescan_in_waiting_for_connections() { + let mut manager = create_test_manager().await; + manager.set_state(SyncState::WaitingForConnections); + manager.progress.update_committed_height(100); + + // Wallet behind committed — would normally trip the trigger. + assert!(!manager.wallet.read().await.wallets_behind(100).is_empty()); + + let (tx, _rx) = unbounded_channel(); + let requests = RequestSender::new(tx); + + let events = manager.tick(&requests).await.unwrap(); + + assert!(events.is_empty()); + // committed_height not lowered, no batches created. + assert_eq!(manager.progress.committed_height(), 100); + assert_eq!(manager.state(), SyncState::WaitingForConnections); + assert!(manager.active_batches.is_empty()); + } } diff --git a/dash-spv/src/sync/filters/mod.rs b/dash-spv/src/sync/filters/mod.rs index a930e87da..e65bb1b30 100644 --- a/dash-spv/src/sync/filters/mod.rs +++ b/dash-spv/src/sync/filters/mod.rs @@ -1,5 +1,6 @@ mod batch; mod batch_tracker; +mod block_match_tracker; mod manager; mod pipeline; mod progress; diff --git a/dash-spv/src/sync/filters/sync_manager.rs b/dash-spv/src/sync/filters/sync_manager.rs index 45b341465..dadf490e8 100644 --- a/dash-spv/src/sync/filters/sync_manager.rs +++ b/dash-spv/src/sync/filters/sync_manager.rs @@ -41,8 +41,7 @@ impl< fn clear_in_flight_state(&mut self) { self.active_batches.clear(); - self.blocks_remaining.clear(); - self.filters_matched.clear(); + self.tracker.clear(); self.pending_batches.clear(); self.filter_pipeline = FiltersPipeline::new(); } @@ -156,12 +155,17 @@ impl< SyncEvent::BlockProcessed { block_hash, height, + wallets, new_addresses, .. } => { + // Record per-wallet processing so a future scan can give a + // late-added wallet its own pass at this block via the + // `tracker.track` residual. + self.tracker.record_processed(*height, *block_hash, wallets); + // Check if this block is part of our tracked blocks - if let Some((_, batch_start)) = self.blocks_remaining.remove(block_hash) { - // Decrement this batch's pending_blocks count + if let Some((_, batch_start)) = self.tracker.finish_in_flight(block_hash) { if let Some(batch) = self.active_batches.get_mut(&batch_start) { batch.decrement_pending_blocks(); tracing::debug!( @@ -173,16 +177,16 @@ impl< ); } - // Collect new addresses in the batch for deferred rescan at commit time. - // This batches rescans for efficiency and ensures all blocks from - // a BlocksNeeded event are processed before triggering new rescans. - if !new_addresses.is_empty() { + // Collect per-wallet new addresses for deferred rescan at commit time. + for (wallet_id, addrs) in new_addresses { + if addrs.is_empty() { + continue; + } if let Some(batch) = self.active_batches.get_mut(&batch_start) { - batch.add_addresses(new_addresses.iter().cloned()); + batch.add_addresses_for_wallet(*wallet_id, addrs.iter().cloned()); } } - // Try to commit/scan/create batches return self.try_process_batch().await; } } @@ -194,6 +198,30 @@ impl< } async fn tick(&mut self, requests: &RequestSender) -> SyncResult> { + // Detect a wallet that was added behind our scan progress and rescan + // from its `synced_height`. Reset committed_height to the lowest + // synced_height across the stale wallets only, so already-synced + // wallets are not re-scanned from scratch. + if matches!(self.state(), SyncState::Syncing | SyncState::Synced | SyncState::WaitForEvents) + { + let committed = self.progress.committed_height(); + let wallet_read = self.wallet.read().await; + let behind = wallet_read.wallets_behind(committed); + let stale_min_synced = + behind.iter().map(|id| wallet_read.wallet_synced_height(id)).min(); + drop(wallet_read); + if let Some(stale_min_synced) = stale_min_synced { + tracing::info!( + "Wallet synced_height {} fell below filter committed_height {}, restarting scan", + stale_min_synced, + committed + ); + self.clear_in_flight_state(); + self.progress.update_committed_height(stale_min_synced); + return self.start_download(requests).await; + } + } + // TODO: Get rid of the send pending in here? Or decouple it from the header storage? // Run tick when Syncing OR when Synced with pending work (new blocks arriving) let has_pending_work = !self.active_batches.is_empty(); diff --git a/dash-spv/src/sync/mempool/sync_manager.rs b/dash-spv/src/sync/mempool/sync_manager.rs index 806905224..aef25554f 100644 --- a/dash-spv/src/sync/mempool/sync_manager.rs +++ b/dash-spv/src/sync/mempool/sync_manager.rs @@ -194,6 +194,7 @@ mod tests { use crate::test_utils::test_socket_address; use dashcore::hashes::Hash; use key_wallet_manager::test_utils::MockWallet; + use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; @@ -388,7 +389,8 @@ mod tests { let event = SyncEvent::BlockProcessed { block_hash: dashcore::BlockHash::all_zeros(), height: 1001, - new_addresses: vec![], + wallets: BTreeSet::new(), + new_addresses: BTreeMap::new(), confirmed_txids: txids.clone(), }; let events = manager.handle_sync_event(&event, &requests).await.unwrap(); @@ -573,7 +575,8 @@ mod tests { let event = SyncEvent::BlockProcessed { block_hash: dashcore::BlockHash::all_zeros(), height: 1001, - new_addresses: vec![], + wallets: BTreeSet::new(), + new_addresses: BTreeMap::new(), confirmed_txids: vec![dashcore::Txid::all_zeros()], }; manager.handle_sync_event(&event, &requests).await.unwrap(); @@ -599,7 +602,8 @@ mod tests { let event = SyncEvent::BlockProcessed { block_hash: dashcore::BlockHash::all_zeros(), height: 1001, - new_addresses: vec![], + wallets: BTreeSet::new(), + new_addresses: BTreeMap::new(), confirmed_txids: vec![], }; manager.handle_sync_event(&event, &requests).await.unwrap(); diff --git a/dash-spv/tests/dashd_sync/helpers.rs b/dash-spv/tests/dashd_sync/helpers.rs index cbd7abec6..c318224ce 100644 --- a/dash-spv/tests/dashd_sync/helpers.rs +++ b/dash-spv/tests/dashd_sync/helpers.rs @@ -98,7 +98,7 @@ pub(super) fn is_progress_event(event: &SyncEvent) -> bool { SyncEvent::BlockProcessed { new_addresses, .. - } => !new_addresses.is_empty(), + } => new_addresses.values().any(|v| !v.is_empty()), _ => false, } } diff --git a/key-wallet-ffi/src/wallet_manager_tests.rs b/key-wallet-ffi/src/wallet_manager_tests.rs index 3d062021a..9cb6ed66f 100644 --- a/key-wallet-ffi/src/wallet_manager_tests.rs +++ b/key-wallet-ffi/src/wallet_manager_tests.rs @@ -6,7 +6,7 @@ mod tests { use crate::error::{FFIError, FFIErrorCode}; use crate::{wallet, wallet_manager}; use dash_network::ffi::FFINetwork; - use key_wallet_manager::WalletInterface; + use key_wallet_manager::{WalletId, WalletInterface}; use std::ffi::{CStr, CString}; use std::ptr; use std::slice; @@ -442,13 +442,14 @@ mod tests { let height = unsafe { wallet_manager::wallet_manager_current_height(manager, error) }; assert_eq!(height, 0); - // Updating last-processed height without wallets is a no-op + // Updating last-processed height for an unknown wallet is a no-op. + let unknown_wallet: WalletId = [0xff; 32]; let new_height = 12345; unsafe { let manager_ref = &*manager; manager_ref.runtime.block_on(async { let mut manager_guard = manager_ref.manager.write().await; - manager_guard.update_last_processed_height(new_height); + manager_guard.update_wallet_last_processed_height(&unknown_wallet, new_height); }); } diff --git a/key-wallet-manager/examples/wallet_creation.rs b/key-wallet-manager/examples/wallet_creation.rs index d11abe13b..fd82448b8 100644 --- a/key-wallet-manager/examples/wallet_creation.rs +++ b/key-wallet-manager/examples/wallet_creation.rs @@ -144,8 +144,11 @@ fn main() { println!(" Current last-processed height (Testnet): {:?}", manager.last_processed_height()); - // Update last-processed height across all managed wallets - manager.update_last_processed_height(850_000); + // Advance every wallet's last-processed height through the per-wallet API. + let wallet_ids: Vec<_> = manager.list_wallets().into_iter().copied().collect(); + for wallet_id in &wallet_ids { + manager.update_wallet_last_processed_height(wallet_id, 850_000); + } println!(" Updated last-processed height to: {:?}", manager.last_processed_height()); println!("\n=== Summary ==="); diff --git a/key-wallet-manager/src/event_tests.rs b/key-wallet-manager/src/event_tests.rs index 1cf278130..fdbe91a20 100644 --- a/key-wallet-manager/src/event_tests.rs +++ b/key-wallet-manager/src/event_tests.rs @@ -15,6 +15,7 @@ use dashcore::{ }; use key_wallet::account::StandardAccountType; use key_wallet::AccountType; +use std::collections::BTreeSet; fn make_block(txdata: Vec, seed: u8, time: u32) -> Block { Block { @@ -226,7 +227,8 @@ async fn test_late_instant_send_lock_after_block_confirmation_emits_event() { // Confirm the transaction in a block first. let block = make_block(vec![tx.clone()], 0xe3, 4000); - manager.process_block(&block, 300).await; + let wallets = BTreeSet::from([wallet_id]); + manager.process_block_for_wallets(&block, 300, &wallets).await; let mut rx = manager.subscribe_events(); let lock = InstantLock { @@ -274,7 +276,8 @@ async fn test_block_with_new_tx_emits_inserted_record() { let tx = create_tx_paying_to(&addr, 0xcc); let block = make_block(vec![tx.clone()], 0xcc, 1000); - let result = manager.process_block(&block, 100).await; + let wallets = BTreeSet::from([wallet_id]); + let result = manager.process_block_for_wallets(&block, 100, &wallets).await; assert_eq!(result.new_txids.len(), 1); let events = drain_events(&mut rx); @@ -321,7 +324,8 @@ async fn test_block_confirming_known_mempool_tx_emits_updated_record() { let mut rx = manager.subscribe_events(); let block = make_block(vec![tx.clone()], 0xdd, 2000); - manager.process_block(&block, 200).await; + let wallets = BTreeSet::from([wallet_id]); + manager.process_block_for_wallets(&block, 200, &wallets).await; let events = drain_events(&mut rx); assert_eq!(events.len(), 1, "one BlockProcessed expected, got {:?}", events); @@ -409,7 +413,8 @@ async fn test_block_with_index_less_account_tx_carries_account_type() { let mut rx = manager.subscribe_events(); let block = make_block(vec![tx.clone()], 0xee, 9999); - manager.process_block(&block, 9000).await; + let wallets = BTreeSet::from([wallet_id]); + manager.process_block_for_wallets(&block, 9000, &wallets).await; let events = drain_events(&mut rx); let block_event = events @@ -442,11 +447,12 @@ async fn test_block_with_index_less_account_tx_carries_account_type() { #[tokio::test] async fn test_empty_block_for_idle_wallet_emits_nothing() { - let (mut manager, _wallet_id, _addr) = setup_manager_with_wallet(); + let (mut manager, wallet_id, _addr) = setup_manager_with_wallet(); let mut rx = manager.subscribe_events(); let block = make_block(Vec::new(), 0x55, 3000); - manager.process_block(&block, 50).await; + let wallets = BTreeSet::from([wallet_id]); + manager.process_block_for_wallets(&block, 50, &wallets).await; assert_no_events(&mut rx); } @@ -460,13 +466,14 @@ async fn test_block_processed_carries_matured_coinbase_record() { let coinbase_tx = make_coinbase_paying_to(&addr, 5_000_000_000); let coinbase_height = 100; let coinbase_block = make_block(vec![coinbase_tx.clone()], 0xc0, 4000); - manager.process_block(&coinbase_block, coinbase_height).await; + let wallets = BTreeSet::from([wallet_id]); + manager.process_block_for_wallets(&coinbase_block, coinbase_height, &wallets).await; // Advance to maturity height. With coinbase_height = 100, maturity is at // height 200. Processing block 200 must surface the matured record. let mut rx = manager.subscribe_events(); let mature_block = make_block(Vec::new(), 0xc1, 5000); - manager.process_block(&mature_block, coinbase_height + 100).await; + manager.process_block_for_wallets(&mature_block, coinbase_height + 100, &wallets).await; let events = drain_events(&mut rx); let block_event = events @@ -501,11 +508,11 @@ async fn test_block_processed_carries_matured_coinbase_record() { // --------------------------------------------------------------------------- #[tokio::test] -async fn test_update_synced_height_emits_event_per_wallet() { +async fn test_update_wallet_synced_height_emits_event_per_wallet() { let (mut manager, wallet_id, _addr) = setup_manager_with_wallet(); let mut rx = manager.subscribe_events(); - manager.update_synced_height(1000); + manager.update_wallet_synced_height(&wallet_id, 1000); let synced_events: Vec<_> = drain_events(&mut rx) .into_iter() @@ -521,15 +528,15 @@ async fn test_update_synced_height_emits_event_per_wallet() { } #[tokio::test] -async fn test_update_synced_height_does_not_re_emit_when_unchanged() { - let (mut manager, _wallet_id, _addr) = setup_manager_with_wallet(); +async fn test_update_wallet_synced_height_does_not_re_emit_when_unchanged() { + let (mut manager, wallet_id, _addr) = setup_manager_with_wallet(); let mut rx = manager.subscribe_events(); - manager.update_synced_height(2000); + manager.update_wallet_synced_height(&wallet_id, 2000); drain_events(&mut rx); // Re-calling with the same height must not emit another SyncHeightAdvanced - manager.update_synced_height(2000); + manager.update_wallet_synced_height(&wallet_id, 2000); let events = drain_events(&mut rx); assert!( !events.iter().any(|e| matches!(e, WalletEvent::SyncHeightAdvanced { .. })), @@ -538,7 +545,7 @@ async fn test_update_synced_height_does_not_re_emit_when_unchanged() { ); // Going backwards also must not emit - manager.update_synced_height(1500); + manager.update_wallet_synced_height(&wallet_id, 1500); let events = drain_events(&mut rx); assert!( !events.iter().any(|e| matches!(e, WalletEvent::SyncHeightAdvanced { .. })), diff --git a/key-wallet-manager/src/lib.rs b/key-wallet-manager/src/lib.rs index d581340ae..4e9ed5056 100644 --- a/key-wallet-manager/src/lib.rs +++ b/key-wallet-manager/src/lib.rs @@ -34,7 +34,7 @@ use key_wallet::wallet::managed_wallet_info::wallet_info_interface::WalletInfoIn use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; use key_wallet::{AccountType, Address, ExtendedPrivKey, Mnemonic, Network, Wallet}; use key_wallet::{ExtendedPubKey, WalletCoreBalance}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::str::FromStr; use tokio::sync::broadcast; @@ -73,8 +73,9 @@ pub struct CheckTransactionsResult { pub affected_wallets: Vec, /// Set to false if the transaction was already stored and is being re-processed (e.g., during rescan) pub is_new_transaction: bool, - /// New addresses generated during gap limit maintenance - pub new_addresses: Vec
, + /// New addresses generated during gap limit maintenance, attributed to the + /// wallet that produced them. + pub new_addresses: BTreeMap>, /// Total value received across all wallets pub total_received: u64, /// Total value sent across all wallets @@ -88,6 +89,13 @@ pub struct CheckTransactionsResult { pub per_wallet_updated_records: BTreeMap>, } +impl CheckTransactionsResult { + /// Iterate over every newly generated address regardless of wallet attribution. + pub(crate) fn all_new_addresses(&self) -> impl Iterator { + self.new_addresses.values().flatten() + } +} + /// High-level wallet manager that manages multiple wallets /// /// Each wallet can contain multiple accounts following BIP44 standard. @@ -460,16 +468,33 @@ impl WalletManager { update_state_if_found: bool, update_balance: bool, ) -> CheckTransactionsResult { - let mut result = CheckTransactionsResult::default(); + let wallet_ids: BTreeSet = self.wallets.keys().cloned().collect(); + self.check_transaction_in_wallets( + tx, + context, + &wallet_ids, + update_state_if_found, + update_balance, + ) + .await + } - // We need to iterate carefully since we're mutating - let wallet_ids: Vec = self.wallets.keys().cloned().collect(); + /// Check a transaction against the given subset of wallets and update their states if relevant. + pub(crate) async fn check_transaction_in_wallets( + &mut self, + tx: &Transaction, + context: TransactionContext, + wallet_ids: &BTreeSet, + update_state_if_found: bool, + update_balance: bool, + ) -> CheckTransactionsResult { + let mut result = CheckTransactionsResult::default(); for wallet_id in wallet_ids { // Get mutable references to both wallet and wallet_info // We need to use split borrowing to get around Rust's borrow checker - let wallet_opt = self.wallets.get_mut(&wallet_id); - let wallet_info_opt = self.wallet_infos.get_mut(&wallet_id); + let wallet_opt = self.wallets.get_mut(wallet_id); + let wallet_info_opt = self.wallet_infos.get_mut(wallet_id); if let (Some(wallet), Some(wallet_info)) = (wallet_opt, wallet_info_opt) { let check_result = wallet_info @@ -482,15 +507,12 @@ impl WalletManager { ) .await; - // If the transaction is relevant if check_result.is_relevant { - result.affected_wallets.push(wallet_id); - // If any wallet reports this as new, mark result as new + result.affected_wallets.push(*wallet_id); if check_result.is_new_transaction { result.is_new_transaction = true; } - // Aggregate totals and involved addresses across wallets result.total_received = result.total_received.saturating_add(check_result.total_received); result.total_sent = result.total_sent.saturating_add(check_result.total_sent); @@ -503,20 +525,26 @@ impl WalletManager { if !check_result.new_records.is_empty() { result .per_wallet_new_records - .entry(wallet_id) + .entry(*wallet_id) .or_default() .extend(check_result.new_records); } if !check_result.updated_records.is_empty() { result .per_wallet_updated_records - .entry(wallet_id) + .entry(*wallet_id) .or_default() .extend(check_result.updated_records); } } - result.new_addresses.extend(check_result.new_addresses); + if !check_result.new_addresses.is_empty() { + result + .new_addresses + .entry(*wallet_id) + .or_default() + .extend(check_result.new_addresses); + } } } diff --git a/key-wallet-manager/src/matching.rs b/key-wallet-manager/src/matching.rs index 0b61289f8..acc73865a 100644 --- a/key-wallet-manager/src/matching.rs +++ b/key-wallet-manager/src/matching.rs @@ -27,18 +27,33 @@ impl FilterMatchKey { } /// Check compact filters for addresses and return the keys that matched. +/// +/// Entries with `key.height() <= min_height` are skipped. Pass `0` to test +/// every filter in the input. pub fn check_compact_filters_for_addresses( input: &HashMap, addresses: Vec
, + min_height: CoreBlockHeight, ) -> BTreeSet { let script_pubkey_bytes: Vec> = addresses.iter().map(|address| address.script_pubkey().to_bytes()).collect(); let match_filter = |(key, filter): (&FilterMatchKey, &BlockFilter)| { - filter - .match_any(key.hash(), script_pubkey_bytes.iter().map(|v| v.as_slice())) - .unwrap_or(false) - .then_some(key.clone()) + if key.height() <= min_height { + return None; + } + match filter.match_any(key.hash(), script_pubkey_bytes.iter().map(|v| v.as_slice())) { + Ok(true) => Some(key.clone()), + Ok(false) => None, + Err(e) => { + tracing::warn!( + "filter match_any error at height {}: {}; treating as non-match", + key.height(), + e + ); + None + } + } }; #[cfg(feature = "parallel-filters")] @@ -60,7 +75,7 @@ mod tests { #[test] fn test_empty_input_returns_empty() { - let result = check_compact_filters_for_addresses(&HashMap::new(), vec![]); + let result = check_compact_filters_for_addresses(&HashMap::new(), vec![], 0); assert!(result.is_empty()); } @@ -75,7 +90,7 @@ mod tests { let mut input = HashMap::new(); input.insert(key.clone(), filter); - let output = check_compact_filters_for_addresses(&input, vec![]); + let output = check_compact_filters_for_addresses(&input, vec![], 0); assert!(!output.contains(&key)); } @@ -90,7 +105,7 @@ mod tests { let mut input = HashMap::new(); input.insert(key.clone(), filter); - let output = check_compact_filters_for_addresses(&input, vec![address]); + let output = check_compact_filters_for_addresses(&input, vec![address], 0); assert!(output.contains(&key)); } @@ -107,7 +122,7 @@ mod tests { let mut input = HashMap::new(); input.insert(key.clone(), filter); - let output = check_compact_filters_for_addresses(&input, vec![address]); + let output = check_compact_filters_for_addresses(&input, vec![address], 0); assert!(!output.contains(&key)); } @@ -137,7 +152,7 @@ mod tests { input.insert(key_2.clone(), filter_2); input.insert(key_3.clone(), filter_3); - let output = check_compact_filters_for_addresses(&input, vec![address_1, address_2]); + let output = check_compact_filters_for_addresses(&input, vec![address_1, address_2], 0); assert_eq!(output.len(), 2); assert!(output.contains(&key_1)); assert!(output.contains(&key_2)); @@ -160,7 +175,7 @@ mod tests { input.insert(key, filter); } - let output = check_compact_filters_for_addresses(&input, vec![address]); + let output = check_compact_filters_for_addresses(&input, vec![address], 0); // Verify output is sorted by height (ascending) let heights_out: Vec = output.iter().map(|k| k.height()).collect(); diff --git a/key-wallet-manager/src/process_block.rs b/key-wallet-manager/src/process_block.rs index af83b99f4..9c313e177 100644 --- a/key-wallet-manager/src/process_block.rs +++ b/key-wallet-manager/src/process_block.rs @@ -8,17 +8,21 @@ use dashcore::{Address, Block, Transaction}; use key_wallet::managed_account::transaction_record::TransactionRecord; use key_wallet::transaction_checking::{BlockInfo, TransactionContext}; use key_wallet::wallet::managed_wallet_info::wallet_info_interface::WalletInfoInterface; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use tokio::sync::broadcast; #[async_trait] impl WalletInterface for WalletManager { - async fn process_block( + async fn process_block_for_wallets( &mut self, block: &Block, height: CoreBlockHeight, + wallets: &BTreeSet, ) -> BlockProcessingResult { let mut result = BlockProcessingResult::default(); + if wallets.is_empty() { + return result; + } let info = BlockInfo::new(height, block.block_hash(), block.header.time); let mut per_wallet_inserted: BTreeMap> = BTreeMap::new(); @@ -26,9 +30,8 @@ impl WalletInterface for WalletM for tx in &block.txdata { let context = TransactionContext::InBlock(info); - let check_result = - self.check_transaction_in_all_wallets(tx, context, true, false).await; + self.check_transaction_in_wallets(tx, context, wallets, true, false).await; if !check_result.affected_wallets.is_empty() { if check_result.is_new_transaction { @@ -38,8 +41,9 @@ impl WalletInterface for WalletM } } - result.new_addresses.extend(check_result.new_addresses); - + for (wallet_id, addrs) in check_result.new_addresses { + result.new_addresses.entry(wallet_id).or_default().extend(addrs); + } for (wallet_id, records) in check_result.per_wallet_new_records { per_wallet_inserted.entry(wallet_id).or_default().extend(records); } @@ -48,7 +52,7 @@ impl WalletInterface for WalletM } } - self.finalize_block_advance(height, per_wallet_inserted, per_wallet_updated); + self.finalize_block_advance(height, wallets, per_wallet_inserted, per_wallet_updated); result } @@ -65,7 +69,8 @@ impl WalletInterface for WalletM } None => TransactionContext::Mempool, }; - let check_result = self.check_transaction_in_all_wallets(tx, context, true, false).await; + let mut check_result = + self.check_transaction_in_all_wallets(tx, context, true, false).await; let is_relevant = !check_result.affected_wallets.is_empty(); let net_amount = if is_relevant { @@ -82,7 +87,11 @@ impl WalletInterface for WalletM } } - for (wallet_id, records) in check_result.per_wallet_new_records { + let per_wallet_new_records = std::mem::take(&mut check_result.per_wallet_new_records); + let per_wallet_updated_records = + std::mem::take(&mut check_result.per_wallet_updated_records); + + for (wallet_id, records) in per_wallet_new_records { let Some(info) = self.wallet_infos.get(&wallet_id) else { continue; }; @@ -98,7 +107,7 @@ impl WalletInterface for WalletM } if let Some(lock) = instant_lock { - for (wallet_id, records) in check_result.per_wallet_updated_records { + for (wallet_id, records) in per_wallet_updated_records { if records.is_empty() { continue; } @@ -118,12 +127,13 @@ impl WalletInterface for WalletM } } + let new_addresses: Vec
= check_result.all_new_addresses().cloned().collect(); MempoolTransactionResult { is_relevant, net_amount, is_outgoing: net_amount < 0, addresses: check_result.involved_addresses, - new_addresses: check_result.new_addresses, + new_addresses, } } @@ -131,6 +141,10 @@ impl WalletInterface for WalletM self.monitored_addresses() } + fn monitored_addresses_for(&self, wallet_id: &WalletId) -> Vec
{ + self.wallet_infos.get(wallet_id).map(|info| info.monitored_addresses()).unwrap_or_default() + } + fn watched_outpoints(&self) -> Vec { self.watched_outpoints() } @@ -147,19 +161,31 @@ impl WalletInterface for WalletM self.wallet_infos.values().map(|info| info.last_processed_height()).max().unwrap_or(0) } - fn update_last_processed_height(&mut self, height: CoreBlockHeight) { - self.finalize_block_advance(height, BTreeMap::new(), BTreeMap::new()); - } - fn synced_height(&self) -> CoreBlockHeight { self.wallet_infos.values().map(|info| info.synced_height()).min().unwrap_or(0) } - fn update_synced_height(&mut self, height: CoreBlockHeight) { - for (wallet_id, info) in self.wallet_infos.iter_mut() { - let advanced = height > info.synced_height(); - info.update_synced_height(height); - if advanced { + fn wallets_behind(&self, height: CoreBlockHeight) -> BTreeSet { + self.wallet_infos + .iter() + .filter_map(|(id, info)| { + if info.synced_height() < height { + Some(*id) + } else { + None + } + }) + .collect() + } + + fn wallet_synced_height(&self, wallet_id: &WalletId) -> CoreBlockHeight { + self.wallet_infos.get(wallet_id).map(|info| info.synced_height()).unwrap_or(0) + } + + fn update_wallet_synced_height(&mut self, wallet_id: &WalletId, height: CoreBlockHeight) { + if let Some(info) = self.wallet_infos.get_mut(wallet_id) { + if height > info.synced_height() { + info.update_synced_height(height); let _ = self.event_sender.send(WalletEvent::SyncHeightAdvanced { wallet_id: *wallet_id, height, @@ -168,6 +194,15 @@ impl WalletInterface for WalletM } } + fn update_wallet_last_processed_height( + &mut self, + wallet_id: &WalletId, + height: CoreBlockHeight, + ) { + let wallets = BTreeSet::from([*wallet_id]); + self.finalize_block_advance(height, &wallets, BTreeMap::new(), BTreeMap::new()); + } + fn subscribe_events(&self) -> broadcast::Receiver { self.event_sender.subscribe() } @@ -231,42 +266,67 @@ impl WalletInterface for WalletM } impl WalletManager { - /// Advance every wallet's last-processed height to `height`, collect the - /// matured-coinbase window `(prior, height]` per wallet, and emit a - /// `BlockProcessed` event for each wallet whose balance changed or whose - /// `inserted`/`updated`/`matured` lists are non-empty. Snapshots are taken - /// before the advance so events carry the post-advance balance. + /// For each wallet in `wallets`: advance `last_processed_height` to + /// `height` (monotonically — never backwards), refresh the cached balance, + /// collect matured-coinbase records over the window `(prior, height]`, and + /// emit a `BlockProcessed` event whose balance reflects the post-advance + /// state. A wallet whose `last_processed_height` is already at or above + /// `height` keeps its height but still gets a balance refresh, so rescan + /// passes that hit blocks below the wallet's checkpoint surface UTXO + /// changes without dragging the height backwards. fn finalize_block_advance( &mut self, height: CoreBlockHeight, + wallets: &BTreeSet, mut per_wallet_inserted: BTreeMap>, mut per_wallet_updated: BTreeMap>, ) { + if wallets.is_empty() { + return; + } + let snapshot = self.snapshot_balances(); - let prior_heights: BTreeMap = self - .wallet_infos + let prior_heights: BTreeMap = wallets .iter() - .map(|(id, info)| (*id, info.last_processed_height())) + .filter_map(|id| { + self.wallet_infos.get(id).map(|info| (*id, info.last_processed_height())) + }) .collect(); // Collect matured coinbase records before advancing the height so the - // (old, new] window is well-defined per wallet. + // (old, new] window is well-defined per wallet. Wallets whose height + // is already at or past `height` contribute no matured records on this + // pass (their matured window is empty). let mut per_wallet_matured: BTreeMap> = BTreeMap::new(); - for (wallet_id, info) in &self.wallet_infos { + for wallet_id in wallets { + let Some(info) = self.wallet_infos.get(wallet_id) else { + continue; + }; let old_height = prior_heights.get(wallet_id).copied().unwrap_or(0); - let matured = info.matured_coinbase_records(old_height, height); - if !matured.is_empty() { - per_wallet_matured.insert(*wallet_id, matured); + if height > old_height { + let matured = info.matured_coinbase_records(old_height, height); + if !matured.is_empty() { + per_wallet_matured.insert(*wallet_id, matured); + } } } // Advance heights and refresh balances. Event emission happens below // so each wallet's event carries the post-advance balance. - for info in self.wallet_infos.values_mut() { - info.update_last_processed_height(height); + for wallet_id in wallets { + if let Some(info) = self.wallet_infos.get_mut(wallet_id) { + if height > info.last_processed_height() { + info.update_last_processed_height(height); + } else { + info.update_balance(); + } + } } - for (wallet_id, info) in &self.wallet_infos { + for wallet_id in wallets { + let Some(info) = self.wallet_infos.get(wallet_id) else { + continue; + }; let new_balance = info.balance(); let inserted = per_wallet_inserted.remove(wallet_id).unwrap_or_default(); let updated = per_wallet_updated.remove(wallet_id).unwrap_or_default(); @@ -300,10 +360,11 @@ mod tests { BlockHash, Network, OutPoint, ScriptBuf, TxIn, TxMerkleNode, TxOut, Txid, Witness, }; use key_wallet::account::StandardAccountType; + use key_wallet::mnemonic::Language; use key_wallet::wallet::initialization::WalletAccountCreationOptions; use key_wallet::wallet::managed_wallet_info::transaction_building::AccountTypePreference; use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; - use key_wallet::AccountType; + use key_wallet::{AccountType, Mnemonic}; fn make_block(txdata: Vec) -> Block { Block { @@ -322,15 +383,9 @@ mod tests { #[tokio::test] async fn test_last_processed_height() { let mut manager: WalletManager = WalletManager::new(Network::Testnet); - // Initial state - assert_eq!(manager.last_processed_height(), 0); - // Updating last-processed height without wallets is a no-op - manager.update_last_processed_height(1000); - assert_eq!(manager.last_processed_height(), 0); - // Still a no-op without wallets - manager.update_last_processed_height(5000); assert_eq!(manager.last_processed_height(), 0); - manager.update_last_processed_height(10); + let unknown: WalletId = [0xff; 32]; + manager.update_wallet_last_processed_height(&unknown, 1000); assert_eq!(manager.last_processed_height(), 0); } @@ -386,12 +441,13 @@ mod tests { #[tokio::test] async fn test_process_block_emits_block_processed() { - let (mut manager, _wallet_id, addr) = setup_manager_with_wallet(); + let (mut manager, wallet_id, addr) = setup_manager_with_wallet(); let tx = create_tx_paying_to(&addr, 0xcc); let block = make_block(vec![tx.clone()]); let mut rx = manager.subscribe_events(); - manager.process_block(&block, 100).await; + let wallets = BTreeSet::from([wallet_id]); + manager.process_block_for_wallets(&block, 100, &wallets).await; let mut found = false; while let Ok(event) = rx.try_recv() { @@ -414,11 +470,11 @@ mod tests { } #[tokio::test] - async fn test_update_synced_height_emits_sync_height_advanced() { + async fn test_update_wallet_synced_height_emits_sync_height_advanced() { let (mut manager, wallet_id, _addr) = setup_manager_with_wallet(); let mut rx = manager.subscribe_events(); - manager.update_synced_height(500); + manager.update_wallet_synced_height(&wallet_id, 500); let mut found = false; while let Ok(event) = rx.try_recv() { @@ -432,7 +488,39 @@ mod tests { found = true; } } - assert!(found, "should emit SyncHeightAdvanced on update_synced_height"); + assert!(found, "should emit SyncHeightAdvanced on update_wallet_synced_height"); + } + + #[tokio::test] + async fn test_process_block_for_wallets_only_touches_listed() { + let (mut manager, wallet_id1, _) = setup_manager_with_wallet(); + let mnemonic2 = Mnemonic::generate(12, Language::English).unwrap(); + let wallet_id2 = manager + .create_wallet_from_mnemonic( + &mnemonic2.to_string(), + "", + 0, + WalletAccountCreationOptions::Default, + ) + .unwrap(); + + let block = make_block(vec![]); + + let only_w1 = BTreeSet::from([wallet_id1]); + manager.process_block_for_wallets(&block, 200, &only_w1).await; + assert_eq!(manager.get_wallet_info(&wallet_id1).unwrap().last_processed_height(), 200); + assert_eq!(manager.get_wallet_info(&wallet_id2).unwrap().last_processed_height(), 0); + + let only_w2 = BTreeSet::from([wallet_id2]); + manager.process_block_for_wallets(&block, 300, &only_w2).await; + assert_eq!(manager.get_wallet_info(&wallet_id1).unwrap().last_processed_height(), 200); + assert_eq!(manager.get_wallet_info(&wallet_id2).unwrap().last_processed_height(), 300); + + // Empty wallet set is a no-op even though the height is past both wallets. + let none = BTreeSet::new(); + manager.process_block_for_wallets(&block, 1000, &none).await; + assert_eq!(manager.get_wallet_info(&wallet_id1).unwrap().last_processed_height(), 200); + assert_eq!(manager.get_wallet_info(&wallet_id2).unwrap().last_processed_height(), 300); } #[tokio::test] @@ -518,9 +606,13 @@ mod tests { assert_eq!(manager.monitor_revision(), expected_rev, "after get_change_address"); } - // update_last_processed_height does NOT bump - manager.update_last_processed_height(1000); - assert_eq!(manager.monitor_revision(), expected_rev, "after update_last_processed_height"); + // `update_wallet_last_processed_height` does not bump the monitor revision. + manager.update_wallet_last_processed_height(&wallet_id, 1000); + assert_eq!( + manager.monitor_revision(), + expected_rev, + "after update_wallet_last_processed_height" + ); // process_mempool_transaction bumps from UTXO changes and possibly // new addresses generated via gap limit maintenance @@ -542,11 +634,12 @@ mod tests { "after process_instant_send_lock" ); - // process_block bumps from UTXO changes and possibly new addresses + // process_block_for_wallets bumps from UTXO changes and possibly new addresses let rev_before_block = manager.monitor_revision(); let tx2 = create_tx_paying_to(&addr, 0xd1); let block = make_block(vec![tx2]); - let _result = manager.process_block(&block, 100).await; + let block_wallets = BTreeSet::from([wallet_id]); + let _result = manager.process_block_for_wallets(&block, 100, &block_wallets).await; assert!( manager.monitor_revision() > rev_before_block, "block with tx paying to our address should bump revision (UTXO added)" diff --git a/key-wallet-manager/src/test_utils/mock_wallet.rs b/key-wallet-manager/src/test_utils/mock_wallet.rs index 180e064ac..bd16d60d6 100644 --- a/key-wallet-manager/src/test_utils/mock_wallet.rs +++ b/key-wallet-manager/src/test_utils/mock_wallet.rs @@ -1,18 +1,27 @@ -use crate::{BlockProcessingResult, MempoolTransactionResult, WalletEvent, WalletInterface}; +use crate::{ + BlockProcessingResult, MempoolTransactionResult, WalletEvent, WalletId, WalletInterface, +}; use dashcore::ephemerealdata::instant_lock::InstantLock; use dashcore::prelude::CoreBlockHeight; use dashcore::{Address, Block, OutPoint, Transaction, Txid}; use key_wallet::transaction_checking::TransactionContext; +use std::collections::BTreeSet; use std::sync::Arc; use tokio::sync::{broadcast, Mutex}; // Type alias for captured IS lock payloads type InstantLockCaptures = Arc)>>>; +/// Default wallet ID used by `MockWallet` and `NonMatchingMockWallet` for tests +/// that don't care about per-wallet attribution. +pub const MOCK_WALLET_ID: WalletId = [0u8; 32]; + pub struct MockWallet { + wallet_id: WalletId, processed_blocks: Arc>>, processed_transactions: Arc>>, last_processed_height: CoreBlockHeight, + synced_height: CoreBlockHeight, event_sender: broadcast::Sender, /// When true, process_mempool_transaction returns is_relevant=true. mempool_relevant: bool, @@ -45,9 +54,11 @@ impl MockWallet { pub fn new() -> Self { let (event_sender, _) = broadcast::channel(16); Self { + wallet_id: MOCK_WALLET_ID, processed_blocks: Arc::new(Mutex::new(Vec::new())), processed_transactions: Arc::new(Mutex::new(Vec::new())), last_processed_height: 0, + synced_height: 0, event_sender, mempool_relevant: false, addresses: Vec::new(), @@ -61,6 +72,11 @@ impl MockWallet { } } + /// Override the wallet id used for per-wallet API surfaces. + pub fn set_wallet_id(&mut self, wallet_id: WalletId) { + self.wallet_id = wallet_id; + } + /// Configure whether mempool transactions are reported as relevant. pub fn set_mempool_relevant(&mut self, relevant: bool) { self.mempool_relevant = relevant; @@ -108,14 +124,25 @@ impl MockWallet { #[async_trait::async_trait] impl WalletInterface for MockWallet { - async fn process_block(&mut self, block: &Block, height: u32) -> BlockProcessingResult { + async fn process_block_for_wallets( + &mut self, + block: &Block, + height: u32, + wallets: &BTreeSet, + ) -> BlockProcessingResult { + if !wallets.contains(&self.wallet_id) { + return BlockProcessingResult::default(); + } let mut processed = self.processed_blocks.lock().await; processed.push((block.block_hash(), height)); + if height > self.last_processed_height { + self.last_processed_height = height; + } BlockProcessingResult { new_txids: block.txdata.iter().map(|tx| tx.txid()).collect(), existing_txids: Vec::new(), - new_addresses: Vec::new(), + new_addresses: Default::default(), } } @@ -152,6 +179,14 @@ impl WalletInterface for MockWallet { self.addresses.clone() } + fn monitored_addresses_for(&self, wallet_id: &WalletId) -> Vec
{ + if wallet_id == &self.wallet_id { + self.addresses.clone() + } else { + Vec::new() + } + } + fn watched_outpoints(&self) -> Vec { self.outpoints.clone() } @@ -160,8 +195,40 @@ impl WalletInterface for MockWallet { self.last_processed_height } - fn update_last_processed_height(&mut self, height: CoreBlockHeight) { - self.last_processed_height = height; + fn synced_height(&self) -> CoreBlockHeight { + self.synced_height + } + + fn wallets_behind(&self, height: CoreBlockHeight) -> BTreeSet { + if self.synced_height < height { + BTreeSet::from([self.wallet_id]) + } else { + BTreeSet::new() + } + } + + fn wallet_synced_height(&self, wallet_id: &WalletId) -> CoreBlockHeight { + if wallet_id == &self.wallet_id { + self.synced_height + } else { + 0 + } + } + + fn update_wallet_synced_height(&mut self, wallet_id: &WalletId, height: CoreBlockHeight) { + if wallet_id == &self.wallet_id && height > self.synced_height { + self.synced_height = height; + } + } + + fn update_wallet_last_processed_height( + &mut self, + wallet_id: &WalletId, + height: CoreBlockHeight, + ) { + if wallet_id == &self.wallet_id && height > self.last_processed_height { + self.last_processed_height = height; + } } fn monitor_revision(&self) -> u64 { @@ -189,7 +256,9 @@ impl WalletInterface for MockWallet { /// Mock wallet that returns false for filter checks pub struct NonMatchingMockWallet { + wallet_id: WalletId, last_processed_height: CoreBlockHeight, + synced_height: CoreBlockHeight, event_sender: broadcast::Sender, } @@ -203,7 +272,9 @@ impl NonMatchingMockWallet { pub fn new() -> Self { let (event_sender, _) = broadcast::channel(16); Self { + wallet_id: MOCK_WALLET_ID, last_processed_height: 0, + synced_height: 0, event_sender, } } @@ -211,7 +282,15 @@ impl NonMatchingMockWallet { #[async_trait::async_trait] impl WalletInterface for NonMatchingMockWallet { - async fn process_block(&mut self, _block: &Block, _height: u32) -> BlockProcessingResult { + async fn process_block_for_wallets( + &mut self, + _block: &Block, + height: u32, + wallets: &BTreeSet, + ) -> BlockProcessingResult { + if wallets.contains(&self.wallet_id) && height > self.last_processed_height { + self.last_processed_height = height; + } BlockProcessingResult::default() } @@ -227,6 +306,10 @@ impl WalletInterface for NonMatchingMockWallet { Vec::new() } + fn monitored_addresses_for(&self, _wallet_id: &WalletId) -> Vec
{ + Vec::new() + } + fn watched_outpoints(&self) -> Vec { Vec::new() } @@ -235,8 +318,40 @@ impl WalletInterface for NonMatchingMockWallet { self.last_processed_height } - fn update_last_processed_height(&mut self, height: CoreBlockHeight) { - self.last_processed_height = height; + fn synced_height(&self) -> CoreBlockHeight { + self.synced_height + } + + fn wallets_behind(&self, height: CoreBlockHeight) -> BTreeSet { + if self.synced_height < height { + BTreeSet::from([self.wallet_id]) + } else { + BTreeSet::new() + } + } + + fn wallet_synced_height(&self, wallet_id: &WalletId) -> CoreBlockHeight { + if wallet_id == &self.wallet_id { + self.synced_height + } else { + 0 + } + } + + fn update_wallet_synced_height(&mut self, wallet_id: &WalletId, height: CoreBlockHeight) { + if wallet_id == &self.wallet_id && height > self.synced_height { + self.synced_height = height; + } + } + + fn update_wallet_last_processed_height( + &mut self, + wallet_id: &WalletId, + height: CoreBlockHeight, + ) { + if wallet_id == &self.wallet_id && height > self.last_processed_height { + self.last_processed_height = height; + } } fn subscribe_events(&self) -> broadcast::Receiver { @@ -247,3 +362,146 @@ impl WalletInterface for NonMatchingMockWallet { "NonMatchingWallet (test implementation)".to_string() } } + +/// Per-wallet state held inside `MultiMockWallet`. +#[derive(Default)] +pub struct MockWalletState { + pub addresses: Vec
, + pub synced_height: CoreBlockHeight, + pub last_processed_height: CoreBlockHeight, +} + +/// Multi-wallet mock that holds independent state for several wallet IDs, +/// enabling tests that exercise per-wallet attribution paths. +pub struct MultiMockWallet { + wallets: std::collections::BTreeMap, + event_sender: broadcast::Sender, + /// Track every block processed for assertions. + processed: Arc>>, +} + +impl Default for MultiMockWallet { + fn default() -> Self { + Self::new() + } +} + +impl MultiMockWallet { + pub fn new() -> Self { + let (event_sender, _) = broadcast::channel(16); + Self { + wallets: std::collections::BTreeMap::new(), + event_sender, + processed: Arc::new(Mutex::new(Vec::new())), + } + } + + /// Insert or replace a wallet's state. + pub fn insert_wallet(&mut self, wallet_id: WalletId, state: MockWalletState) { + self.wallets.insert(wallet_id, state); + } + + /// Mutable access to a wallet's state, panicking if absent. + pub fn wallet_mut(&mut self, wallet_id: &WalletId) -> &mut MockWalletState { + self.wallets.get_mut(wallet_id).expect("wallet present") + } + + pub fn processed(&self) -> Arc>> { + self.processed.clone() + } +} + +#[async_trait::async_trait] +impl WalletInterface for MultiMockWallet { + async fn process_block_for_wallets( + &mut self, + block: &Block, + height: CoreBlockHeight, + wallets: &BTreeSet, + ) -> BlockProcessingResult { + let hash = block.block_hash(); + let mut processed = self.processed.lock().await; + for wallet_id in wallets { + if let Some(state) = self.wallets.get_mut(wallet_id) { + processed.push((*wallet_id, hash, height)); + if height > state.last_processed_height { + state.last_processed_height = height; + } + } + } + BlockProcessingResult::default() + } + + async fn process_mempool_transaction( + &mut self, + _tx: &Transaction, + _instant_lock: Option, + ) -> MempoolTransactionResult { + MempoolTransactionResult::default() + } + + fn monitored_addresses(&self) -> Vec
{ + self.wallets.values().flat_map(|s| s.addresses.iter().cloned()).collect() + } + + fn monitored_addresses_for(&self, wallet_id: &WalletId) -> Vec
{ + self.wallets.get(wallet_id).map(|s| s.addresses.clone()).unwrap_or_default() + } + + fn watched_outpoints(&self) -> Vec { + Vec::new() + } + + fn last_processed_height(&self) -> CoreBlockHeight { + self.wallets.values().map(|s| s.last_processed_height).max().unwrap_or(0) + } + + fn synced_height(&self) -> CoreBlockHeight { + self.wallets.values().map(|s| s.synced_height).min().unwrap_or(0) + } + + fn wallets_behind(&self, height: CoreBlockHeight) -> BTreeSet { + self.wallets + .iter() + .filter_map(|(id, s)| { + if s.synced_height < height { + Some(*id) + } else { + None + } + }) + .collect() + } + + fn wallet_synced_height(&self, wallet_id: &WalletId) -> CoreBlockHeight { + self.wallets.get(wallet_id).map(|s| s.synced_height).unwrap_or(0) + } + + fn update_wallet_synced_height(&mut self, wallet_id: &WalletId, height: CoreBlockHeight) { + if let Some(state) = self.wallets.get_mut(wallet_id) { + if height > state.synced_height { + state.synced_height = height; + } + } + } + + fn update_wallet_last_processed_height( + &mut self, + wallet_id: &WalletId, + height: CoreBlockHeight, + ) { + if let Some(state) = self.wallets.get_mut(wallet_id) { + if height > state.last_processed_height { + state.last_processed_height = height; + } + } + } + + fn subscribe_events(&self) -> broadcast::Receiver { + self.event_sender.subscribe() + } + + async fn describe(&self) -> String { + "MultiMockWallet (test implementation)".to_string() + } +} diff --git a/key-wallet-manager/src/test_utils/mod.rs b/key-wallet-manager/src/test_utils/mod.rs index 108a02fd5..252be4387 100644 --- a/key-wallet-manager/src/test_utils/mod.rs +++ b/key-wallet-manager/src/test_utils/mod.rs @@ -1,4 +1,7 @@ mod mock_wallet; pub use mock_wallet::MockWallet; +pub use mock_wallet::MockWalletState; +pub use mock_wallet::MultiMockWallet; pub use mock_wallet::NonMatchingMockWallet; +pub use mock_wallet::MOCK_WALLET_ID; diff --git a/key-wallet-manager/src/wallet_interface.rs b/key-wallet-manager/src/wallet_interface.rs index 90e01e80d..e2e7a8f89 100644 --- a/key-wallet-manager/src/wallet_interface.rs +++ b/key-wallet-manager/src/wallet_interface.rs @@ -2,11 +2,12 @@ //! //! This module defines the trait that SPV clients use to interact with wallets. -use crate::WalletEvent; +use crate::{WalletEvent, WalletId}; use async_trait::async_trait; use dashcore::ephemerealdata::instant_lock::InstantLock; use dashcore::prelude::CoreBlockHeight; use dashcore::{Address, Block, OutPoint, Transaction, Txid}; +use std::collections::{BTreeMap, BTreeSet}; use tokio::sync::broadcast; /// Result of processing a block through the wallet @@ -16,8 +17,8 @@ pub struct BlockProcessingResult { pub new_txids: Vec, /// Transaction IDs that were already in wallet history pub existing_txids: Vec, - /// New addresses generated during gap limit maintenance - pub new_addresses: Vec
, + /// New addresses generated per wallet during gap-limit maintenance. + pub new_addresses: BTreeMap>, } /// Result of processing a mempool transaction through the wallet @@ -45,18 +46,27 @@ impl BlockProcessingResult { pub fn relevant_tx_count(&self) -> usize { self.new_txids.len() + self.existing_txids.len() } + + /// Iterate over every newly generated address regardless of wallet attribution. + pub fn all_new_addresses(&self) -> impl Iterator { + self.new_addresses.values().flatten() + } } /// Trait for wallet implementations to receive SPV events #[async_trait] pub trait WalletInterface: Send + Sync + 'static { - /// Called when a new block is received that may contain relevant transactions. - /// Returns processing result including relevant transactions and any new addresses - /// generated during gap limit maintenance. - async fn process_block( + /// Process a block, but only against the listed wallets. Implementations + /// must update the per-wallet `last_processed_height` for each wallet in + /// `wallets` once the block is applied to its state. + /// + /// Pass the result of `wallets_behind(height)` for the canonical "scan + /// only the wallets that need this block" semantics. + async fn process_block_for_wallets( &mut self, block: &Block, height: CoreBlockHeight, + wallets: &BTreeSet, ) -> BlockProcessingResult; /// Called when a transaction is seen in the mempool. @@ -71,6 +81,9 @@ pub trait WalletInterface: Send + Sync + 'static { /// Get all addresses the wallet is monitoring for incoming transactions fn monitored_addresses(&self) -> Vec
; + /// Get monitored addresses for a specific wallet. + fn monitored_addresses_for(&self, wallet_id: &WalletId) -> Vec
; + /// Get all outpoints the wallet is watching (unspent outputs). /// Used for bloom filter construction to detect spends of our UTXOs. fn watched_outpoints(&self) -> Vec; @@ -88,23 +101,37 @@ pub trait WalletInterface: Send + Sync + 'static { /// Return the last fully processed height of the wallet. fn last_processed_height(&self) -> CoreBlockHeight; - /// Update the wallet's last processed height. This also triggers balance updates. - fn update_last_processed_height(&mut self, height: CoreBlockHeight); - - /// Return the height at which filter scanning was last committed. - /// Defaults to `last_processed_height()` for implementations that don't separate these concepts. - // TODO: This can probably somehow be combined with last_processed_height(). - fn synced_height(&self) -> CoreBlockHeight { - self.last_processed_height() + /// Return the lowest committed sync checkpoint across all managed wallets. + /// Filter scanning resumes from this height. A new wallet added behind this + /// drags the value down and triggers a rescan. + fn synced_height(&self) -> CoreBlockHeight; + + /// Return the wallet IDs whose `synced_height` is strictly less than `height`, + /// i.e. the wallets that still need filter coverage at that height. + fn wallets_behind(&self, height: CoreBlockHeight) -> BTreeSet; + + /// Return the wallet IDs that still need filter coverage at heights up to + /// and including `height`. Equivalent to `wallets_behind(height + 1)` but + /// expresses the inclusive intent at the call site, so callers don't have + /// to compensate the strict-less-than semantics with `+ 1`. + fn wallets_not_yet_at(&self, height: CoreBlockHeight) -> BTreeSet { + self.wallets_behind(height.saturating_add(1)) } - /// Update the committed synced height. Call when a height is fully processed - /// (including any rescans for newly discovered addresses). - fn update_synced_height(&mut self, height: CoreBlockHeight) { - if height > self.last_processed_height() { - self.update_last_processed_height(height); - } - } + /// Return the per-wallet committed sync checkpoint, or `0` if unknown. + fn wallet_synced_height(&self, wallet_id: &WalletId) -> CoreBlockHeight; + + /// Advance one wallet's committed sync checkpoint. Implementations must + /// only advance forward (a value below the current is silently ignored). + fn update_wallet_synced_height(&mut self, wallet_id: &WalletId, height: CoreBlockHeight); + + /// Advance one wallet's last-processed height after a block has been applied + /// to its state. Implementations must only advance forward. + fn update_wallet_last_processed_height( + &mut self, + wallet_id: &WalletId, + height: CoreBlockHeight, + ); /// Return a revision counter that increments whenever the set of monitored /// addresses or watched outpoints changes. The mempool manager uses this to diff --git a/key-wallet-manager/tests/integration_test.rs b/key-wallet-manager/tests/integration_test.rs index 16851df2a..fe0047564 100644 --- a/key-wallet-manager/tests/integration_test.rs +++ b/key-wallet-manager/tests/integration_test.rs @@ -162,13 +162,7 @@ fn test_balance_calculation() { fn test_block_height_tracking() { let mut manager = WalletManager::::new(Network::Testnet); - // Initial state - assert_eq!(manager.last_processed_height(), 0); - assert_eq!(manager.synced_height(), 0); - - // Updating heights before adding wallets is a no-op - manager.update_last_processed_height(1000); - manager.update_synced_height(500); + // Initial state with no wallets assert_eq!(manager.last_processed_height(), 0); assert_eq!(manager.synced_height(), 0); @@ -194,53 +188,58 @@ fn test_block_height_tracking() { assert_eq!(manager.wallet_count(), 2); - // Verify both wallets have last_processed_height and synced_height of 0 initially + // Both wallets initialized with `synced_height = birth_height - 1 = 0`, + // so neither has been processed past genesis. for wallet_info in manager.get_all_wallet_infos().values() { assert_eq!(wallet_info.last_processed_height(), 0); assert_eq!(wallet_info.synced_height(), 0); } - // Update last-processed height - should propagate to all wallets - manager.update_last_processed_height(12345); + // Per-wallet last-processed updates only touch the addressed wallet. + manager.update_wallet_last_processed_height(&wallet_id1, 12345); assert_eq!(manager.last_processed_height(), 12345); - - // Verify all wallets got updated while synced_height stays at 0 let wallet_info1 = manager.get_wallet_info(&wallet_id1).unwrap(); let wallet_info2 = manager.get_wallet_info(&wallet_id2).unwrap(); assert_eq!(wallet_info1.last_processed_height(), 12345); - assert_eq!(wallet_info2.last_processed_height(), 12345); - assert_eq!(wallet_info1.synced_height(), 0); - assert_eq!(wallet_info2.synced_height(), 0); - - // Update synced height - should propagate to all wallets without touching last_processed_height - manager.update_synced_height(20000); - assert_eq!(manager.synced_height(), 20000); + assert_eq!(wallet_info2.last_processed_height(), 0); - for wallet_info in manager.get_all_wallet_infos().values() { - assert_eq!(wallet_info.last_processed_height(), 12345); - assert_eq!(wallet_info.synced_height(), 20000); - } - - // Update wallets individually to different last-processed heights - let wallet_info1 = manager.get_wallet_info_mut(&wallet_id1).unwrap(); - wallet_info1.update_last_processed_height(30000); + // Per-wallet synced-height updates only touch the addressed wallet. + manager.update_wallet_synced_height(&wallet_id1, 12000); + let wallet_info1 = manager.get_wallet_info(&wallet_id1).unwrap(); + let wallet_info2 = manager.get_wallet_info(&wallet_id2).unwrap(); + assert_eq!(wallet_info1.synced_height(), 12000); + assert_eq!(wallet_info2.synced_height(), 0); + // Aggregate `synced_height()` is `min` across wallets, so wallet 2 holds it at 0. + assert_eq!(manager.synced_height(), 0); - let wallet_info2 = manager.get_wallet_info_mut(&wallet_id2).unwrap(); - wallet_info2.update_last_processed_height(25000); + // Advance wallet 2 too. Aggregate min jumps to wallet 2's new value. + manager.update_wallet_synced_height(&wallet_id2, 11000); + assert_eq!(manager.synced_height(), 11000); - // Verify each wallet has its own last_processed_height and manager reports the max + // Wallets advance independently. Aggregate `last_processed_height()` is `max`. + manager.update_wallet_last_processed_height(&wallet_id2, 25000); let wallet_info1 = manager.get_wallet_info(&wallet_id1).unwrap(); let wallet_info2 = manager.get_wallet_info(&wallet_id2).unwrap(); - assert_eq!(wallet_info1.last_processed_height(), 30000); + assert_eq!(wallet_info1.last_processed_height(), 12345); assert_eq!(wallet_info2.last_processed_height(), 25000); - assert_eq!(manager.last_processed_height(), 30000); + assert_eq!(manager.last_processed_height(), 25000); - // Manager synced-height update syncs across all wallets - manager.update_synced_height(40000); - let wallet_info1 = manager.get_wallet_info(&wallet_id1).unwrap(); + // Per-wallet updates are monotonic. Values below the current are ignored. + manager.update_wallet_last_processed_height(&wallet_id2, 10); + manager.update_wallet_synced_height(&wallet_id2, 10); let wallet_info2 = manager.get_wallet_info(&wallet_id2).unwrap(); - assert_eq!(wallet_info1.last_processed_height(), 30000); assert_eq!(wallet_info2.last_processed_height(), 25000); - assert_eq!(wallet_info1.synced_height(), 40000); - assert_eq!(wallet_info2.synced_height(), 40000); + assert_eq!(wallet_info2.synced_height(), 11000); + + // `wallets_behind(height)` lists wallets with `synced_height < height`. + let behind_at_12500 = manager.wallets_behind(12500); + assert!(behind_at_12500.contains(&wallet_id1)); + assert!(behind_at_12500.contains(&wallet_id2)); + // A wallet at exactly `height` is not behind. wallet_id1 sits at 12000, + // wallet_id2 sits at 11000. + let behind_at_12000 = manager.wallets_behind(12000); + assert!(!behind_at_12000.contains(&wallet_id1)); + assert!(behind_at_12000.contains(&wallet_id2)); + let behind_at_500 = manager.wallets_behind(500); + assert!(behind_at_500.is_empty()); } diff --git a/key-wallet-manager/tests/spv_integration_tests.rs b/key-wallet-manager/tests/spv_integration_tests.rs index 71b3bbfab..d30cb12c0 100644 --- a/key-wallet-manager/tests/spv_integration_tests.rs +++ b/key-wallet-manager/tests/spv_integration_tests.rs @@ -8,8 +8,17 @@ use key_wallet::wallet::initialization::WalletAccountCreationOptions; use key_wallet::wallet::managed_wallet_info::wallet_info_interface::WalletInfoInterface; use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; use key_wallet::Network; -use key_wallet_manager::WalletInterface; -use key_wallet_manager::WalletManager; +use key_wallet_manager::{BlockProcessingResult, WalletId, WalletInterface, WalletManager}; +use std::collections::BTreeSet; + +async fn process_block_all_wallets( + manager: &mut WalletManager, + block: &Block, + height: u32, +) -> BlockProcessingResult { + let wallet_ids: BTreeSet = manager.list_wallets().into_iter().copied().collect(); + manager.process_block_for_wallets(block, height, &wallet_ids).await +} #[tokio::test] async fn test_block_processing() { @@ -29,7 +38,7 @@ async fn test_block_processing() { let tx3 = Transaction::dummy(&external, 0..0, &[300_000]); let block = Block::dummy(100, vec![tx1.clone(), tx2.clone(), tx3.clone()]); - let result = manager.process_block(&block, 100).await; + let result = process_block_all_wallets(&mut manager, &block, 100).await; // Both transactions should be new (first time seen) assert_eq!(result.new_txids.len(), 2); @@ -38,13 +47,14 @@ async fn test_block_processing() { assert!(!result.new_txids.contains(&tx3.txid())); // No existing transactions during initial processing assert!(result.existing_txids.is_empty()); - assert_eq!(result.new_addresses.len(), 2); + let new_addresses: Vec<_> = result.all_new_addresses().cloned().collect(); + assert_eq!(new_addresses.len(), 2); let addresses_after = manager.monitored_addresses(); let actual_increase = addresses_after.len() - addresses_before.len(); - assert_eq!(result.new_addresses.len(), actual_increase); + assert_eq!(new_addresses.len(), actual_increase); - for new_addr in &result.new_addresses { + for new_addr in &new_addresses { assert!(addresses_after.contains(new_addr)); } } @@ -61,7 +71,7 @@ async fn test_block_processing_result_empty() { let tx2 = Transaction::dummy(&external, 0..0, &[200_000]); let block = Block::dummy(100, vec![tx1, tx2]); - let result = manager.process_block(&block, 100).await; + let result = process_block_all_wallets(&mut manager, &block, 100).await; assert!(result.new_txids.is_empty()); assert!(result.existing_txids.is_empty()); @@ -101,7 +111,7 @@ async fn test_height_updated_after_block_processing() { for height in [1000, 2000, 3000] { let tx = Transaction::dummy(&Address::dummy(Network::Testnet, 0), 0..0, &[100000]); let block = Block::dummy(height, vec![tx]); - manager.process_block(&block, height).await; + process_block_all_wallets(&mut manager, &block, height).await; assert_wallet_heights(&manager, height); } } @@ -138,7 +148,7 @@ async fn test_immature_balance_matures_during_block_processing() { // Process the coinbase at height 1000 let coinbase_height = 1000; let coinbase_block = Block::dummy(coinbase_height, vec![coinbase_tx.clone()]); - manager.process_block(&coinbase_block, coinbase_height).await; + process_block_all_wallets(&mut manager, &coinbase_block, coinbase_height).await; // Verify the coinbase is detected and stored as immature let wallet_info = manager.get_wallet_info(&wallet_id).expect("Wallet info should exist"); @@ -157,7 +167,7 @@ async fn test_immature_balance_matures_during_block_processing() { let tx = Transaction::dummy(&Address::dummy(Network::Regtest, 0), 0..0, &[1000]); for height in (coinbase_height + 1)..maturity_height { let block = Block::dummy(height, vec![tx.clone()]); - manager.process_block(&block, height).await; + process_block_all_wallets(&mut manager, &block, height).await; } // Verify still immature just before maturity @@ -170,7 +180,7 @@ async fn test_immature_balance_matures_during_block_processing() { // Process the maturity block let maturity_block = Block::dummy(maturity_height, vec![tx.clone()]); - manager.process_block(&maturity_block, maturity_height).await; + process_block_all_wallets(&mut manager, &maturity_block, maturity_height).await; // Verify the coinbase has matured let wallet_info = manager.get_wallet_info(&wallet_id).expect("Wallet info should exist"); @@ -201,7 +211,7 @@ async fn test_block_rescan_marks_transactions_as_existing() { let block = Block::dummy(100, vec![tx1.clone()]); // First processing - transaction should be new - let result1 = manager.process_block(&block, 100).await; + let result1 = process_block_all_wallets(&mut manager, &block, 100).await; assert_eq!(result1.new_txids.len(), 1, "First processing should have 1 new transaction"); assert!( @@ -215,7 +225,7 @@ async fn test_block_rescan_marks_transactions_as_existing() { let tx_history_count = wallet_info.transaction_history().len(); // Second processing (simulating rescan) - transaction should be existing - let result2 = manager.process_block(&block, 100).await; + let result2 = process_block_all_wallets(&mut manager, &block, 100).await; assert!(result2.new_txids.is_empty(), "Rescan should have no new transactions"); assert_eq!(result2.existing_txids.len(), 1, "Rescan should have 1 existing transaction");