@@ -712,7 +712,7 @@ impl ModuleSubscriptions {
712712 caller : Option < & ClientConnectionSender > ,
713713 mut event : ModuleEvent ,
714714 tx : MutTx ,
715- ) -> Result < Result < Arc < ModuleEvent > , WriteConflict > , DBError > {
715+ ) -> Result < Result < ( Arc < ModuleEvent > , ExecutionMetrics ) , WriteConflict > , DBError > {
716716 // Take a read lock on `subscriptions` before committing tx
717717 // else it can result in subscriber receiving duplicate updates.
718718 let subscriptions = self . subscriptions . read ( ) ;
@@ -742,10 +742,16 @@ impl ModuleSubscriptions {
742742 . unwrap_or_else ( || DeltaTx :: from ( & * read_tx) ) ;
743743
744744 let event = Arc :: new ( event) ;
745+ let mut metrics = ExecutionMetrics :: default ( ) ;
745746
746747 match & event. status {
747748 EventStatus :: Committed ( _) => {
748- subscriptions. eval_updates ( & read_tx, event. clone ( ) , caller, & self . relational_db . database_identity ( ) )
749+ metrics. merge ( subscriptions. eval_updates (
750+ & read_tx,
751+ event. clone ( ) ,
752+ caller,
753+ & self . relational_db . database_identity ( ) ,
754+ ) ) ;
749755 }
750756 EventStatus :: Failed ( _) => {
751757 if let Some ( client) = caller {
@@ -761,7 +767,7 @@ impl ModuleSubscriptions {
761767 EventStatus :: OutOfEnergy => { } // ?
762768 }
763769
764- Ok ( Ok ( event) )
770+ Ok ( Ok ( ( event, metrics ) ) )
765771 }
766772}
767773
@@ -798,6 +804,7 @@ mod tests {
798804 use spacetimedb_lib:: bsatn:: ToBsatn ;
799805 use spacetimedb_lib:: db:: auth:: StAccess ;
800806 use spacetimedb_lib:: identity:: AuthCtx ;
807+ use spacetimedb_lib:: metrics:: ExecutionMetrics ;
801808 use spacetimedb_lib:: { bsatn, ConnectionId , ProductType , ProductValue , Timestamp } ;
802809 use spacetimedb_lib:: { error:: ResultTest , AlgebraicType , Identity } ;
803810 use spacetimedb_primitives:: TableId ;
@@ -1079,19 +1086,19 @@ mod tests {
10791086 subs : & ModuleSubscriptions ,
10801087 deletes : impl IntoIterator < Item = ( TableId , ProductValue ) > ,
10811088 inserts : impl IntoIterator < Item = ( TableId , ProductValue ) > ,
1082- ) -> anyhow:: Result < ( ) > {
1089+ ) -> anyhow:: Result < ExecutionMetrics > {
10831090 let mut tx = db. begin_mut_tx ( IsolationLevel :: Serializable , Workload :: ForTests ) ;
10841091 for ( table_id, row) in deletes {
10851092 tx. delete_product_value ( table_id, & row) ?;
10861093 }
10871094 for ( table_id, row) in inserts {
10881095 db. insert ( & mut tx, table_id, & bsatn:: to_vec ( & row) ?) ?;
10891096 }
1090- assert ! ( matches! (
1091- subs. commit_and_broadcast_event( None , module_event( ) , tx) ,
1092- Ok ( Ok ( _ ) )
1093- ) ) ;
1094- Ok ( ( ) )
1097+
1098+ let Ok ( Ok ( ( _ , metrics ) ) ) = subs. commit_and_broadcast_event ( None , module_event ( ) , tx) else {
1099+ panic ! ( "Encountered an error in `commit_and_broadcast_event`" ) ;
1100+ } ;
1101+ Ok ( metrics )
10951102 }
10961103
10971104 #[ test]
@@ -1689,6 +1696,119 @@ mod tests {
16891696 Ok ( ( ) )
16901697 }
16911698
1699+ /// Test that we do not evaluate queries that we know will not match table update rows
1700+ #[ tokio:: test]
1701+ async fn test_query_pruning ( ) -> anyhow:: Result < ( ) > {
1702+ // Establish a connection for each client
1703+ let ( tx_for_a, mut rx_for_a) = client_connection ( client_id_from_u8 ( 1 ) ) ;
1704+ let ( tx_for_b, mut rx_for_b) = client_connection ( client_id_from_u8 ( 2 ) ) ;
1705+
1706+ let db = relational_db ( ) ?;
1707+ let subs = module_subscriptions ( db. clone ( ) ) ;
1708+
1709+ let u_id = db. create_table_for_test (
1710+ "u" ,
1711+ & [
1712+ ( "i" , AlgebraicType :: U64 ) ,
1713+ ( "a" , AlgebraicType :: U64 ) ,
1714+ ( "b" , AlgebraicType :: U64 ) ,
1715+ ] ,
1716+ & [ 0 . into ( ) ] ,
1717+ ) ?;
1718+ let v_id = db. create_table_for_test (
1719+ "v" ,
1720+ & [
1721+ ( "i" , AlgebraicType :: U64 ) ,
1722+ ( "x" , AlgebraicType :: U64 ) ,
1723+ ( "y" , AlgebraicType :: U64 ) ,
1724+ ] ,
1725+ & [ 0 . into ( ) , 1 . into ( ) ] ,
1726+ ) ?;
1727+
1728+ commit_tx (
1729+ & db,
1730+ & subs,
1731+ [ ] ,
1732+ [
1733+ ( u_id, product ! [ 0u64 , 1u64 , 1u64 ] ) ,
1734+ ( u_id, product ! [ 1u64 , 2u64 , 2u64 ] ) ,
1735+ ( u_id, product ! [ 2u64 , 3u64 , 3u64 ] ) ,
1736+ ( v_id, product ! [ 0u64 , 4u64 , 4u64 ] ) ,
1737+ ( v_id, product ! [ 1u64 , 5u64 , 5u64 ] ) ,
1738+ ] ,
1739+ ) ?;
1740+
1741+ let mut query_ids = 0 ;
1742+
1743+ // Returns (i: 0, a: 1, b: 1)
1744+ subscribe_multi (
1745+ & subs,
1746+ & [
1747+ "select u.* from u join v on u.i = v.i where v.x = 4" ,
1748+ "select u.* from u join v on u.i = v.i where v.x = 6" ,
1749+ ] ,
1750+ tx_for_a,
1751+ & mut query_ids,
1752+ ) ?;
1753+
1754+ // Returns (i: 1, a: 2, b: 2)
1755+ subscribe_multi (
1756+ & subs,
1757+ & [
1758+ "select u.* from u join v on u.i = v.i where v.x = 5" ,
1759+ "select u.* from u join v on u.i = v.i where v.x = 7" ,
1760+ ] ,
1761+ tx_for_b,
1762+ & mut query_ids,
1763+ ) ?;
1764+
1765+ // Wait for both subscriptions
1766+ assert ! ( matches!(
1767+ rx_for_a. recv( ) . await ,
1768+ Some ( SerializableMessage :: Subscription ( SubscriptionMessage {
1769+ result: SubscriptionResult :: SubscribeMulti ( _) ,
1770+ ..
1771+ } ) )
1772+ ) ) ;
1773+ assert ! ( matches!(
1774+ rx_for_b. recv( ) . await ,
1775+ Some ( SerializableMessage :: Subscription ( SubscriptionMessage {
1776+ result: SubscriptionResult :: SubscribeMulti ( _) ,
1777+ ..
1778+ } ) )
1779+ ) ) ;
1780+
1781+ // Modify a single row in `v`
1782+ let metrics = commit_tx (
1783+ & db,
1784+ & subs,
1785+ [ ( v_id, product ! [ 1u64 , 5u64 , 5u64 ] ) ] ,
1786+ [ ( v_id, product ! [ 1u64 , 5u64 , 6u64 ] ) ] ,
1787+ ) ?;
1788+
1789+ // We should only have evaluated a single query
1790+ assert_eq ! ( metrics. delta_queries_evaluated, 1 ) ;
1791+ assert_eq ! ( metrics. delta_queries_matched, 1 ) ;
1792+
1793+ // Insert a new row into `v`
1794+ let metrics = commit_tx ( & db, & subs, [ ] , [ ( v_id, product ! [ 2u64 , 6u64 , 6u64 ] ) ] ) ?;
1795+
1796+ assert_tx_update_for_table (
1797+ & mut rx_for_a,
1798+ u_id,
1799+ & ProductType :: from ( [ AlgebraicType :: U64 , AlgebraicType :: U64 , AlgebraicType :: U64 ] ) ,
1800+ [ product ! [ 2u64 , 3u64 , 3u64 ] ] ,
1801+ [ ] ,
1802+ )
1803+ . await ;
1804+
1805+ // We should only have evaluated a single query
1806+ assert_eq ! ( metrics. delta_queries_evaluated, 1 ) ;
1807+ assert_eq ! ( metrics. delta_queries_matched, 1 ) ;
1808+
1809+ Ok ( ( ) )
1810+ }
1811+
16921812 /// Asserts that a subscription holds a tx handle for the entire length of its evaluation.
16931813 #[ test]
16941814 fn test_tx_subscription_ordering ( ) -> ResultTest < ( ) > {
0 commit comments