Skip to content

Commit 5e53e62

Browse files
committed
Allow ignoring sinks init errors
Fix finding dates timezone issues
1 parent d169b2b commit 5e53e62

7 files changed

Lines changed: 68 additions & 26 deletions

File tree

docs/notification-routing/configuring-sinks.rst

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,22 @@ Description of each option:
9090
| *parameters* | Slack and webhook_url for MSTeams | | |
9191
+------------------+---------------------------------------------------------+----------------------------------------------------------+-----------------------------------------------+
9292

93+
Ignoring Sinks Initialization Errors
94+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
95+
96+
By default, when Robusta fails to initialize any of the Sinks, it will not start.
97+
98+
On some scenarios, you may want to ignore Sinks initialization errors.
99+
100+
For example, if Robusta is not allowed to connect to Slack, but you still want to receive notifications on the Robusta UI.
101+
102+
In order to enable that, add the below to ``globalConfig`` in your ``generated_values.yaml`` file:
103+
104+
.. code-block:: yaml
105+
106+
globalConfig:
107+
continue_on_sink_errors: True
108+
93109
Learn More
94110
^^^^^^^^^^^^
95111

src/robusta/core/model/cluster_status.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class ActivityStats(BaseModel):
2323
holmesModel: Optional[str]
2424
clusterTimeZone: str
2525
errors: List[str]
26+
sinksInitializationErrors: bool = False
2627

2728

2829
class ClusterStatus(BaseModel):

src/robusta/core/reporting/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import urllib.parse
55
import uuid
66
from abc import ABC, abstractmethod
7-
from datetime import datetime
7+
from datetime import datetime, timezone
88
from enum import Enum
99
from typing import Any, Dict, List, Optional, Union
1010
from urllib.parse import urlencode
@@ -289,7 +289,7 @@ def __init__(
289289
self.fingerprint = (
290290
fingerprint if fingerprint else self.__calculate_fingerprint(subject, source, aggregation_key)
291291
)
292-
self.starts_at = starts_at if starts_at else datetime.now()
292+
self.starts_at = starts_at if starts_at else datetime.now(timezone.utc)
293293
self.ends_at = ends_at
294294
self.dirty = False
295295

src/robusta/core/sinks/robusta/dal/model_conversion.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import json
33
import logging
44
import uuid
5-
from datetime import datetime
5+
from datetime import datetime, timezone
66
from typing import Any, Dict, List
77

88
from robusta.core.model.env_vars import ENABLE_GRAPH_BLOCK
@@ -52,7 +52,7 @@ def to_finding_json(account_id: str, cluster_id: str, finding: Finding):
5252
"account_id": account_id,
5353
"video_links": [link.dict() for link in finding.links], # TD: Migrate column in table.
5454
"starts_at": datetime_to_db_str(finding.starts_at),
55-
"updated_at": datetime_to_db_str(datetime.now()),
55+
"updated_at": datetime_to_db_str(datetime.now(timezone.utc)),
5656
}
5757

5858
if finding.creation_date:

src/robusta/core/sinks/robusta/robusta_sink.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,7 @@ def __update_cluster_status(self):
563563
holmesModel=holmes_model,
564564
clusterTimeZone=str(datetime.now().astimezone().tzinfo),
565565
errors=self.__errors,
566+
sinksInitializationErrors=self.registry.has_sink_initialization_errors(),
566567
)
567568

568569
# checking the status of relay connection

src/robusta/model/config.py

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
from collections import defaultdict
3-
from typing import Dict, List, Optional
3+
from typing import Dict, List, Optional, Tuple
44

55
from robusta.core.model.env_vars import PROMETHEUS_ENABLED, RUNNER_VERSION
66
from robusta.core.playbooks.actions_registry import ActionsRegistry
@@ -46,7 +46,8 @@ def construct_new_sinks(
4646
new_sinks_config: List[SinkConfigBase],
4747
existing_sinks: Dict[str, SinkBase],
4848
registry,
49-
) -> Dict[str, SinkBase]:
49+
continue_on_sink_errors: bool = False,
50+
) -> Tuple[Dict[str, SinkBase], bool]:
5051
new_sink_names = [sink_config.get_name() for sink_config in new_sinks_config]
5152
# remove deleted sinks
5253
deleted_sink_names = [sink_name for sink_name in existing_sinks.keys() if sink_name not in new_sink_names]
@@ -56,6 +57,7 @@ def construct_new_sinks(
5657
del existing_sinks[deleted_sink]
5758

5859
new_sinks: Dict[str, SinkBase] = dict()
60+
has_sink_errors = False
5961

6062
# Reload sinks, order does matter and should be loaded & added to the dict by config order.
6163
for sink_config in new_sinks_config:
@@ -68,24 +70,33 @@ def construct_new_sinks(
6870

6971
sink_name = sink_config.get_name()
7072
exists_sink = existing_sinks.get(sink_name, None)
71-
if not exists_sink:
72-
logging.info(f"Adding {type(sink_config)} sink named {sink_name}")
73-
new_sinks[sink_name] = SinkFactory.create_sink(sink_config, registry)
74-
continue
75-
76-
is_global_config_changed = exists_sink.is_global_config_changed()
77-
is_sink_changed = sink_config.get_params() != exists_sink.params or is_global_config_changed
78-
if is_sink_changed:
79-
config_change_msg = "due to global config change" if is_global_config_changed else "due to param change"
80-
logging.info(f"Updating {type(sink_config)} sink named {sink_config.get_name()} {config_change_msg}")
81-
exists_sink.stop()
82-
new_sinks[sink_name] = SinkFactory.create_sink(sink_config, registry)
83-
continue
84-
85-
logging.info("Sink %s not changed", sink_name)
86-
new_sinks[sink_name] = exists_sink
87-
88-
return new_sinks
73+
74+
try:
75+
if not exists_sink:
76+
logging.info(f"Adding {type(sink_config)} sink named {sink_name}")
77+
new_sinks[sink_name] = SinkFactory.create_sink(sink_config, registry)
78+
continue
79+
80+
is_global_config_changed = exists_sink.is_global_config_changed()
81+
is_sink_changed = sink_config.get_params() != exists_sink.params or is_global_config_changed
82+
if is_sink_changed:
83+
config_change_msg = "due to global config change" if is_global_config_changed else "due to param change"
84+
logging.info(f"Updating {type(sink_config)} sink named {sink_config.get_name()} {config_change_msg}")
85+
exists_sink.stop()
86+
new_sinks[sink_name] = SinkFactory.create_sink(sink_config, registry)
87+
continue
88+
89+
logging.info("Sink %s not changed", sink_name)
90+
new_sinks[sink_name] = exists_sink
91+
92+
except Exception as e:
93+
has_sink_errors = True
94+
logging.error(f"Failed to initialize sink {sink_name}: {e}", exc_info=True)
95+
if not continue_on_sink_errors:
96+
raise
97+
# Skip this sink if continue_on_sink_errors is True
98+
99+
return new_sinks, has_sink_errors
89100

90101

91102
class PlaybooksRegistry:
@@ -171,6 +182,7 @@ class Registry:
171182
prometheus_enabled=PROMETHEUS_ENABLED,
172183
)
173184
_pubsub: EventsPubSub = EventsPubSub()
185+
_sink_initialization_errors: bool = False
174186

175187
def set_light_actions(self, light_actions: List[str]):
176188
self._light_actions = light_actions
@@ -231,3 +243,9 @@ def subscribe(self, event_name: str, handler: EventHandler):
231243

232244
def unsubscribe(self, event_name: str, handler: EventHandler):
233245
self._pubsub.unsubscribe(event_name, handler)
246+
247+
def set_sink_initialization_errors(self, has_errors: bool):
248+
self._sink_initialization_errors = has_errors
249+
250+
def has_sink_initialization_errors(self) -> bool:
251+
return self._sink_initialization_errors

src/robusta/runner/config_loader.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,10 +287,16 @@ def __prepare_runtime_config(
287287
sinks_registry: SinksRegistry,
288288
actions_registry: ActionsRegistry,
289289
registry: Registry,
290-
) -> (SinksRegistry, PlaybooksRegistry):
290+
) -> tuple[SinksRegistry, PlaybooksRegistry]:
291291
existing_sinks = sinks_registry.get_all() if sinks_registry else {}
292-
new_sinks = SinksRegistry.construct_new_sinks(runner_config.sinks_config, existing_sinks, registry)
292+
new_sinks, has_sink_errors = SinksRegistry.construct_new_sinks(
293+
runner_config.sinks_config,
294+
existing_sinks,
295+
registry,
296+
runner_config.global_config.get("continue_on_sink_errors", False)
297+
)
293298
sinks_registry = SinksRegistry(new_sinks)
299+
registry.set_sink_initialization_errors(has_sink_errors)
294300

295301
# TODO we will replace it with a more generic mechanism, as part of the triggers separation task
296302
# First, we load the internal playbooks, then add the user activated playbooks

0 commit comments

Comments
 (0)