Skip to content

Commit b879fe1

Browse files
authored
feat(dash-spv): masternode list and quorum rewind across reorg (#165)
* feat(dash): add `MasternodeListEngine::truncate_above` for reorg rewind Adds a primitive that drops every piece of cached state whose anchor sits above a target height: `masternode_lists`, hash-keyed `known_snapshots` and `rotated_quorums_per_cycle` (via `block_container` lookup), and `quorum_statuses` height sets. Orphaned hashes are dropped conservatively. The container is trimmed last so hash lookups remain valid during cleanup. Idempotent under repeated invocation. * feat(dash-spv): add `MasternodesManager::rewind_to_height` Truncates the shared masternode engine, prunes the manager's height-tracked sync state, refreshes `last_synced_block_hash` from the surviving engine tip, clears straggler/dedup state, transitions to `Syncing`, and dispatches a fresh QRInfo for the new tip. * feat(dash-spv): trigger masternode rewind on `ChainReorg` `MasternodesManager::handle_sync_event` now intercepts `SyncEvent::ChainReorg` before any other arm and delegates to `rewind_to_height`, mirroring the cascade pattern already in `FilterHeadersManager` and `BlocksManager`. * feat(dash-spv): hard-block chainlock validation across reorg `ChainLockManager` listens for `SyncEvent::ChainReorg` and flips `masternode_ready` back to `false` while dropping any cached `pending_validation`. Subsequent chainlocks are cached until the next `MasternodeStateUpdated` flips readiness back on, which mirrors the pre-startup path. * feat(dash-spv): drop pending instantlocks across reorg `InstantSendManager` listens for `SyncEvent::ChainReorg` and clears `pending_instantlocks`. Pre-reorg IS locks were validated against a quorum view that no longer matches the new chain, so retrying them would either succeed under the wrong cycle or fail spuriously. Future IS locks re-received from the network are picked up via the existing `MasternodeStateUpdated` retry path. Also drops the dead `MasternodesManager::is_synced_for_height` query and its test. * test(dash-spv): cover masternode-list rewind across DIP-3 reorg Adds a regtest integration test that orchestrates a reorg via the existing controller node (`invalidateblock` + replacement chain), observes the SPV's `ChainReorg` event, verifies the engine truncates above the fork before refilling, and confirms a post-rewind `MasternodeStateUpdated` lands above the fork. Also adds `MasternodeTestContext::mine_reorg` and a `wait_for_chain_reorg_event` helper. * test(dash-spv): cover qrinfo refresh across DIP-24 cycle reorg Adds an integration test for the full DIP-24 rotation-cycle reorg path: mine a DKG cycle, capture its commitment hash, invalidate enough to roll back the commitment block, mine a replacement DKG cycle, and verify the engine drops the orphaned cycle from `rotated_quorums_per_cycle` and refills it under the replacement key. The replacement DKG is flaky on the existing regtest masternode harness so the test is gated behind `#[ignore]` with a tracking note tied to #142. The body still compiles to catch refactors that break the helper surface. * chore(dash-spv): apply rustfmt to reorg integration test scaffolding * chore: pr cleanup — move inline imports to `mod tests` top, remove what-comments, import `Range` * test(dash-spv): wait for full filter sync before triggering reorg in `test_masternode_list_rewind_across_dip3_reorg` On macOS ARM, the filter pipeline had not yet downloaded filters for the 3 most recent blocks when `mine_reorg` was called. After the reorg those block hashes were orphaned, so dashd disconnected every reconnect attempt that included a `GetCFilters` for an orphaned stop hash, preventing the `GetHeaders2` fork-detection request from ever getting a response and causing the 60 s `wait_for_chain_reorg_event` timeout. Ubuntu/Windows passed because the slower VMs gave the filter pipeline enough time to finish before the reorg was triggered. Fix: add `wait_for_full_sync` (waits for `SyncProgress::is_synced()`) and call it after `wait_for_mn_state_event_above` and before `mine_reorg`, ensuring no orphan-hash filter requests remain in flight. * fix: merge engine lock acquisitions and add error recovery in `rewind_to_height` Addresses manki-review comments on PR #165 #165 (comment) #165 (comment) #165 (comment) * refactor: encapsulate `ChainLockManager` reorg state via `reset_for_reorg()` Addresses manki-review comment on PR #165 #165 (comment) * test: add optional fork-height filter to `wait_for_chain_reorg_event` Addresses manki-review comment on PR #165 #165 (comment) * fix(`ChainLockManager`): preserve `pending_validation` across peer disconnect Adds `reset_for_disconnect()` that clears only `masternode_ready`, and switches `on_disconnect` to use it instead of `reset_for_reorg()`. A chainlock cached before masternode sync completes remains valid on the same chain and must survive disconnect so `on_masternode_ready` can re-validate it on the next reconnect. Addresses manki-review comment on PR #165 #165 (comment) * test(`MasternodesManager`): cover `rewind_to_height` send failure path Adds `test_rewind_to_height_sets_wait_for_events_on_send_failure` to assert that when `send_qrinfo_for_tip` errors (dropped channel), the manager transitions to `WaitForEvents` and clears `qrinfo_in_flight`, leaving it recoverable by the next `BlockHeaderSyncComplete` event. Addresses manki-review comment on PR #165 #165 (comment) * feat(`dash-spv`): extend buffered fork branches across multi-message reorg announcements dashd announces reorgs as N separate `headers2` messages (one header each, because each `generatetoaddress` block produces a `headers` push). The first batch enters the fork buffer via `ingest_fork`. Subsequent batches whose `prev_blockhash` is the previous fork tip re-entered `ingest_fork`, which then failed chain-continuity against the *active-chain* ancestor and was swallowed with a "deferred to Phase 3" debug log. The branch was stuck at one header, never outweighed the active extension, and `ChainReorg` never fired. Add `ForkBuffer::extend_branch` that looks up the existing branch by `(peer, prev_tip_hash)`, validates each continuation header (continuity off the buffered branch tip, PoW, MTP, DGW v3 against a rolling window of `history + buffered + new`), appends them, and re-keys the branch under the new tip. The caller receives a `BranchUpdate` with the new tip/height/work so it can refresh `fork_tip_index` and re-evaluate promotion. The per-header validation loop is factored out of `ingest` into `validate_chain` so both paths share identical rules. `BlockHeadersManager` routes `fork_tip_index` hits through a new `extend_fork` helper that loads the same active-chain history as `ingest_fork`, calls `extend_branch`, refreshes the index, and runs `take_winning_candidate` so a branch that just outweighs the active chain promotes immediately. Verified locally with `test_masternode_list_rewind_across_dip3_reorg` (previously timed out, now completes in ~2s). Refs [#142](#142), [#165](#165). * fix(`ChainLockManager`): document phase-ordering invariant in `reset_for_disconnect`, add behavioral tests Addresses manki-review comments on PR #165: - #165 (comment) - #165 (comment) - #165 (comment) * test(`ChainLockManager`,`MasternodesManager`): strengthen round-4 manki assertions - Assert return value and `progress.invalid()` in `test_pending_validation_survives_disconnect_and_consumed_on_reconnect` to cover the re-broadcast contract of `on_masternode_ready` - Add `engine.masternode_lists.is_empty()` boundary assertion in `test_rewind_to_height_sets_wait_for_events_on_send_failure` to verify surviving entries, not just absent ones - Add `test_reorg_prevents_stale_pending_validation_from_being_revalidated` as a companion test for the phase-ordering invariant documented on `reset_for_disconnect` * test(`ChainLockManager`,`MasternodesManager`): address round-5 manki assertions Assert `masternode_ready == true` after `on_masternode_ready()` in the reorg test (the flag flips unconditionally, even when `pending_validation` is None). Seed a surviving entry at height 30 in `test_rewind_to_height_sets_wait_for_events_on_send_failure` and assert `contains_key(&30)` after truncation at fork_height=50, closing the absence-only gap manki flagged. The disconnect-test `progress.invalid()` assertion (thread 3) was already applied in round 4. Addresses manki-review comments on PR #165 #165 (comment) #165 (comment) #165 (comment)
1 parent 374d7e9 commit b879fe1

12 files changed

Lines changed: 1287 additions & 68 deletions

File tree

dash-spv/src/sync/block_headers/fork_buffer.rs

Lines changed: 254 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ struct BufferedBranch {
4444
arrived_at: Instant,
4545
}
4646

47+
/// Result of extending a buffered branch with one or more continuation headers.
48+
#[derive(Debug, Clone, Copy)]
49+
pub(super) struct BranchUpdate {
50+
pub(super) new_tip: BlockHash,
51+
pub(super) new_height: u32,
52+
pub(super) new_work: ChainWork,
53+
}
54+
4755
impl ForkBuffer {
4856
pub(super) fn new(params: Params) -> Self {
4957
Self {
@@ -89,12 +97,46 @@ impl ForkBuffer {
8997
"history.last() must be the ancestor header"
9098
);
9199

92-
// Chain continuity: each header must extend the previous.
93-
let mut prev = ancestor_header;
94-
let mut hashed: Vec<HashedBlockHeader> = Vec::with_capacity(headers.len());
95100
let mut rolling_history: Vec<Header> = history.to_vec();
101+
let hashed =
102+
self.validate_chain(headers, ancestor_header, ancestor_height, &mut rolling_history)?;
103+
104+
let branch_work = ChainWork::accumulate(ChainWork::zero(), headers);
105+
106+
let key = (peer, hashed.last().expect("non-empty fork branch").hash().to_owned());
107+
self.branches.insert(
108+
key,
109+
BufferedBranch {
110+
headers: hashed,
111+
ancestor_height,
112+
total_work: branch_work,
113+
arrived_at: Instant::now(),
114+
},
115+
);
116+
117+
Ok(())
118+
}
119+
120+
/// Validate `headers` as a continuous extension after `prev`.
121+
///
122+
/// `rolling_history` must contain the active-chain headers preceding the
123+
/// branch ancestor (oldest first) plus any already-buffered branch
124+
/// headers. It is extended in place with each validated header so MTP
125+
/// and DGW v3 see the full window.
126+
///
127+
/// `prev_height` is the height of `prev`; the first validated header
128+
/// lands at `prev_height + 1`. Pass `ancestor_height` for a fresh ingest
129+
/// and the branch's current tip height when extending an existing branch.
130+
fn validate_chain(
131+
&self,
132+
headers: &[Header],
133+
mut prev: Header,
134+
prev_height: u32,
135+
rolling_history: &mut Vec<Header>,
136+
) -> SyncResult<Vec<HashedBlockHeader>> {
137+
let mut hashed: Vec<HashedBlockHeader> = Vec::with_capacity(headers.len());
96138
for (offset, header) in headers.iter().enumerate() {
97-
let height = ancestor_height + offset as u32 + 1;
139+
let height = prev_height + offset as u32 + 1;
98140
if header.prev_blockhash != prev.block_hash() {
99141
return Err(SyncError::ForkChainBreak(format!(
100142
"expected prev {}, got {}",
@@ -103,7 +145,6 @@ impl ForkBuffer {
103145
)));
104146
}
105147

106-
// PoW target met.
107148
let hashed_header = HashedBlockHeader::from(*header);
108149
if !header.target().is_met_by(*hashed_header.hash()) {
109150
return Err(SyncError::Validation(format!(
@@ -112,19 +153,16 @@ impl ForkBuffer {
112153
)));
113154
}
114155

115-
// Median time past: candidate time must strictly exceed MTP of
116-
// last 11 ancestor headers.
117-
let mtp = median_time_past(&rolling_history);
156+
let mtp = median_time_past(rolling_history);
118157
if (header.time as u64) <= mtp {
119158
return Err(SyncError::Validation(format!(
120159
"Fork header at height {} fails MTP rule ({} <= {})",
121160
height, header.time, mtp
122161
)));
123162
}
124163

125-
// DGW v3 retarget anchored at the ancestor's window.
126164
let expected_bits =
127-
next_work_required_dgw_v3(&rolling_history, height - 1, &self.params);
165+
next_work_required_dgw_v3(rolling_history, height - 1, &self.params);
128166
let min_diff = min_difficulty_bits(&self.params, prev.time, prev.bits, header.time);
129167
let bits_ok = header.bits == expected_bits || min_diff == Some(header.bits);
130168
if !bits_ok {
@@ -138,21 +176,91 @@ impl ForkBuffer {
138176
rolling_history.push(*header);
139177
prev = *header;
140178
}
179+
Ok(hashed)
180+
}
141181

142-
let branch_work = ChainWork::accumulate(ChainWork::zero(), headers);
182+
/// Extend an existing buffered branch with continuation headers.
183+
///
184+
/// The branch is keyed by `(peer, prev_tip_hash)`. Each new header is
185+
/// validated (continuity off the branch's current tip, PoW, MTP, DGW v3)
186+
/// against a rolling window built from `history` plus the branch's
187+
/// already-buffered headers. On success the branch is re-keyed under the
188+
/// new tip hash and the caller receives a `BranchUpdate` to update its
189+
/// own tip-to-ancestor index.
190+
///
191+
/// `history` is the same shape as for `ingest`: active-chain headers
192+
/// preceding the branch ancestor, oldest first, with the ancestor itself
193+
/// at `history.last()`.
194+
pub(super) fn extend_branch(
195+
&mut self,
196+
peer: SocketAddr,
197+
prev_tip_hash: BlockHash,
198+
headers: &[Header],
199+
history: &[Header],
200+
) -> SyncResult<BranchUpdate> {
201+
if headers.is_empty() {
202+
return Err(SyncError::Validation(
203+
"extend_branch called with empty headers".to_string(),
204+
));
205+
}
206+
let key = (peer, prev_tip_hash);
207+
let branch = self.branches.remove(&key).ok_or_else(|| {
208+
SyncError::ForkChainBreak(format!(
209+
"no buffered branch for peer {} tip {}",
210+
peer, prev_tip_hash
211+
))
212+
})?;
213+
214+
if branch.headers.len() + headers.len() > MAX_FORK_HEADERS_PER_PEER {
215+
let total = branch.headers.len() + headers.len();
216+
self.branches.insert(key, branch);
217+
return Err(SyncError::Validation(format!(
218+
"Fork branch extension exceeds cap: {} headers (max {})",
219+
total, MAX_FORK_HEADERS_PER_PEER
220+
)));
221+
}
222+
223+
let branch_tip_height = branch.ancestor_height + branch.headers.len() as u32;
224+
let branch_tip_header = *branch.headers.last().expect("non-empty buffered branch").header();
225+
226+
let mut rolling_history: Vec<Header> = history.to_vec();
227+
rolling_history.extend(branch.headers.iter().map(|h| *h.header()));
228+
229+
let validated = match self.validate_chain(
230+
headers,
231+
branch_tip_header,
232+
branch_tip_height,
233+
&mut rolling_history,
234+
) {
235+
Ok(v) => v,
236+
Err(e) => {
237+
self.branches.insert(key, branch);
238+
return Err(e);
239+
}
240+
};
241+
242+
let mut combined_headers = branch.headers;
243+
combined_headers.extend(validated);
244+
let added_work = ChainWork::accumulate(ChainWork::zero(), headers);
245+
let new_work = branch.total_work + added_work;
246+
let new_tip = combined_headers.last().expect("non-empty branch").hash().to_owned();
247+
let new_height = branch.ancestor_height + combined_headers.len() as u32;
143248

144-
let key = (peer, hashed.last().expect("non-empty fork branch").hash().to_owned());
145249
self.branches.insert(
146-
key,
250+
(peer, new_tip),
147251
BufferedBranch {
148-
headers: hashed,
149-
ancestor_height,
150-
total_work: branch_work,
252+
headers: combined_headers,
253+
ancestor_height: branch.ancestor_height,
254+
total_work: new_work,
151255
arrived_at: Instant::now(),
152256
},
153257
);
154258

155-
Ok(())
259+
Ok(BranchUpdate {
260+
new_tip,
261+
new_height,
262+
new_work,
263+
})
156264
}
157265

158266
/// Drop branches older than `ttl`. Returns how many were evicted.
@@ -588,4 +696,133 @@ mod tests {
588696
.expect("min-difficulty fork header must be accepted");
589697
assert_eq!(buf.len(), 1);
590698
}
699+
700+
/// Ingest a 1-header branch, then `extend_branch` with a second 1-header
701+
/// continuation. The branch is re-keyed under the new tip with both
702+
/// headers buffered and the cumulative work doubled.
703+
#[test]
704+
fn extend_branch_appends_single_header_continuation() {
705+
let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap();
706+
let mut buf = ForkBuffer::new(regtest_params());
707+
708+
let active = build_chain(1_700_000_000, 11, BlockHash::all_zeros());
709+
let ancestor_height = (active.len() as u32) - 1;
710+
let ancestor = *active.last().unwrap();
711+
712+
let fork = build_chain(1_700_000_000 + 12 * 600, 1, ancestor.block_hash());
713+
let first_tip = fork[0].block_hash();
714+
buf.ingest(peer, &fork, ancestor_height, ancestor, &active).expect("ingest");
715+
assert_eq!(buf.len(), 1);
716+
717+
let cont = build_chain(1_700_000_000 + 13 * 600, 1, first_tip);
718+
let update =
719+
buf.extend_branch(peer, first_tip, &cont, &active).expect("extend continuation");
720+
assert_eq!(update.new_tip, cont[0].block_hash());
721+
assert_eq!(update.new_height, ancestor_height + 2);
722+
assert_eq!(buf.len(), 1, "branch is re-keyed in place, count unchanged");
723+
assert!(!buf.branches.contains_key(&(peer, first_tip)), "old branch key must be removed");
724+
assert!(buf.branches.contains_key(&(peer, update.new_tip)), "new branch key inserted");
725+
726+
// Branch now holds both headers.
727+
let candidate = buf
728+
.take_winning_candidate(ChainWork::zero())
729+
.expect("extended branch must win against zero active work");
730+
assert_eq!(candidate.headers.len(), 2);
731+
assert_eq!(*candidate.headers[0].hash(), first_tip);
732+
assert_eq!(*candidate.headers[1].hash(), update.new_tip);
733+
}
734+
735+
/// Stream a 4-header reorg as four 1-header continuations, matching the
736+
/// dashd `generatetoaddress` pattern. Final branch must hold all 4 headers
737+
/// and beat the active chain on work.
738+
#[test]
739+
fn extend_branch_handles_multi_message_reorg_announcement() {
740+
let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap();
741+
let mut buf = ForkBuffer::new(regtest_params());
742+
743+
let active = build_chain(1_700_000_000, 11, BlockHash::all_zeros());
744+
let ancestor_height = (active.len() as u32) - 1;
745+
let ancestor = *active.last().unwrap();
746+
747+
// Build a 4-header fork chain as a single sequence, then feed it
748+
// header-by-header through ingest + 3 extend_branch calls.
749+
let fork = build_chain(1_700_000_000 + 12 * 600, 4, ancestor.block_hash());
750+
buf.ingest(peer, &fork[..1], ancestor_height, ancestor, &active).expect("first batch");
751+
752+
let mut current_tip = fork[0].block_hash();
753+
for next in &fork[1..] {
754+
let update = buf
755+
.extend_branch(peer, current_tip, std::slice::from_ref(next), &active)
756+
.expect("continuation must validate");
757+
assert_eq!(update.new_tip, next.block_hash());
758+
current_tip = update.new_tip;
759+
}
760+
assert_eq!(buf.len(), 1);
761+
762+
let candidate = buf
763+
.take_winning_candidate(ChainWork::zero())
764+
.expect("4-header branch must win against zero work");
765+
assert_eq!(candidate.headers.len(), 4);
766+
assert_eq!(candidate.ancestor_height, ancestor_height);
767+
}
768+
769+
/// A continuation whose `prev_blockhash` does not chain off the buffered
770+
/// branch tip returns `ForkChainBreak` and leaves the branch untouched.
771+
#[test]
772+
fn extend_branch_rejects_continuity_break() {
773+
let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap();
774+
let mut buf = ForkBuffer::new(regtest_params());
775+
776+
let active = build_chain(1_700_000_000, 11, BlockHash::all_zeros());
777+
let ancestor_height = (active.len() as u32) - 1;
778+
let ancestor = *active.last().unwrap();
779+
780+
let fork = build_chain(1_700_000_000 + 12 * 600, 1, ancestor.block_hash());
781+
let first_tip = fork[0].block_hash();
782+
buf.ingest(peer, &fork, ancestor_height, ancestor, &active).expect("ingest");
783+
784+
// Build a header whose prev_blockhash points at the ancestor (the
785+
// active-chain tip) instead of the branch tip — that is exactly the
786+
// shape of a dashd headers2 message arriving for the next reorg block
787+
// before the branch re-key happens. With `extend_branch`, this is a
788+
// ForkChainBreak because the buffered tip is `first_tip`, not ancestor.
789+
let bad = build_chain(1_700_000_000 + 13 * 600, 1, ancestor.block_hash());
790+
let err = buf
791+
.extend_branch(peer, first_tip, &bad, &active)
792+
.expect_err("continuation off the wrong predecessor must fail");
793+
assert!(
794+
matches!(&err, SyncError::ForkChainBreak(_)),
795+
"expected ForkChainBreak, got: {:?}",
796+
err
797+
);
798+
assert_eq!(buf.len(), 1, "branch must be restored on failure");
799+
assert!(buf.branches.contains_key(&(peer, first_tip)), "original key must be intact");
800+
}
801+
802+
/// Continuation from a peer that did not own the branch must be rejected.
803+
#[test]
804+
fn extend_branch_rejects_wrong_peer() {
805+
let peer_a: SocketAddr = "1.2.3.4:9999".parse().unwrap();
806+
let peer_b: SocketAddr = "5.6.7.8:9999".parse().unwrap();
807+
let mut buf = ForkBuffer::new(regtest_params());
808+
809+
let active = build_chain(1_700_000_000, 11, BlockHash::all_zeros());
810+
let ancestor_height = (active.len() as u32) - 1;
811+
let ancestor = *active.last().unwrap();
812+
813+
let fork = build_chain(1_700_000_000 + 12 * 600, 1, ancestor.block_hash());
814+
let first_tip = fork[0].block_hash();
815+
buf.ingest(peer_a, &fork, ancestor_height, ancestor, &active).expect("ingest from a");
816+
817+
let cont = build_chain(1_700_000_000 + 13 * 600, 1, first_tip);
818+
let err = buf
819+
.extend_branch(peer_b, first_tip, &cont, &active)
820+
.expect_err("wrong-peer continuation must fail");
821+
assert!(
822+
matches!(&err, SyncError::ForkChainBreak(_)),
823+
"expected ForkChainBreak for missing branch, got: {:?}",
824+
err
825+
);
826+
assert!(buf.branches.contains_key(&(peer_a, first_tip)), "peer_a branch must be untouched");
827+
}
591828
}

0 commit comments

Comments
 (0)