1+ """ML optimization using Gaussian Process regression."""
2+ import warnings
3+ import mesa
4+ import numpy as np
5+ import matplotlib .pyplot as plt
6+ from sklearn .gaussian_process import GaussianProcessRegressor
7+ from sklearn .gaussian_process .kernels import RBF
8+ from sklearn .preprocessing import StandardScaler
9+ from .agents import SimpleAgent
10+ warnings .filterwarnings ('ignore' ,category = UserWarning )
11+
12+ class MLDemoModel (mesa .Model ):
13+ """Simple segregation model for testing parameter optimization."""
14+ def __init__ (self ,width = 20 ,height = 20 ,num_agents = 30 ,ratio = 0.5 ,
15+ exploration_bias = 0.5 ,noise_level = 0.05 ,seed = None ):
16+ super ().__init__ (seed = seed )
17+ self .width = width
18+ self .height = height
19+ self .num_agents = num_agents
20+ self .ratio = ratio
21+ self .exploration_bias = exploration_bias
22+ self .noise_level = noise_level
23+ self .grid = mesa .space .MultiGrid (width ,height ,torus = False )
24+ self .schedule = mesa .time .RandomActivation (self )
25+ for i in range (num_agents ):
26+ agent = SimpleAgent (i ,self )
27+ self .schedule .add (agent )
28+ x = self .random .randrange (width )
29+ y = self .random .randrange (height )
30+ self .grid .place_agent (agent ,(x ,y ))
31+ self .datacollector = mesa .DataCollector (
32+ model_reporters = {
33+ "Segregation" :self .segregation_score ,
34+ "AgentCount" :lambda m :len (m .schedule .agents ),
35+ }
36+ )
37+ def step (self ):
38+ """Advance model by one step."""
39+ self .schedule .step ()
40+ self .datacollector .collect (self )
41+ def segregation_score (self ):
42+ """Count how many neighbors match agent type (0-1 score)."""
43+ similar = 0
44+ total = 0
45+ for agent in self .schedule .agents :
46+ neighbors = self .grid .get_neighbors (agent .pos ,moore = True )
47+ for neighbor in neighbors :
48+ if neighbor .type == agent .type :
49+ similar += 1
50+ total += 1
51+ base_score = similar / total if total > 0 else 0
52+ noise = self .random .gauss (0 ,self .noise_level )
53+ noisy_score = np .clip (base_score + noise ,0 ,1 )
54+ return noisy_score
55+ def optimize_parameters (self ,n_iterations = 30 ):
56+ """Try random parameters then use ML to find better ones."""
57+ X = []
58+ y = []
59+ phase = []
60+ print ("\n Phase 1: Random exploration (10 samples)" )
61+ for i in range (10 ):
62+ params = {
63+ "num_agents" :self .random .randint (10 ,60 ),
64+ "ratio" :self .random .uniform (0.3 ,0.8 ),
65+ "exploration_bias" :self .random .uniform (0.0 ,1.0 ),
66+ }
67+ model = MLDemoModel (
68+ width = self .width , height = self .height , ** params , seed = None
69+ )
70+ for _ in range (100 ):
71+ model .step ()
72+ score = model .segregation_score ()
73+ X .append ([params ["num_agents" ], params ["ratio" ], params ["exploration_bias" ]])
74+ y .append (score )
75+ phase .append (1 )
76+ print (f" { i + 1 :2d} . agents={ params ['num_agents' ]:2d} , "
77+ f"ratio={ params ['ratio' ]:.2f} , "
78+ f"bias={ params ['exploration_bias' ]:.2f} score={ score :.4f} " )
79+ best_score = max (y )
80+ best_idx = np .argmax (y )
81+ best_params = {
82+ "num_agents" :int (X [best_idx ][0 ]),
83+ "ratio" :float (X [best_idx ][1 ]),
84+ "exploration_bias" :float (X [best_idx ][2 ]),
85+ }
86+ print ("\n Phase 2: Bayesian Optimization (20 iterations)" )
87+ kernel = RBF (length_scale = 1.0 ,length_scale_bounds = (1e-2 ,1e3 ))
88+ gp = GaussianProcessRegressor (
89+ kernel = kernel ,random_state = 42 ,n_restarts_optimizer = 10
90+ )
91+ scaler = StandardScaler ()
92+ X_normalized = scaler .fit_transform (X )
93+ for i in range (20 ):
94+ gp .fit (X_normalized ,y )
95+ candidates = []
96+ for _ in range (100 ):
97+ num = self .random .randint (10 ,60 )
98+ ratio = self .random .uniform (0.3 ,0.8 )
99+ bias = self .random .uniform (0.0 ,1.0 )
100+ candidates .append ([num ,ratio ,bias ])
101+ candidates_normalized = scaler .transform (candidates )
102+ preds ,stds = gp .predict (candidates_normalized ,return_std = True )
103+ ucb = preds + 1.96 * stds
104+ best_candidate_idx = np .argmax (ucb )
105+ params = {
106+ "num_agents" :int (candidates [best_candidate_idx ][0 ]),
107+ "ratio" :float (candidates [best_candidate_idx ][1 ]),
108+ "exploration_bias" :float (candidates [best_candidate_idx ][2 ]),
109+ }
110+ model = MLDemoModel (
111+ width = self .width , height = self .height , ** params , seed = None
112+ )
113+ for _ in range (100 ):
114+ model .step ()
115+ score = model .segregation_score ()
116+ X .append ([params ["num_agents" ], params ["ratio" ], params ["exploration_bias" ]])
117+ y .append (score )
118+ phase .append (2 )
119+ if score > best_score :
120+ best_score = score
121+ best_params = params
122+ print (f" { i + 1 :2d} . NEW BEST - agents={ params ['num_agents' ]:2d} , "
123+ f"ratio={ params ['ratio' ]:.2f} , "
124+ f"bias={ params ['exploration_bias' ]:.2f} , score={ score :.4f} " )
125+ else :
126+ print (f" { i + 1 :2d} . agents={ params ['num_agents' ]:2d} , "
127+ f"ratio={ params ['ratio' ]:.2f} , "
128+ f"bias={ params ['exploration_bias' ]:.2f} , score={ score :.4f} " )
129+ X_normalized = scaler .fit_transform (X )
130+ print ("\n Optimization Results" )
131+ y = np .array (y )
132+ phase = np .array (phase )
133+ random_scores = y [phase == 1 ]
134+ ml_scores = y [phase == 2 ]
135+ print ("\n Phase 1 (Random Search):" )
136+ print (f" Best: { np .max (random_scores ):.4f} " )
137+ print (f" Mean: { np .mean (random_scores ):.4f} +/- { np .std (random_scores ):.4f} " )
138+ print ("\n Phase 2 (Bayesian Optimization):" )
139+ print (f" Best: { np .max (ml_scores ):.4f} " )
140+ print (f" Mean: { np .mean (ml_scores ):.4f} +/- { np .std (ml_scores ):.4f} " )
141+ improvement_pct = ((best_score - np .max (random_scores ))/ np .max (random_scores )* 100 )
142+ print (f"\n Improvement: { improvement_pct :+.2f} %" )
143+ print ("\n Best Parameters:" )
144+ print (f" agents: { best_params ['num_agents' ]} " )
145+ print (f" ratio: { best_params ['ratio' ]:.4f} " )
146+ print (f" exploration_bias: { best_params ['exploration_bias' ]:.4f} " )
147+ print (f" score: { best_score :.4f} \n " )
148+ return {
149+ "best_params" :best_params ,
150+ "best_score" :best_score ,
151+ "all_scores" :y .tolist (),
152+ "all_phases" :phase .tolist (),
153+ "random_scores" :random_scores .tolist (),
154+ "ml_scores" :ml_scores .tolist (),
155+ }
156+
157+ def visualize_optimization (result ,save_path = None ):
158+ """Plot the optimization results."""
159+ all_scores = np .array (result ["all_scores" ])
160+ phases = np .array (result ["all_phases" ])
161+ fig ,axes = plt .subplots (1 ,2 ,figsize = (14 ,5 ))
162+ ax = axes [0 ]
163+ random_idx = phases == 1
164+ ml_idx = phases == 2
165+ ax .scatter (np .where (random_idx )[0 ],all_scores [random_idx ],
166+ color = 'red' , s = 100 , alpha = 0.6 , label = 'Random Search' , marker = 'o' )
167+ ax .scatter (np .where (ml_idx )[0 ],all_scores [ml_idx ],
168+ color = 'blue' , s = 100 , alpha = 0.6 , label = 'Bayesian Opt' , marker = 's' )
169+ ax .set_xlabel ('Iteration' ,fontsize = 12 ,fontweight = 'bold' )
170+ ax .set_ylabel ('Score' ,fontsize = 12 ,fontweight = 'bold' )
171+ ax .set_title ('Optimization Performance: Random vs Bayesian' ,fontsize = 13 ,fontweight = 'bold' )
172+ ax .grid (True ,alpha = 0.3 )
173+ ax .legend (fontsize = 11 )
174+ ax = axes [1 ]
175+ running_best_random = np .maximum .accumulate (all_scores [random_idx ])
176+ running_best_ml = np .maximum .accumulate (all_scores [ml_idx ])
177+ ax .plot (range (len (running_best_random )),running_best_random ,
178+ 'o-' ,color = 'red' ,linewidth = 2 ,markersize = 6 ,label = 'Random Search' )
179+ ax .plot (range (len (running_best_random ), len (running_best_random )+ len (running_best_ml )),
180+ running_best_ml , 's-' , color = 'blue' , linewidth = 2 , markersize = 6 , label = 'Bayesian Opt' )
181+ ax .axvline (x = 10 ,color = 'gray' ,linestyle = '--' ,alpha = 0.5 ,linewidth = 1 )
182+ ax .text (5 ,ax .get_ylim ()[1 ]* 0.95 ,'Phase 1' ,ha = 'center' ,fontsize = 10 ,style = 'italic' )
183+ ax .text (20 ,ax .get_ylim ()[1 ]* 0.95 ,'Phase 2' ,ha = 'center' ,fontsize = 10 ,style = 'italic' )
184+ ax .set_xlabel ('Iteration' ,fontsize = 12 ,fontweight = 'bold' )
185+ ax .set_ylabel ('Best Score Found' ,fontsize = 12 ,fontweight = 'bold' )
186+ ax .set_title ('Convergence: Best Score Over Time' ,fontsize = 13 ,fontweight = 'bold' )
187+ ax .grid (True ,alpha = 0.3 )
188+ ax .legend (fontsize = 11 )
189+ plt .tight_layout ()
190+ if save_path :
191+ plt .savefig (save_path ,dpi = 300 ,bbox_inches = 'tight' )
192+ print (f"Visualization saved to: { save_path } " )
193+ plt .show ()
194+
195+ def run_statistical_validation (num_runs = 15 ):
196+ """Run it multiple times and see if results are consistent."""
197+ print (f"\n Running { num_runs } optimization runs...\n " )
198+ all_improvements = []
199+ all_best_scores = []
200+ for run in range (num_runs ):
201+ print (f"Run { run + 1 } /{ num_runs } ..." , end = " " , flush = True )
202+ model = MLDemoModel ()
203+ result = model .optimize_parameters ()
204+ random_best = max (result ["random_scores" ])
205+ improvement = ((result ["best_score" ]- random_best )/ random_best * 100 )
206+ all_improvements .append (improvement )
207+ all_best_scores .append (result ["best_score" ])
208+ print (f"improvement: { improvement :+.2f} %" )
209+ consistency = sum (1 for x in all_improvements if x >= 0 )/ num_runs * 100
210+ print ("\n Statistical Summary:" )
211+ print (f" Mean improvement: { np .mean (all_improvements ):+.2f} %" )
212+ print (f" Std deviation: { np .std (all_improvements ):.2f} %" )
213+ print (f" Min: { np .min (all_improvements ):+.2f} %" )
214+ print (f" Max: { np .max (all_improvements ):+.2f} %" )
215+ print (f" Consistency: { consistency :.1f} % of runs beat random search\n " )
216+ return {
217+ "improvements" :all_improvements ,
218+ "best_scores" :all_best_scores ,
219+ "mean_improvement" :np .mean (all_improvements ),
220+ "std_improvement" :np .std (all_improvements ),
221+ "consistency" :consistency ,
222+ }
223+
224+ def run_optimization ():
225+ """Run one optimization and show the plot."""
226+ model = MLDemoModel ()
227+ result = model .optimize_parameters ()
228+ visualize_optimization (result )
229+ return result
230+
231+ def run_full_analysis (num_runs = 15 ):
232+ """Run multiple times then show a plot from the final run."""
233+ stats = run_statistical_validation (num_runs = num_runs )
234+ print ("Creating visualization from final run..." )
235+ model = MLDemoModel ()
236+ result = model .optimize_parameters ()
237+ visualize_optimization (result )
238+
239+ return stats , result
240+
241+
242+ if __name__ == "__main__" :
243+ # Full analysis with 15 runs
244+ stats ,final_result = run_full_analysis (num_runs = 15 )
0 commit comments