Skip to content

Commit 062e73c

Browse files
authored
feat(dash-spv): filter re-match across reorg and safe auto-rebroadcast (#170)
* feat(key-wallet-manager): add `WalletEvent::TxRepeatedlyOrphaned` variant New variant emitted by the SPV mempool manager when an outgoing transaction demoted by reorg has exhausted its rebroadcast retry budget. UI consumers surface this so the user can decide to abandon, re-sign, or wait. The `wallet_id` field is `Option<WalletId>` because the rebroadcast loop may emit this for a txid that is no longer attributable to any managed wallet. `WalletEvent::wallet_id()` is widened to `Option<WalletId>` to accommodate. * feat(dash-spv): drive wallet rewind and per-wallet effective floor on `ChainReorg` `FiltersManager` now calls `WalletInterface::rewind_to_height` when it observes a `ChainReorg` event, then resets its filter scan cursor to the effective per-wallet floor (`min(fork_height, wallet.synced_height)`) instead of always to `fork_height`. A wallet whose ingest tip is behind the fork never saw the orphaned chain, so the filter pipeline does not need to re-match the segment between its tip and the fork. If the wallet refuses the rewind (e.g. chainlock floor, defense in depth since the SPV cascade already enforces this upstream), log and fall back to `fork_height` for the floor without touching wallet state. Renames `reset_for_reorg`'s parameter to `effective_floor` to make the contract explicit. * feat(dash-spv): auto-rebroadcast reorg-demoted transactions with safety checks `MempoolManager` now subscribes to `WalletEvent` broadcasts and drives an auto-rebroadcast pipeline for transactions demoted by a chain reorg: - `WalletEvent::Reorg` enqueues each `demoted_txid` after a chainlock-floor defense check (the wallet already refuses to rewind below its chainlock floor upstream, this branch is belt-and-braces). - Block-height `nLockTime` is compared against the current chain tip; unsatisfied locktimes defer the broadcast. Timestamp locktimes are compared best-effort against wall clock, never against an MTP cache the SPV side does not maintain. - An input-conflict check evicts pending entries whose inputs collide with any transaction confirming on the new chain (delivered via `WalletEvent::BlockProcessed`). - Each attempt advances an exponential backoff capped at one hour. After `MAX_REORG_REBROADCAST_ATTEMPTS` (10) failures the entry is dropped and a `WalletEvent::TxRepeatedlyOrphaned` is emitted so the UI can surface it for user intervention. The manager owns a forwarder `broadcast::channel` so it can both subscribe to wallet events (replaceable via `set_wallet_event_subscription`) and emit `TxRepeatedlyOrphaned` without changing the trait surface. The constructor gains a `chainlock_height: Arc<AtomicU32>` parameter threaded through from `lifecycle.rs`. `MempoolManager` also picks up a `current_tip_height` field updated via `SyncManager::update_target_height`. Tests cover the happy path, input-conflict eviction, locktime defer followed by tip advance, and the retry-cap orphan event. * chore(dash-spv-ffi): mark new `TxRepeatedlyOrphaned` variant as not-yet-bridged Add the missing match arm in `FFIWalletEventCallbacks::dispatch`. The variant is left without an FFI surface for now (matching the existing `Reorg` arm) so the C ABI stays unchanged. Wiring a dedicated callback is deferred to a follow-up that surfaces orphaned-tx state to the UI. * chore: pr cleanup: fix FQ paths and remove what-comments * test(dash-spv): cover rebroadcast backoff, chainlock floor, and channel-driven paths Addresses four `manki-review` testing-coverage findings on PR #170: - Extend `test_reorg_demoted_tx_rebroadcast_happy_path` with a second immediate `drive_reorg_rebroadcast` call to assert the exponential backoff suppresses a duplicate broadcast and does not bump `attempts`. - Add `test_chainlock_floor_prevents_enqueue` exercising the chainlock-floor defense in `enqueue_demoted_for_rebroadcast` when `chainlock_height > current_tip_height`. - Add `test_input_conflict_via_channel_evicts_rebroadcast` so the `WalletEvent::BlockProcessed` arm of `drain_wallet_events` is exercised through the channel, not by calling `evict_conflicting_rebroadcasts` directly. - Extend `test_block_processed_removes_confirmed_txids` to seed `pending_rebroadcast` via `enqueue_demoted_for_rebroadcast` and assert the confirmed txid is removed from it, locking in the `BlockProcessed` handler's `pending_rebroadcast.remove(txid)` line. Promotes `enqueue_demoted_for_rebroadcast` to `pub(super)` so sibling-module tests can drive the rebroadcast queue through the public API. * ci: trigger workflows on feat/filter-rematch-and-rebroadcast * test(dash-spv): narrow `enqueue_demoted_for_rebroadcast` visibility and clarify chainlock-floor test Keep `enqueue_demoted_for_rebroadcast` private and expose a `#[cfg(test)] pub(super)` wrapper so sibling-module tests in `sync_manager.rs` can seed `pending_rebroadcast` without widening production visibility. Drop the unused `wallet_event_rx` / `set_wallet_event_subscription` setup from `test_chainlock_floor_prevents_enqueue` (the test exercises the direct call, not the channel path) and document the ordering invariant the `transactions` postcondition relies on (the guard must short-circuit before the wallet fetch/insert path).
1 parent ac05e6f commit 062e73c

8 files changed

Lines changed: 904 additions & 36 deletions

File tree

dash-spv-ffi/src/callbacks.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1215,6 +1215,14 @@ impl FFIWalletEventCallbacks {
12151215
// conflicted txid lists and the post-rewind balance.
12161216
// Until then this variant has no surface on the C ABI.
12171217
}
1218+
WalletEvent::TxRepeatedlyOrphaned {
1219+
..
1220+
} => {
1221+
// TODO(issue #146): wire a dedicated FFI callback so
1222+
// durable consumers can surface "this transaction has
1223+
// been orphaned too many times" to the UI. Until then
1224+
// this variant has no surface on the C ABI.
1225+
}
12181226
WalletEvent::ChainLockProcessed {
12191227
wallet_id,
12201228
chain_lock,

dash-spv/src/client/lifecycle.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,16 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
163163
// Build mempool manager if tracking is enabled
164164
if config.enable_mempool_tracking {
165165
let initial_revision = wallet.read().await.monitor_revision();
166-
managers.mempool = Some(MempoolManager::new(
166+
let wallet_event_rx = wallet.read().await.subscribe_events();
167+
let mut mempool = MempoolManager::new(
167168
wallet.clone(),
168169
config.mempool_strategy,
169170
config.max_mempool_transactions,
170171
initial_revision,
171-
));
172+
chainlock_height.clone(),
173+
);
174+
mempool.set_wallet_event_subscription(wallet_event_rx);
175+
managers.mempool = Some(mempool);
172176
}
173177

174178
// `reorg_generation` is held by each manager (via clones threaded in

dash-spv/src/sync/filters/manager.rs

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,16 +149,46 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet
149149
self.filter_pipeline = FiltersPipeline::new();
150150
}
151151

152-
/// Reset all sync state to begin again at `fork_height` after a reorg
153-
/// cascade. Clears in-flight batches and rewinds the per-batch
152+
/// Drive the per-wallet rewind and compute the effective floor for
153+
/// the filter re-match. Returns the lowest per-wallet `synced_height`
154+
/// after the rewind clamps each wallet to `min(fork_height, current)`.
155+
///
156+
/// The chainlock-floor invariant is enforced upstream by the reorg
157+
/// cascade, so a `BelowChainLockFloor` error here is defense in
158+
/// depth: log and fall back to `fork_height` for the floor without
159+
/// touching wallet state.
160+
pub(super) async fn rewind_wallet_for_reorg(&self, fork_height: u32) -> u32 {
161+
let mut wallet = self.wallet.write().await;
162+
match wallet.rewind_to_height(fork_height).await {
163+
Ok(_) => wallet.synced_height().min(fork_height),
164+
Err(e) => {
165+
tracing::warn!(
166+
"FiltersManager: wallet rewind to {} rejected: {}; using fork_height as floor",
167+
fork_height,
168+
e
169+
);
170+
fork_height
171+
}
172+
}
173+
}
174+
175+
/// Reset all sync state to begin again at `effective_floor` after a
176+
/// reorg cascade. Clears in-flight batches and rewinds the per-batch
154177
/// processing cursors so the next `FilterHeadersStored` event drives a
155-
/// fresh download starting at the truncated ancestor.
156-
pub(super) fn reset_for_reorg(&mut self, fork_height: u32) {
178+
/// fresh download starting at the effective floor.
179+
///
180+
/// `effective_floor` is the minimum of `fork_height` and the lowest
181+
/// per-wallet `synced_height` across the wallet set, computed after
182+
/// the wallet rewind. A wallet whose ingest tip is behind the fork
183+
/// point never saw the orphaned chain, so the filter pipeline does
184+
/// not need to re-match the segment between that wallet's tip and
185+
/// the fork.
186+
pub(super) fn reset_for_reorg(&mut self, effective_floor: u32) {
157187
self.reset_for_rescan();
158-
self.next_batch_to_store = fork_height;
159-
self.processing_height = fork_height;
160-
self.progress.update_committed_height(fork_height);
161-
self.progress.update_stored_height(fork_height);
188+
self.next_batch_to_store = effective_floor;
189+
self.processing_height = effective_floor;
190+
self.progress.update_committed_height(effective_floor);
191+
self.progress.update_stored_height(effective_floor);
162192
}
163193

164194
async fn load_filters(
@@ -2405,4 +2435,35 @@ mod tests {
24052435
events
24062436
);
24072437
}
2438+
2439+
#[tokio::test]
2440+
async fn test_rewind_wallet_for_reorg_uses_min_of_wallet_and_fork() {
2441+
let manager = create_test_manager().await;
2442+
manager.wallet.write().await.update_wallet_synced_height(&MOCK_WALLET_ID, 40);
2443+
2444+
let effective_floor = manager.rewind_wallet_for_reorg(100).await;
2445+
2446+
assert_eq!(
2447+
effective_floor, 40,
2448+
"wallet behind fork_height should drag floor down to its synced_height"
2449+
);
2450+
}
2451+
2452+
#[tokio::test]
2453+
async fn test_rewind_wallet_for_reorg_clamps_to_fork_height() {
2454+
let manager = create_test_manager().await;
2455+
manager.wallet.write().await.update_wallet_synced_height(&MOCK_WALLET_ID, 200);
2456+
2457+
let effective_floor = manager.rewind_wallet_for_reorg(100).await;
2458+
2459+
assert_eq!(
2460+
effective_floor, 100,
2461+
"wallet ahead of fork_height should be clamped and floor equals fork_height"
2462+
);
2463+
assert_eq!(
2464+
manager.wallet.read().await.wallet_synced_height(&MOCK_WALLET_ID),
2465+
100,
2466+
"rewind must clamp wallet's synced_height to fork_height"
2467+
);
2468+
}
24082469
}

dash-spv/src/sync/filters/sync_manager.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,15 @@ impl<
190190
..
191191
} => {
192192
tracing::info!(
193-
"FiltersManager: cascading ChainReorg, resetting state at {}",
193+
"FiltersManager: cascading ChainReorg, rewinding wallet at {}",
194194
fork_height
195195
);
196-
self.reset_for_reorg(*fork_height);
196+
let effective_floor = self.rewind_wallet_for_reorg(*fork_height).await;
197+
tracing::info!(
198+
"FiltersManager: re-matching filters from effective floor {}",
199+
effective_floor
200+
);
201+
self.reset_for_reorg(effective_floor);
197202
self.set_state(SyncState::WaitForEvents);
198203
return Ok(vec![]);
199204
}

0 commit comments

Comments
 (0)