1717- Graceful shutdown on Ctrl+C
1818
1919Interactive Commands (available during runtime):
20- - 'anomaly [#]' or 'a [#]' - Switch to anomaly mode (all assets or specific asset #)
21- - 'normal [#]' or 'n [#]' - Switch to normal mode (all assets or specific asset #)
20+ - 'anomaly [#]' or 'a [#]' - Switch to anomaly mode
21+ (all assets or specific asset #)
22+ - 'normal [#]' or 'n [#]' - Switch to normal mode
23+ (all assets or specific asset #)
2224- 'status' or 's' - Show current simulation status
2325- 'stats' - Show detailed per-asset statistics
2426- 'help' or 'h' - Show available commands
3133 AZURE_EVENT_HUB_NAMESPACE_HOSTNAME - Azure Event Hub namespace
3234 (e.g., myeventhub.servicebus.windows.net)
3335 AZURE_EVENT_HUB_NAME - Name of the Event Hub
34- ASSETS_CSV_PATH - Path to assets.csv file (default: infra/data/assets.csv)
35- PRODUCTS_CSV_PATH - Path to products.csv file (default: infra/data/products.csv)
36+ ASSETS_CSV_PATH - Path to assets.csv file
37+ (default: infra/data/assets.csv)
38+ PRODUCTS_CSV_PATH - Path to products.csv file
39+ (default: infra/data/products.csv)
3640 SIMULATION_INTERVAL - Seconds between events per asset (default: 5)
37- MAX_RUNTIME_SECONDS - Maximum runtime in seconds (default: unlimited)
41+ MAX_RUNTIME_SECONDS - Maximum runtime in seconds
42+ (default: unlimited)
3843
3944Example:
4045 python event_simulator.py --interval 2 --max-runtime 300
7075class AssetSimulator :
7176 """Simulates events for a single manufacturing asset."""
7277
73- def __init__ (self , asset_id : str , asset_name : str , asset_type_name : str ,
74- products : List [Dict ], event_hub_service : EventHubService , index : int ):
78+ def __init__ (
79+ self , asset_id : str , asset_name : str , asset_type_name : str ,
80+ products : List [Dict ], event_hub_service : EventHubService ,
81+ index : int
82+ ):
7583 self .asset_id = asset_id
7684 self .asset_name = asset_name
7785 self .asset_type_name = asset_type_name
@@ -130,12 +138,14 @@ def start(self, interval_seconds: float):
130138
131139 self .is_running = True
132140 self .thread = threading .Thread (
133- target = self ._simulation_loop ,
134- args = (interval_seconds ,)
141+ target = self ._simulation_loop , args = (interval_seconds ,)
135142 )
136143 self .thread .daemon = True
137144 self .thread .start ()
138- print (f"🚀 Started simulation for { self .asset_name } (ID: { self .asset_id } )" )
145+ print (
146+ f"🚀 Started simulation for { self .asset_name } "
147+ f"(ID: { self .asset_id } )"
148+ )
139149
140150 def stop (self ):
141151 """Stop the event simulation for this asset."""
@@ -220,14 +230,16 @@ def load_products(self, products_csv_path: str) -> List[Dict]:
220230 except FileNotFoundError :
221231 print (
222232 f"⚠️ Products file not found: { products_csv_path } "
223- "(will use default)"
233+ f "(will use default)"
224234 )
225235 except Exception as e :
226236 print (f"⚠️ Error loading products: { e } (will use default)" )
227237 return products
228238
229- def create_simulators (self , assets : List [Dict ], products : List [Dict ],
230- event_hub_service : EventHubService ):
239+ def create_simulators (
240+ self , assets : List [Dict ], products : List [Dict ],
241+ event_hub_service : EventHubService
242+ ):
231243 """Create asset simulators."""
232244 self .simulators = []
233245 for i , asset in enumerate (assets , 1 ):
@@ -240,11 +252,12 @@ def create_simulators(self, assets: List[Dict], products: List[Dict],
240252 index = i
241253 )
242254 self .simulators .append (simulator )
243- print (f"🏭 Created simulators for { len (self .simulators )} assets" )
255+ print (
256+ f"🏭 Created simulators for { len (self .simulators )} assets"
257+ )
244258
245259 def start_all_simulators (
246- self ,
247- interval_seconds : float ,
260+ self , interval_seconds : float ,
248261 max_runtime_seconds : Optional [int ] = None
249262 ):
250263 """Start all asset simulators."""
@@ -285,11 +298,11 @@ def _start_command_interface(self):
285298 print (f"\n 🎛️ Interactive Commands Available:" )
286299 print (
287300 f" Type 'anomaly [#]' to switch to anomaly mode "
288- "(all assets or specific asset #)"
301+ f "(all assets or specific asset #)"
289302 )
290303 print (
291304 f" Type 'normal [#]' to switch to normal mode "
292- "(all assets or specific asset #)"
305+ f "(all assets or specific asset #)"
293306 )
294307 print (f" Type 'status' to show current status" )
295308 print (f" Type 'stats' to show detailed statistics" )
@@ -304,9 +317,8 @@ def _command_loop(self):
304317 try :
305318 # Simple blocking input - works on all platforms
306319 print (
307- "\n 💬 Enter command (type 'help' for options): " ,
308- end = '' ,
309- flush = True
320+ f"\n 💬 Enter command (type 'help' for options): " ,
321+ end = '' , flush = True
310322 )
311323 command = input ().strip ().lower ()
312324 if command : # Only process non-empty commands
@@ -315,8 +327,8 @@ def _command_loop(self):
315327 except (EOFError , KeyboardInterrupt ):
316328 break
317329 except Exception :
318- # Silently handle command loop errors
319- # to not interrupt main simulation
330+ # Silently handle command loop errors to not interrupt
331+ # main simulation
320332 time .sleep (0.1 )
321333 continue
322334
@@ -330,10 +342,18 @@ def _handle_command(self, command: str):
330342 cmd = parts [0 ] if parts else ''
331343
332344 if cmd in ['anomaly' , 'a' ]:
333- asset_num = int (parts [1 ]) if len (parts ) > 1 and parts [1 ].isdigit () else None
345+ asset_num = (
346+ int (parts [1 ])
347+ if len (parts ) > 1 and parts [1 ].isdigit ()
348+ else None
349+ )
334350 self ._switch_to_anomaly_mode (asset_num )
335351 elif cmd in ['normal' , 'n' ]:
336- asset_num = int (parts [1 ]) if len (parts ) > 1 and parts [1 ].isdigit () else None
352+ asset_num = (
353+ int (parts [1 ])
354+ if len (parts ) > 1 and parts [1 ].isdigit ()
355+ else None
356+ )
337357 self ._switch_to_normal_mode (asset_num )
338358 elif cmd in ['status' , 's' ]:
339359 self ._show_status ()
@@ -347,7 +367,7 @@ def _handle_command(self, command: str):
347367 elif command .strip ():
348368 print (
349369 f"❓ Unknown command: '{ command } '. "
350- "Type 'help' for available commands."
370+ f "Type 'help' for available commands."
351371 )
352372
353373 def _switch_to_anomaly_mode (self , asset_num : Optional [int ] = None ):
@@ -361,7 +381,10 @@ def _switch_to_anomaly_mode(self, asset_num: Optional[int] = None):
361381 f"({ simulator .asset_name } ) TO ANOMALY MODE"
362382 )
363383 else :
364- print (f"❌ Invalid asset number. Valid range: 1-{ len (self .simulators )} " )
384+ print (
385+ f"❌ Invalid asset number. "
386+ f"Valid range: 1-{ len (self .simulators )} "
387+ )
365388 else :
366389 for simulator in self .simulators :
367390 simulator .anomaly_mode = True
@@ -378,7 +401,10 @@ def _switch_to_normal_mode(self, asset_num: Optional[int] = None):
378401 f"({ simulator .asset_name } ) TO NORMAL MODE"
379402 )
380403 else :
381- print (f"❌ Invalid asset number. Valid range: 1-{ len (self .simulators )} " )
404+ print (
405+ f"❌ Invalid asset number. "
406+ f"Valid range: 1-{ len (self .simulators )} "
407+ )
382408 else :
383409 for simulator in self .simulators :
384410 simulator .anomaly_mode = False
@@ -388,8 +414,7 @@ def _show_status(self):
388414 """Show current simulation status."""
389415 elapsed = (
390416 (datetime .now () - self .start_time ).total_seconds ()
391- if self .start_time
392- else 0
417+ if self .start_time else 0
393418 )
394419 total_events = sum (s .events_sent for s in self .simulators )
395420 total_anomalies = sum (s .anomaly_events_sent for s in self .simulators )
@@ -409,8 +434,7 @@ def _show_status(self):
409434 )
410435 print (
411436 f" Events/sec: { total_events / elapsed :.2f} "
412- if elapsed > 0
413- else " Events/sec: 0"
437+ if elapsed > 0 else " Events/sec: 0"
414438 )
415439
416440 if anomaly_assets :
@@ -422,8 +446,8 @@ def _show_detailed_stats(self):
422446 """Show detailed per-asset statistics."""
423447 print (f"\n 📈 DETAILED STATISTICS" )
424448 print (
425- f"{ '#' :<3} { 'Asset Name' :<20} { 'Mode' :<8} "
426- f"{ 'Total' :<8 } { ' Normal' :<8} { 'Anomaly' :<8} { 'Anomaly %' :<10} "
449+ f"{ '#' :<3} { 'Asset Name' :<20} { 'Mode' :<8} { 'Total' :<8 } "
450+ f"{ 'Normal' :<8} { 'Anomaly' :<8} { 'Anomaly %' :<10} "
427451 )
428452 print ("-" * 75 )
429453
@@ -436,20 +460,20 @@ def _show_detailed_stats(self):
436460
437461 print (
438462 f"{ simulator .index :<3} { simulator .asset_name :<20} "
439- f"{ mode :<8} { total :<8} { normal :<8} "
440- f"{ anomalies :<8 } { anomaly_pct :<10.1f} %"
463+ f"{ mode :<8} { total :<8} { normal :<8} { anomalies :<8 } "
464+ f"{ anomaly_pct :<10.1f} %"
441465 )
442466
443467 def _show_help (self ):
444468 """Show help for available commands."""
445469 print (f"\n 🎛️ AVAILABLE COMMANDS:" )
446470 print (
447- f" anomaly [#], a [#] - "
448- "Switch to anomaly mode (all or specific asset)"
471+ f" anomaly [#], a [#] - Switch to anomaly mode "
472+ f" (all or specific asset)"
449473 )
450474 print (
451- f" normal [#], n [#] - "
452- "Switch to normal mode (all or specific asset)"
475+ f" normal [#], n [#] - Switch to normal mode "
476+ f" (all or specific asset)"
453477 )
454478 print (f" status, s - Show current simulation status" )
455479 print (f" stats - Show detailed per-asset statistics" )
@@ -491,12 +515,13 @@ def stop_all_simulators(self):
491515
492516 # Print summary
493517 total_events = sum (s .events_sent for s in self .simulators )
494- total_anomalies = sum (s .anomaly_events_sent for s in self .simulators )
518+ total_anomalies = sum (
519+ s .anomaly_events_sent for s in self .simulators
520+ )
495521 total_normal = total_events - total_anomalies
496522 elapsed = (
497523 (datetime .now () - self .start_time ).total_seconds ()
498- if self .start_time
499- else 0
524+ if self .start_time else 0
500525 )
501526
502527 print ("\n " + "=" * 60 )
@@ -508,22 +533,21 @@ def stop_all_simulators(self):
508533 print (f" Anomaly events: { total_anomalies } " )
509534 print (
510535 f" Anomaly rate: { total_anomalies / total_events * 100 :.1f} %"
511- if total_events > 0
512- else " Anomaly rate: 0%"
536+ if total_events > 0 else " Anomaly rate: 0%"
513537 )
514538 print (
515539 f"Events per second: { total_events / elapsed :.2f} "
516- if elapsed > 0
517- else "Events per second: 0"
540+ if elapsed > 0 else "Events per second: 0"
518541 )
519542 print (f"Active assets: { len (self .simulators )} " )
520543 print ("\n Per-asset summary:" )
521544 for simulator in self .simulators :
522- normal_events = simulator .events_sent - simulator .anomaly_events_sent
545+ normal_events = (
546+ simulator .events_sent - simulator .anomaly_events_sent
547+ )
523548 anomaly_pct = (
524549 (simulator .anomaly_events_sent / simulator .events_sent * 100 )
525- if simulator .events_sent > 0
526- else 0
550+ if simulator .events_sent > 0 else 0
527551 )
528552 print (
529553 f" { simulator .asset_name } : { simulator .events_sent } total "
@@ -564,19 +588,25 @@ def main():
564588 data_dir = (src_dir / '..' / 'infra' / 'data' ).resolve ()
565589
566590 # Get configuration from environment variables or arguments
567- event_hub_namespace_fqdn = os .getenv ('AZURE_EVENT_HUB_NAMESPACE_HOSTNAME' )
591+ event_hub_namespace_fqdn = os .getenv (
592+ 'AZURE_EVENT_HUB_NAMESPACE_HOSTNAME'
593+ )
568594 event_hub_name = os .getenv ('AZURE_EVENT_HUB_NAME' )
569- assets_csv_path = args .assets_csv or os .getenv (
570- 'ASSETS_CSV_PATH' , data_dir / 'assets.csv'
595+ assets_csv_path = (
596+ args .assets_csv or
597+ os .getenv ('ASSETS_CSV_PATH' , data_dir / 'assets.csv' )
598+ )
599+ products_csv_path = (
600+ args .products_csv or
601+ os .getenv ('PRODUCTS_CSV_PATH' , data_dir / 'products.csv' )
571602 )
572- products_csv_path = args .products_csv or os . getenv (
573- 'PRODUCTS_CSV_PATH ' , data_dir / 'products.csv'
603+ interval = args .interval or float (
604+ os . getenv ( 'SIMULATION_INTERVAL ' , '5.0' )
574605 )
575- interval = args .interval or float (os .getenv ('SIMULATION_INTERVAL' , '5.0' ))
576- max_runtime = args .max_runtime or (
577- int (os .getenv ('MAX_RUNTIME_SECONDS' ))
578- if os .getenv ('MAX_RUNTIME_SECONDS' )
579- else None
606+ max_runtime = (
607+ args .max_runtime or
608+ (int (os .getenv ('MAX_RUNTIME_SECONDS' ))
609+ if os .getenv ('MAX_RUNTIME_SECONDS' ) else None )
580610 )
581611
582612 # Validate required configuration
@@ -587,17 +617,17 @@ def main():
587617 )
588618 print (
589619 "Set it using: export "
590- "AZURE_EVENT_HUB_NAMESPACE_HOSTNAME="
591- "'your_namespace.servicebus.windows.net' "
592- "or in Powershell: "
593- "$env:AZURE_EVENT_HUB_NAMESPACE_HOSTNAME="
594- "'your_namespace.servicebus.windows.net'"
620+ "AZURE_EVENT_HUB_NAMESPACE_HOSTNAME='"
621+ "your_namespace.servicebus.windows.net' or in Powershell: "
622+ "$env:AZURE_EVENT_HUB_NAMESPACE_HOSTNAME='"
623+ "your_namespace.servicebus.windows.net'"
595624 )
596625 sys .exit (1 )
597626
598627 if not event_hub_name :
599628 print (
600- "❌ ERROR: AZURE_EVENT_HUB_NAME environment variable is required"
629+ "❌ ERROR: AZURE_EVENT_HUB_NAME "
630+ "environment variable is required"
601631 )
602632 print (
603633 "Set it using: export AZURE_EVENT_HUB_NAME='your_event_hub_name' "
0 commit comments