Skip to content

Commit 7e1cc26

Browse files
fix(bitcoind_rpc): FilterIter detects reorgs
Co-authored-by: valued mammal <valuedmammal@protonmail.com>
1 parent 10fa62a commit 7e1cc26

File tree

2 files changed

+113
-71
lines changed

2 files changed

+113
-71
lines changed

crates/bitcoind_rpc/src/bip158.rs

Lines changed: 113 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
//! [0]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
77
//! [1]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki
88
9-
use bdk_core::collections::BTreeMap;
9+
use bdk_core::collections::{BTreeMap, BTreeSet};
1010
use core::fmt;
1111

1212
use bdk_core::bitcoin;
@@ -33,20 +33,29 @@ pub struct FilterIter<'c, C> {
3333
cp: Option<CheckPoint>,
3434
// blocks map
3535
blocks: BTreeMap<Height, BlockHash>,
36+
// set of heights with filters that matched any watched SPK
37+
matched: BTreeSet<Height>,
38+
// initial height
39+
start: Height,
3640
// best height counter
3741
height: Height,
3842
// stop height
3943
stop: Height,
4044
}
4145

4246
impl<'c, C: RpcApi> FilterIter<'c, C> {
47+
/// Hard cap on how far to walk back when a reorg is detected.
48+
const MAX_REORG_DEPTH: u32 = 100;
49+
4350
/// Construct [`FilterIter`] from a given `client` and start `height`.
4451
pub fn new_with_height(client: &'c C, height: u32) -> Self {
4552
Self {
4653
client,
4754
spks: vec![],
4855
cp: None,
4956
blocks: BTreeMap::new(),
57+
matched: BTreeSet::new(),
58+
start: height,
5059
height,
5160
stop: 0,
5261
}
@@ -69,57 +78,28 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
6978
self.spks.push(spk);
7079
}
7180

72-
/// Get the next filter and increment the current best height.
73-
///
74-
/// Returns `Ok(None)` when the stop height is exceeded.
75-
fn next_filter(&mut self) -> Result<Option<NextFilter>, Error> {
76-
if self.height > self.stop {
77-
return Ok(None);
78-
}
79-
let height = self.height;
80-
let hash = match self.blocks.get(&height) {
81-
Some(h) => *h,
82-
None => self.client.get_block_hash(height as u64)?,
83-
};
84-
let filter_bytes = self.client.get_block_filter(&hash)?.filter;
85-
let filter = BlockFilter::new(&filter_bytes);
86-
self.height += 1;
87-
Ok(Some((BlockId { height, hash }, filter)))
81+
/// Get the block hash by `height` if it is found in the blocks map.
82+
fn get_block_hash(&self, height: &Height) -> Option<BlockHash> {
83+
self.blocks.get(height).copied()
8884
}
8985

9086
/// Get the remote tip.
9187
///
92-
/// Returns `None` if the remote height is not strictly greater than the height of this
93-
/// [`FilterIter`].
88+
/// Returns `None` if the remote height is less than the height of this [`FilterIter`].
9489
pub fn get_tip(&mut self) -> Result<Option<BlockId>, Error> {
9590
let tip_hash = self.client.get_best_block_hash()?;
96-
let mut header = self.client.get_block_header_info(&tip_hash)?;
91+
let header = self.client.get_block_header_info(&tip_hash)?;
9792
let tip_height = header.height as u32;
98-
if self.height >= tip_height {
93+
// Allow returning tip if we're exactly at it. Return `None`` if we've already scanned past.
94+
if self.height > tip_height {
9995
// nothing to do
10096
return Ok(None);
10197
}
102-
self.blocks.insert(tip_height, tip_hash);
10398

104-
// if we have a checkpoint we use a lookback of ten blocks
105-
// to ensure consistency of the local chain
99+
// start scanning from point of agreement + 1
106100
if let Some(cp) = self.cp.as_ref() {
107-
// adjust start height to point of agreement + 1
108101
let base = self.find_base_with(cp.clone())?;
109-
self.height = base.height + 1;
110-
111-
for _ in 0..9 {
112-
let hash = match header.previous_block_hash {
113-
Some(hash) => hash,
114-
None => break,
115-
};
116-
header = self.client.get_block_header_info(&hash)?;
117-
let height = header.height as u32;
118-
if height < self.height {
119-
break;
120-
}
121-
self.blocks.insert(height, hash);
122-
}
102+
self.height = base.height.saturating_add(1);
123103
}
124104

125105
self.stop = tip_height;
@@ -131,9 +111,6 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
131111
}
132112
}
133113

134-
/// Alias for a compact filter and associated block id.
135-
type NextFilter = (BlockId, BlockFilter);
136-
137114
/// Event inner type
138115
#[derive(Debug, Clone)]
139116
pub struct EventInner {
@@ -171,27 +148,80 @@ impl<C: RpcApi> Iterator for FilterIter<'_, C> {
171148
type Item = Result<Event, Error>;
172149

173150
fn next(&mut self) -> Option<Self::Item> {
174-
(|| -> Result<_, Error> {
175-
// if the next filter matches any of our watched spks, get the block
176-
// and return it, inserting relevant block ids along the way
177-
self.next_filter()?.map_or(Ok(None), |(block, filter)| {
178-
let height = block.height;
179-
let hash = block.hash;
180-
181-
if self.spks.is_empty() {
182-
Err(Error::NoScripts)
183-
} else if filter
184-
.match_any(&hash, self.spks.iter().map(|script| script.as_bytes()))
185-
.map_err(Error::Bip158)?
186-
{
187-
let block = self.client.get_block(&hash)?;
188-
self.blocks.insert(height, hash);
189-
let inner = EventInner { height, block };
190-
Ok(Some(Event::Block(inner)))
191-
} else {
192-
Ok(Some(Event::NoMatch(height)))
151+
(|| -> Result<Option<_>, Error> {
152+
if self.height > self.stop {
153+
return Ok(None);
154+
}
155+
// Fetch next header.
156+
let mut height = self.height;
157+
let mut hash = self.client.get_block_hash(height as _)?;
158+
let mut header = self.client.get_block_header(&hash)?;
159+
160+
// Detect and resolve reorgs: either block at height changed, or its parent changed.
161+
let stored_hash = self.blocks.get(&height).copied();
162+
let prev_hash = height
163+
.checked_sub(1)
164+
.and_then(|height| self.blocks.get(&height).copied());
165+
166+
// If we've seen this height before but the hash has changed, or parent changed, trigger
167+
// reorg.
168+
let reorg_detected = if let Some(old_hash) = stored_hash {
169+
old_hash != hash
170+
} else if let Some(expected_prev) = prev_hash {
171+
header.prev_blockhash != expected_prev
172+
} else {
173+
false
174+
};
175+
176+
// Reorg detected, rewind to last known-good ancestor.
177+
if reorg_detected {
178+
let mut reorg_depth = 0;
179+
loop {
180+
if reorg_depth >= Self::MAX_REORG_DEPTH || height == 0 {
181+
return Err(Error::ReorgDepthExceeded);
182+
}
183+
184+
height = height.saturating_sub(1);
185+
hash = self.client.get_block_hash(height as _)?;
186+
header = self.client.get_block_header(&hash)?;
187+
188+
let prev_height = height.saturating_sub(1);
189+
if let Some(prev_hash) = self.blocks.get(&prev_height) {
190+
if header.prev_blockhash == *prev_hash {
191+
break;
192+
}
193+
}
194+
195+
reorg_depth += 1;
193196
}
194-
})
197+
198+
self.blocks.split_off(&height);
199+
self.matched.split_off(&height);
200+
}
201+
202+
let filter_bytes = self.client.get_block_filter(&hash)?.filter;
203+
let filter = BlockFilter::new(&filter_bytes);
204+
205+
// record the scanned block
206+
self.blocks.insert(height, hash);
207+
// increment best height
208+
self.height = height.saturating_add(1);
209+
210+
// If the filter matches any of our watched SPKs, fetch the full
211+
// block, and record the matching block entry.
212+
if self.spks.is_empty() {
213+
Err(Error::NoScripts)
214+
} else if filter
215+
.match_any(&hash, self.spks.iter().map(|s| s.as_bytes()))
216+
.map_err(Error::Bip158)?
217+
{
218+
let block = self.client.get_block(&hash)?;
219+
self.matched.insert(height);
220+
let inner = EventInner { height, block };
221+
Ok(Some(Event::Block(inner)))
222+
} else {
223+
Ok(Some(Event::NoMatch(height)))
224+
}
195225
})()
196226
.transpose()
197227
}
@@ -202,8 +232,8 @@ impl<C: RpcApi> FilterIter<'_, C> {
202232
fn find_base_with(&mut self, mut cp: CheckPoint) -> Result<BlockId, Error> {
203233
loop {
204234
let height = cp.height();
205-
let fetched_hash = match self.blocks.get(&height) {
206-
Some(hash) => *hash,
235+
let fetched_hash = match self.get_block_hash(&height) {
236+
Some(hash) => hash,
207237
None if height == 0 => cp.hash(),
208238
_ => self.client.get_block_hash(height as _)?,
209239
};
@@ -221,17 +251,27 @@ impl<C: RpcApi> FilterIter<'_, C> {
221251
/// Returns a chain update from the newly scanned blocks.
222252
///
223253
/// Returns `None` if this [`FilterIter`] was not constructed using a [`CheckPoint`], or
224-
/// if no blocks have been fetched for example by using [`get_tip`](Self::get_tip).
254+
/// if not all events have been emitted (by calling `next`).
225255
pub fn chain_update(&mut self) -> Option<CheckPoint> {
226-
if self.cp.is_none() || self.blocks.is_empty() {
256+
if self.cp.is_none() || self.blocks.is_empty() || self.height <= self.stop {
227257
return None;
228258
}
229259

230-
// note: to connect with the local chain we must guarantee that `self.blocks.first()`
231-
// is also the point of agreement with `self.cp`.
260+
// We return blocks up to and including the initial height, all of the matching blocks,
261+
// and blocks in the terminal range.
262+
let tail_range = self.stop.saturating_sub(9)..=self.stop;
232263
Some(
233-
CheckPoint::from_block_ids(self.blocks.iter().map(BlockId::from))
234-
.expect("blocks must be in order"),
264+
CheckPoint::from_block_ids(self.blocks.iter().filter_map(|(&height, &hash)| {
265+
if height <= self.start
266+
|| self.matched.contains(&height)
267+
|| tail_range.contains(&height)
268+
{
269+
Some(BlockId { height, hash })
270+
} else {
271+
None
272+
}
273+
}))
274+
.expect("blocks must be in order"),
235275
)
236276
}
237277
}
@@ -245,6 +285,8 @@ pub enum Error {
245285
NoScripts,
246286
/// `bitcoincore_rpc` error
247287
Rpc(bitcoincore_rpc::Error),
288+
/// `MAX_REORG_DEPTH` exceeded
289+
ReorgDepthExceeded,
248290
}
249291

250292
impl From<bitcoincore_rpc::Error> for Error {
@@ -259,6 +301,7 @@ impl fmt::Display for Error {
259301
Self::Bip158(e) => e.fmt(f),
260302
Self::NoScripts => write!(f, "no script pubkeys were provided to match with"),
261303
Self::Rpc(e) => e.fmt(f),
304+
Self::ReorgDepthExceeded => write!(f, "maximum reorg depth exceeded"),
262305
}
263306
}
264307
}

crates/bitcoind_rpc/tests/test_filter_iter.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,6 @@ fn filter_iter_handles_reorg() -> anyhow::Result<()> {
400400

401401
// Test that while a reorg is detected we delay incrementing the best height
402402
#[test]
403-
#[ignore]
404403
fn repeat_reorgs() -> anyhow::Result<()> {
405404
const MINE_TO: u32 = 11;
406405

0 commit comments

Comments
 (0)