feat: per-wallet filter scan and runtime wallet catch-up #118
feat: per-wallet filter scan and runtime wallet catch-up #118xdustinface wants to merge 2 commits into
Conversation
Filter matching and block processing now operate per wallet, so a wallet added at runtime catches up without disturbing the wallets that are already in sync. `WalletInterface` is restructured around per-wallet operations: - `process_block_for_wallets(block, height, wallets)` replaces the global `process_block` and only updates the listed wallets. - `wallets_behind(height)` returns the wallet ids that still need filter coverage at `height`. - `monitored_addresses_for(wallet_id)` and `wallet_synced_height(wallet_id)` give per-wallet projections for filter matching. - `update_wallet_synced_height` and `update_wallet_last_processed_height` advance one wallet at a time and are monotonic. - `BlockProcessingResult.new_addresses` and `CheckTransactionsResult.new_addresses` carry gap-limit discoveries with wallet attribution. `FiltersManager.scan_batch` matches each behind wallet's addresses against the batch's filters at heights it hasn't yet covered. The per-block result flows through `BlocksNeeded` to `BlocksManager`, which processes each block only against the wallets whose filters matched it. `FiltersBatch` records the scanned wallet set so commit advances only their `synced_height`. When a late-added wallet's filter matches a block already in flight, its id is merged into the existing entry. If the block has already been processed, it is re-queued so `BlocksManager` reloads it from local storage and processes it for the late wallet only. `process_block_for_wallets` refreshes the cached balance even on rescan paths below the wallet's current `last_processed_height`, because UTXOs may change.
When `wallet.synced_height()` drops below `FiltersManager`'s `progress.committed_height()`, a wallet was added or moved behind the current scan position and needs catch-up coverage. Add a check at the top of `FiltersManager::tick()` that detects this regression, clears in-flight pipeline state, lowers `committed_height` to the new aggregate min, and re-enters `start_download()`. The check runs in `Syncing`, `Synced`, and `WaitForEvents` states so idle additions are caught on the next 100ms tick. Add `test_wallet_added_at_runtime_catches_up` in `dash-spv/tests/dashd_sync/tests_basic.rs`. After initial sync with `W1`, mine a block funding `W2`'s address and add `W2` at runtime with `birth_height` before that block. Assert the rescan picks up `W2`'s funding transaction and `W1`'s state is unchanged. Then add `W3` with `birth_height` beyond the tip and assert no spurious rescan or regression in either existing wallet.
|
Manki — Review complete Planner (28s) Review — 24 findings Judge — 24 kept · 0 dropped (41s) Review metadataConfig:
Judge decisions:
Timing:
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## refactor/heights-per-wallet #118 +/- ##
===============================================================
+ Coverage 70.52% 70.63% +0.11%
===============================================================
Files 319 319
Lines 66758 66995 +237
===============================================================
+ Hits 47080 47323 +243
+ Misses 19678 19672 -6
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Per-wallet filter scan introduces real correctness risks around late-added wallets — empty-address wallets get their height advanced without scanning, and the global rescan on catch-up can re-process blocks for already-synced wallets. Performance also regresses to O(N_wallets × batch_size) due to per-wallet filter cloning.
📊 24 findings (9 warning, 10 suggestion, 5 nitpick) · 1128 lines · 958s
Review stats
{
"model": "claude-sonnet-4-6",
"reviewTimeMs": 957605,
"diffLines": 1128,
"diffAdditions": 858,
"diffDeletions": 270,
"filesReviewed": 20,
"agents": [
"Correctness & Logic",
"Architecture & Design",
"Testing & Coverage",
"Performance & Efficiency"
],
"findingsRaw": 24,
"findingsKept": 24,
"findingsDropped": 0,
"severity": {
"blocker": 0,
"warning": 9,
"suggestion": 10,
"nitpick": 5
},
"verdict": "REQUEST_CHANGES",
"prNumber": 118,
"commitSha": "5603c6a00ca5daa01d73a02cbfc4006a28c3fb16",
"agentMetrics": [
{
"name": "Correctness & Logic",
"findingsRaw": 6,
"findingsKept": 6,
"responseLength": 7706
},
{
"name": "Architecture & Design",
"findingsRaw": 6,
"findingsKept": 6,
"responseLength": 7376
},
{
"name": "Testing & Coverage",
"findingsRaw": 6,
"findingsKept": 6,
"responseLength": 5278
},
{
"name": "Performance & Efficiency",
"findingsRaw": 6,
"findingsKept": 6,
"responseLength": 5649
}
],
"judgeMetrics": {
"confidenceDistribution": {
"high": 14,
"medium": 10,
"low": 0
},
"severityChanges": 24,
"mergedDuplicates": 0,
"defensiveHardeningCount": 1,
"verdictReason": "novel_suggestion"
},
"fileMetrics": {
"fileTypes": {
".rs": 20
},
"findingsPerFile": {
"dash-spv/src/sync/filters/manager.rs": 18,
"dash-spv/src/sync/filters/sync_manager.rs": 1,
"dash-spv/src/sync/blocks/manager.rs": 2,
"dash-spv/src/sync/blocks/pipeline.rs": 2,
"dash-spv/src/sync/blocks/sync_manager.rs": 1
}
},
"reviewerModel": "claude-sonnet-4-6",
"judgeModel": "claude-opus-4-7"
}| /// again for whichever wallets the caller passes via `BlocksNeeded`. | ||
| fn track_block_match(&mut self, key: &FilterMatchKey, batch_start: u32) -> bool { | ||
| if self.blocks_remaining.contains_key(key.hash()) { | ||
| return false; |
There was a problem hiding this comment.
🟠 process_block_for_wallets
The track_block_match helper re-queues a block that was already fully processed (present in filters_matched but absent from blocks_remaining) when a new wallet's scan matches it. BlocksManager then reloads the block from local storage and calls process_block_for_wallets for the already-processed wallet. A comment in scan_batch asserts this path is idempotent 'through the monotonic last_processed_height', but that contract cannot be verified from this diff. If process_block_for_wallets does not guard against re-processing a block whose height is ≤ the wallet's last_processed_height, the affected wallet will accumulate duplicate transaction records for every re-queued block, corrupting its balance and history.
Suggested fix
| return false; | |
| Add a height guard at the top of the `process_block_for_wallets` implementation (or inside the per-wallet branch) that skips wallets where `wallet.last_processed_height() >= block_height`. Also add a unit test that calls `process_block_for_wallets` twice for the same block and asserts the transaction list length is unchanged after the second call. |
AI context
{
"file": "dash-spv/src/sync/filters/manager.rs",
"line": 142,
"severity": "warning",
"confidence": "medium",
"flaggedBy": [
"Correctness & Logic"
],
"title": "Re-queued blocks depend on unverifiable idempotency of `process_block_for_wallets`",
"fix": "Add a height guard at the top of the `process_block_for_wallets` implementation (or inside the per-wallet branch) that skips wallets where `wallet.last_processed_height() >= block_height`. Also add a ",
"reachability": "unknown"
}| batch.set_scanned_wallets(behind); | ||
| } | ||
|
|
||
| if wallet_states.is_empty() { |
There was a problem hiding this comment.
🟠 synced_height advanced without being scanned
In scan_batch, wallets_behind(batch_end + 1) returns all wallets whose synced_height ≤ batch_end. Those without monitored addresses are excluded from wallet_states (no actual filter matching), but the full behind set—including empty-address wallets—is stored in scanned_wallets and used to advance synced_height to batch_end at commit time. If a wallet is initialised before its accounts/address-pool are fully derived (e.g., the account creation is async or deferred), its height will be advanced past blocks it should have scanned, causing it to permanently miss historical transactions for those addresses.
Suggested fix
| if wallet_states.is_empty() { | |
| Only include a wallet in `scanned_wallets` if it was either (a) actually scanned (i.e., present in `wallet_states`), or (b) explicitly has no addresses and its birth-height logic guarantees no relevant transactions exist below the batch end: | |
| ```rust | |
| let scanned_ids: BTreeSet<WalletId> = wallet_states.iter().map(|(id, _, _)| *id).collect(); | |
| if let Some(batch) = self.active_batches.get_mut(&batch_start) { | |
| batch.set_scanned_wallets(scanned_ids); | |
| } | |
| ``` | |
| Note this does mean empty-address wallets will not have their `synced_height` advanced here; a separate mechanism (e.g., at address-generation time) would need to trigger a rescan if addresses are added later. |
AI context
{
"file": "dash-spv/src/sync/filters/manager.rs",
"line": 730,
"severity": "warning",
"confidence": "medium",
"flaggedBy": [
"Correctness & Logic"
],
"title": "Empty-address wallets get `synced_height` advanced without being scanned",
"fix": "Only include a wallet in `scanned_wallets` if it was either (a) actually scanned (i.e., present in `wallet_states`), or (b) explicitly has no addresses and its birth-height logic guarantees no relevan",
"reachability": "unknown"
}| @@ -194,6 +193,27 @@ impl< | |||
| } | |||
There was a problem hiding this comment.
🔴
When tick detects a wallet whose synced_height is below committed_height, it resets committed_height to the stale wallet's scan start (0 in the worst case) and wipes all active batches. The subsequent scan_batch call includes every wallet whose synced_height ≤ batch_end, so already-fully-synced wallets are pulled back into filter matching for heights they already processed. For a synced wallet at height 100 000 added to a system where a new wallet has birth height 0, every one of the 100 000 blocks is re-matched. Combined with finding #1 (re-queue on match), this multiplies the idempotency risk: each re-matched block is re-queued and process_block_for_wallets is re-invoked for the already-synced wallet.
Suggested fix
| } | |
| Rather than resetting `committed_height` for all wallets, track a per-wallet 'needs catch-up from height X' flag and initiate a targeted rescan only for the behind wallet, without disturbing other wallets' committed progress. If a global reset is required for architectural reasons, add an explicit idempotency guard in `process_block_for_wallets` (see finding #1) and document the expected O(wallets × history) cost so it is not accidentally triggered in a tight loop. |
AI context
{
"file": "dash-spv/src/sync/filters/sync_manager.rs",
"line": 193,
"severity": "warning",
"confidence": "high",
"flaggedBy": [
"Correctness & Logic"
],
"title": "Runtime wallet catch-up re-runs filter matching for already-synced wallets",
"fix": "Rather than resetting `committed_height` for all wallets, track a per-wallet 'needs catch-up from height X' flag and initiate a targeted rescan only for the behind wallet, without disturbing other wal",
"reachability": "reachable"
}| .filter(|(key, _)| key.height() > *wallet_synced) | ||
| .map(|(key, filter)| (key.clone(), filter.clone())) | ||
| .collect(); | ||
| if relevant.is_empty() { |
There was a problem hiding this comment.
🔴 ✨ Suggestion: Inconsistent null-safety: silent skip on get_mut followed by unwrap on get
In scan_batch, the set_scanned_wallets call is guarded by if let Some(batch) = self.active_batches.get_mut(&batch_start) (silently a no-op if the batch is absent), but four lines later self.active_batches.get(&batch_start).unwrap().filters() panics on the same condition. These two paths are inconsistent: either the batch is guaranteed to exist (in which case both should panic/expect with a diagnostic) or it is not (in which case both should return early). A panic here would be especially confusing because the set_scanned_wallets no-op would have already run without error.
Suggested fix
| if relevant.is_empty() { | |
| Replace both accesses with a single guard: | |
| ```rust | |
| let Some(batch) = self.active_batches.get_mut(&batch_start) else { | |
| return Ok(events); | |
| }; | |
| batch.set_scanned_wallets(behind); | |
| // ... | |
| let batch_filters = self.active_batches | |
| .get(&batch_start) | |
| .expect("batch inserted above must still exist") | |
| .filters(); | |
| ``` |
AI context
{
"file": "dash-spv/src/sync/filters/manager.rs",
"line": 745,
"severity": "suggestion",
"confidence": "high",
"flaggedBy": [
"Correctness & Logic"
],
"title": "Inconsistent null-safety: silent skip on `get_mut` followed by `unwrap` on `get`",
"fix": "Replace both accesses with a single guard:\n```rust\nlet Some(batch) = self.active_batches.get_mut(&batch_start) else {\n return Ok(events);\n};\nbatch.set_scanned_wallets(behind);\n// ...\nlet batch_filt",
"reachability": "reachable"
}| } | ||
| self.blocks_remaining.insert(*key.hash(), (key.height(), batch_start)); | ||
| self.filters_matched.insert(*key.hash()); | ||
| true |
There was a problem hiding this comment.
🟠 ✨ Suggestion: filters_matched purpose changed but name unchanged — misleads future guards
track_block_match no longer checks filters_matched before inserting; the old guard (if self.filters_matched.contains(key.hash()) { continue; }) was removed from both scan_batch and rescan_batch. The field is still written on every match but is never read as a deduplication gate. A future contributor who re-adds the filters_matched check to prevent re-processing will silently break the late-wallet catch-up path, because the set permanently retains hashes of already-processed blocks. The name implies active deduplication semantics but the actual role is now a history log used only for cleanup on rescan restart.
Suggested fix
| true | |
| // Rename field: | |
| blocks_ever_matched: HashSet<BlockHash>, | |
| /// Hashes of all blocks that have ever matched a filter in this session. | |
| /// NOT a re-processing gate — `blocks_remaining` governs in-flight deduplication. | |
| /// A block absent from `blocks_remaining` but present here is re-queued to | |
| /// serve late-added wallets; see `track_block_match`. |
AI context
{
"file": "dash-spv/src/sync/filters/manager.rs",
"line": 146,
"severity": "suggestion",
"confidence": "medium",
"flaggedBy": [
"Architecture & Design"
],
"title": "filters_matched purpose changed but name unchanged — misleads future guards",
"fix": "// Rename field:\nblocks_ever_matched: HashSet<BlockHash>,\n\n/// Hashes of all blocks that have ever matched a filter in this session.\n/// NOT a re-processing gate — `blocks_remaining` governs in-flight",
"reachability": "reachable"
}| assert_eq!(manager.wallet.read().await.wallet_synced_height(&MOCK_WALLET_ID), 4999); | ||
| } | ||
|
|
||
| #[tokio::test] |
There was a problem hiding this comment.
🔴 ✨ Suggestion: test_batch_commit_advances_only_scanned_wallets tests single wallet; multi-wallet isolation untested
The test verifies that a wallet in scanned_wallets advances while an empty set does not, but it never has two wallets simultaneously where one is in scanned_wallets and the other is not. The current test would pass even if the implementation advanced all wallets rather than only scanned ones, as long as the default synced_height is 0 and matches expectations.
Suggested fix
| #[tokio::test] | |
| Add a second wallet ID to the manager, set it to `synced_height=200`, and include only `MOCK_WALLET_ID` in `scanned_wallets`. After commit, assert that the second wallet's synced_height remains 200 while `MOCK_WALLET_ID` advances to the batch end. |
AI context
{
"file": "dash-spv/src/sync/filters/manager.rs",
"line": 1047,
"severity": "suggestion",
"confidence": "high",
"flaggedBy": [
"Testing & Coverage"
],
"title": "test_batch_commit_advances_only_scanned_wallets tests single wallet; multi-wallet isolation untested",
"fix": "Add a second wallet ID to the manager, set it to `synced_height=200`, and include only `MOCK_WALLET_ID` in `scanned_wallets`. After commit, assert that the second wallet's synced_height remains 200 wh",
"reachability": "reachable"
}| batch.set_scanned_wallets(behind); | ||
| } | ||
|
|
||
| if wallet_states.is_empty() { |
There was a problem hiding this comment.
🔴
Inside scan_batch, for every behind-wallet a new HashMap<FilterMatchKey, BlockFilter> is materialised by cloning. BlockFilter owns a Vec<u8> (typically hundreds of bytes of filter data), so for a 5 000-filter batch with 5 wallets this is 25 000 clone allocations per scan cycle, compared to 5 000 in the old unified scan. This is the hottest path in the entire filter sync pipeline and will degrade significantly as wallet count or batch size grows. Fix: change check_compact_filters_for_addresses to accept impl IntoIterator<Item = (&FilterMatchKey, &BlockFilter)> so a filtered borrow-iterator can be passed without copying, or wrap filter values in Arc<BlockFilter> to make cloning O(1).
Suggested fix
| if wallet_states.is_empty() { | |
| // Instead of cloning into a new HashMap: | |
| let matches = check_compact_filters_for_addresses( | |
| batch_filters.iter().filter(|(key, _)| key.height() > *wallet_synced), | |
| addresses.clone(), | |
| ); | |
| // Requires updating the signature of check_compact_filters_for_addresses | |
| // to accept IntoIterator<Item=(&FilterMatchKey, &BlockFilter)>. |
AI context
{
"file": "dash-spv/src/sync/filters/manager.rs",
"line": 730,
"severity": "warning",
"confidence": "high",
"flaggedBy": [
"Performance & Efficiency"
],
"title": "BlockFilter cloned per wallet per scan batch — O(N_wallets × batch_size) allocations",
"fix": "// Instead of cloning into a new HashMap:\nlet matches = check_compact_filters_for_addresses(\n batch_filters.iter().filter(|(key, _)| key.height() > *wallet_synced),\n addresses.clone(),\n);\n// Req",
"reachability": "reachable"
}| let batch_filters = self.active_batches.get(&batch_start).unwrap().filters(); | ||
| let mut block_to_wallets: BTreeMap<FilterMatchKey, BTreeSet<WalletId>> = BTreeMap::new(); | ||
| for (wallet_id, wallet_synced, addresses) in &wallet_states { | ||
| let relevant: HashMap<FilterMatchKey, BlockFilter> = batch_filters |
There was a problem hiding this comment.
🔴
The inner loop in scan_batch calls check_compact_filters_for_addresses once per behind-wallet, each time re-iterating its own relevant filter slice. For N wallets the total work is N × batch_size filter evaluations, whereas a union-then-attribute approach (scan once, then for each match check which wallet's address it belongs to) would be O(batch_size + N × addresses_per_wallet). In production with many wallets and large batches this is a significant regression. A hybrid: union all addresses into a single scan, then intersect the matched block set back against per-wallet address sets. The attribution step is much cheaper than re-scanning the same filter data N times.
AI context
{
"file": "dash-spv/src/sync/filters/manager.rs",
"line": 740,
"severity": "warning",
"confidence": "high",
"flaggedBy": [
"Performance & Efficiency"
],
"title": "Per-wallet independent filter scans make scan cost O(N_wallets × batch_size)",
"reachability": "reachable"
}| @@ -473,7 +492,9 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet | |||
| .collect(); | |||
There was a problem hiding this comment.
🟠 ✨ Suggestion: addresses_by_wallet cloned for every later-batch rescan call
rescan_batch takes HashMap<WalletId, HashSet> by value, so the caller must .clone() it for the current batch and again for every later batch in the for later_start in later_batches loop. For wallets with large address pools this copies all address strings on every call. Change rescan_batch to accept &HashMap<WalletId, HashSet> and iterate by reference internally; the one place that needs into_iter() consuming behaviour can use .iter() + a reference walk instead. Note: this also needs check_compact_filters_for_addresses to accept &[Address] or similar — coordinate with the fix for finding 1.
Suggested fix
| .collect(); | |
| pub(super) async fn rescan_batch( | |
| &mut self, | |
| batch_start: u32, | |
| new_addresses: &HashMap<WalletId, HashSet<Address>>, | |
| ) -> SyncResult<Vec<SyncEvent>> { ... } |
AI context
{
"file": "dash-spv/src/sync/filters/manager.rs",
"line": 491,
"severity": "suggestion",
"confidence": "medium",
"flaggedBy": [
"Performance & Efficiency"
],
"title": "addresses_by_wallet cloned for every later-batch rescan call",
"fix": "pub(super) async fn rescan_batch(\n &mut self,\n batch_start: u32,\n new_addresses: &HashMap<WalletId, HashSet<Address>>,\n) -> SyncResult<Vec<SyncEvent>> { ... }",
"reachability": "reachable"
}| self.pipeline.add_from_storage(hashed_block.block().clone(), key.height()); | ||
| self.pipeline.add_from_storage( | ||
| hashed_block.block().clone(), | ||
| key.height(), |
There was a problem hiding this comment.
🔴 ✨ Suggestion: Unnecessary clone of wallets BTreeSet in block-loading loop
In the for (key, wallets) in blocks loop, wallets is cloned for both the storage path (add_from_storage) and the download path (to_download.push). Because each branch is mutually exclusive (the storage path continues and the download path is the last statement), ownership of wallets can be moved in both cases, eliminating both clones. This is in the block-loading hot path called for every BlocksNeeded event.
Suggested fix
| key.height(), | |
| // Storage path — move wallets: | |
| self.pipeline.add_from_storage(hashed_block.block().clone(), key.height(), wallets); | |
| self.progress.add_from_storage(1); | |
| continue; | |
| // Download path — move key and wallets: | |
| to_download.push((key, wallets)); |
AI context
{
"file": "dash-spv/src/sync/blocks/sync_manager.rs",
"line": 141,
"severity": "suggestion",
"confidence": "high",
"flaggedBy": [
"Performance & Efficiency"
],
"title": "Unnecessary clone of wallets BTreeSet in block-loading loop",
"fix": "// Storage path — move wallets:\nself.pipeline.add_from_storage(hashed_block.block().clone(), key.height(), wallets);\nself.progress.add_from_storage(1);\ncontinue;\n\n// Download path — move key and walle",
"reachability": "reachable"
}
No description provided.