@@ -972,5 +972,142 @@ def test_walker_settles_differently_per_workload(self):
972972 f"for different workloads. Dense={ W1 } , Simple={ W2 } " )
973973
974974
975+ class TestCondvarStress (unittest .TestCase ):
976+ """Stress-test the condvar dispatch path under varying worker counts.
977+
978+ Designed to catch races in _PyGC_DispatchAndWait when the number of
979+ active workers changes between collections (as the proactive walker
980+ does). Particularly useful under TSAN.
981+ """
982+
983+ def setUp (self ):
984+ _setup_parallel_gc (self )
985+
986+ def tearDown (self ):
987+ _teardown_parallel_gc (self )
988+
989+ def test_rapid_collections_varying_heaps (self ):
990+ """200 rapid collections with wildly varying heap sizes.
991+
992+ The proactive walker changes adaptive_workers on ~20% of collections.
993+ With 200 collections, we expect ~40 worker-count changes, exercising
994+ the condvar wake/wait path with different participant counts.
995+ """
996+ import random
997+ rng = random .Random (42 )
998+ gc .enable_parallel (8 )
999+
1000+ for i in range (200 ):
1001+ # Alternate between tiny and large heaps to stress the
1002+ # dispatch path with different workload characteristics
1003+ size = rng .choice ([50 , 500 , 5_000 , 50_000 , 200_000 ])
1004+ objs = [{'ref' : None } for _ in range (size )]
1005+ # Create cycles so GC has work to do
1006+ for j in range (len (objs ) - 1 ):
1007+ objs [j ]['ref' ] = objs [(j + 1 ) % len (objs )]
1008+ del objs
1009+ gc .collect ()
1010+
1011+ def test_enable_disable_cycles (self ):
1012+ """Rapid enable/disable/re-enable with collections in between.
1013+
1014+ Tests the full lifecycle: pool start -> dispatch -> pool stop,
1015+ repeated with different worker counts each time. This exercises
1016+ the condvar init/fini paths and catches races in shutdown.
1017+ """
1018+ for num_workers in [2 , 4 , 8 , 3 , 6 , 2 , 8 , 4 ]:
1019+ gc .enable_parallel (num_workers )
1020+ # Run a few collections at this worker count
1021+ for _ in range (10 ):
1022+ objs = [{'ref' : None } for _ in range (10_000 )]
1023+ for j in range (len (objs ) - 1 ):
1024+ objs [j ]['ref' ] = objs [(j + 1 ) % len (objs )]
1025+ del objs
1026+ gc .collect ()
1027+ gc .disable_parallel ()
1028+ # Collect once with parallel disabled to test serial fallback
1029+ gc .collect ()
1030+
1031+ def test_concurrent_allocation_during_gc (self ):
1032+ """Run GC collections while other threads allocate objects.
1033+
1034+ This simulates real-world conditions where GC runs concurrently
1035+ with application threads. The condvar dispatch must not deadlock
1036+ when the GIL is contended.
1037+ """
1038+ import time
1039+
1040+ stop = threading .Event ()
1041+ errors = []
1042+
1043+ def allocator ():
1044+ """Continuously allocate and release cyclic garbage."""
1045+ try :
1046+ while not stop .is_set ():
1047+ objs = [{'ref' : None } for _ in range (1_000 )]
1048+ for j in range (len (objs ) - 1 ):
1049+ objs [j ]['ref' ] = objs [(j + 1 ) % len (objs )]
1050+ del objs
1051+ # Don't call gc.collect() -- let the main thread drive GC
1052+ except Exception as e :
1053+ errors .append (e )
1054+
1055+ gc .enable_parallel (8 )
1056+
1057+ # Start allocator threads
1058+ threads = []
1059+ for _ in range (4 ):
1060+ t = threading .Thread (target = allocator )
1061+ t .start ()
1062+ threads .append (t )
1063+
1064+ try :
1065+ # Run collections while allocators are running
1066+ for _ in range (100 ):
1067+ gc .collect ()
1068+ finally :
1069+ stop .set ()
1070+ for t in threads :
1071+ t .join (timeout = 10 )
1072+
1073+ self .assertEqual (errors , [], f"Allocator threads had errors: { errors } " )
1074+
1075+ def test_walker_transitions_through_range (self ):
1076+ """Force the walker through a wide range of worker counts.
1077+
1078+ Start with a large heap (walker climbs toward 8), then switch
1079+ to tiny heap (walker drops). This ensures the condvar dispatch
1080+ exercises N=2 through N=8 over the test run.
1081+ """
1082+ gc .enable_parallel (8 )
1083+ all_aw = set ()
1084+
1085+ # Phase 1: large heap -- walker should climb
1086+ for _ in range (50 ):
1087+ nodes = [{'id' : i , 'refs' : []} for i in range (100_000 )]
1088+ for i in range (0 , len (nodes ), 100 ):
1089+ nodes [i ]['refs' ].append (nodes [(i + 7 ) % len (nodes )])
1090+ del nodes
1091+ gc .collect ()
1092+ all_aw .add (gc .get_parallel_config ()['adaptive_workers' ])
1093+
1094+ # Phase 2: tiny heap -- walker should drop
1095+ for _ in range (50 ):
1096+ objs = [{'ref' : None } for _ in range (100 )]
1097+ if len (objs ) > 1 :
1098+ objs [0 ]['ref' ] = objs [1 ]
1099+ objs [1 ]['ref' ] = objs [0 ]
1100+ del objs
1101+ gc .collect ()
1102+ all_aw .add (gc .get_parallel_config ()['adaptive_workers' ])
1103+
1104+ # The walker should have visited at least 3 different worker counts
1105+ # across both phases. This proves the condvar dispatch path exercised
1106+ # different participant counts.
1107+ self .assertGreaterEqual (len (all_aw ), 3 ,
1108+ f"Walker only visited { all_aw } -- expected >=3 "
1109+ f"distinct values for condvar coverage" )
1110+
1111+
9751112if __name__ == '__main__' :
9761113 unittest .main ()
0 commit comments