@@ -86,9 +86,9 @@ def __init__(self, sink_config: RobustaSinkConfigWrapper, registry):
8686 )
8787 self .__rrm_checker = RRM (dal = self .dal , cluster = self .cluster_name , account_id = self .account_id )
8888 self .__pods_running_count : int = 0
89- self .__update_cluster_status () # send runner version initially, then force prometheus alert time periodically.
9089
9190 self .registry .subscribe ("scan_updated" , self )
91+ self .registry .subscribe ("config_init" , self )
9292
9393 # start cluster discovery
9494 self .__active = True
@@ -101,14 +101,14 @@ def __init__(self, sink_config: RobustaSinkConfigWrapper, registry):
101101 self .__jobs_cache_initialized : bool = False
102102 self .__helm_releases_cache : Optional [Dict [str , HelmRelease ]] = None
103103 self .__init_service_resolver ()
104- self .__thread = threading .Thread (target = self .__discover_cluster )
105104 self .__watchdog_thread = threading .Thread (target = self .__discovery_watchdog )
106- self .__thread .start ()
107105 self .__watchdog_thread .start ()
108106
109107 def handle_event (self , event_name : str , ** kwargs ):
110108 if event_name == "scan_updated" :
111109 self ._on_scan_updated (** kwargs )
110+ elif event_name == "config_init" :
111+ self ._on_config_init (** kwargs )
112112 else :
113113 logging .warning ("RobustaSink subscriber called with unknown event" )
114114
@@ -120,6 +120,13 @@ def _on_scan_updated(
120120
121121 self .dal .set_scan_state (scan_id , state , metadata )
122122
123+ def _on_config_init (self ) -> None :
124+ # make sure cluster status and periodic check start after all config has been reloaded successfuly.
125+ self .__update_cluster_status () # send runner version initially, then force prometheus alert time periodically.
126+ self .__thread = threading .Thread (target = self .__discover_cluster )
127+ if not self .__thread .is_alive ():
128+ self .__thread .start ()
129+
123130 def set_cluster_active (self , active : bool ):
124131 self .dal .set_cluster_active (active )
125132
0 commit comments