2828 python event_simulator.py [options]
2929
3030Environment Variables:
31- AZURE_EVENT_HUB_NAMESPACE_HOSTNAME - Azure Event Hub namespace (e.g., myeventhub.servicebus.windows.net)
31+ AZURE_EVENT_HUB_NAMESPACE_HOSTNAME - Azure Event Hub namespace
32+ (e.g., myeventhub.servicebus.windows.net)
3233 AZURE_EVENT_HUB_NAME - Name of the Event Hub
3334 ASSETS_CSV_PATH - Path to assets.csv file (default: infra/data/assets.csv)
3435 PRODUCTS_CSV_PATH - Path to products.csv file (default: infra/data/products.csv)
@@ -110,12 +111,14 @@ def _check_set_new_batch(self):
110111 def _create_event (self , anomaly : bool ) -> Event :
111112 self ._check_set_new_batch ()
112113
113- event = self .asset_type .create_random_event (asset_id = self .asset_id ,
114- product_id = self ._get_random_product (),
115- batch_id = self .current_batch_id ,
116- timestamp = datetime .now (timezone .utc ),
117- anomaly = anomaly ,
118- variation_multiplier = random .uniform (2 , 3 ))
114+ event = self .asset_type .create_random_event (
115+ asset_id = self .asset_id ,
116+ product_id = self ._get_random_product (),
117+ batch_id = self .current_batch_id ,
118+ timestamp = datetime .now (timezone .utc ),
119+ anomaly = anomaly ,
120+ variation_multiplier = random .uniform (2 , 3 )
121+ )
119122
120123 self .events_in_batch += 1
121124 return event
@@ -126,7 +129,10 @@ def start(self, interval_seconds: float):
126129 return
127130
128131 self .is_running = True
129- self .thread = threading .Thread (target = self ._simulation_loop , args = (interval_seconds ,))
132+ self .thread = threading .Thread (
133+ target = self ._simulation_loop ,
134+ args = (interval_seconds ,)
135+ )
130136 self .thread .daemon = True
131137 self .thread .start ()
132138 print (f"🚀 Started simulation for { self .asset_name } (ID: { self .asset_id } )" )
@@ -137,8 +143,11 @@ def stop(self):
137143 if self .thread :
138144 self .thread .join (timeout = 2 )
139145 total_normal = self .events_sent - self .anomaly_events_sent
140- print (f"⏹️ Stopped simulation for { self .asset_name } - { self .events_sent } total events "
141- f"(Normal: { total_normal } , Anomalies: { self .anomaly_events_sent } )" )
146+ print (
147+ f"⏹️ Stopped simulation for { self .asset_name } - "
148+ f"{ self .events_sent } total events "
149+ f"(Normal: { total_normal } , Anomalies: { self .anomaly_events_sent } )"
150+ )
142151
143152 def _simulation_loop (self , interval_seconds : float ):
144153 """Main simulation loop for this asset."""
@@ -209,7 +218,10 @@ def load_products(self, products_csv_path: str) -> List[Dict]:
209218 products .append (row )
210219 print (f"📦 Loaded { len (products )} products from { products_csv_path } " )
211220 except FileNotFoundError :
212- print (f"⚠️ Products file not found: { products_csv_path } (will use default)" )
221+ print (
222+ f"⚠️ Products file not found: { products_csv_path } "
223+ "(will use default)"
224+ )
213225 except Exception as e :
214226 print (f"⚠️ Error loading products: { e } (will use default)" )
215227 return products
@@ -230,7 +242,11 @@ def create_simulators(self, assets: List[Dict], products: List[Dict],
230242 self .simulators .append (simulator )
231243 print (f"🏭 Created simulators for { len (self .simulators )} assets" )
232244
233- def start_all_simulators (self , interval_seconds : float , max_runtime_seconds : Optional [int ] = None ):
245+ def start_all_simulators (
246+ self ,
247+ interval_seconds : float ,
248+ max_runtime_seconds : Optional [int ] = None
249+ ):
234250 """Start all asset simulators."""
235251 if not self .simulators :
236252 print ("❌ No simulators to start" )
@@ -267,8 +283,14 @@ def _start_command_interface(self):
267283
268284 # Show available commands
269285 print (f"\n 🎛️ Interactive Commands Available:" )
270- print (f" Type 'anomaly [#]' to switch to anomaly mode (all assets or specific asset #)" )
271- print (f" Type 'normal [#]' to switch to normal mode (all assets or specific asset #)" )
286+ print (
287+ f" Type 'anomaly [#]' to switch to anomaly mode "
288+ "(all assets or specific asset #)"
289+ )
290+ print (
291+ f" Type 'normal [#]' to switch to normal mode "
292+ "(all assets or specific asset #)"
293+ )
272294 print (f" Type 'status' to show current status" )
273295 print (f" Type 'stats' to show detailed statistics" )
274296 print (f" Type 'help' to show this help" )
@@ -281,15 +303,20 @@ def _command_loop(self):
281303 while self .is_running :
282304 try :
283305 # Simple blocking input - works on all platforms
284- print (f"\n 💬 Enter command (type 'help' for options): " , end = '' , flush = True )
306+ print (
307+ "\n 💬 Enter command (type 'help' for options): " ,
308+ end = '' ,
309+ flush = True
310+ )
285311 command = input ().strip ().lower ()
286312 if command : # Only process non-empty commands
287313 self ._handle_command (command )
288314
289315 except (EOFError , KeyboardInterrupt ):
290316 break
291317 except Exception :
292- # Silently handle command loop errors to not interrupt main simulation
318+ # Silently handle command loop errors
319+ # to not interrupt main simulation
293320 time .sleep (0.1 )
294321 continue
295322
@@ -318,15 +345,21 @@ def _handle_command(self, command: str):
318345 print (f"\n 🛑 Stopping simulation via command..." )
319346 self .stop_all_simulators ()
320347 elif command .strip ():
321- print (f"❓ Unknown command: '{ command } '. Type 'help' for available commands." )
348+ print (
349+ f"❓ Unknown command: '{ command } '. "
350+ "Type 'help' for available commands."
351+ )
322352
323353 def _switch_to_anomaly_mode (self , asset_num : Optional [int ] = None ):
324354 """Switch simulators to anomaly mode."""
325355 if asset_num is not None :
326356 if 1 <= asset_num <= len (self .simulators ):
327357 simulator = self .simulators [asset_num - 1 ]
328358 simulator .anomaly_mode = True
329- print (f"🚨 SWITCHED ASSET #{ asset_num } ({ simulator .asset_name } ) TO ANOMALY MODE" )
359+ print (
360+ f"🚨 SWITCHED ASSET #{ asset_num } "
361+ f"({ simulator .asset_name } ) TO ANOMALY MODE"
362+ )
330363 else :
331364 print (f"❌ Invalid asset number. Valid range: 1-{ len (self .simulators )} " )
332365 else :
@@ -340,7 +373,10 @@ def _switch_to_normal_mode(self, asset_num: Optional[int] = None):
340373 if 1 <= asset_num <= len (self .simulators ):
341374 simulator = self .simulators [asset_num - 1 ]
342375 simulator .anomaly_mode = False
343- print (f"✅ SWITCHED ASSET #{ asset_num } ({ simulator .asset_name } ) TO NORMAL MODE" )
376+ print (
377+ f"✅ SWITCHED ASSET #{ asset_num } "
378+ f"({ simulator .asset_name } ) TO NORMAL MODE"
379+ )
344380 else :
345381 print (f"❌ Invalid asset number. Valid range: 1-{ len (self .simulators )} " )
346382 else :
@@ -350,7 +386,11 @@ def _switch_to_normal_mode(self, asset_num: Optional[int] = None):
350386
351387 def _show_status (self ):
352388 """Show current simulation status."""
353- elapsed = (datetime .now () - self .start_time ).total_seconds () if self .start_time else 0
389+ elapsed = (
390+ (datetime .now () - self .start_time ).total_seconds ()
391+ if self .start_time
392+ else 0
393+ )
354394 total_events = sum (s .events_sent for s in self .simulators )
355395 total_anomalies = sum (s .anomaly_events_sent for s in self .simulators )
356396 total_normal = total_events - total_anomalies
@@ -363,8 +403,15 @@ def _show_status(self):
363403 print (f" Active Assets: { len (self .simulators )} " )
364404 print (f" Anomaly Mode: { len (anomaly_assets )} assets" )
365405 print (f" Normal Mode: { len (normal_assets )} assets" )
366- print (f" Total Events: { total_events } (Normal: { total_normal } , Anomalies: { total_anomalies } )" )
367- print (f" Events/sec: { total_events / elapsed :.2f} " if elapsed > 0 else " Events/sec: 0" )
406+ print (
407+ f" Total Events: { total_events } "
408+ f"(Normal: { total_normal } , Anomalies: { total_anomalies } )"
409+ )
410+ print (
411+ f" Events/sec: { total_events / elapsed :.2f} "
412+ if elapsed > 0
413+ else " Events/sec: 0"
414+ )
368415
369416 if anomaly_assets :
370417 print (f"\n Assets in Anomaly Mode:" )
@@ -374,7 +421,10 @@ def _show_status(self):
374421 def _show_detailed_stats (self ):
375422 """Show detailed per-asset statistics."""
376423 print (f"\n 📈 DETAILED STATISTICS" )
377- print (f"{ '#' :<3} { 'Asset Name' :<20} { 'Mode' :<8} { 'Total' :<8} { 'Normal' :<8} { 'Anomaly' :<8} { 'Anomaly %' :<10} " )
424+ print (
425+ f"{ '#' :<3} { 'Asset Name' :<20} { 'Mode' :<8} "
426+ f"{ 'Total' :<8} { 'Normal' :<8} { 'Anomaly' :<8} { 'Anomaly %' :<10} "
427+ )
378428 print ("-" * 75 )
379429
380430 for simulator in self .simulators :
@@ -384,13 +434,23 @@ def _show_detailed_stats(self):
384434 anomaly_pct = (anomalies / total * 100 ) if total > 0 else 0
385435 mode = "ANOMALY" if simulator .anomaly_mode else "NORMAL"
386436
387- print (f"{ simulator .index :<3} { simulator .asset_name :<20} { mode :<8} { total :<8} { normal :<8} { anomalies :<8} { anomaly_pct :<10.1f} %" )
437+ print (
438+ f"{ simulator .index :<3} { simulator .asset_name :<20} "
439+ f"{ mode :<8} { total :<8} { normal :<8} "
440+ f"{ anomalies :<8} { anomaly_pct :<10.1f} %"
441+ )
388442
389443 def _show_help (self ):
390444 """Show help for available commands."""
391445 print (f"\n 🎛️ AVAILABLE COMMANDS:" )
392- print (f" anomaly [#], a [#] - Switch to anomaly mode (all or specific asset)" )
393- print (f" normal [#], n [#] - Switch to normal mode (all or specific asset)" )
446+ print (
447+ f" anomaly [#], a [#] - "
448+ "Switch to anomaly mode (all or specific asset)"
449+ )
450+ print (
451+ f" normal [#], n [#] - "
452+ "Switch to normal mode (all or specific asset)"
453+ )
394454 print (f" status, s - Show current simulation status" )
395455 print (f" stats - Show detailed per-asset statistics" )
396456 print (f" help, h, ? - Show this help message" )
@@ -433,7 +493,11 @@ def stop_all_simulators(self):
433493 total_events = sum (s .events_sent for s in self .simulators )
434494 total_anomalies = sum (s .anomaly_events_sent for s in self .simulators )
435495 total_normal = total_events - total_anomalies
436- elapsed = (datetime .now () - self .start_time ).total_seconds () if self .start_time else 0
496+ elapsed = (
497+ (datetime .now () - self .start_time ).total_seconds ()
498+ if self .start_time
499+ else 0
500+ )
437501
438502 print ("\n " + "=" * 60 )
439503 print ("📊 SIMULATION SUMMARY" )
@@ -442,24 +506,50 @@ def stop_all_simulators(self):
442506 print (f"Total events sent: { total_events } " )
443507 print (f" Normal events: { total_normal } " )
444508 print (f" Anomaly events: { total_anomalies } " )
445- print (f" Anomaly rate: { total_anomalies / total_events * 100 :.1f} %" if total_events > 0 else " Anomaly rate: 0%" )
446- print (f"Events per second: { total_events / elapsed :.2f} " if elapsed > 0 else "Events per second: 0" )
509+ print (
510+ f" Anomaly rate: { total_anomalies / total_events * 100 :.1f} %"
511+ if total_events > 0
512+ else " Anomaly rate: 0%"
513+ )
514+ print (
515+ f"Events per second: { total_events / elapsed :.2f} "
516+ if elapsed > 0
517+ else "Events per second: 0"
518+ )
447519 print (f"Active assets: { len (self .simulators )} " )
448520 print ("\n Per-asset summary:" )
449521 for simulator in self .simulators :
450522 normal_events = simulator .events_sent - simulator .anomaly_events_sent
451- anomaly_pct = (simulator .anomaly_events_sent / simulator .events_sent * 100 ) if simulator .events_sent > 0 else 0
452- print (f" { simulator .asset_name } : { simulator .events_sent } total "
453- f"(Normal: { normal_events } , Anomalies: { simulator .anomaly_events_sent } , { anomaly_pct :.1f} %)" )
523+ anomaly_pct = (
524+ (simulator .anomaly_events_sent / simulator .events_sent * 100 )
525+ if simulator .events_sent > 0
526+ else 0
527+ )
528+ print (
529+ f" { simulator .asset_name } : { simulator .events_sent } total "
530+ f"(Normal: { normal_events } , "
531+ f"Anomalies: { simulator .anomaly_events_sent } , "
532+ f"{ anomaly_pct :.1f} %)"
533+ )
454534
455535
456536def main ():
457537 """Main function."""
458- parser = argparse .ArgumentParser (description = 'Manufacturing Event Simulator' )
459- parser .add_argument ('--interval' , type = float , default = 5.0 ,
460- help = 'Seconds between events per asset (default: 5.0)' )
461- parser .add_argument ('--max-runtime' , type = int , default = None ,
462- help = 'Maximum runtime in seconds (default: unlimited)' )
538+ parser = argparse .ArgumentParser (
539+ description = 'Manufacturing Event Simulator'
540+ )
541+ parser .add_argument (
542+ '--interval' ,
543+ type = float ,
544+ default = 5.0 ,
545+ help = 'Seconds between events per asset (default: 5.0)'
546+ )
547+ parser .add_argument (
548+ '--max-runtime' ,
549+ type = int ,
550+ default = None ,
551+ help = 'Maximum runtime in seconds (default: unlimited)'
552+ )
463553 parser .add_argument ('--assets-csv' , type = str , default = None ,
464554 help = 'Path to assets.csv file' )
465555 parser .add_argument ('--products-csv' , type = str , default = None ,
@@ -483,7 +573,11 @@ def main():
483573 'PRODUCTS_CSV_PATH' , data_dir / 'products.csv'
484574 )
485575 interval = args .interval or float (os .getenv ('SIMULATION_INTERVAL' , '5.0' ))
486- max_runtime = args .max_runtime or (int (os .getenv ('MAX_RUNTIME_SECONDS' )) if os .getenv ('MAX_RUNTIME_SECONDS' ) else None )
576+ max_runtime = args .max_runtime or (
577+ int (os .getenv ('MAX_RUNTIME_SECONDS' ))
578+ if os .getenv ('MAX_RUNTIME_SECONDS' )
579+ else None
580+ )
487581
488582 # Validate required configuration
489583 if not event_hub_namespace_fqdn :
@@ -493,17 +587,22 @@ def main():
493587 )
494588 print (
495589 "Set it using: export "
496- "AZURE_EVENT_HUB_NAMESPACE_HOSTNAME='your_namespace.servicebus.windows.net' "
590+ "AZURE_EVENT_HUB_NAMESPACE_HOSTNAME="
591+ "'your_namespace.servicebus.windows.net' "
497592 "or in Powershell: "
498- "$env:AZURE_EVENT_HUB_NAMESPACE_HOSTNAME='your_namespace.servicebus.windows.net'"
593+ "$env:AZURE_EVENT_HUB_NAMESPACE_HOSTNAME="
594+ "'your_namespace.servicebus.windows.net'"
499595 )
500596 sys .exit (1 )
501597
502598 if not event_hub_name :
503- print ("❌ ERROR: AZURE_EVENT_HUB_NAME environment variable is required" )
599+ print (
600+ "❌ ERROR: AZURE_EVENT_HUB_NAME environment variable is required"
601+ )
504602 print (
505603 "Set it using: export AZURE_EVENT_HUB_NAME='your_event_hub_name' "
506- "or in Powershell: $env:AZURE_EVENT_HUB_NAME='your_event_hub_name'"
604+ "or in Powershell: "
605+ "$env:AZURE_EVENT_HUB_NAME='your_event_hub_name'"
507606 )
508607 sys .exit (1 )
509608
0 commit comments