@@ -2488,119 +2488,6 @@ async fn exec_row_deduplication_r_join_s_and_r_join_t() {
24882488 assert_eq ! ( count_unique_u32_on_insert. load( Ordering :: SeqCst ) , 1 ) ;
24892489}
24902490
2491- #[ cfg( all( target_arch = "wasm32" , feature = "web" ) ) ]
2492- async fn exec_row_deduplication_r_join_s_and_r_join_t_async ( ) {
2493- use gloo_timers:: future:: TimeoutFuture ;
2494-
2495- let on_subscription_applied = Arc :: new ( AtomicBool :: new ( false ) ) ;
2496- let pk_u32_on_insert = Arc :: new ( AtomicBool :: new ( false ) ) ;
2497- let pk_u32_on_delete = Arc :: new ( AtomicBool :: new ( false ) ) ;
2498- let pk_u32_two_on_insert = Arc :: new ( AtomicBool :: new ( false ) ) ;
2499- let count_unique_u32_on_insert = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
2500-
2501- let name = db_name_or_panic ( ) ;
2502- let builder = DbConnection :: builder ( )
2503- . with_database_name ( name)
2504- . with_uri ( LOCALHOST )
2505- . on_connect ( {
2506- let on_subscription_applied = on_subscription_applied. clone ( ) ;
2507- let pk_u32_on_insert = pk_u32_on_insert. clone ( ) ;
2508- let pk_u32_on_delete = pk_u32_on_delete. clone ( ) ;
2509- let pk_u32_two_on_insert = pk_u32_two_on_insert. clone ( ) ;
2510- let count_unique_u32_on_insert = count_unique_u32_on_insert. clone ( ) ;
2511-
2512- move |ctx, _, _| {
2513- let queries = [
2514- "SELECT * FROM pk_u_32;" ,
2515- "SELECT * FROM pk_u_32_two;" ,
2516- "SELECT unique_u_32.* FROM unique_u_32 JOIN pk_u_32 ON unique_u_32.n = pk_u_32.n;" ,
2517- "SELECT unique_u_32.* FROM unique_u_32 JOIN pk_u_32_two ON unique_u_32.n = pk_u_32_two.n;" ,
2518- ] ;
2519-
2520- const KEY : u32 = 42 ;
2521- const DATA : i32 = 0xbeef ;
2522-
2523- UniqueU32 :: insert ( ctx, KEY , DATA ) ;
2524-
2525- subscribe_these_then ( ctx, & queries, {
2526- let on_subscription_applied = on_subscription_applied. clone ( ) ;
2527- move |ctx| {
2528- PkU32 :: insert ( ctx, KEY , DATA ) ;
2529- assert_all_tables_empty ( ctx) . unwrap ( ) ;
2530- on_subscription_applied. store ( true , Ordering :: SeqCst ) ;
2531- }
2532- } ) ;
2533- PkU32 :: on_insert ( ctx, {
2534- let pk_u32_on_insert = pk_u32_on_insert. clone ( ) ;
2535- move |ctx, val| {
2536- assert_eq ! ( val, & PkU32 { n: KEY , data: DATA } ) ;
2537- pk_u32_on_insert. store ( true , Ordering :: SeqCst ) ;
2538- ctx. reducers
2539- . delete_pk_u_32_insert_pk_u_32_two_then (
2540- KEY ,
2541- DATA ,
2542- reducer_callback_assert_committed ( "delete_pk_u_32_insert_pk_u_32_two" ) ,
2543- )
2544- . unwrap ( ) ;
2545- }
2546- } ) ;
2547- PkU32Two :: on_insert ( ctx, {
2548- let pk_u32_two_on_insert = pk_u32_two_on_insert. clone ( ) ;
2549- move |_, val| {
2550- assert_eq ! ( val, & PkU32Two { n: KEY , data: DATA } ) ;
2551- pk_u32_two_on_insert. store ( true , Ordering :: SeqCst ) ;
2552- }
2553- } ) ;
2554- PkU32 :: on_delete ( ctx, {
2555- let pk_u32_on_delete = pk_u32_on_delete. clone ( ) ;
2556- move |_, val| {
2557- assert_eq ! ( val, & PkU32 { n: KEY , data: DATA } ) ;
2558- pk_u32_on_delete. store ( true , Ordering :: SeqCst ) ;
2559- }
2560- } ) ;
2561- UniqueU32 :: on_insert ( ctx, {
2562- let count_unique_u32_on_insert = count_unique_u32_on_insert. clone ( ) ;
2563- move |_, _| {
2564- count_unique_u32_on_insert. fetch_add ( 1 , Ordering :: SeqCst ) ;
2565- }
2566- } ) ;
2567- UniqueU32 :: on_delete ( ctx, move |_, _| panic ! ( ) ) ;
2568- PkU32Two :: on_delete ( ctx, move |_, _| panic ! ( ) ) ;
2569- }
2570- } )
2571- . on_connect_error ( |_ctx, error| panic ! ( "Connect errored: {error:?}" ) ) ;
2572-
2573- let conn = ManagedConnection :: new ( builder. build ( ) . await . unwrap ( ) ) ;
2574- conn. run_background_task ( ) ;
2575-
2576- const WAIT_INTERVAL_MS : u32 = 10 ;
2577- const MAX_WAIT_ITERATIONS : u32 = 3000 ;
2578- let all_callbacks_observed = || {
2579- on_subscription_applied. load ( Ordering :: SeqCst )
2580- && pk_u32_on_insert. load ( Ordering :: SeqCst )
2581- && pk_u32_on_delete. load ( Ordering :: SeqCst )
2582- && pk_u32_two_on_insert. load ( Ordering :: SeqCst )
2583- } ;
2584- for _ in 0 ..MAX_WAIT_ITERATIONS {
2585- if all_callbacks_observed ( ) {
2586- break ;
2587- }
2588- TimeoutFuture :: new ( WAIT_INTERVAL_MS ) . await ;
2589- }
2590- if !all_callbacks_observed ( ) {
2591- panic ! (
2592- "Timeout waiting for callbacks: on_subscription_applied={}, pk_u32_on_insert={}, pk_u32_on_delete={}, pk_u32_two_on_insert={}" ,
2593- on_subscription_applied. load( Ordering :: SeqCst ) ,
2594- pk_u32_on_insert. load( Ordering :: SeqCst ) ,
2595- pk_u32_on_delete. load( Ordering :: SeqCst ) ,
2596- pk_u32_two_on_insert. load( Ordering :: SeqCst ) ,
2597- ) ;
2598- }
2599-
2600- assert_eq ! ( count_unique_u32_on_insert. load( Ordering :: SeqCst ) , 1 ) ;
2601- conn. disconnect ( ) . unwrap ( ) ;
2602- }
2603-
26042491/// This test asserts that the correct callbacks are invoked when updating the lhs table of a join
26052492async fn test_lhs_join_update ( ) {
26062493 let insert_counter = TestCounter :: new ( ) ;
0 commit comments