11use std:: collections:: VecDeque ;
22use std:: mem;
33use std:: pin:: { pin, Pin } ;
4+ use std:: sync:: Arc ;
45use std:: time:: Duration ;
56
67use axum:: extract:: { Path , Query , State } ;
@@ -14,12 +15,14 @@ use futures::{Future, FutureExt, SinkExt, StreamExt};
1415use http:: { HeaderValue , StatusCode } ;
1516use scopeguard:: ScopeGuard ;
1617use serde:: Deserialize ;
17- use spacetimedb:: client:: messages:: { serialize, IdentityTokenMessage , SerializableMessage } ;
18+ use spacetimedb:: client:: messages:: { serialize, IdentityTokenMessage , SerializableMessage , SerializeBufferPool } ;
1819use spacetimedb:: client:: { ClientActorId , ClientConfig , ClientConnection , DataMessage , MessageHandleError , Protocol } ;
20+ use spacetimedb:: execution_context:: WorkloadType ;
1921use spacetimedb:: host:: module_host:: ClientConnectedError ;
2022use spacetimedb:: host:: NoSuchModule ;
2123use spacetimedb:: util:: also_poll;
2224use spacetimedb:: worker_metrics:: WORKER_METRICS ;
25+ use spacetimedb:: Identity ;
2326use spacetimedb_client_api_messages:: websocket:: { self as ws_api, Compression } ;
2427use spacetimedb_lib:: connection_id:: { ConnectionId , ConnectionIdForUrl } ;
2528use std:: time:: Instant ;
@@ -125,6 +128,8 @@ where
125128 name : ctx. client_actor_index ( ) . next_client_name ( ) ,
126129 } ;
127130
131+ let serialize_buffer_pool = ctx. websocket_send_serialize_buffer_pool ( ) . clone ( ) ;
132+
128133 let ws_config = WebSocketConfig :: default ( )
129134 . max_message_size ( Some ( 0x2000000 ) )
130135 . max_frame_size ( None )
@@ -146,7 +151,7 @@ where
146151 None => log:: debug!( "New client connected from unknown ip" ) ,
147152 }
148153
149- let actor = |client, sendrx| ws_client_actor ( client, ws, sendrx) ;
154+ let actor = |client, sendrx| ws_client_actor ( client, ws, sendrx, serialize_buffer_pool ) ;
150155 let client = match ClientConnection :: spawn ( client_id, client_config, leader. replica_id , module_rx, actor) . await
151156 {
152157 Ok ( s) => s,
@@ -180,13 +185,18 @@ where
180185
181186const LIVELINESS_TIMEOUT : Duration = Duration :: from_secs ( 60 ) ;
182187
183- async fn ws_client_actor ( client : ClientConnection , ws : WebSocketStream , sendrx : mpsc:: Receiver < SerializableMessage > ) {
188+ async fn ws_client_actor (
189+ client : ClientConnection ,
190+ ws : WebSocketStream ,
191+ sendrx : mpsc:: Receiver < SerializableMessage > ,
192+ serialize_buffer_pool : Arc < SerializeBufferPool > ,
193+ ) {
184194 // ensure that even if this task gets cancelled, we always cleanup the connection
185195 let mut client = scopeguard:: guard ( client, |client| {
186196 tokio:: spawn ( client. disconnect ( ) ) ;
187197 } ) ;
188198
189- ws_client_actor_inner ( & mut client, ws, sendrx) . await ;
199+ ws_client_actor_inner ( & mut client, ws, sendrx, & serialize_buffer_pool ) . await ;
190200
191201 ScopeGuard :: into_inner ( client) . disconnect ( ) . await ;
192202}
@@ -203,6 +213,7 @@ async fn ws_client_actor_inner(
203213 client : & mut ClientConnection ,
204214 mut ws : WebSocketStream ,
205215 mut sendrx : mpsc:: Receiver < SerializableMessage > ,
216+ serialize_buffer_pool : & SerializeBufferPool ,
206217) {
207218 let mut liveness_check_interval = tokio:: time:: interval ( LIVELINESS_TIMEOUT ) ;
208219 let mut got_pong = true ;
@@ -298,31 +309,31 @@ async fn ws_client_actor_inner(
298309 // even though the websocket RFC allows it. should we fork tungstenite?
299310 log:: info!( "dropping messages due to ws already being closed: {:?}" , & rx_buf[ ..n] ) ;
300311 } else {
301- let send_all = async {
302- for msg in rx_buf. drain( ..n) {
303- let workload = msg. workload( ) ;
304- let num_rows = msg. num_rows( ) ;
305-
306- let msg = datamsg_to_wsmsg( serialize( msg, client. config) ) ;
307-
308- // These metrics should be updated together,
309- // or not at all.
310- if let ( Some ( workload) , Some ( num_rows) ) = ( workload, num_rows) {
311- WORKER_METRICS
312- . websocket_sent_num_rows
313- . with_label_values( & addr, & workload)
314- . observe( num_rows as f64 ) ;
315- WORKER_METRICS
316- . websocket_sent_msg_size
317- . with_label_values( & addr, & workload)
318- . observe( msg. len( ) as f64 ) ;
312+ let send_all = async {
313+ for msg in rx_buf. drain( ..n) {
314+ let workload = msg. workload( ) ;
315+ let num_rows = msg. num_rows( ) ;
316+
317+ // Serialize the message, report metrics,
318+ // and keep a handle to the buffer.
319+ let msg_data = serialize( serialize_buffer_pool, msg, client. config) ;
320+ report_ws_sent_metrics( & addr, workload, num_rows, & msg_data) ;
321+ let msg_alloc = msg_data. allocation( ) ;
322+
323+ // Buffer the message without necessarily sending it.
324+ ws. feed( datamsg_to_wsmsg( msg_data) ) . await ?;
325+
326+ // At this point,
327+ // the underlying allocation of `msg_data` should have a single referent
328+ // and this should be `msg_alloc`.
329+ // We can put this back into our pool.
330+ let msg_alloc = msg_alloc. try_into_mut( )
331+ . expect( "should have a unique referent to `msg_alloc`" ) ;
332+ serialize_buffer_pool. put( msg_alloc) ;
319333 }
320- // feed() buffers the message, but does not necessarily send it
321- ws. feed( msg) . await ?;
322- }
323- // now we flush all the messages to the socket
324- ws. flush( ) . await
325- } ;
334+ // now we flush all the messages to the socket
335+ ws. flush( ) . await
336+ } ;
326337 // Flush the websocket while continuing to poll the `handle_queue`,
327338 // to avoid deadlocks or delays due to enqueued futures holding resources.
328339 let send_all = also_poll( send_all, make_progress( & mut current_message) ) ;
@@ -393,10 +404,24 @@ async fn ws_client_actor_inner(
393404 if let Err ( e) = res {
394405 if let MessageHandleError :: Execution ( err) = e {
395406 log:: error!( "{err:#}" ) ;
396- let msg = serialize ( err, client. config ) ;
397- if let Err ( error) = ws. send ( datamsg_to_wsmsg ( msg) ) . await {
407+ // Serialize the message and keep a handle to the buffer.
408+ let msg_data = serialize ( serialize_buffer_pool, err, client. config ) ;
409+ let msg_alloc = msg_data. allocation ( ) ;
410+
411+ // Buffer the message without necessarily sending it.
412+ if let Err ( error) = ws. send ( datamsg_to_wsmsg ( msg_data) ) . await {
398413 log:: warn!( "Websocket send error: {error}" )
399414 }
415+
416+ // At this point,
417+ // the underlying allocation of `msg_data` should have a single referent
418+ // and this should be `msg_alloc`.
419+ // We can put this back into our pool.
420+ let msg_alloc = msg_alloc
421+ . try_into_mut ( )
422+ . expect ( "should have a unique referent to `msg_alloc`" ) ;
423+ serialize_buffer_pool. put ( msg_alloc) ;
424+
400425 continue ;
401426 }
402427 log:: debug!( "Client caused error on text message: {}" , e) ;
@@ -460,6 +485,27 @@ impl ClientMessage {
460485 }
461486}
462487
488+ /// Report metrics on sent rows and message sizes to a websocket client.
489+ fn report_ws_sent_metrics (
490+ addr : & Identity ,
491+ workload : Option < WorkloadType > ,
492+ num_rows : Option < usize > ,
493+ msg_ws : & DataMessage ,
494+ ) {
495+ // These metrics should be updated together,
496+ // or not at all.
497+ if let ( Some ( workload) , Some ( num_rows) ) = ( workload, num_rows) {
498+ WORKER_METRICS
499+ . websocket_sent_num_rows
500+ . with_label_values ( addr, & workload)
501+ . observe ( num_rows as f64 ) ;
502+ WORKER_METRICS
503+ . websocket_sent_msg_size
504+ . with_label_values ( addr, & workload)
505+ . observe ( msg_ws. len ( ) as f64 ) ;
506+ }
507+ }
508+
463509fn datamsg_to_wsmsg ( msg : DataMessage ) -> WsMessage {
464510 match msg {
465511 DataMessage :: Text ( text) => WsMessage :: Text ( bytestring_to_utf8bytes ( text) ) ,
0 commit comments