@@ -34,6 +34,8 @@ use crate::{
3434} ;
3535use bytes:: Bytes ;
3636use futures:: StreamExt ;
37+ #[ cfg( feature = "web" ) ]
38+ use futures:: { pin_mut, FutureExt } ;
3739use futures_channel:: mpsc;
3840use http:: Uri ;
3941use spacetimedb_client_api_messages:: websocket as ws;
@@ -44,9 +46,11 @@ use std::{
4446 collections:: HashMap ,
4547 sync:: { atomic:: AtomicU32 , Arc , Mutex as StdMutex , OnceLock } ,
4648} ;
47- use tokio:: runtime:: { self , Runtime } ;
4849#[ cfg( not( feature = "web" ) ) ]
49- use tokio:: sync:: Mutex as TokioMutex ;
50+ use tokio:: {
51+ runtime:: { self , Runtime } ,
52+ sync:: Mutex as TokioMutex ,
53+ } ;
5054
5155pub ( crate ) type SharedCell < T > = Arc < StdMutex < T > > ;
5256
@@ -56,6 +60,7 @@ pub(crate) type SharedCell<T> = Arc<StdMutex<T>>;
5660/// This must be relatively cheaply `Clone`-able, and have internal sharing,
5761/// as numerous operations will clone it to get new handles on the connection.
5862pub struct DbContextImpl < M : SpacetimeModule > {
63+ #[ cfg( not( feature = "web" ) ) ]
5964 runtime : runtime:: Handle ,
6065
6166 /// All the state which is safe to hold a lock on while running callbacks.
@@ -102,6 +107,7 @@ pub struct DbContextImpl<M: SpacetimeModule> {
102107impl < M : SpacetimeModule > Clone for DbContextImpl < M > {
103108 fn clone ( & self ) -> Self {
104109 Self {
110+ #[ cfg( not( feature = "web" ) ) ]
105111 runtime : self . runtime . clone ( ) ,
106112 // Being very explicit with `Arc::clone` here,
107113 // since we'll be doing `DbContextImpl::clone` very frequently,
@@ -576,15 +582,28 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
576582 return Message :: Local ( pending_mutation. unwrap ( ) ) ;
577583 }
578584
585+ #[ cfg( not( feature = "web" ) ) ]
579586 tokio:: select! {
580587 pending_mutation = pending_mutations. next( ) => Message :: Local ( pending_mutation. unwrap( ) ) ,
581588 incoming_message = recv. next( ) => Message :: Ws ( incoming_message) ,
582589 }
590+
591+ #[ cfg( feature = "web" ) ]
592+ {
593+ let ( pending_fut, recv_fut) = ( pending_mutations. next ( ) . fuse ( ) , recv. next ( ) . fuse ( ) ) ;
594+ pin_mut ! ( pending_fut, recv_fut) ;
595+
596+ futures:: select! {
597+ pending_mutation = pending_fut => Message :: Local ( pending_mutation. unwrap( ) ) ,
598+ incoming_message = recv_fut => Message :: Ws ( incoming_message) ,
599+ }
600+ }
583601 }
584602
585603 /// Like [`Self::advance_one_message`], but sleeps the thread until a message is available.
586604 ///
587605 /// Called by the autogenerated `DbConnection` method of the same name.
606+ #[ cfg( not( feature = "web" ) ) ]
588607 pub fn advance_one_message_blocking ( & self ) -> crate :: Result < ( ) > {
589608 match self . runtime . block_on ( self . get_message ( ) ) {
590609 Message :: Local ( pending) => self . apply_mutation ( pending) ,
@@ -623,6 +642,7 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
623642 /// Spawn a thread which does [`Self::advance_one_message_blocking`] in a loop.
624643 ///
625644 /// Called by the autogenerated `DbConnection` method of the same name.
645+ #[ cfg( not( feature = "web" ) ) ]
626646 pub fn run_threaded ( & self ) -> std:: thread:: JoinHandle < ( ) > {
627647 let this = self . clone ( ) ;
628648 std:: thread:: spawn ( move || loop {
@@ -806,6 +826,7 @@ pub(crate) struct DbContextImplInner<M: SpacetimeModule> {
806826 /// `Some` if not within the context of an outer runtime. The `Runtime` must
807827 /// then live as long as `Self`.
808828 #[ allow( unused) ]
829+ #[ cfg( not( feature = "web" ) ) ]
809830 runtime : Option < Runtime > ,
810831
811832 db_callbacks : DbCallbacks < M > ,
@@ -1016,7 +1037,6 @@ but you must call one of them, or else the connection will never progress.
10161037
10171038 #[ cfg( feature = "web" ) ]
10181039 pub async fn build_impl ( self ) -> crate :: Result < DbContextImpl < M > > {
1019- let ( runtime, handle) = enter_or_create_runtime ( ) ?;
10201040 let db_callbacks = DbCallbacks :: default ( ) ;
10211041 let reducer_callbacks = ReducerCallbacks :: default ( ) ;
10221042
@@ -1037,8 +1057,6 @@ but you must call one of them, or else the connection will never progress.
10371057 let parsed_recv_chan = spawn_parse_loop :: < M > ( raw_msg_recv) ;
10381058
10391059 let inner = Arc :: new ( StdMutex :: new ( DbContextImplInner {
1040- runtime,
1041-
10421060 db_callbacks,
10431061 reducer_callbacks,
10441062 subscriptions : SubscriptionManager :: default ( ) ,
@@ -1056,7 +1074,6 @@ but you must call one of them, or else the connection will never progress.
10561074
10571075 let ( pending_mutations_send, pending_mutations_recv) = mpsc:: unbounded ( ) ;
10581076 let ctx_imp = DbContextImpl {
1059- runtime : handle,
10601077 inner,
10611078 send_chan,
10621079 cache,
@@ -1202,15 +1219,11 @@ Instead of registering multiple `on_disconnect` callbacks, register a single cal
12021219// When called from within an async context, return a handle to it (and no
12031220// `Runtime`), otherwise create a fresh `Runtime` and return it along with a
12041221// handle to it.
1222+ #[ cfg( not( feature = "web" ) ) ]
12051223fn enter_or_create_runtime ( ) -> crate :: Result < ( Option < Runtime > , runtime:: Handle ) > {
12061224 match runtime:: Handle :: try_current ( ) {
12071225 Err ( e) if e. is_missing_context ( ) => {
1208- #[ cfg( not( feature = "web" ) ) ]
1209- let mut rt = tokio:: runtime:: Builder :: new_multi_thread ( ) ;
1210- #[ cfg( feature = "web" ) ]
1211- let mut rt = tokio:: runtime:: Builder :: new_current_thread ( ) ;
1212-
1213- let rt = rt
1226+ let rt = tokio:: runtime:: Builder :: new_multi_thread ( )
12141227 . enable_all ( )
12151228 . worker_threads ( 1 )
12161229 . thread_name ( "spacetimedb-background-connection" )
0 commit comments