@@ -978,9 +978,6 @@ but you must call one of them, or else the connection will never progress.
978978 #[ cfg( not( feature = "web" ) ) ]
979979 fn build_impl ( self ) -> crate :: Result < DbContextImpl < M > > {
980980 let ( runtime, handle) = enter_or_create_runtime ( ) ?;
981- let db_callbacks = DbCallbacks :: default ( ) ;
982- let reducer_callbacks = ReducerCallbacks :: default ( ) ;
983- let procedure_callbacks = ProcedureCallbacks :: default ( ) ;
984981
985982 let connection_id_override = get_connection_id_override ( ) ;
986983 let ws_connection = tokio:: task:: block_in_place ( || {
@@ -998,50 +995,30 @@ but you must call one of them, or else the connection will never progress.
998995
999996 let ( _websocket_loop_handle, raw_msg_recv, raw_msg_send) = ws_connection. spawn_message_loop ( & handle) ;
1000997 let ( _parse_loop_handle, parsed_recv_chan) = spawn_parse_loop :: < M > ( raw_msg_recv, & handle) ;
1001-
1002- let inner = Arc :: new ( StdMutex :: new ( DbContextImplInner {
1003- runtime,
1004-
1005- db_callbacks,
1006- reducer_callbacks,
1007- subscriptions : SubscriptionManager :: default ( ) ,
1008-
1009- on_connect : self . on_connect ,
1010- on_connect_error : self . on_connect_error ,
1011- on_disconnect : self . on_disconnect ,
1012- call_reducer_flags : <_ >:: default ( ) ,
1013- procedure_callbacks,
1014- } ) ) ;
1015-
1016- let mut cache = ClientCache :: default ( ) ;
1017- M :: register_tables ( & mut cache) ;
1018- let cache = Arc :: new ( StdMutex :: new ( cache) ) ;
1019- let send_chan = Arc :: new ( StdMutex :: new ( Some ( raw_msg_send) ) ) ;
998+ let parsed_recv_chan = Arc :: new ( TokioMutex :: new ( parsed_recv_chan) ) ;
1020999
10211000 let ( pending_mutations_send, pending_mutations_recv) = mpsc:: unbounded ( ) ;
1022- let ctx_imp = DbContextImpl {
1023- runtime : handle,
1024- inner,
1025- send_chan,
1026- cache,
1027- recv : Arc :: new ( TokioMutex :: new ( parsed_recv_chan) ) ,
1001+ let pending_mutations_recv = Arc :: new ( TokioMutex :: new ( pending_mutations_recv) ) ;
1002+
1003+ let inner_ctx = build_db_ctx_inner ( runtime, self . on_connect , self . on_connect_error , self . on_disconnect ) ;
1004+ Ok ( build_db_ctx (
1005+ handle,
1006+ inner_ctx,
1007+ raw_msg_send,
1008+ parsed_recv_chan,
10281009 pending_mutations_send,
1029- pending_mutations_recv : Arc :: new ( TokioMutex :: new ( pending_mutations_recv) ) ,
1030- identity : Arc :: new ( StdMutex :: new ( None ) ) ,
1031- connection_id : Arc :: new ( StdMutex :: new ( connection_id_override) ) ,
1032- } ;
1033-
1034- Ok ( ctx_imp)
1010+ pending_mutations_recv,
1011+ connection_id_override,
1012+ ) )
10351013 }
10361014
1015+ /// Open a WebSocket connection, build an empty client cache, &c,
1016+ /// to construct a [`DbContextImpl`].
10371017 #[ cfg( feature = "web" ) ]
1038- pub async fn build_impl ( self ) -> crate :: Result < DbContextImpl < M > > {
1039- let db_callbacks = DbCallbacks :: default ( ) ;
1040- let reducer_callbacks = ReducerCallbacks :: default ( ) ;
1041-
1018+ async fn build_impl ( self ) -> crate :: Result < DbContextImpl < M > > {
10421019 let connection_id_override = get_connection_id_override ( ) ;
10431020 let ws_connection = WsConnection :: connect (
1044- self . uri . unwrap ( ) ,
1021+ self . uri . clone ( ) . unwrap ( ) ,
10451022 self . module_name . as_ref ( ) . unwrap ( ) ,
10461023 self . token . as_deref ( ) ,
10471024 connection_id_override,
@@ -1054,36 +1031,20 @@ but you must call one of them, or else the connection will never progress.
10541031
10551032 let ( raw_msg_recv, raw_msg_send) = ws_connection. spawn_message_loop ( ) ;
10561033 let parsed_recv_chan = spawn_parse_loop :: < M > ( raw_msg_recv) ;
1057-
1058- let inner = Arc :: new ( StdMutex :: new ( DbContextImplInner {
1059- db_callbacks,
1060- reducer_callbacks,
1061- subscriptions : SubscriptionManager :: default ( ) ,
1062-
1063- on_connect : self . on_connect ,
1064- on_connect_error : self . on_connect_error ,
1065- on_disconnect : self . on_disconnect ,
1066- call_reducer_flags : <_ >:: default ( ) ,
1067- } ) ) ;
1068-
1069- let mut cache = ClientCache :: default ( ) ;
1070- M :: register_tables ( & mut cache) ;
1071- let cache = Arc :: new ( StdMutex :: new ( cache) ) ;
1072- let send_chan = Arc :: new ( StdMutex :: new ( Some ( raw_msg_send) ) ) ;
1034+ let parsed_recv_chan = Arc :: new ( StdMutex :: new ( parsed_recv_chan) ) ;
10731035
10741036 let ( pending_mutations_send, pending_mutations_recv) = mpsc:: unbounded ( ) ;
1075- let ctx_imp = DbContextImpl {
1076- inner,
1077- send_chan,
1078- cache,
1079- recv : Arc :: new ( StdMutex :: new ( parsed_recv_chan) ) ,
1080- pending_mutations_send,
1081- pending_mutations_recv : Arc :: new ( StdMutex :: new ( pending_mutations_recv) ) ,
1082- identity : Arc :: new ( StdMutex :: new ( None ) ) ,
1083- connection_id : Arc :: new ( StdMutex :: new ( connection_id_override) ) ,
1084- } ;
1037+ let pending_mutations_recv = Arc :: new ( StdMutex :: new ( pending_mutations_recv) ) ;
10851038
1086- Ok ( ctx_imp)
1039+ let inner_ctx = build_db_ctx_inner ( self . on_connect , self . on_connect_error , self . on_disconnect ) ;
1040+ Ok ( build_db_ctx (
1041+ inner_ctx,
1042+ raw_msg_send,
1043+ parsed_recv_chan,
1044+ pending_mutations_send,
1045+ pending_mutations_recv,
1046+ connection_id_override,
1047+ ) )
10871048 }
10881049
10891050 /// Set the URI of the SpacetimeDB host which is running the remote module.
@@ -1215,6 +1176,60 @@ Instead of registering multiple `on_disconnect` callbacks, register a single cal
12151176 }
12161177}
12171178
1179+ /// Create a [`DbContextImplInner`] wrapped in `Arc<Mutex<...>>`.
1180+ fn build_db_ctx_inner < M : SpacetimeModule > (
1181+ #[ cfg( not( feature = "web" ) ) ] runtime : Option < Runtime > ,
1182+
1183+ on_connect_cb : Option < OnConnectCallback < M > > ,
1184+ on_connect_error_cb : Option < OnConnectErrorCallback < M > > ,
1185+ on_disconnect_cb : Option < OnDisconnectCallback < M > > ,
1186+ ) -> Arc < StdMutex < DbContextImplInner < M > > > {
1187+ Arc :: new ( StdMutex :: new ( DbContextImplInner {
1188+ #[ cfg( not( feature = "web" ) ) ]
1189+ runtime,
1190+
1191+ db_callbacks : DbCallbacks :: default ( ) ,
1192+ reducer_callbacks : ReducerCallbacks :: default ( ) ,
1193+ subscriptions : SubscriptionManager :: default ( ) ,
1194+
1195+ on_connect : on_connect_cb,
1196+ on_connect_error : on_connect_error_cb,
1197+ on_disconnect : on_disconnect_cb,
1198+ call_reducer_flags : <_ >:: default ( ) ,
1199+
1200+ procedure_callbacks : ProcedureCallbacks :: default ( ) ,
1201+ } ) )
1202+ }
1203+
1204+ /// Assemble and return a [`DbContextImpl`] from the provided [`DbContextImplInner`], and channels.
1205+ fn build_db_ctx < M : SpacetimeModule > (
1206+ #[ cfg( not( feature = "web" ) ) ] runtime_handle : runtime:: Handle ,
1207+
1208+ inner_ctx : Arc < StdMutex < DbContextImplInner < M > > > ,
1209+ raw_msg_send : mpsc:: UnboundedSender < ws:: ClientMessage < Bytes > > ,
1210+ parsed_msg_recv : SharedAsyncCell < mpsc:: UnboundedReceiver < ParsedMessage < M > > > ,
1211+ pending_mutations_send : mpsc:: UnboundedSender < PendingMutation < M > > ,
1212+ pending_mutations_recv : SharedAsyncCell < mpsc:: UnboundedReceiver < PendingMutation < M > > > ,
1213+ connection_id : Option < ConnectionId > ,
1214+ ) -> DbContextImpl < M > {
1215+ let mut cache = ClientCache :: default ( ) ;
1216+ M :: register_tables ( & mut cache) ;
1217+ let cache = Arc :: new ( StdMutex :: new ( cache) ) ;
1218+
1219+ DbContextImpl {
1220+ #[ cfg( not( feature = "web" ) ) ]
1221+ runtime : runtime_handle,
1222+ inner : inner_ctx,
1223+ send_chan : Arc :: new ( StdMutex :: new ( Some ( raw_msg_send) ) ) ,
1224+ cache,
1225+ recv : parsed_msg_recv,
1226+ pending_mutations_send,
1227+ pending_mutations_recv,
1228+ identity : Arc :: new ( StdMutex :: new ( None ) ) ,
1229+ connection_id : Arc :: new ( StdMutex :: new ( connection_id) ) ,
1230+ }
1231+ }
1232+
12181233// When called from within an async context, return a handle to it (and no
12191234// `Runtime`), otherwise create a fresh `Runtime` and return it along with a
12201235// handle to it.
0 commit comments