Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 86 additions & 2 deletions dash-spv-ffi/src/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl FFISyncEventCallbacks {
} => {
if let Some(cb) = self.on_blocks_needed {
let ffi_blocks: Vec<FFIBlockNeeded> = blocks
.iter()
.keys()
.map(|key| FFIBlockNeeded {
height: key.height(),
hash: *key.hash().as_byte_array(),
Expand All @@ -361,15 +361,17 @@ impl FFISyncEventCallbacks {
height,
new_addresses,
confirmed_txids,
..
} => {
if let Some(cb) = self.on_block_processed {
let hash_bytes = block_hash.as_byte_array();
let txid_bytes: Vec<[u8; 32]> =
confirmed_txids.iter().map(|txid| *txid.as_byte_array()).collect();
let total_new_addresses: usize = new_addresses.values().map(|v| v.len()).sum();
cb(
*height,
hash_bytes as *const [u8; 32],
new_addresses.len() as u32,
total_new_addresses as u32,
txid_bytes.as_ptr(),
txid_bytes.len() as u32,
self.user_data,
Expand Down Expand Up @@ -836,3 +838,85 @@ impl FFIWalletEventCallbacks {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use dashcore::hashes::Hash;
use dashcore::{Address, BlockHash, Network, Txid};
use key_wallet_manager::{FilterMatchKey, WalletId};
use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::{AtomicU32, Ordering};

/// `BlocksNeeded` dispatch must pass exactly one entry per
/// `FilterMatchKey` to the FFI callback (i.e. iterate keys, not
/// inflated by the per-block wallet attribution).
#[test]
fn test_blocks_needed_dispatch_passes_unique_keys_count() {
static COUNT: AtomicU32 = AtomicU32::new(u32::MAX);
extern "C" fn cb(_blocks: *const FFIBlockNeeded, count: u32, _user: *mut c_void) {
COUNT.store(count, Ordering::SeqCst);
}

let callbacks = FFISyncEventCallbacks {
on_blocks_needed: Some(cb),
..FFISyncEventCallbacks::default()
};

let mut blocks: BTreeMap<FilterMatchKey, BTreeSet<WalletId>> = BTreeMap::new();
// Two distinct blocks, each attributed to two wallets. The dispatch
// must report 2 (unique keys), not 4.
blocks.insert(
FilterMatchKey::new(10, BlockHash::from_byte_array([1u8; 32])),
BTreeSet::from([[1u8; 32], [2u8; 32]]),
);
blocks.insert(
FilterMatchKey::new(20, BlockHash::from_byte_array([2u8; 32])),
BTreeSet::from([[1u8; 32], [2u8; 32]]),
);

callbacks.dispatch(&SyncEvent::BlocksNeeded {
blocks,
});
assert_eq!(COUNT.load(Ordering::SeqCst), 2);
}

/// `BlockProcessed` dispatch must report the total address count
/// summed across all per-wallet entries in the `new_addresses` map.
#[test]
fn test_block_processed_dispatch_sums_per_wallet_addresses() {
static NEW_ADDR_COUNT: AtomicU32 = AtomicU32::new(u32::MAX);
extern "C" fn cb(
_height: u32,
_hash: *const [u8; 32],
new_address_count: u32,
_txids: *const [u8; 32],
_txid_count: u32,
_user: *mut c_void,
) {
NEW_ADDR_COUNT.store(new_address_count, Ordering::SeqCst);
}

let callbacks = FFISyncEventCallbacks {
on_block_processed: Some(cb),
..FFISyncEventCallbacks::default()
};

let addr_a = Address::dummy(Network::Regtest, 1);
let addr_b = Address::dummy(Network::Regtest, 2);
let addr_c = Address::dummy(Network::Regtest, 3);
let mut new_addresses: BTreeMap<WalletId, Vec<Address>> = BTreeMap::new();
// Wallet 1 contributes 2 new addresses, wallet 2 contributes 1. Total = 3.
new_addresses.insert([1u8; 32], vec![addr_a, addr_b]);
new_addresses.insert([2u8; 32], vec![addr_c]);

callbacks.dispatch(&SyncEvent::BlockProcessed {
block_hash: BlockHash::from_byte_array([7u8; 32]),
height: 100,
wallets: BTreeSet::new(),
new_addresses,
confirmed_txids: vec![Txid::from_byte_array([9u8; 32])],
});
assert_eq!(NEW_ADDR_COUNT.load(Ordering::SeqCst), 3);
}
}
122 changes: 111 additions & 11 deletions dash-spv/src/sync/blocks/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,17 @@ impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface> BlocksManager<H
let mut events = Vec::new();

// Process blocks in height order using pipeline's ordering logic
while let Some((block, height)) = self.pipeline.take_next_ordered_block() {
while let Some((block, height, interested)) = self.pipeline.take_next_ordered_block() {
let hash = block.block_hash();

// Process block through wallet
// Process the block only for the wallets whose filter matched it.
// Already-synced wallets that did not match are not touched.
let mut wallet = self.wallet.write().await;
let result = wallet.process_block(&block, height).await;
let result = wallet.process_block_for_wallets(&block, height, &interested).await;
drop(wallet);

let total_relevant = result.relevant_tx_count();
let new_addresses_total: usize = result.new_addresses.values().map(|v| v.len()).sum();
if total_relevant > 0 {
tracing::info!(
"Found {} relevant transactions ({} new, {} existing) {} at height {}, new addresses: {}",
Expand All @@ -96,19 +98,20 @@ impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface> BlocksManager<H
result.existing_txids.len(),
hash,
height,
result.new_addresses.len()
new_addresses_total
);
}

// Collect confirmed txids before moving new_addresses out of result
let confirmed_txids: Vec<_> = result.relevant_txids().cloned().collect();

// Collect new addresses for gap limit rescanning
let new_addresses: Vec<_> = result.new_addresses.into_iter().collect();
if !new_addresses.is_empty() {
let new_addresses = result.new_addresses;
if new_addresses_total > 0 {
tracing::debug!(
"Block {} generated {} new addresses for gap limit maintenance",
"Block {} generated {} new addresses for gap limit maintenance across {} wallets",
height,
new_addresses_total,
new_addresses.len()
);
}
Expand All @@ -124,6 +127,7 @@ impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface> BlocksManager<H
events.push(SyncEvent::BlockProcessed {
block_hash: hash,
height,
wallets: interested,
new_addresses,
confirmed_txids,
});
Expand Down Expand Up @@ -168,9 +172,9 @@ mod tests {
};
use crate::sync::{ManagerIdentifier, SyncEvent, SyncManagerProgress};
use crate::test_utils::MockNetworkManager;
use key_wallet_manager::test_utils::MockWallet;
use key_wallet_manager::test_utils::{MockWallet, MOCK_WALLET_ID};
use key_wallet_manager::FilterMatchKey;
use std::collections::BTreeSet;
use std::collections::{BTreeMap, BTreeSet};

type TestBlocksManager =
BlocksManager<PersistentBlockHeaderStorage, PersistentBlockStorage, MockWallet>;
Expand Down Expand Up @@ -215,8 +219,8 @@ mod tests {
let requests = network.request_sender();

let block_hash = dashcore::BlockHash::dummy(0);
let mut blocks = BTreeSet::new();
blocks.insert(FilterMatchKey::new(100, block_hash));
let mut blocks = BTreeMap::new();
blocks.insert(FilterMatchKey::new(100, block_hash), BTreeSet::from([MOCK_WALLET_ID]));
let event = SyncEvent::BlocksNeeded {
blocks,
};
Expand All @@ -227,4 +231,100 @@ mod tests {
assert_eq!(manager.state(), SyncState::Syncing);
assert!(events.is_empty());
}

/// `process_buffered_blocks` must call `process_block_for_wallets` with
/// the exact wallet set carried in the pipeline so already-synced
/// wallets are not touched by routing logic.
#[tokio::test]
async fn test_process_buffered_blocks_routes_wallet_set() {
use dashcore::block::Header;
use dashcore::{Block, TxMerkleNode};
use dashcore_hashes::Hash;

let mut manager = create_test_manager().await;
manager.progress.set_state(SyncState::Syncing);

let header = Header {
version: dashcore::blockdata::block::Version::from_consensus(1),
prev_blockhash: dashcore::BlockHash::all_zeros(),
merkle_root: TxMerkleNode::all_zeros(),
time: 0,
bits: dashcore::CompactTarget::from_consensus(0),
nonce: 0,
};
let block = Block {
header,
txdata: vec![],
};
manager.pipeline.add_from_storage(block.clone(), 100, BTreeSet::from([MOCK_WALLET_ID]));

let events = manager.process_buffered_blocks().await.unwrap();
assert!(matches!(events.first(), Some(SyncEvent::BlockProcessed { .. })));

// MOCK_WALLET_ID was in the routed set, so MockWallet recorded the
// block. (MockWallet::process_block_for_wallets returns early when
// its id is absent.)
let processed = manager.wallet.read().await.processed_blocks();
let processed = processed.lock().await;
assert_eq!(processed.len(), 1);
assert_eq!(processed[0].1, 100);
}

/// A wallet that is NOT in the pipeline's interested set must not be
/// routed the block. Two wallets are registered, but only `wallet_in`
/// appears in the routed set; the other wallet's processed log must
/// stay empty for that block.
#[tokio::test]
async fn test_process_buffered_blocks_excludes_uninterested_wallet() {
use dashcore::block::Header;
use dashcore::{Block, TxMerkleNode};
use dashcore_hashes::Hash;
use key_wallet_manager::test_utils::{MockWalletState, MultiMockWallet};
use key_wallet_manager::WalletId;

let storage = DiskStorageManager::with_temp_dir().await.unwrap();
let multi = MultiMockWallet::new();
let wallet_in: WalletId = [0xAA; 32];
let wallet_out: WalletId = [0xBB; 32];
let multi = Arc::new(RwLock::new(multi));
{
let mut w = multi.write().await;
w.insert_wallet(wallet_in, MockWalletState::default());
w.insert_wallet(wallet_out, MockWalletState::default());
}
let mut manager: BlocksManager<
PersistentBlockHeaderStorage,
PersistentBlockStorage,
MultiMockWallet,
> = BlocksManager::new(multi.clone(), storage.block_headers(), storage.blocks()).await;
manager.progress.set_state(SyncState::Syncing);

let header = Header {
version: dashcore::blockdata::block::Version::from_consensus(1),
prev_blockhash: dashcore::BlockHash::all_zeros(),
merkle_root: TxMerkleNode::all_zeros(),
time: 0,
bits: dashcore::CompactTarget::from_consensus(0),
nonce: 0,
};
let block = Block {
header,
txdata: vec![],
};
// Only wallet_in is in the routed set.
manager.pipeline.add_from_storage(block.clone(), 100, BTreeSet::from([wallet_in]));

let _ = manager.process_buffered_blocks().await.unwrap();

let processed = multi.read().await.processed();
let processed = processed.lock().await;
// Exactly one entry, for wallet_in only.
assert_eq!(processed.len(), 1);
assert_eq!(processed[0].0, wallet_in);
assert_eq!(processed[0].2, 100);
assert!(
!processed.iter().any(|(id, _, _)| *id == wallet_out),
"wallet_out was not in the routed set, must not be processed"
);
}
}
Loading
Loading