|
| 1 | +-- 'Test concurrent write lock during payload apply' |
| 2 | +-- NOTE: The lock-contention portion requires dblink with table access. |
| 3 | +-- On environments where dblink cannot lock the table (e.g. Supabase), |
| 4 | +-- the lock test is skipped and only apply + consistency are verified. |
| 5 | + |
| 6 | +\set testid '39' |
| 7 | +\ir helper_test_init.sql |
| 8 | + |
| 9 | +\connect postgres |
| 10 | +\ir helper_psql_conn_setup.sql |
| 11 | +DROP DATABASE IF EXISTS cloudsync_test_39_a; |
| 12 | +DROP DATABASE IF EXISTS cloudsync_test_39_b; |
| 13 | +CREATE DATABASE cloudsync_test_39_a; |
| 14 | +CREATE DATABASE cloudsync_test_39_b; |
| 15 | + |
| 16 | +-- Setup db_a |
| 17 | +\connect cloudsync_test_39_a |
| 18 | +\ir helper_psql_conn_setup.sql |
| 19 | +CREATE EXTENSION IF NOT EXISTS cloudsync; |
| 20 | +CREATE TABLE concurrent_tbl (id TEXT PRIMARY KEY, val TEXT); |
| 21 | +SELECT cloudsync_init('concurrent_tbl', 'CLS', true) AS _init_a \gset |
| 22 | + |
| 23 | +-- Setup db_b |
| 24 | +\connect cloudsync_test_39_b |
| 25 | +\ir helper_psql_conn_setup.sql |
| 26 | +CREATE EXTENSION IF NOT EXISTS cloudsync; |
| 27 | +CREATE TABLE concurrent_tbl (id TEXT PRIMARY KEY, val TEXT); |
| 28 | +SELECT cloudsync_init('concurrent_tbl', 'CLS', true) AS _init_b \gset |
| 29 | + |
| 30 | +-- Insert row1 on db_a and sync to db_b |
| 31 | +\connect cloudsync_test_39_a |
| 32 | +INSERT INTO concurrent_tbl VALUES ('row1', 'val_a'); |
| 33 | + |
| 34 | +SELECT CASE WHEN payload IS NULL OR octet_length(payload) = 0 |
| 35 | + THEN '' |
| 36 | + ELSE '\x' || encode(payload, 'hex') |
| 37 | + END AS payload_init, |
| 38 | + (payload IS NOT NULL AND octet_length(payload) > 0) AS payload_init_ok |
| 39 | +FROM ( |
| 40 | + SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, |
| 41 | + db_version, site_id, cl, seq) AS payload |
| 42 | + FROM cloudsync_changes WHERE site_id = cloudsync_siteid() |
| 43 | +) AS p \gset |
| 44 | + |
| 45 | +\connect cloudsync_test_39_b |
| 46 | +\if :payload_init_ok |
| 47 | +SELECT cloudsync_payload_apply(decode(substr(:'payload_init', 3), 'hex')) AS _apply_init \gset |
| 48 | +\endif |
| 49 | + |
| 50 | +-- Update row1 on db_a |
| 51 | +\connect cloudsync_test_39_a |
| 52 | +UPDATE concurrent_tbl SET val = 'val_a_updated' WHERE id = 'row1'; |
| 53 | + |
| 54 | +SELECT CASE WHEN payload IS NULL OR octet_length(payload) = 0 |
| 55 | + THEN '' |
| 56 | + ELSE '\x' || encode(payload, 'hex') |
| 57 | + END AS payload_upd, |
| 58 | + (payload IS NOT NULL AND octet_length(payload) > 0) AS payload_upd_ok |
| 59 | +FROM ( |
| 60 | + SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, |
| 61 | + db_version, site_id, cl, seq) AS payload |
| 62 | + FROM cloudsync_changes WHERE site_id = cloudsync_siteid() |
| 63 | +) AS p \gset |
| 64 | + |
| 65 | +-- Try to set up dblink and acquire a table lock |
| 66 | +\connect cloudsync_test_39_b |
| 67 | +CREATE EXTENSION IF NOT EXISTS dblink; |
| 68 | + |
| 69 | +SELECT dblink_connect('locker', 'dbname=cloudsync_test_39_b') AS _conn \gset |
| 70 | +SELECT dblink_exec('locker', 'BEGIN') AS _begin \gset |
| 71 | + |
| 72 | +-- Try to acquire EXCLUSIVE lock — if this fails (e.g. permission denied on |
| 73 | +-- Supabase), _lock won't be set and we skip the lock-contention test |
| 74 | +\unset _lock |
| 75 | +SELECT dblink_exec('locker', 'LOCK TABLE concurrent_tbl IN EXCLUSIVE MODE') AS _lock \gset |
| 76 | + |
| 77 | +\if :{?_lock} |
| 78 | +-- ===== Lock acquired — run lock-contention test ===== |
| 79 | + |
| 80 | +BEGIN; |
| 81 | +\set ON_ERROR_ROLLBACK on |
| 82 | +SET LOCAL lock_timeout = '500ms'; |
| 83 | + |
| 84 | +\if :payload_upd_ok |
| 85 | +SELECT cloudsync_payload_apply(decode(substr(:'payload_upd', 3), 'hex')) AS _blocked_apply \gset |
| 86 | +\endif |
| 87 | + |
| 88 | +COMMIT; |
| 89 | +\set ON_ERROR_ROLLBACK off |
| 90 | + |
| 91 | +-- row1 should still have the OLD value because the apply was blocked |
| 92 | +SELECT val AS row1_val_check FROM concurrent_tbl WHERE id = 'row1' \gset |
| 93 | +SELECT (:'row1_val_check' = 'val_a') AS blocked_ok \gset |
| 94 | +\if :blocked_ok |
| 95 | +\echo [PASS] (:testid) Apply correctly blocked by concurrent table lock |
| 96 | +\else |
| 97 | +\echo [FAIL] (:testid) Expected val_a (blocked), got :'row1_val_check' |
| 98 | +SELECT (:fail::int + 1) AS fail \gset |
| 99 | +\endif |
| 100 | + |
| 101 | +-- Release the table lock |
| 102 | +SELECT dblink_exec('locker', 'COMMIT') AS _release \gset |
| 103 | +SELECT dblink_disconnect('locker') AS _disconn \gset |
| 104 | + |
| 105 | +-- Retry apply — should succeed now |
| 106 | +\if :payload_upd_ok |
| 107 | +SELECT cloudsync_payload_apply(decode(substr(:'payload_upd', 3), 'hex')) AS _apply_retry \gset |
| 108 | +\endif |
| 109 | + |
| 110 | +SELECT val AS row1_val FROM concurrent_tbl WHERE id = 'row1' \gset |
| 111 | +SELECT (:'row1_val' = 'val_a_updated') AS retry_ok \gset |
| 112 | +\if :retry_ok |
| 113 | +\echo [PASS] (:testid) Apply succeeded after lock released |
| 114 | +\else |
| 115 | +\echo [FAIL] (:testid) Apply after unlock - expected val_a_updated, got :'row1_val' |
| 116 | +SELECT (:fail::int + 1) AS fail \gset |
| 117 | +\endif |
| 118 | + |
| 119 | +\else |
| 120 | +-- ===== Lock failed — skip contention test, apply directly ===== |
| 121 | +\echo [SKIP] (:testid) Lock-contention test skipped (dblink cannot lock table) |
| 122 | + |
| 123 | +-- Clean up the dblink connection (transaction is aborted) |
| 124 | +SELECT dblink_exec('locker', 'ROLLBACK') AS _rollback \gset |
| 125 | +SELECT dblink_disconnect('locker') AS _disconn \gset |
| 126 | + |
| 127 | +\if :payload_upd_ok |
| 128 | +SELECT cloudsync_payload_apply(decode(substr(:'payload_upd', 3), 'hex')) AS _apply_direct \gset |
| 129 | +\endif |
| 130 | + |
| 131 | +SELECT val AS row1_val FROM concurrent_tbl WHERE id = 'row1' \gset |
| 132 | +SELECT (:'row1_val' = 'val_a_updated') AS direct_ok \gset |
| 133 | +\if :direct_ok |
| 134 | +\echo [PASS] (:testid) Apply succeeded (no lock contention) |
| 135 | +\else |
| 136 | +\echo [FAIL] (:testid) Apply failed - expected val_a_updated, got :'row1_val' |
| 137 | +SELECT (:fail::int + 1) AS fail \gset |
| 138 | +\endif |
| 139 | + |
| 140 | +\endif |
| 141 | + |
| 142 | +-- Full cross-sync for consistency |
| 143 | +SELECT CASE WHEN payload IS NULL OR octet_length(payload) = 0 |
| 144 | + THEN '' |
| 145 | + ELSE '\x' || encode(payload, 'hex') |
| 146 | + END AS payload_b_final, |
| 147 | + (payload IS NOT NULL AND octet_length(payload) > 0) AS payload_b_final_ok |
| 148 | +FROM ( |
| 149 | + SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, |
| 150 | + db_version, site_id, cl, seq) AS payload |
| 151 | + FROM cloudsync_changes WHERE site_id = cloudsync_siteid() |
| 152 | +) AS p \gset |
| 153 | + |
| 154 | +\connect cloudsync_test_39_a |
| 155 | +\if :payload_b_final_ok |
| 156 | +SELECT cloudsync_payload_apply(decode(substr(:'payload_b_final', 3), 'hex')) AS _apply_final \gset |
| 157 | +\endif |
| 158 | + |
| 159 | +SELECT md5(COALESCE(string_agg(id || ':' || COALESCE(val, ''), ',' ORDER BY id), '')) AS hash_a |
| 160 | +FROM concurrent_tbl \gset |
| 161 | + |
| 162 | +\connect cloudsync_test_39_b |
| 163 | +SELECT md5(COALESCE(string_agg(id || ':' || COALESCE(val, ''), ',' ORDER BY id), '')) AS hash_b |
| 164 | +FROM concurrent_tbl \gset |
| 165 | + |
| 166 | +SELECT (:'hash_a' = :'hash_b') AS consistency_ok \gset |
| 167 | +\if :consistency_ok |
| 168 | +\echo [PASS] (:testid) Cross-database consistency verified |
| 169 | +\else |
| 170 | +\echo [FAIL] (:testid) Consistency failed (hash_a=:'hash_a' hash_b=:'hash_b') |
| 171 | +SELECT (:fail::int + 1) AS fail \gset |
| 172 | +\endif |
| 173 | + |
| 174 | +-- Cleanup |
| 175 | +\ir helper_test_cleanup.sql |
| 176 | +\if :should_cleanup |
| 177 | +DROP DATABASE IF EXISTS cloudsync_test_39_a; |
| 178 | +DROP DATABASE IF EXISTS cloudsync_test_39_b; |
| 179 | +\endif |
0 commit comments