@@ -148,8 +148,8 @@ pub(crate) async fn dispatch(test: &str) {
148148
149149 "should-fail" => exec_should_fail ( ) . await ,
150150
151- "reconnect-different-connection-id" => exec_reconnect_different_connection_id ( ) ,
152- "caller-always-notified" => exec_caller_always_notified ( ) ,
151+ "reconnect-different-connection-id" => exec_reconnect_different_connection_id ( ) . await ,
152+ "caller-always-notified" => exec_caller_always_notified ( ) . await ,
153153
154154 "subscribe-all-select-star" => exec_subscribe_all_select_star ( ) . await ,
155155 "caller-alice-receives-reducer-callback-but-not-bob" => {
@@ -449,7 +449,7 @@ async fn connect_with_then(
449449 conn
450450}
451451
452- fn connect_then (
452+ async fn connect_then (
453453 test_counter : & std:: sync:: Arc < TestCounter > ,
454454 callback : impl FnOnce ( & DbConnection ) + Send + ' static ,
455455) -> DbConnection {
@@ -2671,7 +2671,7 @@ async fn exec_two_different_compression_algos() {
26712671 // Connect with brotli, gzip, and no compression.
26722672 // One of them will insert and all of them will subscribe.
26732673 // All should get back `bytes`.
2674- fn connect_with_compression (
2674+ async fn connect_with_compression (
26752675 test_counter : & Arc < TestCounter > ,
26762676 compression_name : & str ,
26772677 compression : Compression ,
@@ -2709,7 +2709,8 @@ async fn exec_two_different_compression_algos() {
27092709 }
27102710 } )
27112711 } ,
2712- ) ;
2712+ )
2713+ . await ;
27132714 }
27142715 let test_counter: Arc < TestCounter > = TestCounter :: new ( ) ;
27152716 let barrier = Arc :: new ( Barrier :: new ( 3 ) ) ;
@@ -2735,7 +2736,7 @@ async fn test_parameterized_subscription() {
27352736 let update_0 = Some ( ctr_for_test. add_test ( "update_0" ) ) ;
27362737 let update_1 = Some ( ctr_for_test. add_test ( "update_1" ) ) ;
27372738
2738- fn subscribe_and_update (
2739+ async fn subscribe_and_update (
27392740 test_name : & str ,
27402741 old : i32 ,
27412742 new : i32 ,
@@ -2821,8 +2822,9 @@ async fn test_rls_subscription() {
28212822 let expected_identity = sender;
28222823 subscribe_these_then ( ctx, & [ "SELECT * FROM users" ] , move |ctx| {
28232824 put_result ( & mut record_sub, Ok ( ( ) ) ) ;
2824- // Wait to insert until both client connections have been made
2825- ctr_for_subs. wait_for_all ( ) ;
2825+ // Invoke the reducer only after this client's RLS-filtered subscription is
2826+ // active. As above, callback code must remain non-blocking so wasm can keep
2827+ // delivering websocket and row events on the same event loop.
28262828 ctx. reducers
28272829 . insert_user_then ( user_name, sender, reducer_callback_assert_committed ( "insert_user" ) )
28282830 . unwrap ( ) ;
0 commit comments