Skip to content

Commit eb2226c

Browse files
authored
feat: per-wallet filter scan and runtime wallet catch-up (#122)
* feat: per-wallet filter scan and block processing Filter matching and block processing now operate per wallet, so a wallet added at runtime catches up without disturbing the wallets that are already in sync. `WalletInterface` is restructured around per-wallet operations: - `process_block_for_wallets(block, height, wallets)` replaces the global `process_block` and only updates the listed wallets. - `wallets_behind(height)` returns the wallet ids that still need filter coverage at `height`. - `monitored_addresses_for(wallet_id)` and `wallet_synced_height(wallet_id)` give per-wallet projections for filter matching. - `update_wallet_synced_height` and `update_wallet_last_processed_height` advance one wallet at a time and are monotonic. - `BlockProcessingResult.new_addresses` and `CheckTransactionsResult.new_addresses` carry gap-limit discoveries with wallet attribution. `FiltersManager.scan_batch` matches each behind wallet's addresses against the batch's filters at heights it hasn't yet covered. The per-block result flows through `BlocksNeeded` to `BlocksManager`, which processes each block only against the wallets whose filters matched it. `FiltersBatch` records the scanned wallet set so commit advances only their `synced_height`. When a late-added wallet's filter matches a block already in flight, its id is merged into the existing entry. If the block has already been processed, it is re-queued so `BlocksManager` reloads it from local storage and processes it for the late wallet only. `process_block_for_wallets` refreshes the cached balance even on rescan paths below the wallet's current `last_processed_height`, because UTXOs may change. * feat: rescan filters when a wallet falls behind committed height When `wallet.synced_height()` drops below `FiltersManager`'s `progress.committed_height()`, a wallet was added or moved behind the current scan position and needs catch-up coverage. Add a check at the top of `FiltersManager::tick()` that detects this regression, clears in-flight pipeline state, lowers `committed_height` to the new aggregate min, and re-enters `start_download()`. The check runs in `Syncing`, `Synced`, and `WaitForEvents` states so idle additions are caught on the next 100ms tick. Add `test_wallet_added_at_runtime_catches_up` in `dash-spv/tests/dashd_sync/tests_basic.rs`. After initial sync with `W1`, mine a block funding `W2`'s address and add `W2` at runtime with `birth_height` before that block. Assert the rescan picks up `W2`'s funding transaction and `W1`'s state is unchanged. Then add `W3` with `birth_height` beyond the tip and assert no spurious rescan or regression in either existing wallet. * refactor: add `wallets_not_yet_at` wrapper on `WalletInterface` Encapsulate the off-by-one between `wallets_behind` (strict less-than) and the inclusive height semantics needed by `scan_batch`, so call sites no longer need to compensate with `saturating_add(1)`. * test: add `MultiMockWallet` for per-wallet attribution tests Single-wallet `MockWallet` cannot exercise paths that hinge on multiple wallets having distinct `synced_height` values or independent address sets. `MultiMockWallet` holds per-wallet state keyed by `WalletId`. * fix: correct per-wallet filter scan and catch-up edge cases Resolves several correctness issues in the per-wallet filter scan and catch-up paths: - \`track_block_match\` now distinguishes three states (\`NewlyTracked\`, \`InFlight\`, \`AlreadyProcessed\`) instead of a boolean. A block that was already processed in a prior round is no longer silently re-queued for re-processing, and a block still in flight still re-emits \`BlocksNeeded\` so the \`BlocksPipeline\` merges late-arriving wallet ids into its pending wallet set. - \`scan_batch\` only adds wallets that contributed addresses to the scan into \`scanned_wallets\`. Empty-address wallets no longer have their \`synced_height\` advanced for free. - \`rescan_batch\` no longer drops new wallet ids when the matching block is in flight; it forwards the wallet ids through a fresh \`BlocksNeeded\`, which the pipeline merges into its pending set. - \`tick\` resets \`committed_height\` to the lowest \`synced_height\` of the stale wallets only, instead of the global wallet \`synced_height\`, so already-synced wallets are not re-scanned from scratch. - \`scan_batch\` runs the filters once over the union of behind-wallet addresses, then attributes each match per-wallet by re-testing the matched filter against that wallet's scripts. Cost drops from O(N_wallets * batch_size) to O(batch_size + N_wallets * matches), and the per-batch borrow of \`active_batches\` is consolidated. - Field \`filters_matched\` renamed to \`matched_block_hashes\` to reflect its actual role as a record (not a deduplication gate). - \`rescan_batch\` now takes \`addresses_by_wallet\` by reference, saving a clone per later-batch rescan in \`try_commit_batches\`. - Uses \`wallets_not_yet_at(batch_end)\` to express the inclusive height intent, hiding the \`+ 1\` off-by-one at the call site. * test: add wallet-set propagation and queue-merge tests for `BlocksPipeline` Cover the previously-implicit contract that `queue` and `add_from_storage` merge wallet sets per block hash, and that `take_next_ordered_block` returns the merged wallet set. * fix: address per-wallet filter scan review feedback Tighten `scan_batch` so wallets at `synced == batch_end` are skipped (use `wallets_behind(batch_end)` instead of `wallets_not_yet_at`), ensure behind wallets with zero monitored addresses still advance their `synced_height` at commit, and surface filter errors and `AlreadyProcessed` skips through `tracing::warn!` instead of silent fallthrough. Add a `min_height` parameter to `check_compact_filters_for_addresses` so the union pass can skip irrelevant heights without cloning the filter map. Reuse the existing `batch_filters` borrow for attribution. Add tests covering: - `scan_batch` advancing zero-address wallets - `BlockTrackResult::InFlight` re-emission for late-arriving wallets - `BlockTrackResult::AlreadyProcessed` skip path - Union+attribute false-positive elimination - Rescan from non-zero `synced_height` (not just genesis) - `BlocksManager::process_buffered_blocks` routing the wallet set * fix: skip re-enqueue when `BlocksPipeline` already tracks the hash `BlocksPipeline::queue` unconditionally called `coordinator.enqueue` for every entry, including hashes that were already pending or in flight from a prior call. When a late-arriving wallet match for an already-queued block was emitted as `BlocksNeeded`, the duplicate `enqueue` corrupted the coordinator's pending count and could trigger a duplicate request to the peer. Only enqueue when the hash is not yet tracked, while still merging the late wallet ids into the per-block wallet set. * fix: record `scanned_wallets` before `scan_batch` early returns `scan_batch` recorded the per-batch `scanned_wallets` set only after the empty-filters early return, so a batch that hit that path was committed with an empty set and the per-wallet `synced_height` never advanced for that range. The next tick relisted the same wallets via `wallets_behind` and re-scanned the same range. Move the `set_scanned_wallets` call ahead of the empty-filter and empty-address fast paths so every behind wallet's `synced_height` advances when the batch commits. Also pass each wallet's own `synced_height` as `min_synced` to `check_compact_filters_for_addresses` in `rescan_batch`. Otherwise a new address could spuriously match heights the wallet has already processed and `track_block_match` would route the result into `AlreadyProcessed` and silently drop it. * test: cover wallet-set exclusion routing and FFI count semantics Add a `BlocksManager` test that asserts a wallet absent from the pipeline's interested set never receives `process_block_for_wallets`, complementing the existing positive routing test. Add `dash-spv-ffi` callback tests covering the two behavioural changes of this PR: `BlocksNeeded` dispatch reports unique `FilterMatchKey` count (not inflated by the per-block wallet attribution), and `BlockProcessed` dispatch reports the total address count summed across the per-wallet `new_addresses` map.
1 parent 53b25c4 commit eb2226c

22 files changed

Lines changed: 2011 additions & 310 deletions

File tree

dash-spv-ffi/src/callbacks.rs

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ impl FFISyncEventCallbacks {
347347
} => {
348348
if let Some(cb) = self.on_blocks_needed {
349349
let ffi_blocks: Vec<FFIBlockNeeded> = blocks
350-
.iter()
350+
.keys()
351351
.map(|key| FFIBlockNeeded {
352352
height: key.height(),
353353
hash: *key.hash().as_byte_array(),
@@ -366,10 +366,11 @@ impl FFISyncEventCallbacks {
366366
let hash_bytes = block_hash.as_byte_array();
367367
let txid_bytes: Vec<[u8; 32]> =
368368
confirmed_txids.iter().map(|txid| *txid.as_byte_array()).collect();
369+
let total_new_addresses: usize = new_addresses.values().map(|v| v.len()).sum();
369370
cb(
370371
*height,
371372
hash_bytes as *const [u8; 32],
372-
new_addresses.len() as u32,
373+
total_new_addresses as u32,
373374
txid_bytes.as_ptr(),
374375
txid_bytes.len() as u32,
375376
self.user_data,
@@ -755,3 +756,84 @@ impl FFIWalletEventCallbacks {
755756
}
756757
}
757758
}
759+
760+
#[cfg(test)]
761+
mod tests {
762+
use super::*;
763+
use dashcore::hashes::Hash;
764+
use dashcore::{Address, BlockHash, Network, Txid};
765+
use key_wallet_manager::{FilterMatchKey, WalletId};
766+
use std::collections::{BTreeMap, BTreeSet};
767+
use std::sync::atomic::{AtomicU32, Ordering};
768+
769+
/// `BlocksNeeded` dispatch must pass exactly one entry per
770+
/// `FilterMatchKey` to the FFI callback (i.e. iterate keys, not
771+
/// inflated by the per-block wallet attribution).
772+
#[test]
773+
fn test_blocks_needed_dispatch_passes_unique_keys_count() {
774+
static COUNT: AtomicU32 = AtomicU32::new(u32::MAX);
775+
extern "C" fn cb(_blocks: *const FFIBlockNeeded, count: u32, _user: *mut c_void) {
776+
COUNT.store(count, Ordering::SeqCst);
777+
}
778+
779+
let callbacks = FFISyncEventCallbacks {
780+
on_blocks_needed: Some(cb),
781+
..FFISyncEventCallbacks::default()
782+
};
783+
784+
let mut blocks: BTreeMap<FilterMatchKey, BTreeSet<WalletId>> = BTreeMap::new();
785+
// Two distinct blocks, each attributed to two wallets. The dispatch
786+
// must report 2 (unique keys), not 4.
787+
blocks.insert(
788+
FilterMatchKey::new(10, BlockHash::from_byte_array([1u8; 32])),
789+
BTreeSet::from([[1u8; 32], [2u8; 32]]),
790+
);
791+
blocks.insert(
792+
FilterMatchKey::new(20, BlockHash::from_byte_array([2u8; 32])),
793+
BTreeSet::from([[1u8; 32], [2u8; 32]]),
794+
);
795+
796+
callbacks.dispatch(&SyncEvent::BlocksNeeded {
797+
blocks,
798+
});
799+
assert_eq!(COUNT.load(Ordering::SeqCst), 2);
800+
}
801+
802+
/// `BlockProcessed` dispatch must report the total address count
803+
/// summed across all per-wallet entries in the `new_addresses` map.
804+
#[test]
805+
fn test_block_processed_dispatch_sums_per_wallet_addresses() {
806+
static NEW_ADDR_COUNT: AtomicU32 = AtomicU32::new(u32::MAX);
807+
extern "C" fn cb(
808+
_height: u32,
809+
_hash: *const [u8; 32],
810+
new_address_count: u32,
811+
_txids: *const [u8; 32],
812+
_txid_count: u32,
813+
_user: *mut c_void,
814+
) {
815+
NEW_ADDR_COUNT.store(new_address_count, Ordering::SeqCst);
816+
}
817+
818+
let callbacks = FFISyncEventCallbacks {
819+
on_block_processed: Some(cb),
820+
..FFISyncEventCallbacks::default()
821+
};
822+
823+
let addr_a = Address::dummy(Network::Regtest, 1);
824+
let addr_b = Address::dummy(Network::Regtest, 2);
825+
let addr_c = Address::dummy(Network::Regtest, 3);
826+
let mut new_addresses: BTreeMap<WalletId, Vec<Address>> = BTreeMap::new();
827+
// Wallet 1 contributes 2 new addresses, wallet 2 contributes 1. Total = 3.
828+
new_addresses.insert([1u8; 32], vec![addr_a, addr_b]);
829+
new_addresses.insert([2u8; 32], vec![addr_c]);
830+
831+
callbacks.dispatch(&SyncEvent::BlockProcessed {
832+
block_hash: BlockHash::from_byte_array([7u8; 32]),
833+
height: 100,
834+
new_addresses,
835+
confirmed_txids: vec![Txid::from_byte_array([9u8; 32])],
836+
});
837+
assert_eq!(NEW_ADDR_COUNT.load(Ordering::SeqCst), 3);
838+
}
839+
}

dash-spv/src/sync/blocks/manager.rs

Lines changed: 110 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,17 @@ impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface> BlocksManager<H
7979
let mut events = Vec::new();
8080

8181
// Process blocks in height order using pipeline's ordering logic
82-
while let Some((block, height)) = self.pipeline.take_next_ordered_block() {
82+
while let Some((block, height, interested)) = self.pipeline.take_next_ordered_block() {
8383
let hash = block.block_hash();
8484

85-
// Process block through wallet
85+
// Process the block only for the wallets whose filter matched it.
86+
// Already-synced wallets that did not match are not touched.
8687
let mut wallet = self.wallet.write().await;
87-
let result = wallet.process_block(&block, height).await;
88+
let result = wallet.process_block_for_wallets(&block, height, &interested).await;
8889
drop(wallet);
8990

9091
let total_relevant = result.relevant_tx_count();
92+
let new_addresses_total: usize = result.new_addresses.values().map(|v| v.len()).sum();
9193
if total_relevant > 0 {
9294
tracing::info!(
9395
"Found {} relevant transactions ({} new, {} existing) {} at height {}, new addresses: {}",
@@ -96,19 +98,20 @@ impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface> BlocksManager<H
9698
result.existing_txids.len(),
9799
hash,
98100
height,
99-
result.new_addresses.len()
101+
new_addresses_total
100102
);
101103
}
102104

103105
// Collect confirmed txids before moving new_addresses out of result
104106
let confirmed_txids: Vec<_> = result.relevant_txids().cloned().collect();
105107

106108
// Collect new addresses for gap limit rescanning
107-
let new_addresses: Vec<_> = result.new_addresses.into_iter().collect();
108-
if !new_addresses.is_empty() {
109+
let new_addresses = result.new_addresses;
110+
if new_addresses_total > 0 {
109111
tracing::debug!(
110-
"Block {} generated {} new addresses for gap limit maintenance",
112+
"Block {} generated {} new addresses for gap limit maintenance across {} wallets",
111113
height,
114+
new_addresses_total,
112115
new_addresses.len()
113116
);
114117
}
@@ -168,9 +171,9 @@ mod tests {
168171
};
169172
use crate::sync::{ManagerIdentifier, SyncEvent, SyncManagerProgress};
170173
use crate::test_utils::MockNetworkManager;
171-
use key_wallet_manager::test_utils::MockWallet;
174+
use key_wallet_manager::test_utils::{MockWallet, MOCK_WALLET_ID};
172175
use key_wallet_manager::FilterMatchKey;
173-
use std::collections::BTreeSet;
176+
use std::collections::{BTreeMap, BTreeSet};
174177

175178
type TestBlocksManager =
176179
BlocksManager<PersistentBlockHeaderStorage, PersistentBlockStorage, MockWallet>;
@@ -215,8 +218,8 @@ mod tests {
215218
let requests = network.request_sender();
216219

217220
let block_hash = dashcore::BlockHash::dummy(0);
218-
let mut blocks = BTreeSet::new();
219-
blocks.insert(FilterMatchKey::new(100, block_hash));
221+
let mut blocks = BTreeMap::new();
222+
blocks.insert(FilterMatchKey::new(100, block_hash), BTreeSet::from([MOCK_WALLET_ID]));
220223
let event = SyncEvent::BlocksNeeded {
221224
blocks,
222225
};
@@ -227,4 +230,100 @@ mod tests {
227230
assert_eq!(manager.state(), SyncState::Syncing);
228231
assert!(events.is_empty());
229232
}
233+
234+
/// `process_buffered_blocks` must call `process_block_for_wallets` with
235+
/// the exact wallet set carried in the pipeline so already-synced
236+
/// wallets are not touched by routing logic.
237+
#[tokio::test]
238+
async fn test_process_buffered_blocks_routes_wallet_set() {
239+
use dashcore::block::Header;
240+
use dashcore::{Block, TxMerkleNode};
241+
use dashcore_hashes::Hash;
242+
243+
let mut manager = create_test_manager().await;
244+
manager.progress.set_state(SyncState::Syncing);
245+
246+
let header = Header {
247+
version: dashcore::blockdata::block::Version::from_consensus(1),
248+
prev_blockhash: dashcore::BlockHash::all_zeros(),
249+
merkle_root: TxMerkleNode::all_zeros(),
250+
time: 0,
251+
bits: dashcore::CompactTarget::from_consensus(0),
252+
nonce: 0,
253+
};
254+
let block = Block {
255+
header,
256+
txdata: vec![],
257+
};
258+
manager.pipeline.add_from_storage(block.clone(), 100, BTreeSet::from([MOCK_WALLET_ID]));
259+
260+
let events = manager.process_buffered_blocks().await.unwrap();
261+
assert!(matches!(events.first(), Some(SyncEvent::BlockProcessed { .. })));
262+
263+
// MOCK_WALLET_ID was in the routed set, so MockWallet recorded the
264+
// block. (MockWallet::process_block_for_wallets returns early when
265+
// its id is absent.)
266+
let processed = manager.wallet.read().await.processed_blocks();
267+
let processed = processed.lock().await;
268+
assert_eq!(processed.len(), 1);
269+
assert_eq!(processed[0].1, 100);
270+
}
271+
272+
/// A wallet that is NOT in the pipeline's interested set must not be
273+
/// routed the block. Two wallets are registered, but only `wallet_in`
274+
/// appears in the routed set; the other wallet's processed log must
275+
/// stay empty for that block.
276+
#[tokio::test]
277+
async fn test_process_buffered_blocks_excludes_uninterested_wallet() {
278+
use dashcore::block::Header;
279+
use dashcore::{Block, TxMerkleNode};
280+
use dashcore_hashes::Hash;
281+
use key_wallet_manager::test_utils::{MockWalletState, MultiMockWallet};
282+
use key_wallet_manager::WalletId;
283+
284+
let storage = DiskStorageManager::with_temp_dir().await.unwrap();
285+
let multi = MultiMockWallet::new();
286+
let wallet_in: WalletId = [0xAA; 32];
287+
let wallet_out: WalletId = [0xBB; 32];
288+
let multi = Arc::new(RwLock::new(multi));
289+
{
290+
let mut w = multi.write().await;
291+
w.insert_wallet(wallet_in, MockWalletState::default());
292+
w.insert_wallet(wallet_out, MockWalletState::default());
293+
}
294+
let mut manager: BlocksManager<
295+
PersistentBlockHeaderStorage,
296+
PersistentBlockStorage,
297+
MultiMockWallet,
298+
> = BlocksManager::new(multi.clone(), storage.block_headers(), storage.blocks()).await;
299+
manager.progress.set_state(SyncState::Syncing);
300+
301+
let header = Header {
302+
version: dashcore::blockdata::block::Version::from_consensus(1),
303+
prev_blockhash: dashcore::BlockHash::all_zeros(),
304+
merkle_root: TxMerkleNode::all_zeros(),
305+
time: 0,
306+
bits: dashcore::CompactTarget::from_consensus(0),
307+
nonce: 0,
308+
};
309+
let block = Block {
310+
header,
311+
txdata: vec![],
312+
};
313+
// Only wallet_in is in the routed set.
314+
manager.pipeline.add_from_storage(block.clone(), 100, BTreeSet::from([wallet_in]));
315+
316+
let _ = manager.process_buffered_blocks().await.unwrap();
317+
318+
let processed = multi.read().await.processed();
319+
let processed = processed.lock().await;
320+
// Exactly one entry, for wallet_in only.
321+
assert_eq!(processed.len(), 1);
322+
assert_eq!(processed[0].0, wallet_in);
323+
assert_eq!(processed[0].2, 100);
324+
assert!(
325+
!processed.iter().any(|(id, _, _)| *id == wallet_out),
326+
"wallet_out was not in the routed set, must not be processed"
327+
);
328+
}
230329
}

0 commit comments

Comments
 (0)