Skip to content

Commit d20db55

Browse files
committed
Add SharedDict concurrent tests and benchmark
- test_shared_dict_concurrent_same_key: verifies data integrity under concurrent writes/reads to the same key from multiple processes - test_shared_dict_concurrent_different_keys: tests concurrent access to different keys, ensuring no cross-key corruption - test_shared_dict_concurrent_mixed: mixed read/write operations on shared and unique keys - test_shared_dict_benchmark: measures ops/sec for single and multi-process scenarios
1 parent a46933c commit d20db55

1 file changed

Lines changed: 248 additions & 2 deletions

File tree

test/py_SUITE.erl

Lines changed: 248 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,12 @@
6868
test_shared_dict_types/1,
6969
test_shared_dict_process_death/1,
7070
test_shared_dict_python_access/1,
71-
test_shared_dict_destroy/1
71+
test_shared_dict_destroy/1,
72+
%% SharedDict concurrent tests
73+
test_shared_dict_concurrent_same_key/1,
74+
test_shared_dict_concurrent_different_keys/1,
75+
test_shared_dict_concurrent_mixed/1,
76+
test_shared_dict_benchmark/1
7277
]).
7378

7479
all() ->
@@ -131,7 +136,12 @@ all() ->
131136
test_shared_dict_types,
132137
test_shared_dict_process_death,
133138
test_shared_dict_python_access,
134-
test_shared_dict_destroy
139+
test_shared_dict_destroy,
140+
%% SharedDict concurrent tests
141+
test_shared_dict_concurrent_same_key,
142+
test_shared_dict_concurrent_different_keys,
143+
test_shared_dict_concurrent_mixed,
144+
test_shared_dict_benchmark
135145
].
136146

137147
init_per_suite(Config) ->
@@ -1591,3 +1601,239 @@ test_shared_dict_destroy(_Config) ->
15911601

15921602
ct:pal("SharedDict destroy test passed~n"),
15931603
ok.
1604+
1605+
%%% ============================================================================
1606+
%%% SharedDict Concurrent Tests
1607+
%%% ============================================================================
1608+
1609+
%% Test concurrent access to the same key from multiple processes
1610+
%% Verifies mutex protection - individual operations are atomic, data remains consistent
1611+
%% Note: SharedDict provides per-operation atomicity, not transactional atomicity
1612+
test_shared_dict_concurrent_same_key(_Config) ->
1613+
{ok, SD} = py:shared_dict_new(),
1614+
Parent = self(),
1615+
NumProcs = 10,
1616+
WritesPerProc = 100,
1617+
1618+
%% Test 1: Concurrent writes to same key - verify no corruption
1619+
%% Each process writes its process number repeatedly
1620+
Pids1 = [spawn_link(fun() ->
1621+
lists:foreach(fun(_) ->
1622+
ok = py:shared_dict_set(SD, <<"shared_key">>, N)
1623+
end, lists:seq(1, WritesPerProc)),
1624+
Parent ! {done_write, self()}
1625+
end) || N <- lists:seq(1, NumProcs)],
1626+
1627+
[receive {done_write, Pid} -> ok after 30000 -> ct:fail({timeout, Pid}) end || Pid <- Pids1],
1628+
1629+
%% Final value should be a valid process number (1-NumProcs)
1630+
FinalWrite = py:shared_dict_get(SD, <<"shared_key">>),
1631+
ct:pal("Concurrent writes - final value: ~p (valid if 1-~p)~n", [FinalWrite, NumProcs]),
1632+
true = is_integer(FinalWrite),
1633+
true = FinalWrite >= 1 andalso FinalWrite =< NumProcs,
1634+
1635+
%% Test 2: Concurrent reads while writing - verify no crashes/corruption
1636+
ok = py:shared_dict_set(SD, <<"rw_key">>, 0),
1637+
1638+
%% Half writers, half readers
1639+
WriterPids = [spawn_link(fun() ->
1640+
lists:foreach(fun(I) ->
1641+
ok = py:shared_dict_set(SD, <<"rw_key">>, I)
1642+
end, lists:seq(1, WritesPerProc)),
1643+
Parent ! {done_writer, self()}
1644+
end) || _ <- lists:seq(1, NumProcs div 2)],
1645+
1646+
ReaderPids = [spawn_link(fun() ->
1647+
Reads = [py:shared_dict_get(SD, <<"rw_key">>) || _ <- lists:seq(1, WritesPerProc)],
1648+
%% All reads should return integers (no corruption)
1649+
true = lists:all(fun is_integer/1, Reads),
1650+
Parent ! {done_reader, self()}
1651+
end) || _ <- lists:seq(1, NumProcs div 2)],
1652+
1653+
[receive {done_writer, Pid} -> ok after 30000 -> ct:fail({timeout, Pid}) end || Pid <- WriterPids],
1654+
[receive {done_reader, Pid} -> ok after 30000 -> ct:fail({timeout, Pid}) end || Pid <- ReaderPids],
1655+
1656+
%% Test 3: Rapid key updates - verify latest value is consistent type
1657+
lists:foreach(fun(I) ->
1658+
ok = py:shared_dict_set(SD, <<"type_key">>, I)
1659+
end, lists:seq(1, 1000)),
1660+
1661+
TypeVal = py:shared_dict_get(SD, <<"type_key">>),
1662+
true = is_integer(TypeVal),
1663+
1664+
ct:pal("SharedDict concurrent same-key test passed~n"),
1665+
ok.
1666+
1667+
%% Test concurrent access to different keys from multiple processes
1668+
%% Verifies no data corruption across keys
1669+
test_shared_dict_concurrent_different_keys(_Config) ->
1670+
{ok, SD} = py:shared_dict_new(),
1671+
Parent = self(),
1672+
NumProcs = 10,
1673+
NumOps = 50,
1674+
1675+
%% Spawn processes, each writing to its own key
1676+
Pids = [spawn_link(fun() ->
1677+
Key = list_to_binary("key_" ++ integer_to_list(N)),
1678+
%% Write operations
1679+
lists:foreach(fun(I) ->
1680+
ok = py:shared_dict_set(SD, Key, I)
1681+
end, lists:seq(1, NumOps)),
1682+
%% Final value should be NumOps
1683+
Final = py:shared_dict_get(SD, Key),
1684+
Parent ! {done, self(), Key, Final}
1685+
end) || N <- lists:seq(1, NumProcs)],
1686+
1687+
%% Wait for all and collect results
1688+
Results = [receive
1689+
{done, Pid, Key, Final} -> {Key, Final}
1690+
after 30000 ->
1691+
ct:fail({timeout, Pid})
1692+
end || Pid <- Pids],
1693+
1694+
%% Verify all keys have correct final value
1695+
lists:foreach(fun({Key, Final}) ->
1696+
case Final of
1697+
NumOps -> ok;
1698+
Other ->
1699+
ct:fail({wrong_value, Key, expected, NumOps, got, Other})
1700+
end
1701+
end, Results),
1702+
1703+
%% Verify all keys are present
1704+
Keys = py:shared_dict_keys(SD),
1705+
NumProcs = length(Keys),
1706+
1707+
ct:pal("SharedDict concurrent different-keys test passed~n"),
1708+
ok.
1709+
1710+
%% Test mixed read/write operations on shared and unique keys
1711+
test_shared_dict_concurrent_mixed(_Config) ->
1712+
{ok, SD} = py:shared_dict_new(),
1713+
Parent = self(),
1714+
NumProcs = 10,
1715+
NumOps = 50,
1716+
1717+
%% Initialize shared keys
1718+
ok = py:shared_dict_set(SD, <<"shared_1">>, 0),
1719+
ok = py:shared_dict_set(SD, <<"shared_2">>, 0),
1720+
1721+
%% Spawn processes that do mixed operations
1722+
Pids = [spawn_link(fun() ->
1723+
UniqueKey = list_to_binary("unique_" ++ integer_to_list(N)),
1724+
mixed_operations(SD, UniqueKey, NumOps),
1725+
Parent ! {done, self(), UniqueKey}
1726+
end) || N <- lists:seq(1, NumProcs)],
1727+
1728+
%% Wait for all processes
1729+
UniqueKeys = [receive
1730+
{done, Pid, Key} -> Key
1731+
after 30000 ->
1732+
ct:fail({timeout, Pid})
1733+
end || Pid <- Pids],
1734+
1735+
%% Verify all unique keys exist with valid values
1736+
lists:foreach(fun(Key) ->
1737+
Value = py:shared_dict_get(SD, Key),
1738+
true = is_integer(Value) andalso Value >= 0
1739+
end, UniqueKeys),
1740+
1741+
%% Verify shared keys are integers (may have any value due to races)
1742+
Shared1 = py:shared_dict_get(SD, <<"shared_1">>),
1743+
Shared2 = py:shared_dict_get(SD, <<"shared_2">>),
1744+
true = is_integer(Shared1),
1745+
true = is_integer(Shared2),
1746+
1747+
ct:pal("Mixed operations - shared_1=~p, shared_2=~p~n", [Shared1, Shared2]),
1748+
ct:pal("SharedDict concurrent mixed test passed~n"),
1749+
ok.
1750+
1751+
%% Helper for mixed operations
1752+
mixed_operations(_SD, _UniqueKey, 0) -> ok;
1753+
mixed_operations(SD, UniqueKey, N) ->
1754+
%% Mix of operations
1755+
case N rem 4 of
1756+
0 ->
1757+
%% Write to unique key
1758+
ok = py:shared_dict_set(SD, UniqueKey, N);
1759+
1 ->
1760+
%% Read from unique key
1761+
_ = py:shared_dict_get(SD, UniqueKey, 0);
1762+
2 ->
1763+
%% Increment shared key
1764+
Val = py:shared_dict_get(SD, <<"shared_1">>, 0),
1765+
ok = py:shared_dict_set(SD, <<"shared_1">>, Val + 1);
1766+
3 ->
1767+
%% Read all keys
1768+
_ = py:shared_dict_keys(SD)
1769+
end,
1770+
mixed_operations(SD, UniqueKey, N - 1).
1771+
1772+
%% Benchmark SharedDict operations
1773+
test_shared_dict_benchmark(_Config) ->
1774+
{ok, SD} = py:shared_dict_new(),
1775+
1776+
%% Warmup
1777+
lists:foreach(fun(I) ->
1778+
Key = list_to_binary("warmup_" ++ integer_to_list(I)),
1779+
ok = py:shared_dict_set(SD, Key, I),
1780+
I = py:shared_dict_get(SD, Key)
1781+
end, lists:seq(1, 100)),
1782+
1783+
%% Single process benchmark
1784+
NumOps = 1000,
1785+
1786+
%% Benchmark SET operations
1787+
SetStart = erlang:monotonic_time(microsecond),
1788+
lists:foreach(fun(I) ->
1789+
Key = list_to_binary("bench_" ++ integer_to_list(I rem 100)),
1790+
ok = py:shared_dict_set(SD, Key, I)
1791+
end, lists:seq(1, NumOps)),
1792+
SetEnd = erlang:monotonic_time(microsecond),
1793+
SetDuration = SetEnd - SetStart,
1794+
SetOpsPerSec = (NumOps * 1000000) div max(1, SetDuration),
1795+
1796+
%% Benchmark GET operations
1797+
GetStart = erlang:monotonic_time(microsecond),
1798+
lists:foreach(fun(I) ->
1799+
Key = list_to_binary("bench_" ++ integer_to_list(I rem 100)),
1800+
_ = py:shared_dict_get(SD, Key)
1801+
end, lists:seq(1, NumOps)),
1802+
GetEnd = erlang:monotonic_time(microsecond),
1803+
GetDuration = GetEnd - GetStart,
1804+
GetOpsPerSec = (NumOps * 1000000) div max(1, GetDuration),
1805+
1806+
ct:pal("~n=== SharedDict Single-Process Benchmark ===~n"),
1807+
ct:pal("SET: ~p ops in ~p us (~p ops/sec)~n", [NumOps, SetDuration, SetOpsPerSec]),
1808+
ct:pal("GET: ~p ops in ~p us (~p ops/sec)~n", [NumOps, GetDuration, GetOpsPerSec]),
1809+
1810+
%% Multi-process concurrent benchmark
1811+
Parent = self(),
1812+
NumProcs = 4,
1813+
OpsPerProc = 500,
1814+
1815+
ConcStart = erlang:monotonic_time(microsecond),
1816+
Pids = [spawn_link(fun() ->
1817+
lists:foreach(fun(I) ->
1818+
Key = list_to_binary("conc_" ++ integer_to_list(I rem 50)),
1819+
ok = py:shared_dict_set(SD, Key, I),
1820+
_ = py:shared_dict_get(SD, Key)
1821+
end, lists:seq(1, OpsPerProc)),
1822+
Parent ! {done, self()}
1823+
end) || _ <- lists:seq(1, NumProcs)],
1824+
1825+
[receive {done, Pid} -> ok after 30000 -> ct:fail({timeout, Pid}) end || Pid <- Pids],
1826+
ConcEnd = erlang:monotonic_time(microsecond),
1827+
ConcDuration = ConcEnd - ConcStart,
1828+
1829+
TotalConcOps = NumProcs * OpsPerProc * 2, % SET + GET per iteration
1830+
ConcOpsPerSec = (TotalConcOps * 1000000) div max(1, ConcDuration),
1831+
1832+
ct:pal("~n=== SharedDict Multi-Process Benchmark (~p procs) ===~n", [NumProcs]),
1833+
ct:pal("TOTAL: ~p ops in ~p us (~p ops/sec)~n", [TotalConcOps, ConcDuration, ConcOpsPerSec]),
1834+
1835+
%% Cleanup
1836+
ok = py:shared_dict_destroy(SD),
1837+
1838+
ct:pal("~nSharedDict benchmark completed~n"),
1839+
ok.

0 commit comments

Comments
 (0)