@@ -23,8 +23,8 @@ use crate::worker_metrics::WORKER_METRICS;
2323use parking_lot:: RwLock ;
2424use prometheus:: IntGauge ;
2525use spacetimedb_client_api_messages:: websocket:: {
26- self as ws, BsatnFormat , Compression , FormatSwitch , JsonFormat , SubscribeMulti , SubscribeSingle , TableUpdate ,
27- Unsubscribe , UnsubscribeMulti ,
26+ self as ws, BsatnFormat , FormatSwitch , JsonFormat , SubscribeMulti , SubscribeSingle , TableUpdate , Unsubscribe ,
27+ UnsubscribeMulti ,
2828} ;
2929use spacetimedb_execution:: pipelined:: PipelinedProject ;
3030use spacetimedb_expr:: check:: parse_and_type_sub;
@@ -186,34 +186,10 @@ impl ModuleSubscriptions {
186186 let tx = DeltaTx :: from ( tx) ;
187187
188188 Ok ( match sender. config . protocol {
189- Protocol :: Binary => {
190- collect_table_update (
191- & plans,
192- table_id,
193- table_name. into ( ) ,
194- // We will compress the outer server message,
195- // after we release the tx lock.
196- // There's no need to compress the inner table update too.
197- Compression :: None ,
198- & tx,
199- update_type,
200- )
201- . map ( |( table_update, metrics) | ( FormatSwitch :: Bsatn ( table_update) , metrics) )
202- }
203- Protocol :: Text => {
204- collect_table_update (
205- & plans,
206- table_id,
207- table_name. into ( ) ,
208- // We will compress the outer server message,
209- // after we release the tx lock,
210- // There's no need to compress the inner table update too.
211- Compression :: None ,
212- & tx,
213- update_type,
214- )
215- . map ( |( table_update, metrics) | ( FormatSwitch :: Json ( table_update) , metrics) )
216- }
189+ Protocol :: Binary => collect_table_update ( & plans, table_id, table_name. into ( ) , & tx, update_type)
190+ . map ( |( table_update, metrics) | ( FormatSwitch :: Bsatn ( table_update) , metrics) ) ,
191+ Protocol :: Text => collect_table_update ( & plans, table_id, table_name. into ( ) , & tx, update_type)
192+ . map ( |( table_update, metrics) | ( FormatSwitch :: Json ( table_update) , metrics) ) ,
217193 } ?)
218194 }
219195
@@ -240,27 +216,11 @@ impl ModuleSubscriptions {
240216 let tx = DeltaTx :: from ( tx) ;
241217 match sender. config . protocol {
242218 Protocol :: Binary => {
243- let ( update, metrics) = execute_plans (
244- queries,
245- // We will compress the outer server message,
246- // after we release the tx lock.
247- // There's no need to compress the inner table updates too.
248- Compression :: None ,
249- & tx,
250- update_type,
251- ) ?;
219+ let ( update, metrics) = execute_plans ( queries, & tx, update_type) ?;
252220 Ok ( ( FormatSwitch :: Bsatn ( update) , metrics) )
253221 }
254222 Protocol :: Text => {
255- let ( update, metrics) = execute_plans (
256- queries,
257- // We will compress the outer server message,
258- // after we release the tx lock.
259- // There's no need to compress the inner table updates too.
260- Compression :: None ,
261- & tx,
262- update_type,
263- ) ?;
223+ let ( update, metrics) = execute_plans ( queries, & tx, update_type) ?;
264224 Ok ( ( FormatSwitch :: Json ( update) , metrics) )
265225 }
266226 }
@@ -650,26 +610,10 @@ impl ModuleSubscriptions {
650610
651611 let tx = DeltaTx :: from ( & * tx) ;
652612 let ( database_update, metrics) = match sender. config . protocol {
653- Protocol :: Binary => execute_plans (
654- & queries,
655- // We will compress the outer server message,
656- // after we release the tx lock.
657- // There's no need to compress the inner table updates too.
658- Compression :: None ,
659- & tx,
660- TableUpdateType :: Subscribe ,
661- )
662- . map ( |( table_update, metrics) | ( FormatSwitch :: Bsatn ( table_update) , metrics) ) ?,
663- Protocol :: Text => execute_plans (
664- & queries,
665- // We will compress the outer server message,
666- // after we release the tx lock.
667- // There's no need to compress the inner table updates too.
668- Compression :: None ,
669- & tx,
670- TableUpdateType :: Subscribe ,
671- )
672- . map ( |( table_update, metrics) | ( FormatSwitch :: Json ( table_update) , metrics) ) ?,
613+ Protocol :: Binary => execute_plans ( & queries, & tx, TableUpdateType :: Subscribe )
614+ . map ( |( table_update, metrics) | ( FormatSwitch :: Bsatn ( table_update) , metrics) ) ?,
615+ Protocol :: Text => execute_plans ( & queries, & tx, TableUpdateType :: Subscribe )
616+ . map ( |( table_update, metrics) | ( FormatSwitch :: Json ( table_update) , metrics) ) ?,
673617 } ;
674618
675619 record_exec_metrics (
@@ -1539,7 +1483,9 @@ mod tests {
15391483 Ok ( ( ) )
15401484 }
15411485
1542- /// Test that we do not compress the results of an initial subscribe call
1486+ /// Test that we do not compress within a [SubscriptionMessage].
1487+ /// The message itself is compressed before being sent over the wire,
1488+ /// but we don't care about that for this test.
15431489 #[ tokio:: test]
15441490 async fn test_no_compression_for_subscribe ( ) -> anyhow:: Result < ( ) > {
15451491 // Establish a client connection with compression
@@ -1583,6 +1529,56 @@ mod tests {
15831529 Ok ( ( ) )
15841530 }
15851531
1532+ /// Test that we do not compress within a [TransactionUpdateMessage].
1533+ /// The message itself is compressed before being sent over the wire,
1534+ /// but we don't care about that for this test.
1535+ #[ tokio:: test]
1536+ async fn test_no_compression_for_update ( ) -> anyhow:: Result < ( ) > {
1537+ // Establish a client connection with compression
1538+ let ( tx, mut rx) = client_connection_with_compression ( client_id_from_u8 ( 1 ) , Compression :: Brotli ) ;
1539+
1540+ let db = relational_db ( ) ?;
1541+ let subs = module_subscriptions ( db. clone ( ) ) ;
1542+
1543+ let table_id = db. create_table_for_test ( "t" , & [ ( "x" , AlgebraicType :: U64 ) ] , & [ ] ) ?;
1544+
1545+ let mut inserts = vec ! [ ] ;
1546+
1547+ for i in 0 ..16_000u64 {
1548+ inserts. push ( ( table_id, product ! [ i] ) ) ;
1549+ }
1550+
1551+ // Subscribe to the entire table
1552+ subscribe_multi ( & subs, & [ "select * from t" ] , tx, & mut 0 ) ?;
1553+
1554+ // Wait to receive the initial subscription message
1555+ assert ! ( matches!( rx. recv( ) . await , Some ( SerializableMessage :: Subscription ( _) ) ) ) ;
1556+
1557+ // Insert a lot of rows into `t`.
1558+ // We want to insert enough to cross any threshold there might be for compression.
1559+ commit_tx ( & db, & subs, [ ] , inserts) ?;
1560+
1561+ // Assert the table updates within this message are all be uncompressed
1562+ match rx. recv ( ) . await {
1563+ Some ( SerializableMessage :: TxUpdate ( TransactionUpdateMessage {
1564+ database_update :
1565+ SubscriptionUpdateMessage {
1566+ database_update : FormatSwitch :: Bsatn ( ws:: DatabaseUpdate { tables } ) ,
1567+ ..
1568+ } ,
1569+ ..
1570+ } ) ) => {
1571+ assert ! ( tables. iter( ) . all( |TableUpdate { updates, .. } | updates
1572+ . iter( )
1573+ . all( |query_update| matches!( query_update, CompressableQueryUpdate :: Uncompressed ( _) ) ) ) ) ;
1574+ }
1575+ Some ( _) => panic ! ( "unexpected message from subscription" ) ,
1576+ None => panic ! ( "channel unexpectedly closed" ) ,
1577+ } ;
1578+
1579+ Ok ( ( ) )
1580+ }
1581+
15861582 /// In this test we subscribe to a join query, update the lhs table,
15871583 /// and assert that the server sends the correct delta to the client.
15881584 #[ tokio:: test]
0 commit comments