Skip to content

Commit 8a03367

Browse files
randomloginclaude
andauthored
refactor(cbf): drop latest_tip mirror, source chain tip from kyoto directly (#12)
* Add test to capture wallet checkpoint push problems * fix(cbf): use CheckPoint::insert for reorg-aware wallet sync `push` only appends above the tip, so when `recent_history` contained blocks at or below the wallet's current checkpoint height after a reorg, the stale hashes on the wallet checkpoint were never replaced. Switch to `CheckPoint::insert`, which detects conflicting hashes and purges stale blocks, matching bdk-kyoto's `UpdateBuilder::apply_chain_event`. Also clear `latest_tip` on `BlockHeaderChanges::Reorganized` so cached tip state does not point at an abandoned chain. Update the `checkpoint_building_handles_reorg` unit test (added in c1844b3) to exercise the fixed behaviour: a reorg where the new tip is at the same height as the wallet's checkpoint must still result in the reorged hashes winning. Disclosure: drafted with assistance from Claude Code. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * remove latest_tip from the cbf chain source --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 9a3152e commit 8a03367

1 file changed

Lines changed: 35 additions & 45 deletions

File tree

src/chain/cbf.rs

Lines changed: 35 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,6 @@ pub(super) struct CbfChainSource {
8080
fee_source: FeeSource,
8181
/// Tracks whether the bip157 node is running and holds the command handle.
8282
cbf_runtime_status: Arc<Mutex<CbfRuntimeStatus>>,
83-
/// Latest chain tip hash, updated by the background event processing task.
84-
latest_tip: Arc<Mutex<Option<BlockHash>>>,
8583
/// Scripts to match against compact block filters during a scan.
8684
watched_scripts: Arc<RwLock<Vec<ScriptBuf>>>,
8785
/// Block (height, hash) pairs where filters matched watched scripts.
@@ -119,7 +117,6 @@ enum CbfRuntimeStatus {
119117

120118
/// Shared state passed to the background event processing task.
121119
struct CbfEventState {
122-
latest_tip: Arc<Mutex<Option<BlockHash>>>,
123120
watched_scripts: Arc<RwLock<Vec<ScriptBuf>>>,
124121
matched_block_hashes: Arc<Mutex<Vec<(u32, BlockHash)>>>,
125122
sync_completion_tx: Arc<Mutex<Option<oneshot::Sender<SyncUpdate>>>>,
@@ -148,7 +145,6 @@ impl CbfChainSource {
148145
};
149146

150147
let cbf_runtime_status = Arc::new(Mutex::new(CbfRuntimeStatus::Stopped));
151-
let latest_tip = Arc::new(Mutex::new(None));
152148
let watched_scripts = Arc::new(RwLock::new(Vec::new()));
153149
let matched_block_hashes = Arc::new(Mutex::new(Vec::new()));
154150
let sync_completion_tx = Arc::new(Mutex::new(None));
@@ -163,7 +159,6 @@ impl CbfChainSource {
163159
sync_config,
164160
fee_source,
165161
cbf_runtime_status,
166-
latest_tip,
167162
watched_scripts,
168163
matched_block_hashes,
169164
sync_completion_tx,
@@ -288,7 +283,6 @@ impl CbfChainSource {
288283
// block is 'static (no borrows of `self`).
289284
let restart_status = Arc::clone(&self.cbf_runtime_status);
290285
let restart_logger = Arc::clone(&self.logger);
291-
let restart_latest_tip = Arc::clone(&self.latest_tip);
292286
let restart_watched_scripts = Arc::clone(&self.watched_scripts);
293287
let restart_matched_block_hashes = Arc::clone(&self.matched_block_hashes);
294288
let restart_sync_completion_tx = Arc::clone(&self.sync_completion_tx);
@@ -317,7 +311,6 @@ impl CbfChainSource {
317311
Arc::clone(&restart_logger),
318312
));
319313
let event_state = CbfEventState {
320-
latest_tip: Arc::clone(&restart_latest_tip),
321314
watched_scripts: Arc::clone(&restart_watched_scripts),
322315
matched_block_hashes: Arc::clone(&restart_matched_block_hashes),
323316
sync_completion_tx: Arc::clone(&restart_sync_completion_tx),
@@ -428,7 +421,6 @@ impl CbfChainSource {
428421
match event {
429422
Event::FiltersSynced(sync_update) => {
430423
let tip = sync_update.tip();
431-
*state.latest_tip.lock().unwrap() = Some(tip.hash);
432424
log_info!(
433425
logger,
434426
"CBF filters synced to tip: height={}, hash={}",
@@ -861,60 +853,69 @@ impl CbfChainSource {
861853

862854
/// Derive per-target fee rates from recent blocks' coinbase outputs.
863855
///
864-
/// Returns `Ok(None)` when no chain tip is available yet (first startup before sync).
856+
/// Returns `Ok(None)` when the chain is too short to sample `FEE_RATE_LOOKBACK_BLOCKS`
857+
/// blocks (e.g. kyoto has not yet synced past the genesis region).
865858
async fn fee_rate_cache_from_cbf(
866859
&self,
867860
) -> Result<Option<HashMap<crate::fee_estimator::ConfirmationTarget, FeeRate>>, Error> {
868861
let requester = self.requester()?;
869862

870-
let tip_hash = match *self.latest_tip.lock().unwrap() {
871-
Some(hash) => hash,
872-
None => {
873-
log_debug!(self.logger, "No tip available yet for fee rate estimation, skipping.");
863+
let timeout = Duration::from_secs(
864+
self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs,
865+
);
866+
let fetch_start = Instant::now();
867+
868+
// Ask kyoto for its current chain tip rather than maintaining a mirrored
869+
// cache: the returned hash is always fresh (post-reorg, post-restart),
870+
// so no defensive invalidation is needed below.
871+
let tip = match tokio::time::timeout(timeout, requester.chain_tip()).await {
872+
Ok(Ok(tip)) => tip,
873+
Ok(Err(e)) => {
874+
log_debug!(
875+
self.logger,
876+
"Failed to fetch CBF chain tip for fee estimation: {:?}",
877+
e,
878+
);
874879
return Ok(None);
875880
},
881+
Err(e) => {
882+
log_error!(self.logger, "Timed out fetching CBF chain tip: {}", e);
883+
return Err(Error::FeerateEstimationUpdateTimeout);
884+
},
876885
};
877886

887+
if (tip.height as usize) < FEE_RATE_LOOKBACK_BLOCKS {
888+
log_debug!(
889+
self.logger,
890+
"CBF chain tip at height {} is below the {}-block lookback window, \
891+
skipping fee estimation.",
892+
tip.height,
893+
FEE_RATE_LOOKBACK_BLOCKS,
894+
);
895+
return Ok(None);
896+
}
897+
878898
let now = Instant::now();
879899

880900
// Fetch fee rates from the last N blocks for per-target estimation.
881901
// We compute fee rates ourselves rather than using Requester::average_fee_rate,
882902
// so we can sample multiple blocks and select percentiles per confirmation target.
883903
let mut block_fee_rates: Vec<u64> = Vec::with_capacity(FEE_RATE_LOOKBACK_BLOCKS);
884-
let mut current_hash = tip_hash;
904+
let mut current_hash = tip.hash;
885905

886-
let timeout = Duration::from_secs(
887-
self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs,
888-
);
889-
let fetch_start = Instant::now();
890-
891-
for idx in 0..FEE_RATE_LOOKBACK_BLOCKS {
906+
for _ in 0..FEE_RATE_LOOKBACK_BLOCKS {
892907
// Check if we've exceeded the overall timeout for fee estimation.
893908
let remaining_timeout = timeout.saturating_sub(fetch_start.elapsed());
894909
if remaining_timeout.is_zero() {
895910
log_error!(self.logger, "Updating fee rate estimates timed out.");
896911
return Err(Error::FeerateEstimationUpdateTimeout);
897912
}
898913

899-
// Fetch the block via P2P. On the first iteration, a fetch failure
900-
// likely means the cached tip is stale (initial sync or reorg), so
901-
// we clear the tip and skip gracefully instead of returning an error.
902914
let indexed_block =
903915
match tokio::time::timeout(remaining_timeout, requester.get_block(current_hash))
904916
.await
905917
{
906918
Ok(Ok(indexed_block)) => indexed_block,
907-
Ok(Err(e)) if idx == 0 => {
908-
log_debug!(
909-
self.logger,
910-
"Cached CBF tip {} was unavailable during fee estimation, \
911-
likely due to initial sync or a reorg: {:?}",
912-
current_hash,
913-
e
914-
);
915-
*self.latest_tip.lock().unwrap() = None;
916-
return Ok(None);
917-
},
918919
Ok(Err(e)) => {
919920
log_error!(
920921
self.logger,
@@ -923,17 +924,6 @@ impl CbfChainSource {
923924
);
924925
return Err(Error::FeerateEstimationUpdateFailed);
925926
},
926-
Err(e) if idx == 0 => {
927-
log_debug!(
928-
self.logger,
929-
"Timed out fetching cached CBF tip {} during fee estimation, \
930-
likely due to initial sync or a reorg: {}",
931-
current_hash,
932-
e
933-
);
934-
*self.latest_tip.lock().unwrap() = None;
935-
return Ok(None);
936-
},
937927
Err(e) => {
938928
log_error!(self.logger, "Updating fee rate estimates timed out: {}", e);
939929
return Err(Error::FeerateEstimationUpdateTimeout);

0 commit comments

Comments
 (0)