@@ -31,6 +31,8 @@ use crate::{
3131} ;
3232use bytes:: Bytes ;
3333use futures:: StreamExt ;
34+ #[ cfg( feature = "web" ) ]
35+ use futures:: { pin_mut, FutureExt } ;
3436use futures_channel:: mpsc;
3537use http:: Uri ;
3638use spacetimedb_client_api_messages:: websocket as ws;
@@ -40,9 +42,11 @@ use std::{
4042 collections:: HashMap ,
4143 sync:: { atomic:: AtomicU32 , Arc , Mutex as StdMutex , OnceLock } ,
4244} ;
43- use tokio:: runtime:: { self , Runtime } ;
4445#[ cfg( not( feature = "web" ) ) ]
45- use tokio:: sync:: Mutex as TokioMutex ;
46+ use tokio:: {
47+ runtime:: { self , Runtime } ,
48+ sync:: Mutex as TokioMutex ,
49+ } ;
4650
4751pub ( crate ) type SharedCell < T > = Arc < StdMutex < T > > ;
4852
@@ -52,6 +56,7 @@ pub(crate) type SharedCell<T> = Arc<StdMutex<T>>;
5256/// This must be relatively cheaply `Clone`-able, and have internal sharing,
5357/// as numerous operations will clone it to get new handles on the connection.
5458pub struct DbContextImpl < M : SpacetimeModule > {
59+ #[ cfg( not( feature = "web" ) ) ]
5560 runtime : runtime:: Handle ,
5661
5762 /// All the state which is safe to hold a lock on while running callbacks.
@@ -93,6 +98,7 @@ pub struct DbContextImpl<M: SpacetimeModule> {
9398impl < M : SpacetimeModule > Clone for DbContextImpl < M > {
9499 fn clone ( & self ) -> Self {
95100 Self {
101+ #[ cfg( not( feature = "web" ) ) ]
96102 runtime : self . runtime . clone ( ) ,
97103 // Being very explicit with `Arc::clone` here,
98104 // since we'll be doing `DbContextImpl::clone` very frequently,
@@ -516,15 +522,28 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
516522 return Message :: Local ( pending_mutation. unwrap ( ) ) ;
517523 }
518524
525+ #[ cfg( not( feature = "web" ) ) ]
519526 tokio:: select! {
520527 pending_mutation = pending_mutations. next( ) => Message :: Local ( pending_mutation. unwrap( ) ) ,
521528 incoming_message = recv. next( ) => Message :: Ws ( incoming_message) ,
522529 }
530+
531+ #[ cfg( feature = "web" ) ]
532+ {
533+ let ( pending_fut, recv_fut) = ( pending_mutations. next ( ) . fuse ( ) , recv. next ( ) . fuse ( ) ) ;
534+ pin_mut ! ( pending_fut, recv_fut) ;
535+
536+ futures:: select! {
537+ pending_mutation = pending_fut => Message :: Local ( pending_mutation. unwrap( ) ) ,
538+ incoming_message = recv_fut => Message :: Ws ( incoming_message) ,
539+ }
540+ }
523541 }
524542
525543 /// Like [`Self::advance_one_message`], but sleeps the thread until a message is available.
526544 ///
527545 /// Called by the autogenerated `DbConnection` method of the same name.
546+ #[ cfg( not( feature = "web" ) ) ]
528547 pub fn advance_one_message_blocking ( & self ) -> crate :: Result < ( ) > {
529548 match self . runtime . block_on ( self . get_message ( ) ) {
530549 Message :: Local ( pending) => self . apply_mutation ( pending) ,
@@ -563,6 +582,7 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
563582 /// Spawn a thread which does [`Self::advance_one_message_blocking`] in a loop.
564583 ///
565584 /// Called by the autogenerated `DbConnection` method of the same name.
585+ #[ cfg( not( feature = "web" ) ) ]
566586 pub fn run_threaded ( & self ) -> std:: thread:: JoinHandle < ( ) > {
567587 let this = self . clone ( ) ;
568588 std:: thread:: spawn ( move || loop {
@@ -714,6 +734,7 @@ pub(crate) struct DbContextImplInner<M: SpacetimeModule> {
714734 /// `Some` if not within the context of an outer runtime. The `Runtime` must
715735 /// then live as long as `Self`.
716736 #[ allow( unused) ]
737+ #[ cfg( not( feature = "web" ) ) ]
717738 runtime : Option < Runtime > ,
718739
719740 db_callbacks : DbCallbacks < M > ,
@@ -917,7 +938,6 @@ but you must call one of them, or else the connection will never progress.
917938
918939 #[ cfg( feature = "web" ) ]
919940 pub async fn build_impl ( self ) -> crate :: Result < DbContextImpl < M > > {
920- let ( runtime, handle) = enter_or_create_runtime ( ) ?;
921941 let db_callbacks = DbCallbacks :: default ( ) ;
922942 let reducer_callbacks = ReducerCallbacks :: default ( ) ;
923943
@@ -937,8 +957,6 @@ but you must call one of them, or else the connection will never progress.
937957 let parsed_recv_chan = spawn_parse_loop :: < M > ( raw_msg_recv) ;
938958
939959 let inner = Arc :: new ( StdMutex :: new ( DbContextImplInner {
940- runtime,
941-
942960 db_callbacks,
943961 reducer_callbacks,
944962 subscriptions : SubscriptionManager :: default ( ) ,
@@ -956,7 +974,6 @@ but you must call one of them, or else the connection will never progress.
956974
957975 let ( pending_mutations_send, pending_mutations_recv) = mpsc:: unbounded ( ) ;
958976 let ctx_imp = DbContextImpl {
959- runtime : handle,
960977 inner,
961978 send_chan,
962979 cache,
@@ -1081,15 +1098,11 @@ Instead of registering multiple `on_disconnect` callbacks, register a single cal
10811098// When called from within an async context, return a handle to it (and no
10821099// `Runtime`), otherwise create a fresh `Runtime` and return it along with a
10831100// handle to it.
1101+ #[ cfg( not( feature = "web" ) ) ]
10841102fn enter_or_create_runtime ( ) -> crate :: Result < ( Option < Runtime > , runtime:: Handle ) > {
10851103 match runtime:: Handle :: try_current ( ) {
10861104 Err ( e) if e. is_missing_context ( ) => {
1087- #[ cfg( not( feature = "web" ) ) ]
1088- let mut rt = tokio:: runtime:: Builder :: new_multi_thread ( ) ;
1089- #[ cfg( feature = "web" ) ]
1090- let mut rt = tokio:: runtime:: Builder :: new_current_thread ( ) ;
1091-
1092- let rt = rt
1105+ let rt = tokio:: runtime:: Builder :: new_multi_thread ( )
10931106 . enable_all ( )
10941107 . worker_threads ( 1 )
10951108 . thread_name ( "spacetimedb-background-connection" )
0 commit comments