@@ -293,13 +293,7 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
293293
294294 /// Apply all queued [`PendingMutation`]s.
295295 fn apply_pending_mutations ( & self ) -> crate :: Result < ( ) > {
296- #[ cfg( not( feature = "web" ) ) ]
297- while let Ok ( Some ( pending_mutation) ) = self . pending_mutations_recv . blocking_lock ( ) . try_next ( ) {
298- self . apply_mutation ( pending_mutation) ?;
299- }
300-
301- #[ cfg( feature = "web" ) ]
302- while let Ok ( Some ( pending_mutation) ) = self . pending_mutations_recv . lock ( ) . unwrap ( ) . try_next ( ) {
296+ while let Ok ( Some ( pending_mutation) ) = get_lock_sync ( & self . pending_mutations_recv ) . try_next ( ) {
303297 self . apply_mutation ( pending_mutation) ?;
304298 }
305299
@@ -547,23 +541,14 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
547541 // returns `Err(_)`. Similar behavior as `Iterator::next` and
548542 // `Stream::poll_next`. No comment on whether this is a good mental
549543 // model or not.
550-
551- let res = {
552- #[ cfg( not( feature = "web" ) ) ]
553- let mut recv = self . recv . blocking_lock ( ) ;
554-
555- #[ cfg( feature = "web" ) ]
556- let mut recv = self . recv . lock ( ) . unwrap ( ) ;
557-
558- match recv. try_next ( ) {
559- Ok ( None ) => {
560- let disconnect_ctx = self . make_event_ctx ( None ) ;
561- self . invoke_disconnected ( & disconnect_ctx) ;
562- Err ( crate :: Error :: Disconnected )
563- }
564- Err ( _) => Ok ( false ) ,
565- Ok ( Some ( msg) ) => self . process_message ( msg) . map ( |_| true ) ,
544+ let res = match get_lock_sync ( & self . recv ) . try_next ( ) {
545+ Ok ( None ) => {
546+ let disconnect_ctx = self . make_event_ctx ( None ) ;
547+ self . invoke_disconnected ( & disconnect_ctx) ;
548+ Err ( crate :: Error :: Disconnected )
566549 }
550+ Err ( _) => Ok ( false ) ,
551+ Ok ( Some ( msg) ) => self . process_message ( msg) . map ( |_| true ) ,
567552 } ;
568553
569554 // Also apply any new pending messages afterwards,
@@ -579,15 +564,8 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
579564 // We call this out as an incorrect and unsupported thing to do.
580565 #![ allow( clippy:: await_holding_lock) ]
581566
582- #[ cfg( not( feature = "web" ) ) ]
583- let mut pending_mutations = self . pending_mutations_recv . lock ( ) . await ;
584- #[ cfg( feature = "web" ) ]
585- let mut pending_mutations = self . pending_mutations_recv . lock ( ) . unwrap ( ) ;
586-
587- #[ cfg( not( feature = "web" ) ) ]
588- let mut recv = self . recv . lock ( ) . await ;
589- #[ cfg( feature = "web" ) ]
590- let mut recv = self . recv . lock ( ) . unwrap ( ) ;
567+ let mut pending_mutations = get_lock_async ( & self . pending_mutations_recv ) . await ;
568+ let mut recv = get_lock_async ( & self . recv ) . await ;
591569
592570 // Always process pending mutations before WS messages, if they're available,
593571 // so that newly registered callbacks run on messages.
@@ -1251,6 +1229,31 @@ fn enter_or_create_runtime() -> crate::Result<(Option<Runtime>, runtime::Handle)
12511229 }
12521230}
12531231
1232+ /// Synchronous lock helper: native = blocking_lock, web = lock().unwrap()
1233+ #[ cfg( not( feature = "web" ) ) ]
1234+ fn get_lock_sync < T > ( mutex : & TokioMutex < T > ) -> tokio:: sync:: MutexGuard < ' _ , T > {
1235+ mutex. blocking_lock ( )
1236+ }
1237+
1238+ /// Synchronous lock helper: native = blocking_lock, web = lock().unwrap()
1239+ #[ cfg( feature = "web" ) ]
1240+ fn get_lock_sync < T > ( mutex : & StdMutex < T > ) -> std:: sync:: MutexGuard < ' _ , T > {
1241+ mutex. lock ( ) . unwrap ( )
1242+ }
1243+
1244+ // Async‐lock helper: native = .lock().await, web = lock().unwrap() inside async fn
1245+ #[ cfg( not( feature = "web" ) ) ]
1246+ async fn get_lock_async < T > ( mutex : & TokioMutex < T > ) -> tokio:: sync:: MutexGuard < ' _ , T > {
1247+ mutex. lock ( ) . await
1248+ }
1249+
1250+ // Async‐lock helper: native = .lock().await, web = lock().unwrap() inside async fn
1251+ #[ cfg( feature = "web" ) ]
1252+ pub async fn get_lock_async < T > ( mutex : & StdMutex < T > ) -> std:: sync:: MutexGuard < ' _ , T > {
1253+ // still async, but does the sync lock immediately
1254+ mutex. lock ( ) . unwrap ( )
1255+ }
1256+
12541257enum ParsedMessage < M : SpacetimeModule > {
12551258 InitialSubscription {
12561259 db_update : M :: DbUpdate ,
0 commit comments