@@ -264,13 +264,7 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
264264
265265 /// Apply all queued [`PendingMutation`]s.
266266 fn apply_pending_mutations ( & self ) -> crate :: Result < ( ) > {
267- #[ cfg( not( feature = "web" ) ) ]
268- while let Ok ( Some ( pending_mutation) ) = self . pending_mutations_recv . blocking_lock ( ) . try_next ( ) {
269- self . apply_mutation ( pending_mutation) ?;
270- }
271-
272- #[ cfg( feature = "web" ) ]
273- while let Ok ( Some ( pending_mutation) ) = self . pending_mutations_recv . lock ( ) . unwrap ( ) . try_next ( ) {
267+ while let Ok ( Some ( pending_mutation) ) = get_lock_sync ( & self . pending_mutations_recv ) . try_next ( ) {
274268 self . apply_mutation ( pending_mutation) ?;
275269 }
276270
@@ -487,23 +481,14 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
487481 // returns `Err(_)`. Similar behavior as `Iterator::next` and
488482 // `Stream::poll_next`. No comment on whether this is a good mental
489483 // model or not.
490-
491- let res = {
492- #[ cfg( not( feature = "web" ) ) ]
493- let mut recv = self . recv . blocking_lock ( ) ;
494-
495- #[ cfg( feature = "web" ) ]
496- let mut recv = self . recv . lock ( ) . unwrap ( ) ;
497-
498- match recv. try_next ( ) {
499- Ok ( None ) => {
500- let disconnect_ctx = self . make_event_ctx ( None ) ;
501- self . invoke_disconnected ( & disconnect_ctx) ;
502- Err ( crate :: Error :: Disconnected )
503- }
504- Err ( _) => Ok ( false ) ,
505- Ok ( Some ( msg) ) => self . process_message ( msg) . map ( |_| true ) ,
484+ let res = match get_lock_sync ( & self . recv ) . try_next ( ) {
485+ Ok ( None ) => {
486+ let disconnect_ctx = self . make_event_ctx ( None ) ;
487+ self . invoke_disconnected ( & disconnect_ctx) ;
488+ Err ( crate :: Error :: Disconnected )
506489 }
490+ Err ( _) => Ok ( false ) ,
491+ Ok ( Some ( msg) ) => self . process_message ( msg) . map ( |_| true ) ,
507492 } ;
508493
509494 // Also apply any new pending messages afterwards,
@@ -519,15 +504,8 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
519504 // We call this out as an incorrect and unsupported thing to do.
520505 #![ allow( clippy:: await_holding_lock) ]
521506
522- #[ cfg( not( feature = "web" ) ) ]
523- let mut pending_mutations = self . pending_mutations_recv . lock ( ) . await ;
524- #[ cfg( feature = "web" ) ]
525- let mut pending_mutations = self . pending_mutations_recv . lock ( ) . unwrap ( ) ;
526-
527- #[ cfg( not( feature = "web" ) ) ]
528- let mut recv = self . recv . lock ( ) . await ;
529- #[ cfg( feature = "web" ) ]
530- let mut recv = self . recv . lock ( ) . unwrap ( ) ;
507+ let mut pending_mutations = get_lock_async ( & self . pending_mutations_recv ) . await ;
508+ let mut recv = get_lock_async ( & self . recv ) . await ;
531509
532510 // Always process pending mutations before WS messages, if they're available,
533511 // so that newly registered callbacks run on messages.
@@ -1130,6 +1108,31 @@ fn enter_or_create_runtime() -> crate::Result<(Option<Runtime>, runtime::Handle)
11301108 }
11311109}
11321110
1111+ /// Synchronous lock helper: native = blocking_lock, web = lock().unwrap()
1112+ #[ cfg( not( feature = "web" ) ) ]
1113+ fn get_lock_sync < T > ( mutex : & TokioMutex < T > ) -> tokio:: sync:: MutexGuard < ' _ , T > {
1114+ mutex. blocking_lock ( )
1115+ }
1116+
1117+ /// Synchronous lock helper: native = blocking_lock, web = lock().unwrap()
1118+ #[ cfg( feature = "web" ) ]
1119+ fn get_lock_sync < T > ( mutex : & StdMutex < T > ) -> std:: sync:: MutexGuard < ' _ , T > {
1120+ mutex. lock ( ) . unwrap ( )
1121+ }
1122+
1123+ // Async‐lock helper: native = .lock().await, web = lock().unwrap() inside async fn
1124+ #[ cfg( not( feature = "web" ) ) ]
1125+ async fn get_lock_async < T > ( mutex : & TokioMutex < T > ) -> tokio:: sync:: MutexGuard < ' _ , T > {
1126+ mutex. lock ( ) . await
1127+ }
1128+
1129+ // Async‐lock helper: native = .lock().await, web = lock().unwrap() inside async fn
1130+ #[ cfg( feature = "web" ) ]
1131+ pub async fn get_lock_async < T > ( mutex : & StdMutex < T > ) -> std:: sync:: MutexGuard < ' _ , T > {
1132+ // still async, but does the sync lock immediately
1133+ mutex. lock ( ) . unwrap ( )
1134+ }
1135+
11331136enum ParsedMessage < M : SpacetimeModule > {
11341137 InitialSubscription { db_update : M :: DbUpdate , sub_id : u32 } ,
11351138 TransactionUpdate ( Event < M :: Reducer > , Option < M :: DbUpdate > ) ,
0 commit comments