Skip to content

Commit 541d0cf

Browse files
committed
feat: per-wallet filter scan and runtime wallet catch-up
Filter matching and block processing now operate per wallet, so a wallet added at runtime catches up without forcing the already-synced wallets to reprocess anything. - `WalletInterface` restructured around per-wallet ops: `process_block_for_wallets`, `wallets_behind`, `monitored_addresses_for`, `wallet_synced_height`, and monotonic per-wallet height updates. Aggregate heights are derived (min of `synced_height`, max of `last_processed_height`) rather than stored. - `FiltersManager::scan_batch` matches each behind wallet's addresses only against filter heights it hasn't yet covered; already-synced wallets are skipped entirely. Matched blocks flow through `BlocksNeeded` carrying the per-block wallet set so `BlocksManager` processes each block only against the wallets whose filters matched. `FiltersBatch` records the scanned-wallet set so commit advances only their `synced_height`. - `FiltersManager::tick` detects when a wallet's `synced_height` sits below the current `committed_height` (a runtime add behind scan progress), clears in-flight pipeline state, lowers `committed_height` to the new aggregate floor, and re-enters `start_download` on the next 100ms tick. Runs in `Syncing`, `Synced`, and `WaitForEvents`. Based on: - #689
1 parent 659a6d5 commit 541d0cf

24 files changed

Lines changed: 2439 additions & 328 deletions

dash-spv-ffi/src/callbacks.rs

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ impl FFISyncEventCallbacks {
347347
} => {
348348
if let Some(cb) = self.on_blocks_needed {
349349
let ffi_blocks: Vec<FFIBlockNeeded> = blocks
350-
.iter()
350+
.keys()
351351
.map(|key| FFIBlockNeeded {
352352
height: key.height(),
353353
hash: *key.hash().as_byte_array(),
@@ -361,15 +361,17 @@ impl FFISyncEventCallbacks {
361361
height,
362362
new_addresses,
363363
confirmed_txids,
364+
..
364365
} => {
365366
if let Some(cb) = self.on_block_processed {
366367
let hash_bytes = block_hash.as_byte_array();
367368
let txid_bytes: Vec<[u8; 32]> =
368369
confirmed_txids.iter().map(|txid| *txid.as_byte_array()).collect();
370+
let total_new_addresses: usize = new_addresses.values().map(|v| v.len()).sum();
369371
cb(
370372
*height,
371373
hash_bytes as *const [u8; 32],
372-
new_addresses.len() as u32,
374+
total_new_addresses as u32,
373375
txid_bytes.as_ptr(),
374376
txid_bytes.len() as u32,
375377
self.user_data,
@@ -755,3 +757,85 @@ impl FFIWalletEventCallbacks {
755757
}
756758
}
757759
}
760+
761+
#[cfg(test)]
762+
mod tests {
763+
use super::*;
764+
use dashcore::hashes::Hash;
765+
use dashcore::{Address, BlockHash, Network, Txid};
766+
use key_wallet_manager::{FilterMatchKey, WalletId};
767+
use std::collections::{BTreeMap, BTreeSet};
768+
use std::sync::atomic::{AtomicU32, Ordering};
769+
770+
/// `BlocksNeeded` dispatch must pass exactly one entry per
771+
/// `FilterMatchKey` to the FFI callback (i.e. iterate keys, not
772+
/// inflated by the per-block wallet attribution).
773+
#[test]
774+
fn test_blocks_needed_dispatch_passes_unique_keys_count() {
775+
static COUNT: AtomicU32 = AtomicU32::new(u32::MAX);
776+
extern "C" fn cb(_blocks: *const FFIBlockNeeded, count: u32, _user: *mut c_void) {
777+
COUNT.store(count, Ordering::SeqCst);
778+
}
779+
780+
let callbacks = FFISyncEventCallbacks {
781+
on_blocks_needed: Some(cb),
782+
..FFISyncEventCallbacks::default()
783+
};
784+
785+
let mut blocks: BTreeMap<FilterMatchKey, BTreeSet<WalletId>> = BTreeMap::new();
786+
// Two distinct blocks, each attributed to two wallets. The dispatch
787+
// must report 2 (unique keys), not 4.
788+
blocks.insert(
789+
FilterMatchKey::new(10, BlockHash::from_byte_array([1u8; 32])),
790+
BTreeSet::from([[1u8; 32], [2u8; 32]]),
791+
);
792+
blocks.insert(
793+
FilterMatchKey::new(20, BlockHash::from_byte_array([2u8; 32])),
794+
BTreeSet::from([[1u8; 32], [2u8; 32]]),
795+
);
796+
797+
callbacks.dispatch(&SyncEvent::BlocksNeeded {
798+
blocks,
799+
});
800+
assert_eq!(COUNT.load(Ordering::SeqCst), 2);
801+
}
802+
803+
/// `BlockProcessed` dispatch must report the total address count
804+
/// summed across all per-wallet entries in the `new_addresses` map.
805+
#[test]
806+
fn test_block_processed_dispatch_sums_per_wallet_addresses() {
807+
static NEW_ADDR_COUNT: AtomicU32 = AtomicU32::new(u32::MAX);
808+
extern "C" fn cb(
809+
_height: u32,
810+
_hash: *const [u8; 32],
811+
new_address_count: u32,
812+
_txids: *const [u8; 32],
813+
_txid_count: u32,
814+
_user: *mut c_void,
815+
) {
816+
NEW_ADDR_COUNT.store(new_address_count, Ordering::SeqCst);
817+
}
818+
819+
let callbacks = FFISyncEventCallbacks {
820+
on_block_processed: Some(cb),
821+
..FFISyncEventCallbacks::default()
822+
};
823+
824+
let addr_a = Address::dummy(Network::Regtest, 1);
825+
let addr_b = Address::dummy(Network::Regtest, 2);
826+
let addr_c = Address::dummy(Network::Regtest, 3);
827+
let mut new_addresses: BTreeMap<WalletId, Vec<Address>> = BTreeMap::new();
828+
// Wallet 1 contributes 2 new addresses, wallet 2 contributes 1. Total = 3.
829+
new_addresses.insert([1u8; 32], vec![addr_a, addr_b]);
830+
new_addresses.insert([2u8; 32], vec![addr_c]);
831+
832+
callbacks.dispatch(&SyncEvent::BlockProcessed {
833+
block_hash: BlockHash::from_byte_array([7u8; 32]),
834+
height: 100,
835+
wallets: BTreeSet::new(),
836+
new_addresses,
837+
confirmed_txids: vec![Txid::from_byte_array([9u8; 32])],
838+
});
839+
assert_eq!(NEW_ADDR_COUNT.load(Ordering::SeqCst), 3);
840+
}
841+
}

dash-spv/src/sync/blocks/manager.rs

Lines changed: 111 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,17 @@ impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface> BlocksManager<H
7979
let mut events = Vec::new();
8080

8181
// Process blocks in height order using pipeline's ordering logic
82-
while let Some((block, height)) = self.pipeline.take_next_ordered_block() {
82+
while let Some((block, height, interested)) = self.pipeline.take_next_ordered_block() {
8383
let hash = block.block_hash();
8484

85-
// Process block through wallet
85+
// Process the block only for the wallets whose filter matched it.
86+
// Already-synced wallets that did not match are not touched.
8687
let mut wallet = self.wallet.write().await;
87-
let result = wallet.process_block(&block, height).await;
88+
let result = wallet.process_block_for_wallets(&block, height, &interested).await;
8889
drop(wallet);
8990

9091
let total_relevant = result.relevant_tx_count();
92+
let new_addresses_total: usize = result.new_addresses.values().map(|v| v.len()).sum();
9193
if total_relevant > 0 {
9294
tracing::info!(
9395
"Found {} relevant transactions ({} new, {} existing) {} at height {}, new addresses: {}",
@@ -96,19 +98,20 @@ impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface> BlocksManager<H
9698
result.existing_txids.len(),
9799
hash,
98100
height,
99-
result.new_addresses.len()
101+
new_addresses_total
100102
);
101103
}
102104

103105
// Collect confirmed txids before moving new_addresses out of result
104106
let confirmed_txids: Vec<_> = result.relevant_txids().cloned().collect();
105107

106108
// Collect new addresses for gap limit rescanning
107-
let new_addresses: Vec<_> = result.new_addresses.into_iter().collect();
108-
if !new_addresses.is_empty() {
109+
let new_addresses = result.new_addresses;
110+
if new_addresses_total > 0 {
109111
tracing::debug!(
110-
"Block {} generated {} new addresses for gap limit maintenance",
112+
"Block {} generated {} new addresses for gap limit maintenance across {} wallets",
111113
height,
114+
new_addresses_total,
112115
new_addresses.len()
113116
);
114117
}
@@ -124,6 +127,7 @@ impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface> BlocksManager<H
124127
events.push(SyncEvent::BlockProcessed {
125128
block_hash: hash,
126129
height,
130+
wallets: interested,
127131
new_addresses,
128132
confirmed_txids,
129133
});
@@ -168,9 +172,9 @@ mod tests {
168172
};
169173
use crate::sync::{ManagerIdentifier, SyncEvent, SyncManagerProgress};
170174
use crate::test_utils::MockNetworkManager;
171-
use key_wallet_manager::test_utils::MockWallet;
175+
use key_wallet_manager::test_utils::{MockWallet, MOCK_WALLET_ID};
172176
use key_wallet_manager::FilterMatchKey;
173-
use std::collections::BTreeSet;
177+
use std::collections::{BTreeMap, BTreeSet};
174178

175179
type TestBlocksManager =
176180
BlocksManager<PersistentBlockHeaderStorage, PersistentBlockStorage, MockWallet>;
@@ -215,8 +219,8 @@ mod tests {
215219
let requests = network.request_sender();
216220

217221
let block_hash = dashcore::BlockHash::dummy(0);
218-
let mut blocks = BTreeSet::new();
219-
blocks.insert(FilterMatchKey::new(100, block_hash));
222+
let mut blocks = BTreeMap::new();
223+
blocks.insert(FilterMatchKey::new(100, block_hash), BTreeSet::from([MOCK_WALLET_ID]));
220224
let event = SyncEvent::BlocksNeeded {
221225
blocks,
222226
};
@@ -227,4 +231,100 @@ mod tests {
227231
assert_eq!(manager.state(), SyncState::Syncing);
228232
assert!(events.is_empty());
229233
}
234+
235+
/// `process_buffered_blocks` must call `process_block_for_wallets` with
236+
/// the exact wallet set carried in the pipeline so already-synced
237+
/// wallets are not touched by routing logic.
238+
#[tokio::test]
239+
async fn test_process_buffered_blocks_routes_wallet_set() {
240+
use dashcore::block::Header;
241+
use dashcore::{Block, TxMerkleNode};
242+
use dashcore_hashes::Hash;
243+
244+
let mut manager = create_test_manager().await;
245+
manager.progress.set_state(SyncState::Syncing);
246+
247+
let header = Header {
248+
version: dashcore::blockdata::block::Version::from_consensus(1),
249+
prev_blockhash: dashcore::BlockHash::all_zeros(),
250+
merkle_root: TxMerkleNode::all_zeros(),
251+
time: 0,
252+
bits: dashcore::CompactTarget::from_consensus(0),
253+
nonce: 0,
254+
};
255+
let block = Block {
256+
header,
257+
txdata: vec![],
258+
};
259+
manager.pipeline.add_from_storage(block.clone(), 100, BTreeSet::from([MOCK_WALLET_ID]));
260+
261+
let events = manager.process_buffered_blocks().await.unwrap();
262+
assert!(matches!(events.first(), Some(SyncEvent::BlockProcessed { .. })));
263+
264+
// MOCK_WALLET_ID was in the routed set, so MockWallet recorded the
265+
// block. (MockWallet::process_block_for_wallets returns early when
266+
// its id is absent.)
267+
let processed = manager.wallet.read().await.processed_blocks();
268+
let processed = processed.lock().await;
269+
assert_eq!(processed.len(), 1);
270+
assert_eq!(processed[0].1, 100);
271+
}
272+
273+
/// A wallet that is NOT in the pipeline's interested set must not be
274+
/// routed the block. Two wallets are registered, but only `wallet_in`
275+
/// appears in the routed set; the other wallet's processed log must
276+
/// stay empty for that block.
277+
#[tokio::test]
278+
async fn test_process_buffered_blocks_excludes_uninterested_wallet() {
279+
use dashcore::block::Header;
280+
use dashcore::{Block, TxMerkleNode};
281+
use dashcore_hashes::Hash;
282+
use key_wallet_manager::test_utils::{MockWalletState, MultiMockWallet};
283+
use key_wallet_manager::WalletId;
284+
285+
let storage = DiskStorageManager::with_temp_dir().await.unwrap();
286+
let multi = MultiMockWallet::new();
287+
let wallet_in: WalletId = [0xAA; 32];
288+
let wallet_out: WalletId = [0xBB; 32];
289+
let multi = Arc::new(RwLock::new(multi));
290+
{
291+
let mut w = multi.write().await;
292+
w.insert_wallet(wallet_in, MockWalletState::default());
293+
w.insert_wallet(wallet_out, MockWalletState::default());
294+
}
295+
let mut manager: BlocksManager<
296+
PersistentBlockHeaderStorage,
297+
PersistentBlockStorage,
298+
MultiMockWallet,
299+
> = BlocksManager::new(multi.clone(), storage.block_headers(), storage.blocks()).await;
300+
manager.progress.set_state(SyncState::Syncing);
301+
302+
let header = Header {
303+
version: dashcore::blockdata::block::Version::from_consensus(1),
304+
prev_blockhash: dashcore::BlockHash::all_zeros(),
305+
merkle_root: TxMerkleNode::all_zeros(),
306+
time: 0,
307+
bits: dashcore::CompactTarget::from_consensus(0),
308+
nonce: 0,
309+
};
310+
let block = Block {
311+
header,
312+
txdata: vec![],
313+
};
314+
// Only wallet_in is in the routed set.
315+
manager.pipeline.add_from_storage(block.clone(), 100, BTreeSet::from([wallet_in]));
316+
317+
let _ = manager.process_buffered_blocks().await.unwrap();
318+
319+
let processed = multi.read().await.processed();
320+
let processed = processed.lock().await;
321+
// Exactly one entry, for wallet_in only.
322+
assert_eq!(processed.len(), 1);
323+
assert_eq!(processed[0].0, wallet_in);
324+
assert_eq!(processed[0].2, 100);
325+
assert!(
326+
!processed.iter().any(|(id, _, _)| *id == wallet_out),
327+
"wallet_out was not in the routed set, must not be processed"
328+
);
329+
}
230330
}

0 commit comments

Comments
 (0)