@@ -963,5 +963,142 @@ def test_walker_settles_differently_per_workload(self):
963963 f"for different workloads. Dense={ W1 } , Simple={ W2 } " )
964964
965965
966+ class TestCondvarStress (unittest .TestCase ):
967+ """Stress-test the condvar dispatch path under varying worker counts.
968+
969+ Designed to catch races in _PyGC_DispatchAndWait when the number of
970+ active workers changes between collections (as the proactive walker
971+ does). Particularly useful under TSAN.
972+ """
973+
974+ def setUp (self ):
975+ _setup_parallel_gc (self )
976+
977+ def tearDown (self ):
978+ _teardown_parallel_gc (self )
979+
980+ def test_rapid_collections_varying_heaps (self ):
981+ """200 rapid collections with wildly varying heap sizes.
982+
983+ The proactive walker changes adaptive_workers on ~20% of collections.
984+ With 200 collections, we expect ~40 worker-count changes, exercising
985+ the condvar wake/wait path with different participant counts.
986+ """
987+ import random
988+ rng = random .Random (42 )
989+ gc .enable_parallel (8 )
990+
991+ for i in range (200 ):
992+ # Alternate between tiny and large heaps to stress the
993+ # dispatch path with different workload characteristics
994+ size = rng .choice ([50 , 500 , 5_000 , 50_000 , 200_000 ])
995+ objs = [{'ref' : None } for _ in range (size )]
996+ # Create cycles so GC has work to do
997+ for j in range (len (objs ) - 1 ):
998+ objs [j ]['ref' ] = objs [(j + 1 ) % len (objs )]
999+ del objs
1000+ gc .collect ()
1001+
1002+ def test_enable_disable_cycles (self ):
1003+ """Rapid enable/disable/re-enable with collections in between.
1004+
1005+ Tests the full lifecycle: pool start -> dispatch -> pool stop,
1006+ repeated with different worker counts each time. This exercises
1007+ the condvar init/fini paths and catches races in shutdown.
1008+ """
1009+ for num_workers in [2 , 4 , 8 , 3 , 6 , 2 , 8 , 4 ]:
1010+ gc .enable_parallel (num_workers )
1011+ # Run a few collections at this worker count
1012+ for _ in range (10 ):
1013+ objs = [{'ref' : None } for _ in range (10_000 )]
1014+ for j in range (len (objs ) - 1 ):
1015+ objs [j ]['ref' ] = objs [(j + 1 ) % len (objs )]
1016+ del objs
1017+ gc .collect ()
1018+ gc .disable_parallel ()
1019+ # Collect once with parallel disabled to test serial fallback
1020+ gc .collect ()
1021+
1022+ def test_concurrent_allocation_during_gc (self ):
1023+ """Run GC collections while other threads allocate objects.
1024+
1025+ This simulates real-world conditions where GC runs concurrently
1026+ with application threads. The condvar dispatch must not deadlock
1027+ when the GIL is contended.
1028+ """
1029+ import time
1030+
1031+ stop = threading .Event ()
1032+ errors = []
1033+
1034+ def allocator ():
1035+ """Continuously allocate and release cyclic garbage."""
1036+ try :
1037+ while not stop .is_set ():
1038+ objs = [{'ref' : None } for _ in range (1_000 )]
1039+ for j in range (len (objs ) - 1 ):
1040+ objs [j ]['ref' ] = objs [(j + 1 ) % len (objs )]
1041+ del objs
1042+ # Don't call gc.collect() -- let the main thread drive GC
1043+ except Exception as e :
1044+ errors .append (e )
1045+
1046+ gc .enable_parallel (8 )
1047+
1048+ # Start allocator threads
1049+ threads = []
1050+ for _ in range (4 ):
1051+ t = threading .Thread (target = allocator )
1052+ t .start ()
1053+ threads .append (t )
1054+
1055+ try :
1056+ # Run collections while allocators are running
1057+ for _ in range (100 ):
1058+ gc .collect ()
1059+ finally :
1060+ stop .set ()
1061+ for t in threads :
1062+ t .join (timeout = 10 )
1063+
1064+ self .assertEqual (errors , [], f"Allocator threads had errors: { errors } " )
1065+
1066+ def test_walker_transitions_through_range (self ):
1067+ """Force the walker through a wide range of worker counts.
1068+
1069+ Start with a large heap (walker climbs toward 8), then switch
1070+ to tiny heap (walker drops). This ensures the condvar dispatch
1071+ exercises N=2 through N=8 over the test run.
1072+ """
1073+ gc .enable_parallel (8 )
1074+ all_aw = set ()
1075+
1076+ # Phase 1: large heap -- walker should climb
1077+ for _ in range (50 ):
1078+ nodes = [{'id' : i , 'refs' : []} for i in range (100_000 )]
1079+ for i in range (0 , len (nodes ), 100 ):
1080+ nodes [i ]['refs' ].append (nodes [(i + 7 ) % len (nodes )])
1081+ del nodes
1082+ gc .collect ()
1083+ all_aw .add (gc .get_parallel_config ()['adaptive_workers' ])
1084+
1085+ # Phase 2: tiny heap -- walker should drop
1086+ for _ in range (50 ):
1087+ objs = [{'ref' : None } for _ in range (100 )]
1088+ if len (objs ) > 1 :
1089+ objs [0 ]['ref' ] = objs [1 ]
1090+ objs [1 ]['ref' ] = objs [0 ]
1091+ del objs
1092+ gc .collect ()
1093+ all_aw .add (gc .get_parallel_config ()['adaptive_workers' ])
1094+
1095+ # The walker should have visited at least 3 different worker counts
1096+ # across both phases. This proves the condvar dispatch path exercised
1097+ # different participant counts.
1098+ self .assertGreaterEqual (len (all_aw ), 3 ,
1099+ f"Walker only visited { all_aw } -- expected >=3 "
1100+ f"distinct values for condvar coverage" )
1101+
1102+
9661103if __name__ == '__main__' :
9671104 unittest .main ()
0 commit comments