@@ -23,8 +23,8 @@ use crate::worker_metrics::WORKER_METRICS;
2323use parking_lot:: RwLock ;
2424use prometheus:: IntGauge ;
2525use spacetimedb_client_api_messages:: websocket:: {
26- self as ws, BsatnFormat , FormatSwitch , JsonFormat , SubscribeMulti , SubscribeSingle , TableUpdate , Unsubscribe ,
27- UnsubscribeMulti ,
26+ self as ws, BsatnFormat , Compression , FormatSwitch , JsonFormat , SubscribeMulti , SubscribeSingle , TableUpdate ,
27+ Unsubscribe , UnsubscribeMulti ,
2828} ;
2929use spacetimedb_execution:: pipelined:: PipelinedProject ;
3030use spacetimedb_expr:: check:: parse_and_type_sub;
@@ -170,7 +170,6 @@ impl ModuleSubscriptions {
170170 auth,
171171 ) ?;
172172
173- let comp = sender. config . compression ;
174173 let table_id = query. subscribed_table_id ( ) ;
175174 let table_name = query. subscribed_table_name ( ) ;
176175
@@ -187,10 +186,34 @@ impl ModuleSubscriptions {
187186 let tx = DeltaTx :: from ( tx) ;
188187
189188 Ok ( match sender. config . protocol {
190- Protocol :: Binary => collect_table_update ( & plans, table_id, table_name. into ( ) , comp, & tx, update_type)
191- . map ( |( table_update, metrics) | ( FormatSwitch :: Bsatn ( table_update) , metrics) ) ,
192- Protocol :: Text => collect_table_update ( & plans, table_id, table_name. into ( ) , comp, & tx, update_type)
193- . map ( |( table_update, metrics) | ( FormatSwitch :: Json ( table_update) , metrics) ) ,
189+ Protocol :: Binary => {
190+ collect_table_update (
191+ & plans,
192+ table_id,
193+ table_name. into ( ) ,
194+ // We will compress the outer server message,
195+ // after we release the tx lock.
196+ // There's no need to compress the inner table update too.
197+ Compression :: None ,
198+ & tx,
199+ update_type,
200+ )
201+ . map ( |( table_update, metrics) | ( FormatSwitch :: Bsatn ( table_update) , metrics) )
202+ }
203+ Protocol :: Text => {
204+ collect_table_update (
205+ & plans,
206+ table_id,
207+ table_name. into ( ) ,
208+ // We will compress the outer server message,
209+ // after we release the tx lock,
210+ // There's no need to compress the inner table update too.
211+ Compression :: None ,
212+ & tx,
213+ update_type,
214+ )
215+ . map ( |( table_update, metrics) | ( FormatSwitch :: Json ( table_update) , metrics) )
216+ }
194217 } ?)
195218 }
196219
@@ -213,16 +236,31 @@ impl ModuleSubscriptions {
213236 } ,
214237 auth,
215238 ) ?;
216- let comp = sender. config . compression ;
217239
218240 let tx = DeltaTx :: from ( tx) ;
219241 match sender. config . protocol {
220242 Protocol :: Binary => {
221- let ( update, metrics) = execute_plans ( queries, comp, & tx, update_type) ?;
243+ let ( update, metrics) = execute_plans (
244+ queries,
245+ // We will compress the outer server message,
246+ // after we release the tx lock.
247+ // There's no need to compress the inner table updates too.
248+ Compression :: None ,
249+ & tx,
250+ update_type,
251+ ) ?;
222252 Ok ( ( FormatSwitch :: Bsatn ( update) , metrics) )
223253 }
224254 Protocol :: Text => {
225- let ( update, metrics) = execute_plans ( queries, comp, & tx, update_type) ?;
255+ let ( update, metrics) = execute_plans (
256+ queries,
257+ // We will compress the outer server message,
258+ // after we release the tx lock.
259+ // There's no need to compress the inner table updates too.
260+ Compression :: None ,
261+ & tx,
262+ update_type,
263+ ) ?;
226264 Ok ( ( FormatSwitch :: Json ( update) , metrics) )
227265 }
228266 }
@@ -598,8 +636,6 @@ impl ModuleSubscriptions {
598636
599637 drop ( guard) ;
600638
601- let comp = sender. config . compression ;
602-
603639 check_row_limit (
604640 & queries,
605641 & self . relational_db ,
@@ -614,10 +650,26 @@ impl ModuleSubscriptions {
614650
615651 let tx = DeltaTx :: from ( & * tx) ;
616652 let ( database_update, metrics) = match sender. config . protocol {
617- Protocol :: Binary => execute_plans ( & queries, comp, & tx, TableUpdateType :: Subscribe )
618- . map ( |( table_update, metrics) | ( FormatSwitch :: Bsatn ( table_update) , metrics) ) ?,
619- Protocol :: Text => execute_plans ( & queries, comp, & tx, TableUpdateType :: Subscribe )
620- . map ( |( table_update, metrics) | ( FormatSwitch :: Json ( table_update) , metrics) ) ?,
653+ Protocol :: Binary => execute_plans (
654+ & queries,
655+ // We will compress the outer server message,
656+ // after we release the tx lock.
657+ // There's no need to compress the inner table updates too.
658+ Compression :: None ,
659+ & tx,
660+ TableUpdateType :: Subscribe ,
661+ )
662+ . map ( |( table_update, metrics) | ( FormatSwitch :: Bsatn ( table_update) , metrics) ) ?,
663+ Protocol :: Text => execute_plans (
664+ & queries,
665+ // We will compress the outer server message,
666+ // after we release the tx lock.
667+ // There's no need to compress the inner table updates too.
668+ Compression :: None ,
669+ & tx,
670+ TableUpdateType :: Subscribe ,
671+ )
672+ . map ( |( table_update, metrics) | ( FormatSwitch :: Json ( table_update) , metrics) ) ?,
621673 } ;
622674
623675 record_exec_metrics (
@@ -719,8 +771,8 @@ pub struct WriteConflict;
719771mod tests {
720772 use super :: { AssertTxFn , ModuleSubscriptions } ;
721773 use crate :: client:: messages:: {
722- SerializableMessage , SubscriptionError , SubscriptionMessage , SubscriptionResult , SubscriptionUpdateMessage ,
723- TransactionUpdateMessage ,
774+ SerializableMessage , SubscriptionData , SubscriptionError , SubscriptionMessage , SubscriptionResult ,
775+ SubscriptionUpdateMessage , TransactionUpdateMessage ,
724776 } ;
725777 use crate :: client:: { ClientActorId , ClientConfig , ClientConnectionSender , ClientName , Protocol } ;
726778 use crate :: db:: datastore:: system_tables:: { StRowLevelSecurityRow , ST_ROW_LEVEL_SECURITY_ID } ;
@@ -740,7 +792,7 @@ mod tests {
740792 use spacetimedb_client_api_messages:: energy:: EnergyQuanta ;
741793 use spacetimedb_client_api_messages:: websocket:: {
742794 CompressableQueryUpdate , Compression , FormatSwitch , QueryId , Subscribe , SubscribeMulti , SubscribeSingle ,
743- Unsubscribe , UnsubscribeMulti ,
795+ TableUpdate , Unsubscribe , UnsubscribeMulti ,
744796 } ;
745797 use spacetimedb_execution:: dml:: MutDatastore ;
746798 use spacetimedb_lib:: bsatn:: ToBsatn ;
@@ -856,19 +908,27 @@ mod tests {
856908 }
857909 }
858910
859- /// Instantiate a client connection
860- fn client_connection ( client_id : ClientActorId ) -> ( Arc < ClientConnectionSender > , Receiver < SerializableMessage > ) {
911+ /// Instantiate a client connection with compression
912+ fn client_connection_with_compression (
913+ client_id : ClientActorId ,
914+ compression : Compression ,
915+ ) -> ( Arc < ClientConnectionSender > , Receiver < SerializableMessage > ) {
861916 let ( sender, rx) = ClientConnectionSender :: dummy_with_channel (
862917 client_id,
863918 ClientConfig {
864919 protocol : Protocol :: Binary ,
865- compression : Compression :: None ,
920+ compression,
866921 tx_update_full : true ,
867922 } ,
868923 ) ;
869924 ( Arc :: new ( sender) , rx)
870925 }
871926
927+ /// Instantiate a client connection
928+ fn client_connection ( client_id : ClientActorId ) -> ( Arc < ClientConnectionSender > , Receiver < SerializableMessage > ) {
929+ client_connection_with_compression ( client_id, Compression :: None )
930+ }
931+
872932 /// Insert rules into the RLS system table
873933 fn insert_rls_rules (
874934 db : & RelationalDB ,
@@ -1472,6 +1532,50 @@ mod tests {
14721532 Ok ( ( ) )
14731533 }
14741534
1535+ /// Test that we do not compress the results of an initial subscribe call
1536+ #[ tokio:: test]
1537+ async fn test_no_compression_for_subscribe ( ) -> anyhow:: Result < ( ) > {
1538+ // Establish a client connection with compression
1539+ let ( tx, mut rx) = client_connection_with_compression ( client_id_from_u8 ( 1 ) , Compression :: Brotli ) ;
1540+
1541+ let db = relational_db ( ) ?;
1542+ let subs = module_subscriptions ( db. clone ( ) ) ;
1543+
1544+ let table_id = db. create_table_for_test ( "t" , & [ ( "x" , AlgebraicType :: U64 ) ] , & [ ] ) ?;
1545+
1546+ let mut inserts = vec ! [ ] ;
1547+
1548+ for i in 0 ..16_000u64 {
1549+ inserts. push ( ( table_id, product ! [ i] ) ) ;
1550+ }
1551+
1552+ // Insert a lot of rows into `t`.
1553+ // We want to insert enough to cross any threshold there might be for compression.
1554+ commit_tx ( & db, & subs, [ ] , inserts) ?;
1555+
1556+ // Subscribe to the entire table
1557+ subscribe_multi ( & subs, & [ "select * from t" ] , tx, & mut 0 ) ?;
1558+
1559+ // Assert the table updates within this message are all be uncompressed
1560+ match rx. recv ( ) . await {
1561+ Some ( SerializableMessage :: Subscription ( SubscriptionMessage {
1562+ result :
1563+ SubscriptionResult :: SubscribeMulti ( SubscriptionData {
1564+ data : FormatSwitch :: Bsatn ( ws:: DatabaseUpdate { tables } ) ,
1565+ } ) ,
1566+ ..
1567+ } ) ) => {
1568+ assert ! ( tables. iter( ) . all( |TableUpdate { updates, .. } | updates
1569+ . iter( )
1570+ . all( |query_update| matches!( query_update, CompressableQueryUpdate :: Uncompressed ( _) ) ) ) ) ;
1571+ }
1572+ Some ( _) => panic ! ( "unexpected message from subscription" ) ,
1573+ None => panic ! ( "channel unexpectedly closed" ) ,
1574+ } ;
1575+
1576+ Ok ( ( ) )
1577+ }
1578+
14751579 /// In this test we subscribe to a join query, update the lhs table,
14761580 /// and assert that the server sends the correct delta to the client.
14771581 #[ tokio:: test]
0 commit comments