Skip to content

Commit fb9a9e0

Browse files
committed
feat: per-wallet filter scan and block processing
When a wallet is added behind the current scan progress, only that wallet now goes through filter matching and block processing. Already-synced wallets are completely untouched: their addresses are not tested against any filter, no matched block is processed for them, and their per-wallet `synced_height` and `last_processed_height` stay where they were. `WalletInterface` is restructured around per-wallet operations: - `process_block_for_wallets(block, height, wallets)` replaces the global `process_block` and only updates the listed wallets. - `wallets_behind(height)` returns the wallet ids that still need filter coverage at `height`. - `monitored_addresses_for(wallet_id)` and `wallet_synced_height(wallet_id)` give per-wallet projections for filter matching. - `update_wallet_synced_height(wallet_id, height)` and `update_wallet_last_processed_height(wallet_id, height)` advance one wallet at a time and are monotonic, so a rescan that lowers the aggregate min never regresses an already-synced wallet's per-wallet checkpoint. - `BlockProcessingResult.new_addresses` and `CheckTransactionsResult.new_addresses` carry gap-limit discoveries with wallet attribution so re-matching is also per-wallet. `FiltersManager.scan_batch` now reads each behind wallet's `(synced_height, addresses)`, projects the batch's filters to heights that wallet hasn't yet covered, and matches them only against that wallet's address set. The per-block result is a `BTreeMap<FilterMatchKey, BTreeSet<WalletId>>` that flows through the renamed `BlocksNeeded` payload to `BlocksManager`. `FiltersBatch` records the wallet set scanned at scan time so the commit phase advances only those wallets' `synced_height`. Block-collision in `blocks_remaining` merges wallet sets when the block is still in flight, and re-queues via `BlocksNeeded` when the block has already been processed for an earlier wallet set so the late-added wallet still gets its turn from local block storage. `BlocksManager` queues blocks with their wallet sets, threads them through the pipeline, and calls `process_block_for_wallets` with that exact set rather than every wallet in the manager. `BlockProcessed` carries `new_addresses` so `FiltersManager` can route gap-limit re-matches per wallet via `rescan_batch`. `process_block_for_wallets` always refreshes the cached balance after applying a block's transactions, even when the block height is below a wallet's current `last_processed_height` (a rescan path), because UTXOs may have been added or removed. Closes [#114](xdustinface#114).
1 parent 055dfc9 commit fb9a9e0

20 files changed

Lines changed: 749 additions & 284 deletions

File tree

dash-spv-ffi/src/callbacks.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ impl FFISyncEventCallbacks {
347347
} => {
348348
if let Some(cb) = self.on_blocks_needed {
349349
let ffi_blocks: Vec<FFIBlockNeeded> = blocks
350-
.iter()
350+
.keys()
351351
.map(|key| FFIBlockNeeded {
352352
height: key.height(),
353353
hash: *key.hash().as_byte_array(),
@@ -366,10 +366,11 @@ impl FFISyncEventCallbacks {
366366
let hash_bytes = block_hash.as_byte_array();
367367
let txid_bytes: Vec<[u8; 32]> =
368368
confirmed_txids.iter().map(|txid| *txid.as_byte_array()).collect();
369+
let total_new_addresses: usize = new_addresses.values().map(|v| v.len()).sum();
369370
cb(
370371
*height,
371372
hash_bytes as *const [u8; 32],
372-
new_addresses.len() as u32,
373+
total_new_addresses as u32,
373374
txid_bytes.as_ptr(),
374375
txid_bytes.len() as u32,
375376
self.user_data,

dash-spv/src/sync/blocks/manager.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,17 @@ impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface> BlocksManager<H
7979
let mut events = Vec::new();
8080

8181
// Process blocks in height order using pipeline's ordering logic
82-
while let Some((block, height)) = self.pipeline.take_next_ordered_block() {
82+
while let Some((block, height, interested)) = self.pipeline.take_next_ordered_block() {
8383
let hash = block.block_hash();
8484

85-
// Process block through wallet
85+
// Process the block only for the wallets whose filter matched it.
86+
// Already-synced wallets that did not match are not touched.
8687
let mut wallet = self.wallet.write().await;
87-
let result = wallet.process_block(&block, height).await;
88+
let result = wallet.process_block_for_wallets(&block, height, &interested).await;
8889
drop(wallet);
8990

9091
let total_relevant = result.relevant_tx_count();
92+
let new_addresses_total: usize = result.new_addresses.values().map(|v| v.len()).sum();
9193
if total_relevant > 0 {
9294
tracing::info!(
9395
"Found {} relevant transactions ({} new, {} existing) {} at height {}, new addresses: {}",
@@ -96,19 +98,18 @@ impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface> BlocksManager<H
9698
result.existing_txids.len(),
9799
hash,
98100
height,
99-
result.new_addresses.len()
101+
new_addresses_total
100102
);
101103
}
102104

103-
// Collect confirmed txids before moving new_addresses out of result
104105
let confirmed_txids: Vec<_> = result.relevant_txids().cloned().collect();
105106

106-
// Collect new addresses for gap limit rescanning
107-
let new_addresses: Vec<_> = result.new_addresses.into_iter().collect();
108-
if !new_addresses.is_empty() {
107+
let new_addresses = result.new_addresses;
108+
if new_addresses_total > 0 {
109109
tracing::debug!(
110-
"Block {} generated {} new addresses for gap limit maintenance",
110+
"Block {} generated {} new addresses for gap limit maintenance across {} wallets",
111111
height,
112+
new_addresses_total,
112113
new_addresses.len()
113114
);
114115
}
@@ -170,7 +171,7 @@ mod tests {
170171
use crate::test_utils::MockNetworkManager;
171172
use key_wallet_manager::test_utils::MockWallet;
172173
use key_wallet_manager::FilterMatchKey;
173-
use std::collections::BTreeSet;
174+
use std::collections::{BTreeMap, BTreeSet};
174175

175176
type TestBlocksManager =
176177
BlocksManager<PersistentBlockHeaderStorage, PersistentBlockStorage, MockWallet>;
@@ -215,8 +216,8 @@ mod tests {
215216
let requests = network.request_sender();
216217

217218
let block_hash = dashcore::BlockHash::dummy(0);
218-
let mut blocks = BTreeSet::new();
219-
blocks.insert(FilterMatchKey::new(100, block_hash));
219+
let mut blocks = BTreeMap::new();
220+
blocks.insert(FilterMatchKey::new(100, block_hash), BTreeSet::new());
220221
let event = SyncEvent::BlocksNeeded {
221222
blocks,
222223
};

dash-spv/src/sync/blocks/pipeline.rs

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::network::RequestSender;
1111
use crate::sync::download_coordinator::{DownloadConfig, DownloadCoordinator};
1212
use dashcore::blockdata::block::Block;
1313
use dashcore::BlockHash;
14-
use key_wallet_manager::FilterMatchKey;
14+
use key_wallet_manager::{FilterMatchKey, WalletId};
1515

1616
/// Maximum number of concurrent block downloads.
1717
const MAX_CONCURRENT_BLOCK_DOWNLOADS: usize = 20;
@@ -36,6 +36,9 @@ pub(super) struct BlocksPipeline {
3636
downloaded: BTreeMap<u32, Block>,
3737
/// Map hash -> height for looking up height when block arrives.
3838
hash_to_height: HashMap<BlockHash, u32>,
39+
/// Per-block interested wallets, populated when the block is queued.
40+
/// Only those wallets get the block processed.
41+
hash_to_wallets: HashMap<BlockHash, BTreeSet<WalletId>>,
3942
}
4043

4144
impl std::fmt::Debug for BlocksPipeline {
@@ -66,17 +69,26 @@ impl BlocksPipeline {
6669
pending_heights: BTreeSet::new(),
6770
downloaded: BTreeMap::new(),
6871
hash_to_height: HashMap::new(),
72+
hash_to_wallets: HashMap::new(),
6973
}
7074
}
7175

72-
/// Queue blocks with their heights for download.
76+
/// Queue blocks with their heights and per-block interested wallet sets.
7377
///
74-
/// This is the preferred method as it enables height-ordered processing.
75-
pub(super) fn queue(&mut self, blocks: impl IntoIterator<Item = FilterMatchKey>) {
76-
for key in blocks {
77-
self.coordinator.enqueue([*key.hash()]);
78+
/// Each entry's wallet set is the union of wallets whose addresses matched
79+
/// the filter for that block. If the block is already queued we merge the
80+
/// new wallet ids into the existing set so a late-discovered wallet still
81+
/// gets the block processed when it arrives.
82+
pub(super) fn queue(
83+
&mut self,
84+
blocks: impl IntoIterator<Item = (FilterMatchKey, BTreeSet<WalletId>)>,
85+
) {
86+
for (key, wallets) in blocks {
87+
let hash = *key.hash();
88+
self.coordinator.enqueue([hash]);
7889
self.pending_heights.insert(key.height());
79-
self.hash_to_height.insert(*key.hash(), key.height());
90+
self.hash_to_height.insert(hash, key.height());
91+
self.hash_to_wallets.entry(hash).or_default().extend(wallets);
8092
}
8193
}
8294

@@ -141,12 +153,13 @@ impl BlocksPipeline {
141153
true
142154
}
143155

144-
/// Take the next block that's safe to process in height order.
156+
/// Take the next block that's safe to process in height order, along with
157+
/// the wallet set whose filters matched this block.
145158
///
146159
/// Returns None if:
147160
/// - No downloaded blocks available, or
148161
/// - Waiting for a lower-height block still pending
149-
pub(super) fn take_next_ordered_block(&mut self) -> Option<(Block, u32)> {
162+
pub(super) fn take_next_ordered_block(&mut self) -> Option<(Block, u32, BTreeSet<WalletId>)> {
150163
let lowest_downloaded = *self.downloaded.keys().next()?;
151164

152165
// Check if any pending blocks have lower heights
@@ -156,15 +169,22 @@ impl BlocksPipeline {
156169
}
157170
}
158171

159-
// Safe to return this block
160172
let block = self.downloaded.remove(&lowest_downloaded).unwrap();
161-
Some((block, lowest_downloaded))
173+
let wallets = self.hash_to_wallets.remove(&block.block_hash()).unwrap_or_default();
174+
Some((block, lowest_downloaded, wallets))
162175
}
163176

164177
/// Add a block that was loaded from storage (skip download).
165178
///
166179
/// Used when blocks are already persisted from a previous sync.
167-
pub(super) fn add_from_storage(&mut self, block: Block, height: u32) {
180+
pub(super) fn add_from_storage(
181+
&mut self,
182+
block: Block,
183+
height: u32,
184+
wallets: BTreeSet<WalletId>,
185+
) {
186+
let hash = block.block_hash();
187+
self.hash_to_wallets.entry(hash).or_default().extend(wallets);
168188
self.downloaded.insert(height, block);
169189
}
170190

@@ -212,7 +232,7 @@ mod tests {
212232
fn test_queue_block() {
213233
let mut pipeline = BlocksPipeline::new();
214234
let block = make_test_block(1);
215-
pipeline.queue([FilterMatchKey::new(100, block.block_hash())]);
235+
pipeline.queue([(FilterMatchKey::new(100, block.block_hash()), BTreeSet::new())]);
216236

217237
assert_eq!(pipeline.coordinator.pending_count(), 1);
218238
assert!(!pipeline.is_complete());
@@ -226,9 +246,9 @@ mod tests {
226246
let block2 = make_test_block(2);
227247
let block3 = make_test_block(3);
228248
pipeline.queue([
229-
FilterMatchKey::new(100, block1.block_hash()),
230-
FilterMatchKey::new(101, block2.block_hash()),
231-
FilterMatchKey::new(102, block3.block_hash()),
249+
(FilterMatchKey::new(100, block1.block_hash()), BTreeSet::new()),
250+
(FilterMatchKey::new(101, block2.block_hash()), BTreeSet::new()),
251+
(FilterMatchKey::new(102, block3.block_hash()), BTreeSet::new()),
232252
]);
233253

234254
assert_eq!(pipeline.coordinator.pending_count(), 3);
@@ -245,7 +265,7 @@ mod tests {
245265
let hash = block.block_hash();
246266

247267
// Queue with height tracking
248-
pipeline.queue([FilterMatchKey::new(100, block.block_hash())]);
268+
pipeline.queue([(FilterMatchKey::new(100, block.block_hash()), BTreeSet::new())]);
249269

250270
// Simulate sending via coordinator
251271
let hashes = pipeline.coordinator.take_pending(1);
@@ -276,7 +296,7 @@ mod tests {
276296
// Queue more blocks than max concurrent
277297
for i in 0..=MAX_CONCURRENT_BLOCK_DOWNLOADS {
278298
let block = make_test_block(i as u8);
279-
pipeline.queue([FilterMatchKey::new(i as u32, block.block_hash())]);
299+
pipeline.queue([(FilterMatchKey::new(i as u32, block.block_hash()), BTreeSet::new())]);
280300
}
281301

282302
// Take and mark as downloading up to limit
@@ -301,6 +321,7 @@ mod tests {
301321
pending_heights: BTreeSet::new(),
302322
downloaded: BTreeMap::new(),
303323
hash_to_height: HashMap::new(),
324+
hash_to_wallets: HashMap::new(),
304325
};
305326

306327
// Use coordinator directly to set up in-flight state
@@ -328,7 +349,7 @@ mod tests {
328349

329350
// Use add_from_storage to test ordering logic without network
330351
// Add block 2 first (out of order)
331-
pipeline.add_from_storage(block2.clone(), 101);
352+
pipeline.add_from_storage(block2.clone(), 101, BTreeSet::new());
332353
// Also track height 100 as pending to simulate waiting
333354
pipeline.pending_heights.insert(100);
334355

@@ -337,15 +358,15 @@ mod tests {
337358

338359
// Add block 1
339360
pipeline.pending_heights.remove(&100);
340-
pipeline.add_from_storage(block1.clone(), 100);
361+
pipeline.add_from_storage(block1.clone(), 100, BTreeSet::new());
341362

342363
// Now block 1 is ready (lowest height)
343-
let (block, height) = pipeline.take_next_ordered_block().unwrap();
364+
let (block, height, _) = pipeline.take_next_ordered_block().unwrap();
344365
assert_eq!(height, 100);
345366
assert_eq!(block.block_hash(), hash1);
346367

347368
// Block 2 is now ready
348-
let (block, height) = pipeline.take_next_ordered_block().unwrap();
369+
let (block, height, _) = pipeline.take_next_ordered_block().unwrap();
349370
assert_eq!(height, 101);
350371
assert_eq!(block.block_hash(), hash2);
351372

@@ -360,7 +381,7 @@ mod tests {
360381

361382
// Add block at height 101, but height 100 is still pending
362383
pipeline.pending_heights.insert(100);
363-
pipeline.add_from_storage(block2.clone(), 101);
384+
pipeline.add_from_storage(block2.clone(), 101, BTreeSet::new());
364385

365386
// Cannot take block 2 - block at height 100 is still pending
366387
assert!(pipeline.take_next_ordered_block().is_none());
@@ -369,7 +390,7 @@ mod tests {
369390
pipeline.pending_heights.remove(&100);
370391

371392
// Now block 2 is ready
372-
let (_, height) = pipeline.take_next_ordered_block().unwrap();
393+
let (_, height, _) = pipeline.take_next_ordered_block().unwrap();
373394
assert_eq!(height, 101);
374395
}
375396

@@ -379,11 +400,11 @@ mod tests {
379400
let block = make_test_block(1);
380401
let hash = block.block_hash();
381402

382-
pipeline.add_from_storage(block.clone(), 100);
403+
pipeline.add_from_storage(block.clone(), 100, BTreeSet::new());
383404

384405
assert_eq!(pipeline.downloaded.len(), 1);
385406

386-
let (taken_block, height) = pipeline.take_next_ordered_block().unwrap();
407+
let (taken_block, height, _) = pipeline.take_next_ordered_block().unwrap();
387408
assert_eq!(height, 100);
388409
assert_eq!(taken_block.block_hash(), hash);
389410
}
@@ -395,7 +416,7 @@ mod tests {
395416

396417
// Adding to downloaded makes it incomplete
397418
let block = make_test_block(1);
398-
pipeline.add_from_storage(block, 100);
419+
pipeline.add_from_storage(block, 100, BTreeSet::new());
399420
assert!(!pipeline.is_complete());
400421

401422
// Take the block
@@ -422,7 +443,7 @@ mod tests {
422443
let block = make_test_block(1);
423444

424445
// Queue and mark as sent via coordinator
425-
pipeline.queue([FilterMatchKey::new(100, block.block_hash())]);
446+
pipeline.queue([(FilterMatchKey::new(100, block.block_hash()), BTreeSet::new())]);
426447
let hashes = pipeline.coordinator.take_pending(1);
427448
pipeline.coordinator.mark_sent(&hashes);
428449

dash-spv/src/sync/blocks/sync_manager.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ use crate::types::HashedBlock;
1010
use crate::SyncError;
1111
use async_trait::async_trait;
1212
use dashcore::network::message::NetworkMessage;
13-
use key_wallet_manager::WalletInterface;
13+
use key_wallet_manager::{FilterMatchKey, WalletId, WalletInterface};
14+
use std::collections::BTreeSet;
1415

1516
#[async_trait]
1617
impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface + 'static> SyncManager
@@ -115,10 +116,10 @@ impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface + 'static> SyncM
115116

116117
tracing::debug!("Blocks needed: {} blocks", blocks.len());
117118

118-
let mut to_download = Vec::new();
119+
let mut to_download: Vec<(FilterMatchKey, BTreeSet<WalletId>)> = Vec::new();
119120

120121
let block_storage = self.block_storage.read().await;
121-
for key in blocks {
122+
for (key, wallets) in blocks {
122123
// Check if block is already stored (from previous sync)
123124
if let Ok(Some(hashed_block)) = block_storage.load_block(key.height()).await {
124125
if hashed_block.hash() != key.hash() {
@@ -135,13 +136,17 @@ impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface + 'static> SyncM
135136
)));
136137
}
137138
// Block loaded from storage, add to pipeline for processing
138-
self.pipeline.add_from_storage(hashed_block.block().clone(), key.height());
139+
self.pipeline.add_from_storage(
140+
hashed_block.block().clone(),
141+
key.height(),
142+
wallets.clone(),
143+
);
139144
self.progress.add_from_storage(1);
140145
continue;
141146
}
142147

143-
// Block not in storage, queue for download with height
144-
to_download.push(key.clone());
148+
// Block not in storage, queue for download with height + wallets
149+
to_download.push((key.clone(), wallets.clone()));
145150
}
146151
drop(block_storage);
147152

0 commit comments

Comments
 (0)