11import logging
22from collections import defaultdict
3- from typing import Dict , List , Optional
3+ from typing import Dict , List , Optional , Tuple
44
55from robusta .core .model .env_vars import PROMETHEUS_ENABLED , RUNNER_VERSION
66from robusta .core .playbooks .actions_registry import ActionsRegistry
@@ -46,7 +46,8 @@ def construct_new_sinks(
4646 new_sinks_config : List [SinkConfigBase ],
4747 existing_sinks : Dict [str , SinkBase ],
4848 registry ,
49- ) -> Dict [str , SinkBase ]:
49+ continue_on_sink_errors : bool = False ,
50+ ) -> Tuple [Dict [str , SinkBase ], bool ]:
5051 new_sink_names = [sink_config .get_name () for sink_config in new_sinks_config ]
5152 # remove deleted sinks
5253 deleted_sink_names = [sink_name for sink_name in existing_sinks .keys () if sink_name not in new_sink_names ]
@@ -56,6 +57,7 @@ def construct_new_sinks(
5657 del existing_sinks [deleted_sink ]
5758
5859 new_sinks : Dict [str , SinkBase ] = dict ()
60+ has_sink_errors = False
5961
6062 # Reload sinks, order does matter and should be loaded & added to the dict by config order.
6163 for sink_config in new_sinks_config :
@@ -68,24 +70,33 @@ def construct_new_sinks(
6870
6971 sink_name = sink_config .get_name ()
7072 exists_sink = existing_sinks .get (sink_name , None )
71- if not exists_sink :
72- logging .info (f"Adding { type (sink_config )} sink named { sink_name } " )
73- new_sinks [sink_name ] = SinkFactory .create_sink (sink_config , registry )
74- continue
75-
76- is_global_config_changed = exists_sink .is_global_config_changed ()
77- is_sink_changed = sink_config .get_params () != exists_sink .params or is_global_config_changed
78- if is_sink_changed :
79- config_change_msg = "due to global config change" if is_global_config_changed else "due to param change"
80- logging .info (f"Updating { type (sink_config )} sink named { sink_config .get_name ()} { config_change_msg } " )
81- exists_sink .stop ()
82- new_sinks [sink_name ] = SinkFactory .create_sink (sink_config , registry )
83- continue
84-
85- logging .info ("Sink %s not changed" , sink_name )
86- new_sinks [sink_name ] = exists_sink
87-
88- return new_sinks
73+
74+ try :
75+ if not exists_sink :
76+ logging .info (f"Adding { type (sink_config )} sink named { sink_name } " )
77+ new_sinks [sink_name ] = SinkFactory .create_sink (sink_config , registry )
78+ continue
79+
80+ is_global_config_changed = exists_sink .is_global_config_changed ()
81+ is_sink_changed = sink_config .get_params () != exists_sink .params or is_global_config_changed
82+ if is_sink_changed :
83+ config_change_msg = "due to global config change" if is_global_config_changed else "due to param change"
84+ logging .info (f"Updating { type (sink_config )} sink named { sink_config .get_name ()} { config_change_msg } " )
85+ exists_sink .stop ()
86+ new_sinks [sink_name ] = SinkFactory .create_sink (sink_config , registry )
87+ continue
88+
89+ logging .info ("Sink %s not changed" , sink_name )
90+ new_sinks [sink_name ] = exists_sink
91+
92+ except Exception as e :
93+ has_sink_errors = True
94+ logging .error (f"Failed to initialize sink { sink_name } : { e } " , exc_info = True )
95+ if not continue_on_sink_errors :
96+ raise
97+ # Skip this sink if continue_on_sink_errors is True
98+
99+ return new_sinks , has_sink_errors
89100
90101
91102class PlaybooksRegistry :
@@ -171,6 +182,7 @@ class Registry:
171182 prometheus_enabled = PROMETHEUS_ENABLED ,
172183 )
173184 _pubsub : EventsPubSub = EventsPubSub ()
185+ _sink_initialization_errors : bool = False
174186
175187 def set_light_actions (self , light_actions : List [str ]):
176188 self ._light_actions = light_actions
@@ -231,3 +243,9 @@ def subscribe(self, event_name: str, handler: EventHandler):
231243
232244 def unsubscribe (self , event_name : str , handler : EventHandler ):
233245 self ._pubsub .unsubscribe (event_name , handler )
246+
247+ def set_sink_initialization_errors (self , has_errors : bool ):
248+ self ._sink_initialization_errors = has_errors
249+
250+ def has_sink_initialization_errors (self ) -> bool :
251+ return self ._sink_initialization_errors
0 commit comments