1919from enginecore .state .web_socket import WebSocket
2020from enginecore .state .redis_channels import RedisChannels
2121from enginecore .model .graph_reference import GraphReference
22+ from enginecore .state .state_initializer import initialize , clear_temp
2223
2324class NotifyClient (Event ):
2425 """Notify websocket clients of any data updates"""
@@ -27,36 +28,26 @@ class StateListener(Component):
2728 """Top-level component that instantiates assets & maps redis events to circuit events"""
2829
2930
30- def __init__ (self , debug = False ):
31+ def __init__ (self , debug = False , force_snmp_init = False ):
3132 super (StateListener , self ).__init__ ()
3233
3334 ### Set-up WebSocket & Redis listener ###
3435
35- # subscribe to redis key events
36- self .redis_store = redis .StrictRedis (host = 'localhost' , port = 6379 )
36+ # Use redis pub/sub communication
37+ self ._redis_store = redis .StrictRedis (host = 'localhost' , port = 6379 )
3738
38- # State Channels
39- self .pubsub = self .redis_store .pubsub ()
40- self .pubsub .psubscribe (
41- RedisChannels .oid_update_channel , # snmp oid updates
42- RedisChannels .state_update_channel # power state changes
43- )
44-
45- # Battery Channel
46- self .bat_pubsub = self .redis_store .pubsub ()
47- self .bat_pubsub .psubscribe (
48- RedisChannels .battery_update_channel , # battery level updates
49- RedisChannels .battery_conf_drain_channel , # update drain speed (factor)
50- RedisChannels .battery_conf_charge_channel # update charge speed (factor)
51- )
39+ self ._bat_pubsub = self ._redis_store .pubsub ()
40+ self ._state_pubsub = self ._redis_store .pubsub ()
5241
5342 # assets will store all the devices/items including PDUs, switches etc.
5443 self ._assets = {}
44+
45+ # init graph db instance
5546 self ._graph_ref = GraphReference ()
56- self ._graph_db = self ._graph_ref .get_session ()
5747
58- # set up a web socket
59- self ._server = Server (("0.0.0.0" , 8000 )).register (self )
48+ # set up a web socket server
49+ self ._server = Server (("0.0.0.0" , 8000 )).register (self )
50+
6051 # Worker(process=False).register(self)
6152 Static ().register (self ._server )
6253 Logger ().register (self ._server )
@@ -65,10 +56,39 @@ def __init__(self, debug=False):
6556 Debugger (events = False ).register (self )
6657
6758 self ._ws = WebSocket ().register (self ._server )
68-
6959 WebSocketsDispatcher ("/simengine" ).register (self ._server )
7060
7161 ### Register Assets ###
62+ self ._subscribe_to_channels ()
63+ self ._reload_model (force_snmp_init )
64+
65+
66+ def _subscribe_to_channels (self ):
67+ """Subscribe to redis channels"""
68+
69+ # State Channels
70+ self ._state_pubsub .psubscribe (
71+ RedisChannels .oid_update_channel , # snmp oid updates
72+ RedisChannels .state_update_channel , # power state changes
73+ RedisChannels .model_update_channel # model changes
74+ )
75+
76+ # Battery Channel
77+ self ._bat_pubsub .psubscribe (
78+ RedisChannels .battery_update_channel , # battery level updates
79+ RedisChannels .battery_conf_drain_channel , # update drain speed (factor)
80+ RedisChannels .battery_conf_charge_channel # update charge speed (factor)
81+ )
82+
83+
84+ def _reload_model (self , force_snmp_init = True ):
85+ """Re-create system topology (instantiate assets based on graph ref)"""
86+
87+ self ._assets = {}
88+
89+ # init state
90+ clear_temp ()
91+ initialize (force_snmp_init )
7292
7393 # instantiate assets based on graph records
7494 leaf_nodes = []
@@ -147,10 +167,6 @@ def _handle_state_update(self, asset_key):
147167 self ._chain_power_update (PowerEventResult (asset_key = asset_key , new_state = asset_status ))
148168
149169
150- def __exit__ (self , exc_type , exc_value , traceback ):
151- self ._graph_db .close ()
152-
153-
154170 def _chain_load_update (self , event_result , increased = True ):
155171 """React to load update event by propogating the load changes up the power stream
156172
@@ -317,7 +333,7 @@ def _notify_client(self, asset_key, data):
317333
318334 def monitor_battery (self ):
319335 """Monitor battery in a separate pub/sub stream"""
320- message = self .bat_pubsub .get_message ()
336+ message = self ._bat_pubsub .get_message ()
321337
322338 # validate message
323339 if ((not message ) or ('data' not in message ) or (not isinstance (message ['data' ], bytes ))):
@@ -346,7 +362,7 @@ def monitor_state(self):
346362 """ listens to redis events """
347363
348364 print ("..." )
349- message = self .pubsub .get_message ()
365+ message = self ._state_pubsub .get_message ()
350366
351367 # validate message
352368 if ((not message ) or ('data' not in message ) or (not isinstance (message ['data' ], bytes ))):
@@ -364,14 +380,22 @@ def monitor_state(self):
364380 self ._handle_state_update (int (asset_key ))
365381
366382 elif message ['channel' ] == str .encode (RedisChannels .oid_update_channel ):
367- value = (self .redis_store .get (data )).decode ()
383+ value = (self ._redis_store .get (data )).decode ()
368384 asset_key , oid = data .split ('-' )
369385 self ._handle_oid_update (int (asset_key ), oid , value )
370386
371387 elif message ['channel' ] == str .encode (RedisChannels .battery_update_channel ):
372388 asset_key , _ = data .split ('-' )
373389 self ._notify_client (int (asset_key ), {'battery' : self ._assets [int (asset_key )].state .battery_level })
374390
391+ elif message ['channel' ] == str .encode (RedisChannels .model_update_channel ):
392+ print ('RELOAD REQUESTED' )
393+ self ._state_pubsub .unsubscribe ()
394+ self ._bat_pubsub .unsubscribe ()
395+
396+ self ._reload_model ()
397+ self ._subscribe_to_channels ()
398+
375399 except KeyError as error :
376400 print ("Detected unregistered asset under key [{}]" .format (error ), file = sys .stderr )
377401
0 commit comments