@@ -38,11 +38,10 @@ use spacetimedb::Identity;
3838use spacetimedb_client_api_messages:: websocket:: { self as ws_api, Compression } ;
3939use spacetimedb_datastore:: execution_context:: WorkloadType ;
4040use spacetimedb_lib:: connection_id:: { ConnectionId , ConnectionIdForUrl } ;
41- use std:: time:: Instant ;
4241use tokio:: sync:: { mpsc, watch} ;
4342use tokio:: task:: JoinHandle ;
4443use tokio:: time:: error:: Elapsed ;
45- use tokio:: time:: { sleep_until, timeout} ;
44+ use tokio:: time:: { sleep_until, timeout, Instant } ;
4645use tokio_tungstenite:: tungstenite:: protocol:: frame:: coding:: { Data , OpCode } ;
4746use tokio_tungstenite:: tungstenite:: protocol:: frame:: Frame ;
4847use tokio_tungstenite:: tungstenite:: Utf8Bytes ;
@@ -422,7 +421,7 @@ async fn ws_client_actor_inner(
422421 let client = client. clone ( ) ;
423422 move |data, timer| {
424423 let client = client. clone ( ) ;
425- async move { client. handle_message ( data, timer) . await }
424+ async move { client. handle_message ( data, timer. into ( ) ) . await }
426425 }
427426 } ,
428427 unordered_tx. clone ( ) ,
@@ -655,7 +654,7 @@ async fn ws_main_loop<HotswapWatcher>(
655654/// The `activity` should be updated whenever a new message is received.
656655async fn ws_idle_timer ( mut activity : watch:: Receiver < Instant > ) {
657656 let mut deadline = * activity. borrow ( ) ;
658- let sleep = sleep_until ( deadline. into ( ) ) ;
657+ let sleep = sleep_until ( deadline) ;
659658 pin_mut ! ( sleep) ;
660659
661660 loop {
@@ -666,7 +665,7 @@ async fn ws_idle_timer(mut activity: watch::Receiver<Instant>) {
666665 let new_deadline = * activity. borrow_and_update( ) ;
667666 if new_deadline != deadline {
668667 deadline = new_deadline;
669- sleep. as_mut( ) . reset( deadline. into ( ) ) ;
668+ sleep. as_mut( ) . reset( deadline) ;
670669 }
671670 } ,
672671
@@ -1449,6 +1448,20 @@ mod tests {
14491448
14501449 use super :: * ;
14511450
1451+ // [NOTE: start_paused]:
1452+ //
1453+ // Some of the tests below test timeouts or rely on time in some other way.
1454+ // Since that is prone to flakiness (depending on machine load), we use
1455+ // [tokio::time::pause] to run those tests with paused time.
1456+ //
1457+ // Tokio will auto-advance time when [sleep] is used, and the executor has
1458+ // no other work to do, so this should work as expected: the elapsed time
1459+ // is the sum of the sleep time in the awaited future.
1460+ //
1461+ // Crucially, all timer-backed primitives must use [tokio::time::Instant]
1462+ // rather than [std::time::Instant]. In case a test becomes flaky again in
1463+ // the future, check for use of std `Instant` first.
1464+
14521465 fn dummy_client_id ( ) -> ClientActorId {
14531466 ClientActorId {
14541467 identity : Identity :: ZERO ,
@@ -1465,7 +1478,7 @@ mod tests {
14651478 ActorState :: new ( Identity :: ZERO , dummy_client_id ( ) , config)
14661479 }
14671480
1468- #[ tokio:: test]
1481+ #[ tokio:: test( start_paused = true ) ] // see [NOTE: start_paused ]
14691482 async fn idle_timer_extends_sleep ( ) {
14701483 let timeout = Duration :: from_millis ( 10 ) ;
14711484
@@ -1783,7 +1796,7 @@ mod tests {
17831796 . await ;
17841797 }
17851798
1786- #[ tokio:: test]
1799+ #[ tokio:: test( start_paused = true ) ] // see [NOTE: start_paused ]
17871800 async fn main_loop_terminates_on_idle_timeout ( ) {
17881801 let state = Arc :: new ( dummy_actor_state_with_config ( WebSocketOptions {
17891802 idle_timeout : Duration :: from_millis ( 10 ) ,
@@ -1821,7 +1834,7 @@ mod tests {
18211834 assert ! ( elapsed < timeout + Duration :: from_millis( 10 ) ) ;
18221835 }
18231836
1824- #[ tokio:: test]
1837+ #[ tokio:: test( start_paused = true ) ] // see [NOTE: start_paused ]
18251838 async fn main_loop_keepalive_keeps_alive ( ) {
18261839 let state = Arc :: new ( dummy_actor_state_with_config ( WebSocketOptions {
18271840 ping_interval : Duration :: from_millis ( 5 ) ,
@@ -1868,10 +1881,16 @@ mod tests {
18681881 // It didn't time out.
18691882 assert_matches ! ( res, Ok ( Ok ( ( ) ) ) ) ;
18701883 // It didn't exit early. Allow it to miss a ping.
1871- assert ! ( elapsed >= expected_timeout - state. config. ping_interval) ;
1884+ let expected_timeout = expected_timeout - state. config . ping_interval ;
1885+ assert ! (
1886+ elapsed >= expected_timeout,
1887+ "should not exit early: elapsed={} expected_timeout={}" ,
1888+ elapsed. as_millis( ) ,
1889+ expected_timeout. as_millis( )
1890+ ) ;
18721891 }
18731892
1874- #[ tokio:: test]
1893+ #[ tokio:: test( start_paused = true ) ] // see [NOTE: start_paused ]
18751894 async fn main_loop_terminates_when_module_exits ( ) {
18761895 let state = Arc :: new ( dummy_actor_state ( ) ) ;
18771896
@@ -1885,7 +1904,7 @@ mod tests {
18851904 }
18861905 } ;
18871906
1888- let start = Instant :: now ( ) ;
1907+ let start = tokio :: time :: Instant :: now ( ) ;
18891908 tokio:: spawn ( async move {
18901909 let hotswap = || async {
18911910 sleep ( Duration :: from_millis ( 5 ) ) . await ;
@@ -1913,8 +1932,15 @@ mod tests {
19131932 . await
19141933 . unwrap ( ) ;
19151934 let elapsed = start. elapsed ( ) ;
1916- assert ! ( elapsed >= Duration :: from_millis( 5 ) ) ;
1917- assert ! ( elapsed < Duration :: from_millis( 10 ) ) ;
1935+
1936+ assert ! (
1937+ elapsed >= Duration :: from_millis( 5 ) ,
1938+ "main loop should run until module is shut down"
1939+ ) ;
1940+ assert ! (
1941+ elapsed < Duration :: from_millis( 10 ) ,
1942+ "main loop should shut down shortly after module is shut down"
1943+ ) ;
19181944 }
19191945
19201946 #[ tokio:: test]
0 commit comments