@@ -37,7 +37,12 @@ use http::Uri;
3737use spacetimedb_client_api_messages:: websocket:: { self as ws, common:: QuerySetId } ;
3838use spacetimedb_lib:: { bsatn, ser:: Serialize , ConnectionId , Identity , Timestamp } ;
3939use spacetimedb_sats:: Deserialize ;
40- use std:: sync:: { atomic:: AtomicU32 , Arc , Mutex as StdMutex , OnceLock } ;
40+ use std:: {
41+ fs:: { File , OpenOptions } ,
42+ io:: Write ,
43+ path:: PathBuf ,
44+ sync:: { atomic:: AtomicU32 , Arc , Mutex as StdMutex , OnceLock } ,
45+ } ;
4146use tokio:: {
4247 runtime:: { self , Runtime } ,
4348 sync:: Mutex as TokioMutex ,
@@ -86,6 +91,8 @@ pub struct DbContextImpl<M: SpacetimeModule> {
8691 ///
8792 /// This may be none if we have not yet received the [`ws::v2::InitialConnection`] message.
8893 connection_id : SharedCell < Option < ConnectionId > > ,
94+
95+ pub ( crate ) extra_logging : Option < SharedCell < File > > ,
8996}
9097
9198impl < M : SpacetimeModule > Clone for DbContextImpl < M > {
@@ -103,14 +110,20 @@ impl<M: SpacetimeModule> Clone for DbContextImpl<M> {
103110 pending_mutations_recv : Arc :: clone ( & self . pending_mutations_recv ) ,
104111 identity : Arc :: clone ( & self . identity ) ,
105112 connection_id : Arc :: clone ( & self . connection_id ) ,
113+ extra_logging : Option :: < Arc < _ > > :: clone ( & self . extra_logging ) ,
106114 }
107115 }
108116}
109117
110118impl < M : SpacetimeModule > DbContextImpl < M > {
119+ pub ( crate ) fn debug_log ( & self , body : impl FnOnce ( & mut File ) -> std:: result:: Result < ( ) , std:: io:: Error > ) {
120+ debug_log ( & self . extra_logging , body) ;
121+ }
122+
111123 /// Process a parsed WebSocket message,
112124 /// applying its mutations to the client cache and invoking callbacks.
113125 fn process_message ( & self , msg : ParsedMessage < M > ) -> crate :: Result < ( ) > {
126+ self . debug_log ( |out| writeln ! ( out, "`process_message`: {msg:?}" ) ) ;
114127 match msg {
115128 // Error: treat this as an erroneous disconnect.
116129 ParsedMessage :: Error ( e) => {
@@ -315,6 +328,7 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
315328
316329 /// Apply an individual [`PendingMutation`].
317330 fn apply_mutation ( & self , mutation : PendingMutation < M > ) -> crate :: Result < ( ) > {
331+ self . debug_log ( |out| writeln ! ( out, "`apply_mutation`: {mutation:?}" ) ) ;
318332 match mutation {
319333 // Subscribe: register the subscription in the [`SubscriptionManager`]
320334 // and send the `Subscribe` WS message.
@@ -763,6 +777,8 @@ pub struct DbConnectionBuilder<M: SpacetimeModule> {
763777 on_connect_error : Option < OnConnectErrorCallback < M > > ,
764778 on_disconnect : Option < OnDisconnectCallback < M > > ,
765779
780+ additional_logging_path : Option < PathBuf > ,
781+
766782 params : WsParams ,
767783}
768784
@@ -798,6 +814,15 @@ pub fn set_connection_id(id: ConnectionId) -> crate::Result<()> {
798814 Ok ( ( ) )
799815}
800816
817+ pub ( crate ) fn debug_log (
818+ extra_logging : & Option < SharedCell < File > > ,
819+ body : impl FnOnce ( & mut File ) -> std:: result:: Result < ( ) , std:: io:: Error > ,
820+ ) {
821+ if let Some ( file) = extra_logging {
822+ body ( & mut * file. lock ( ) . expect ( "`extra_logging` file Mutex is poisoned" ) ) . expect ( "Writing debug log failed" )
823+ }
824+ }
825+
801826impl < M : SpacetimeModule > DbConnectionBuilder < M > {
802827 /// Implementation of the generated `DbConnection::builder` method.
803828 /// Call that method instead.
@@ -810,6 +835,7 @@ impl<M: SpacetimeModule> DbConnectionBuilder<M> {
810835 on_connect : None ,
811836 on_connect_error : None ,
812837 on_disconnect : None ,
838+ additional_logging_path : None ,
813839 params : <_ >:: default ( ) ,
814840 }
815841 }
@@ -847,6 +873,16 @@ but you must call one of them, or else the connection will never progress.
847873 /// Open a WebSocket connection, build an empty client cache, &c,
848874 /// to construct a [`DbContextImpl`].
849875 fn build_impl ( self ) -> crate :: Result < DbContextImpl < M > > {
876+ let extra_logging = self
877+ . additional_logging_path
878+ . map ( |path| {
879+ OpenOptions :: new ( ) . append ( true ) . create ( true ) . open ( & path) . map_err ( |e| {
880+ InternalError :: new ( format ! ( "Failed to open file '{path:?}' for additional logging" ) ) . with_cause ( e)
881+ } )
882+ } )
883+ . transpose ( ) ?
884+ . map ( |file| Arc :: new ( StdMutex :: new ( file) ) ) ;
885+
850886 let ( runtime, handle) = enter_or_create_runtime ( ) ?;
851887 let db_callbacks = DbCallbacks :: default ( ) ;
852888 let reducer_callbacks = ReducerCallbacks :: default ( ) ;
@@ -866,8 +902,10 @@ but you must call one of them, or else the connection will never progress.
866902 source : InternalError :: new ( "Failed to initiate WebSocket connection" ) . with_cause ( source) ,
867903 } ) ?;
868904
869- let ( _websocket_loop_handle, raw_msg_recv, raw_msg_send) = ws_connection. spawn_message_loop ( & handle) ;
870- let ( _parse_loop_handle, parsed_recv_chan) = spawn_parse_loop :: < M > ( raw_msg_recv, & handle) ;
905+ let ( _websocket_loop_handle, raw_msg_recv, raw_msg_send) =
906+ ws_connection. spawn_message_loop ( & handle, extra_logging. clone ( ) ) ;
907+ let ( _parse_loop_handle, parsed_recv_chan) =
908+ spawn_parse_loop :: < M > ( raw_msg_recv, & handle, extra_logging. clone ( ) ) ;
871909
872910 let inner = Arc :: new ( StdMutex :: new ( DbContextImplInner {
873911 runtime,
@@ -898,6 +936,7 @@ but you must call one of them, or else the connection will never progress.
898936 pending_mutations_recv : Arc :: new ( TokioMutex :: new ( pending_mutations_recv) ) ,
899937 identity : Arc :: new ( StdMutex :: new ( None ) ) ,
900938 connection_id : Arc :: new ( StdMutex :: new ( connection_id_override) ) ,
939+ extra_logging,
901940 } ;
902941
903942 Ok ( ctx_imp)
@@ -963,6 +1002,23 @@ but you must call one of them, or else the connection will never progress.
9631002 self
9641003 }
9651004
1005+ /// Set `path` as a path for additional debug logging related to SDK internals.
1006+ ///
1007+ /// When enabled, the SDK will create or open `path` for write-append and write logs to it.
1008+ /// This is useful for diagnosing bugs in the SDK,
1009+ /// but will generate a large volume of text logs and may have performance overhead,
1010+ /// so it should not be used in production.
1011+ ///
1012+ /// When running multiple connections in parallel,
1013+ /// either within the same process or from separate processes,
1014+ /// prefer giving each its own unique path here;
1015+ /// multiple `DbConnection`s writing to the same debug file concurrently
1016+ /// may interleave or corrupt the output.
1017+ pub fn with_debug_to_file ( mut self , path : impl Into < PathBuf > ) -> Self {
1018+ self . additional_logging_path = Some ( path. into ( ) ) ;
1019+ self
1020+ }
1021+
9661022 /// Register a callback to run when the connection is successfully initiated.
9671023 ///
9681024 /// The callback will receive three arguments:
@@ -1043,6 +1099,7 @@ fn enter_or_create_runtime() -> crate::Result<(Option<Runtime>, runtime::Handle)
10431099 }
10441100}
10451101
1102+ #[ derive( Debug ) ]
10461103enum ParsedMessage < M : SpacetimeModule > {
10471104 TransactionUpdate ( M :: DbUpdate ) ,
10481105 IdentityToken ( Identity , Box < str > , ConnectionId ) ,
@@ -1073,9 +1130,10 @@ enum ParsedMessage<M: SpacetimeModule> {
10731130fn spawn_parse_loop < M : SpacetimeModule > (
10741131 raw_message_recv : mpsc:: UnboundedReceiver < ws:: v2:: ServerMessage > ,
10751132 handle : & runtime:: Handle ,
1133+ extra_logging : Option < SharedCell < File > > ,
10761134) -> ( tokio:: task:: JoinHandle < ( ) > , mpsc:: UnboundedReceiver < ParsedMessage < M > > ) {
10771135 let ( parsed_message_send, parsed_message_recv) = mpsc:: unbounded ( ) ;
1078- let handle = handle. spawn ( parse_loop ( raw_message_recv, parsed_message_send) ) ;
1136+ let handle = handle. spawn ( parse_loop ( raw_message_recv, parsed_message_send, extra_logging ) ) ;
10791137 ( handle, parsed_message_recv)
10801138}
10811139
@@ -1084,9 +1142,13 @@ fn spawn_parse_loop<M: SpacetimeModule>(
10841142async fn parse_loop < M : SpacetimeModule > (
10851143 mut recv : mpsc:: UnboundedReceiver < ws:: v2:: ServerMessage > ,
10861144 send : mpsc:: UnboundedSender < ParsedMessage < M > > ,
1145+ extra_logging : Option < SharedCell < File > > ,
10871146) {
10881147 while let Some ( msg) = recv. next ( ) . await {
1089- send. unbounded_send ( match msg {
1148+ debug_log ( & extra_logging, |file| {
1149+ writeln ! ( file, "`parse_loop`: Got raw message: {msg:?}" )
1150+ } ) ;
1151+ let parsed = match msg {
10901152 ws:: v2:: ServerMessage :: TransactionUpdate ( transaction_update) => {
10911153 match M :: DbUpdate :: parse_update ( transaction_update) {
10921154 Err ( e) => ParsedMessage :: Error (
@@ -1210,8 +1272,12 @@ async fn parse_loop<M: SpacetimeModule>(
12101272 ws:: v2:: ProcedureStatus :: Returned ( val) => Ok ( val) ,
12111273 } ,
12121274 } ,
1213- } )
1214- . expect ( "Failed to send ParsedMessage to main thread" ) ;
1275+ } ;
1276+ debug_log ( & extra_logging, |file| {
1277+ writeln ! ( file, "`parse_loop`: Parsed as: {parsed:?}" )
1278+ } ) ;
1279+ send. unbounded_send ( parsed)
1280+ . expect ( "Failed to send ParsedMessage to main thread" ) ;
12151281 }
12161282}
12171283
@@ -1263,6 +1329,62 @@ pub(crate) enum PendingMutation<M: SpacetimeModule> {
12631329 } ,
12641330}
12651331
1332+ // Hand-written `Debug` impl, 'cause `SubscriptionHandleImpl` and callbacks aren't printable.
1333+ impl < M : SpacetimeModule > std:: fmt:: Debug for PendingMutation < M > {
1334+ fn fmt ( & self , f : & mut std:: fmt:: Formatter ) -> std:: fmt:: Result {
1335+ match self {
1336+ PendingMutation :: Unsubscribe { query_set_id } => f
1337+ . debug_struct ( "PendingMutation::Unsubscribe" )
1338+ . field ( "query_set_id" , query_set_id)
1339+ . finish ( ) ,
1340+ PendingMutation :: Subscribe { query_set_id, .. } => f
1341+ . debug_struct ( "PendingMutation::Subscribe" )
1342+ . field ( "query_set_id" , query_set_id)
1343+ . finish_non_exhaustive ( ) ,
1344+ PendingMutation :: AddInsertCallback { table, callback_id, .. } => f
1345+ . debug_struct ( "PendingMutation::AddInsertCallback" )
1346+ . field ( "table" , table)
1347+ . field ( "callback_id" , callback_id)
1348+ . finish_non_exhaustive ( ) ,
1349+ PendingMutation :: RemoveInsertCallback { table, callback_id } => f
1350+ . debug_struct ( "PendingMutation::RemoveInsertCallback" )
1351+ . field ( "table" , table)
1352+ . field ( "callback_id" , callback_id)
1353+ . finish ( ) ,
1354+ PendingMutation :: AddDeleteCallback { table, callback_id, .. } => f
1355+ . debug_struct ( "PendingMutation::AddDeleteCallback" )
1356+ . field ( "table" , table)
1357+ . field ( "callback_id" , callback_id)
1358+ . finish_non_exhaustive ( ) ,
1359+ PendingMutation :: RemoveDeleteCallback { table, callback_id } => f
1360+ . debug_struct ( "PendingMutation::RemoveDeleteCallback" )
1361+ . field ( "table" , table)
1362+ . field ( "callback_id" , callback_id)
1363+ . finish ( ) ,
1364+ PendingMutation :: AddUpdateCallback { table, callback_id, .. } => f
1365+ . debug_struct ( "PendingMutation::AddUpdateCallback" )
1366+ . field ( "table" , table)
1367+ . field ( "callback_id" , callback_id)
1368+ . finish_non_exhaustive ( ) ,
1369+ PendingMutation :: RemoveUpdateCallback { table, callback_id } => f
1370+ . debug_struct ( "PendingMutation::RemoveUpdateCallback" )
1371+ . field ( "table" , table)
1372+ . field ( "callback_id" , callback_id)
1373+ . finish ( ) ,
1374+ PendingMutation :: Disconnect => write ! ( f, "PendingMutation::Disconnect" ) ,
1375+ PendingMutation :: InvokeReducerWithCallback { reducer, .. } => f
1376+ . debug_struct ( "PendingMutation::InvokeReducerWithCallback" )
1377+ . field ( "reducer" , reducer)
1378+ . finish_non_exhaustive ( ) ,
1379+ PendingMutation :: InvokeProcedureWithCallback { procedure, args, .. } => f
1380+ . debug_struct ( "PendingMutation::InvokeProcedureWithCallback" )
1381+ . field ( "procedure" , procedure)
1382+ . field ( "args" , args)
1383+ . finish_non_exhaustive ( ) ,
1384+ }
1385+ }
1386+ }
1387+
12661388enum Message < M : SpacetimeModule > {
12671389 Ws ( Option < ParsedMessage < M > > ) ,
12681390 Local ( PendingMutation < M > ) ,
0 commit comments