@@ -8,7 +8,8 @@ use std::collections::{HashMap, VecDeque};
88use std:: fmt;
99use std:: net:: SocketAddr ;
1010use std:: sync:: Arc ;
11- use std:: time:: { Duration , Instant } ;
11+ use std:: time:: Duration ;
12+ use tokio:: time:: Instant ;
1213
1314use dashcore:: network:: message_blockdata:: Inventory ;
1415use dashcore:: { Amount , Transaction , Txid } ;
@@ -135,7 +136,9 @@ impl<W: WalletInterface> MempoolManager<W> {
135136 /// Whether activation needs to be retried. Returns true if activation was
136137 /// attempted but no inventory response arrived within the timeout.
137138 pub ( super ) fn needs_activation_retry ( & self ) -> bool {
138- self . activated_at . is_some_and ( |t| t. elapsed ( ) >= ACTIVATION_TIMEOUT )
139+ self . activated_at . is_some_and ( |t| {
140+ Instant :: now ( ) . checked_duration_since ( t) . unwrap_or_default ( ) >= ACTIVATION_TIMEOUT
141+ } )
139142 }
140143
141144 /// Build and send a bloom filter to the mempool peer.
@@ -434,7 +437,10 @@ impl<W: WalletInterface> MempoolManager<W> {
434437
435438 // Prune pending IS locks whose transaction never arrived
436439 let before = self . pending_is_locks . len ( ) ;
437- self . pending_is_locks . retain ( |_, inserted_at| inserted_at. elapsed ( ) < MEMPOOL_TX_EXPIRY ) ;
440+ let now = Instant :: now ( ) ;
441+ self . pending_is_locks . retain ( |_, inserted_at| {
442+ now. checked_duration_since ( * inserted_at) . unwrap_or_default ( ) < MEMPOOL_TX_EXPIRY
443+ } ) ;
438444 let expired = before - self . pending_is_locks . len ( ) ;
439445 if expired > 0 {
440446 tracing:: debug!( "Pruned {} expired pending IS locks" , expired) ;
@@ -496,7 +502,9 @@ impl<W: WalletInterface> MempoolManager<W> {
496502 pub ( super ) fn prune_pending_requests ( & mut self ) {
497503 let mut timed_out = Vec :: new ( ) ;
498504 self . pending_requests . retain ( |txid, requested_at| {
499- if requested_at. elapsed ( ) >= PENDING_REQUEST_TIMEOUT {
505+ if Instant :: now ( ) . checked_duration_since ( * requested_at) . unwrap_or_default ( )
506+ >= PENDING_REQUEST_TIMEOUT
507+ {
500508 timed_out. push ( * txid) ;
501509 false
502510 } else {
@@ -717,8 +725,8 @@ mod tests {
717725 assert ! ( !manager. pending_requests. contains_key( & extra_txid) ) ;
718726 }
719727
720- #[ test]
721- fn test_prune_pending_requests_timeout ( ) {
728+ #[ tokio :: test( start_paused = true ) ]
729+ async fn test_prune_pending_requests_timeout ( ) {
722730 let wallet = Arc :: new ( RwLock :: new ( MockWallet :: new ( ) ) ) ;
723731 let mempool_state = Arc :: new ( RwLock :: new ( MempoolState :: default ( ) ) ) ;
724732 let ( tx, _rx) = mpsc:: unbounded_channel :: < NetworkRequest > ( ) ;
@@ -727,13 +735,14 @@ mod tests {
727735 let mut manager =
728736 MempoolManager :: new ( wallet, mempool_state, MempoolStrategy :: FetchAll , 1000 ) ;
729737
730- let fresh_txid = Txid :: from_byte_array ( [ 1 ; 32 ] ) ;
731738 let stale_txid = Txid :: from_byte_array ( [ 2 ; 32 ] ) ;
739+ manager. pending_requests . insert ( stale_txid, Instant :: now ( ) ) ;
740+
741+ // Advance time past the timeout so stale_txid expires
742+ tokio:: time:: advance ( PENDING_REQUEST_TIMEOUT + Duration :: from_secs ( 1 ) ) . await ;
732743
744+ let fresh_txid = Txid :: from_byte_array ( [ 1 ; 32 ] ) ;
733745 manager. pending_requests . insert ( fresh_txid, Instant :: now ( ) ) ;
734- manager
735- . pending_requests
736- . insert ( stale_txid, Instant :: now ( ) - PENDING_REQUEST_TIMEOUT - Duration :: from_secs ( 1 ) ) ;
737746
738747 manager. prune_pending_requests ( ) ;
739748
@@ -842,7 +851,7 @@ mod tests {
842851 assert_eq ! ( manager. pending_requests. len( ) , 1 ) ;
843852 }
844853
845- #[ tokio:: test]
854+ #[ tokio:: test( start_paused = true ) ]
846855 async fn test_prune_expired ( ) {
847856 let ( mut manager, _requests, _rx) = create_test_manager ( ) ;
848857
@@ -1372,7 +1381,7 @@ mod tests {
13721381 assert ! ( !state. transactions. contains_key( & txid) ) ;
13731382 }
13741383
1375- #[ tokio:: test]
1384+ #[ tokio:: test( start_paused = true ) ]
13761385 async fn test_prune_expired_preserves_unrelated_is_locks ( ) {
13771386 let ( mut manager, _requests, _rx) = create_test_manager ( ) ;
13781387
@@ -1386,7 +1395,7 @@ mod tests {
13861395 ) ;
13871396 }
13881397
1389- #[ tokio:: test]
1398+ #[ tokio:: test( start_paused = true ) ]
13901399 async fn test_prune_expired_removes_is_lock_for_expired_tx ( ) {
13911400 let ( mut manager, _requests, _rx) = create_test_manager ( ) ;
13921401
@@ -1399,16 +1408,18 @@ mod tests {
13991408 } ;
14001409 let txid = tx. txid ( ) ;
14011410
1402- // Add the tx with a timestamp far in the past so it expires
1411+ // Add the tx at T=0
14031412 {
14041413 let mut state = manager. mempool_state . write ( ) . await ;
1405- let mut utx =
1414+ let utx =
14061415 UnconfirmedTransaction :: new ( tx, Amount :: from_sat ( 0 ) , false , false , Vec :: new ( ) , 0 ) ;
1407- utx. first_seen = Instant :: now ( ) - MEMPOOL_TX_EXPIRY - Duration :: from_secs ( 1 ) ;
14081416 state. add_transaction ( utx) ;
14091417 }
14101418
1411- // Also store a pending IS lock for this txid and an unrelated one
1419+ // Advance time past the expiry threshold so the tx is now considered expired
1420+ tokio:: time:: advance ( MEMPOOL_TX_EXPIRY + Duration :: from_secs ( 1 ) ) . await ;
1421+
1422+ // Insert IS locks after the time advance so they are fresh at the new "now"
14121423 let unrelated_txid = Txid :: from_byte_array ( [ 0xdd ; 32 ] ) ;
14131424 manager. pending_is_locks . insert ( txid, Instant :: now ( ) ) ;
14141425 manager. pending_is_locks . insert ( unrelated_txid, Instant :: now ( ) ) ;
@@ -1427,17 +1438,18 @@ mod tests {
14271438 ) ;
14281439 }
14291440
1430- #[ tokio:: test]
1441+ #[ tokio:: test( start_paused = true ) ]
14311442 async fn test_prune_expired_removes_stale_pending_is_locks ( ) {
14321443 let ( mut manager, _requests, _rx) = create_test_manager ( ) ;
14331444
1434- // Insert a pending IS lock that is older than the expiry threshold
1445+ // Insert a pending IS lock at T=0; it will become stale after we advance time
14351446 let stale_txid = Txid :: from_byte_array ( [ 0xaa ; 32 ] ) ;
1436- manager
1437- . pending_is_locks
1438- . insert ( stale_txid, Instant :: now ( ) - MEMPOOL_TX_EXPIRY - Duration :: from_secs ( 1 ) ) ;
1447+ manager. pending_is_locks . insert ( stale_txid, Instant :: now ( ) ) ;
1448+
1449+ // Advance time past the expiry threshold
1450+ tokio:: time:: advance ( MEMPOOL_TX_EXPIRY + Duration :: from_secs ( 1 ) ) . await ;
14391451
1440- // Insert a fresh pending IS lock
1452+ // Insert a fresh pending IS lock at the new "now"
14411453 let fresh_txid = Txid :: from_byte_array ( [ 0xbb ; 32 ] ) ;
14421454 manager. pending_is_locks . insert ( fresh_txid, Instant :: now ( ) ) ;
14431455
@@ -1476,7 +1488,7 @@ mod tests {
14761488 assert_eq ! ( manager. queued. values( ) . map( |q| q. len( ) ) . sum:: <usize >( ) , 1 ) ;
14771489 }
14781490
1479- #[ tokio:: test]
1491+ #[ tokio:: test( start_paused = true ) ]
14801492 async fn test_activation_retry_on_timeout ( ) {
14811493 let ( mut manager, requests, _rx) = create_test_manager ( ) ;
14821494 manager. activate ( test_socket_address ( 1 ) , & requests) . await . unwrap ( ) ;
@@ -1485,8 +1497,8 @@ mod tests {
14851497 assert ! ( !manager. needs_activation_retry( ) ) ;
14861498 assert ! ( manager. activated_at. is_some( ) ) ;
14871499
1488- // Simulate timeout by backdating the activation timestamp
1489- manager . activated_at = Some ( Instant :: now ( ) - ACTIVATION_TIMEOUT ) ;
1500+ // Simulate timeout by advancing time past the activation timeout
1501+ tokio :: time :: advance ( ACTIVATION_TIMEOUT ) . await ;
14901502 assert ! ( manager. needs_activation_retry( ) ) ;
14911503 }
14921504
@@ -1666,31 +1678,31 @@ mod tests {
16661678 assert ! ( manager. queued. is_empty( ) ) ;
16671679 }
16681680
1669- #[ test]
1670- fn test_prune_pending_requeues_to_connected_peer ( ) {
1681+ #[ tokio :: test( start_paused = true ) ]
1682+ async fn test_prune_pending_requeues_to_connected_peer ( ) {
16711683 let ( mut manager, _requests, _rx) = create_test_manager ( ) ;
16721684 let peer = test_socket_address ( 1 ) ;
16731685 manager. handle_peer_connected ( peer) ;
16741686
16751687 let txid = Txid :: from_byte_array ( [ 1 ; 32 ] ) ;
1676- manager
1677- . pending_requests
1678- . insert ( txid , Instant :: now ( ) - PENDING_REQUEST_TIMEOUT - Duration :: from_secs ( 1 ) ) ;
1688+ manager. pending_requests . insert ( txid , Instant :: now ( ) ) ;
1689+
1690+ tokio :: time :: advance ( PENDING_REQUEST_TIMEOUT + Duration :: from_secs ( 1 ) ) . await ;
16791691
16801692 manager. prune_pending_requests ( ) ;
16811693
16821694 assert ! ( !manager. pending_requests. contains_key( & txid) ) ;
16831695 assert ! ( manager. queued[ & peer] . contains( & txid) ) ;
16841696 }
16851697
1686- #[ test]
1687- fn test_prune_pending_drops_when_no_peers ( ) {
1698+ #[ tokio :: test( start_paused = true ) ]
1699+ async fn test_prune_pending_drops_when_no_peers ( ) {
16881700 let ( mut manager, _requests, _rx) = create_test_manager ( ) ;
16891701
16901702 let txid = Txid :: from_byte_array ( [ 1 ; 32 ] ) ;
1691- manager
1692- . pending_requests
1693- . insert ( txid , Instant :: now ( ) - PENDING_REQUEST_TIMEOUT - Duration :: from_secs ( 1 ) ) ;
1703+ manager. pending_requests . insert ( txid , Instant :: now ( ) ) ;
1704+
1705+ tokio :: time :: advance ( PENDING_REQUEST_TIMEOUT + Duration :: from_secs ( 1 ) ) . await ;
16941706
16951707 manager. prune_pending_requests ( ) ;
16961708
0 commit comments