@@ -67,7 +67,6 @@ pub struct DbContextImpl<M: SpacetimeModule> {
6767 /// which are pre-parsed in the background by [`parse_loop`].
6868 #[ cfg( not( target_arch = "wasm32" ) ) ]
6969 recv : Arc < TokioMutex < mpsc:: UnboundedReceiver < ParsedMessage < M > > > > ,
70-
7170 #[ cfg( target_arch = "wasm32" ) ]
7271 recv : SharedCell < mpsc:: UnboundedReceiver < ParsedMessage < M > > > ,
7372
@@ -81,7 +80,6 @@ pub struct DbContextImpl<M: SpacetimeModule> {
8180 /// from which [Self::apply_pending_mutations] and friends read mutations.
8281 #[ cfg( not( target_arch = "wasm32" ) ) ]
8382 pending_mutations_recv : Arc < TokioMutex < mpsc:: UnboundedReceiver < PendingMutation < M > > > > ,
84-
8583 #[ cfg( target_arch = "wasm32" ) ]
8684 pending_mutations_recv : SharedCell < mpsc:: UnboundedReceiver < PendingMutation < M > > > ,
8785
@@ -473,7 +471,6 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
473471 /// If no WebSocket messages are in the queue, immediately return `false`.
474472 ///
475473 /// Called by the autogenerated `DbConnection` method of the same name.
476- #[ cfg( not( target_arch = "wasm32" ) ) ]
477474 pub fn advance_one_message ( & self ) -> crate :: Result < bool > {
478475 // Apply any pending mutations before processing a WS message,
479476 // so that pending callbacks don't get skipped.
@@ -490,42 +487,27 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
490487 // returns `Err(_)`. Similar behavior as `Iterator::next` and
491488 // `Stream::poll_next`. No comment on whether this is a good mental
492489 // model or not.
493- let res = match self . recv . blocking_lock ( ) . try_next ( ) {
494- Ok ( None ) => {
495- let disconnect_ctx = self . make_event_ctx ( None ) ;
496- self . invoke_disconnected ( & disconnect_ctx) ;
497- Err ( crate :: Error :: Disconnected )
498- }
499- Err ( _) => Ok ( false ) ,
500- Ok ( Some ( msg) ) => self . process_message ( msg) . map ( |_| true ) ,
501- } ;
502490
503- // Also apply any new pending messages afterwards,
504- // so that outgoing WS messages get sent as soon as possible.
505- self . apply_pending_mutations ( ) ? ;
491+ let res = {
492+ # [ cfg ( not ( target_arch = "wasm32" ) ) ]
493+ let mut recv = self . recv . blocking_lock ( ) ;
506494
507- res
508- }
495+ # [ cfg ( target_arch = "wasm32" ) ]
496+ let mut recv = self . recv . lock ( ) . unwrap ( ) ;
509497
510- #[ cfg( target_arch = "wasm32" ) ]
511- pub fn advance_one_message ( & self ) -> crate :: Result < bool > {
512- self . apply_pending_mutations ( ) ?;
513- // Synchronously try to pull one server message
514- let res = {
515- let mut chan = self . recv . lock ( ) . unwrap ( ) ;
516- match chan. try_next ( ) {
498+ match recv. try_next ( ) {
517499 Ok ( None ) => {
518- // Shouldn’t happen on unbounded, treat as disconnect
519- let ctx = self . make_event_ctx ( None ) ;
520- self . invoke_disconnected ( & ctx) ;
500+ let disconnect_ctx = self . make_event_ctx ( None ) ;
501+ self . invoke_disconnected ( & disconnect_ctx) ;
521502 Err ( crate :: Error :: Disconnected )
522503 }
523504 Err ( _) => Ok ( false ) ,
524505 Ok ( Some ( msg) ) => self . process_message ( msg) . map ( |_| true ) ,
525506 }
526507 } ;
527508
528- // send any pending outgoing mutations now that we've done a read
509+ // Also apply any new pending messages afterwards,
510+ // so that outgoing WS messages get sent as soon as possible.
529511 self . apply_pending_mutations ( ) ?;
530512
531513 res
@@ -971,7 +953,7 @@ but you must call one of them, or else the connection will never progress.
971953 } ) ?;
972954
973955 let ( raw_msg_recv, raw_msg_send) = ws_connection. spawn_message_loop ( ) ;
974- let parsed_recv_chan = spawn_parse_loop :: < M > ( raw_msg_recv, & handle ) ;
956+ let parsed_recv_chan = spawn_parse_loop :: < M > ( raw_msg_recv) ;
975957
976958 let inner = Arc :: new ( StdMutex :: new ( DbContextImplInner {
977959 runtime,
@@ -997,10 +979,8 @@ but you must call one of them, or else the connection will never progress.
997979 inner,
998980 send_chan,
999981 cache,
1000- #[ cfg( target_arch = "wasm32" ) ]
1001982 recv : Arc :: new ( StdMutex :: new ( parsed_recv_chan) ) ,
1002983 pending_mutations_send,
1003- #[ cfg( target_arch = "wasm32" ) ]
1004984 pending_mutations_recv : Arc :: new ( StdMutex :: new ( pending_mutations_recv) ) ,
1005985 identity : Arc :: new ( StdMutex :: new ( None ) ) ,
1006986 } ;
@@ -1125,14 +1105,12 @@ fn enter_or_create_runtime() -> crate::Result<(Option<Runtime>, runtime::Handle)
11251105 match runtime:: Handle :: try_current ( ) {
11261106 Err ( e) if e. is_missing_context ( ) => {
11271107 #[ cfg( not( target_arch = "wasm32" ) ) ]
1128- let rt = tokio:: runtime:: Builder :: new_multi_thread ( )
1129- . enable_all ( )
1130- . worker_threads ( 1 )
1131- . thread_name ( "spacetimedb-background-connection" )
1132- . build ( )
1133- . map_err ( |source| InternalError :: new ( "Failed to create Tokio runtime" ) . with_cause ( source) ) ?;
1108+ let mut rt = tokio:: runtime:: Builder :: new_multi_thread ( ) ;
11341109 #[ cfg( target_arch = "wasm32" ) ]
1135- let rt = tokio:: runtime:: Builder :: new_current_thread ( )
1110+ let mut rt = tokio:: runtime:: Builder :: new_current_thread ( ) ;
1111+
1112+ let rt = rt
1113+ . enable_all ( )
11361114 . worker_threads ( 1 )
11371115 . thread_name ( "spacetimedb-background-connection" )
11381116 . build ( )
@@ -1173,7 +1151,6 @@ fn spawn_parse_loop<M: SpacetimeModule>(
11731151#[ cfg( target_arch = "wasm32" ) ]
11741152fn spawn_parse_loop < M : SpacetimeModule > (
11751153 raw_message_recv : mpsc:: UnboundedReceiver < ws:: ServerMessage < BsatnFormat > > ,
1176- _handle : & runtime:: Handle ,
11771154) -> mpsc:: UnboundedReceiver < ParsedMessage < M > > {
11781155 let ( parsed_message_send, parsed_message_recv) = mpsc:: unbounded ( ) ;
11791156 wasm_bindgen_futures:: spawn_local ( parse_loop ( raw_message_recv, parsed_message_send) ) ;
0 commit comments