Skip to content

Commit 707b14e

Browse files
committed
feat: rescan filters when a wallet falls behind committed height
When `wallet.synced_height()` drops below `FiltersManager`'s `progress.committed_height()`, a wallet was added or moved behind the current scan position and needs catch-up coverage. Add a check at the top of `FiltersManager::tick()` that detects this regression, clears in-flight pipeline state, lowers `committed_height` to the new aggregate min, and re-enters `start_download()`. The check runs in `Syncing`, `Synced`, and `WaitForEvents` states so idle additions are caught on the next 100ms tick. Add `test_wallet_added_at_runtime_catches_up` in `dash-spv/tests/dashd_sync/tests_basic.rs`. After initial sync with `W1`, mine a block funding `W2`'s address and add `W2` at runtime with `birth_height` before that block. Assert the rescan picks up `W2`'s funding transaction and `W1`'s state is unchanged. Then add `W3` with `birth_height` beyond the tip and assert no spurious rescan or regression in either existing wallet.
1 parent cf52324 commit 707b14e

2 files changed

Lines changed: 132 additions & 0 deletions

File tree

dash-spv/src/sync/filters/manager.rs

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1312,4 +1312,115 @@ mod tests {
13121312
assert_eq!(manager.state(), SyncState::Synced);
13131313
assert!(events.is_empty());
13141314
}
1315+
1316+
/// A wallet whose `synced_height` sits below the manager's `committed_height`
1317+
/// must trigger a rescan from the wallet's height. This simulates a wallet
1318+
/// being added at runtime behind current scan progress.
1319+
#[tokio::test]
1320+
async fn test_tick_rescans_when_wallet_falls_behind_committed() {
1321+
let mut manager = create_test_manager().await;
1322+
1323+
// Block headers required by start_download to resolve heights.
1324+
let headers = dashcore::block::Header::dummy_batch(0..201);
1325+
manager.header_storage.write().await.store_headers(&headers).await.unwrap();
1326+
1327+
// Manager believes filters are committed up to 100, with filter headers at 200.
1328+
manager.set_state(SyncState::Synced);
1329+
manager.progress.update_committed_height(100);
1330+
manager.progress.update_stored_height(100);
1331+
manager.progress.update_filter_header_tip_height(200);
1332+
manager.progress.update_target_height(200);
1333+
1334+
// Pre-populate in-flight state so we can verify clear_in_flight_state runs.
1335+
manager.active_batches.insert(101, FiltersBatch::new(101, 200, HashMap::new()));
1336+
manager.filters_matched.insert(dashcore::block::Header::dummy(0).block_hash());
1337+
manager.filter_pipeline.init(101, 200);
1338+
1339+
// MockWallet defaults to synced_height=0, so wallets_behind(100) = {MOCK_WALLET_ID}.
1340+
assert_eq!(manager.wallet.read().await.synced_height(), 0);
1341+
1342+
let (tx, _rx) = unbounded_channel();
1343+
let requests = RequestSender::new(tx);
1344+
1345+
let _events = manager.tick(&requests).await.unwrap();
1346+
1347+
// Old in-flight state was cleared and a fresh batch was created at scan_start=0.
1348+
assert!(!manager.active_batches.contains_key(&101));
1349+
assert!(manager.active_batches.contains_key(&0));
1350+
assert!(manager.filters_matched.is_empty());
1351+
1352+
// committed_height was lowered and start_download set it to scan_start - 1 = 0.
1353+
assert_eq!(manager.progress.committed_height(), 0);
1354+
assert_eq!(manager.state(), SyncState::Syncing);
1355+
}
1356+
1357+
/// When every managed wallet is at or beyond `committed_height`, the rescan
1358+
/// trigger must not fire even though the aggregate `synced_height` could
1359+
/// otherwise look stale.
1360+
#[tokio::test]
1361+
async fn test_tick_does_not_rescan_when_no_wallets_behind() {
1362+
let mut manager = create_test_manager().await;
1363+
1364+
// Wallet at synced_height=200, manager committed at 100 → no wallets behind.
1365+
manager.wallet.write().await.update_wallet_synced_height(&MOCK_WALLET_ID, 200);
1366+
1367+
manager.set_state(SyncState::Synced);
1368+
manager.progress.update_committed_height(100);
1369+
manager.progress.update_stored_height(100);
1370+
manager.progress.update_filter_header_tip_height(200);
1371+
manager.progress.update_target_height(200);
1372+
1373+
let (tx, _rx) = unbounded_channel();
1374+
let requests = RequestSender::new(tx);
1375+
1376+
let events = manager.tick(&requests).await.unwrap();
1377+
1378+
assert!(events.is_empty());
1379+
assert_eq!(manager.progress.committed_height(), 100);
1380+
assert_eq!(manager.state(), SyncState::Synced);
1381+
assert!(manager.active_batches.is_empty());
1382+
}
1383+
1384+
/// `committed_height = 0` on a fresh manager must not falsely trip the
1385+
/// rescan trigger. `wallets_behind(0)` returns an empty set since heights
1386+
/// are unsigned, so no wallet can be strictly less than 0.
1387+
#[tokio::test]
1388+
async fn test_tick_does_not_rescan_at_genesis_committed() {
1389+
let mut manager = create_test_manager().await;
1390+
// Default state: committed_height=0, wallet synced_height=0, state=WaitForEvents.
1391+
assert_eq!(manager.progress.committed_height(), 0);
1392+
assert_eq!(manager.state(), SyncState::WaitForEvents);
1393+
1394+
let (tx, _rx) = unbounded_channel();
1395+
let requests = RequestSender::new(tx);
1396+
1397+
let events = manager.tick(&requests).await.unwrap();
1398+
1399+
assert!(events.is_empty());
1400+
assert!(manager.is_idle());
1401+
assert_eq!(manager.state(), SyncState::WaitForEvents);
1402+
}
1403+
1404+
/// The rescan trigger only fires in `Syncing | Synced | WaitForEvents`.
1405+
/// `WaitingForConnections` must be skipped since we're not actively syncing.
1406+
#[tokio::test]
1407+
async fn test_tick_does_not_rescan_in_waiting_for_connections() {
1408+
let mut manager = create_test_manager().await;
1409+
manager.set_state(SyncState::WaitingForConnections);
1410+
manager.progress.update_committed_height(100);
1411+
1412+
// Wallet behind committed — would normally trip the trigger.
1413+
assert!(!manager.wallet.read().await.wallets_behind(100).is_empty());
1414+
1415+
let (tx, _rx) = unbounded_channel();
1416+
let requests = RequestSender::new(tx);
1417+
1418+
let events = manager.tick(&requests).await.unwrap();
1419+
1420+
assert!(events.is_empty());
1421+
// committed_height not lowered, no batches created.
1422+
assert_eq!(manager.progress.committed_height(), 100);
1423+
assert_eq!(manager.state(), SyncState::WaitingForConnections);
1424+
assert!(manager.active_batches.is_empty());
1425+
}
13151426
}

dash-spv/src/sync/filters/sync_manager.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,27 @@ impl<
193193
}
194194

195195
async fn tick(&mut self, requests: &RequestSender) -> SyncResult<Vec<SyncEvent>> {
196+
// Detect a wallet that was added behind our scan progress and rescan
197+
// from its `synced_height`.
198+
if matches!(self.state(), SyncState::Syncing | SyncState::Synced | SyncState::WaitForEvents)
199+
{
200+
let committed = self.progress.committed_height();
201+
let wallet_read = self.wallet.read().await;
202+
let behind = wallet_read.wallets_behind(committed);
203+
let wallet_min_synced = wallet_read.synced_height();
204+
drop(wallet_read);
205+
if !behind.is_empty() {
206+
tracing::info!(
207+
"Wallet synced_height {} fell below filter committed_height {}, restarting scan",
208+
wallet_min_synced,
209+
committed
210+
);
211+
self.clear_in_flight_state();
212+
self.progress.update_committed_height(wallet_min_synced);
213+
return self.start_download(requests).await;
214+
}
215+
}
216+
196217
// TODO: Get rid of the send pending in here? Or decouple it from the header storage?
197218
// Run tick when Syncing OR when Synced with pending work (new blocks arriving)
198219
let has_pending_work = !self.active_batches.is_empty();

0 commit comments

Comments
 (0)