Skip to content

Commit f646c9d

Browse files
committed
Add auth token to runner
1 parent 55124a8 commit f646c9d

4 files changed

Lines changed: 61 additions & 12 deletions

File tree

src/robusta/core/sinks/robusta/dal/supabase_dal.py

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@
22
import logging
33
from collections import defaultdict
44
from datetime import datetime
5-
from typing import Any, Dict, List, Optional
5+
import os
6+
import threading
7+
from typing import Any, Dict, List, Optional, Tuple
8+
from uuid import uuid4
69

10+
from cachetools import TTLCache
711
import requests
812
from postgrest._sync.request_builder import SyncQueryRequestBuilder
913
from postgrest.base_request_builder import BaseFilterRequestBuilder
@@ -49,7 +53,7 @@
4953
ACCOUNT_RESOURCE_TABLE = "AccountResource"
5054
ACCOUNT_RESOURCE_STATUS_TABLE = "AccountResourceStatus"
5155
OPENSHIFT_GROUPS_TABLE = "OpenshiftGroups"
52-
56+
SESSION_TOKENS_TABLE = "AuthTokens"
5357

5458
class SupabaseDal(AccountResourceFetcher):
5559
def __init__(
@@ -77,11 +81,14 @@ def __init__(
7781
self.patch_postgrest_execute()
7882
self.email = email
7983
self.password = password
80-
self.sign_in()
84+
self.user_id = self.sign_in()
8185
self.client.auth.on_auth_state_change(self.__update_token_patch)
8286
self.sink_name = sink_name
8387
self.persist_events = persist_events
8488
self.signing_key = signing_key
89+
ttl = int(os.environ.get("SAAS_SESSION_TOKEN_TTL_SEC", "82800")) # 23 hours
90+
self.token_cache = TTLCache(maxsize=1, ttl=ttl)
91+
self.lock = threading.Lock()
8592

8693
def patch_postgrest_execute(self):
8794
# This is somewhat hacky.
@@ -532,11 +539,12 @@ def publish_helm_releases(self, helm_releases: List[HelmRelease]):
532539
logging.error(f"Failed to persist helm_releases {helm_releases} error: {e}")
533540
raise
534541

535-
def sign_in(self):
542+
def sign_in(self) -> str:
536543
logging.info("Supabase dal login")
537544
res = self.client.auth.sign_in_with_password({"email": self.email, "password": self.password})
538545
self.client.auth.set_session(res.session.access_token, res.session.refresh_token)
539546
self.client.postgrest.auth(res.session.access_token)
547+
return res.user.id
540548

541549
def to_db_cluster_status(self, data: ClusterStatus) -> Dict[str, Any]:
542550
db_cluster_status = data.dict()
@@ -753,3 +761,25 @@ def set_cluster_active(self, active: bool) -> None:
753761
)
754762
except Exception as e:
755763
logging.error(f"Failed to set cluster status active=False error: {e}")
764+
765+
def get_session_token(self) -> Tuple[str, str]:
766+
with self.lock:
767+
session_token = self.token_cache.get("session_token")
768+
if not session_token:
769+
session_token = self.create_session_token()
770+
self.token_cache["session_token"] = session_token
771+
772+
return session_token
773+
774+
def create_session_token(self) -> str:
775+
token = str(uuid4())
776+
self.client.table(SESSION_TOKENS_TABLE).insert(
777+
{
778+
"account_id": self.account_id,
779+
"user_id": self.user_id,
780+
"token": token,
781+
"type": "RUNNER",
782+
},
783+
returning=ReturnMethod.minimal,
784+
).execute()
785+
return token

src/robusta/integrations/receiver.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,14 @@ class SlackActionsMessage(BaseModel):
7676

7777

7878
class ActionRequestReceiver:
79-
def __init__(self, event_handler: PlaybooksEventHandler):
79+
def __init__(self, event_handler: PlaybooksEventHandler, auth_token: str):
8080
self.event_handler = event_handler
8181
self.active = True
8282
self.account_id = self.event_handler.get_global_config().get("account_id")
8383
self.cluster_name = self.event_handler.get_global_config().get("cluster_name")
8484
self.auth_provider = AuthProvider()
8585
self.healthy = False
86+
self.auth_token = auth_token
8687

8788
self.ws = websocket.WebSocketApp(
8889
WEBSOCKET_RELAY_ADDRESS,
@@ -291,8 +292,9 @@ def on_open(self, ws):
291292
"account_id": account_id,
292293
"cluster_name": cluster_name,
293294
"version": RUNNER_VERSION,
295+
"token": self.auth_token,
294296
}
295-
logging.info(f"connecting to server as account_id={account_id}; cluster_name={cluster_name}")
297+
logging.info(f"connecting to server as account_id={account_id}; cluster_name={cluster_name} token={self.auth_token}")
296298
ws.send(json.dumps(open_payload))
297299

298300
def __validate_request(self, action_request: ExternalActionRequest, validate_timestamp: bool) -> ValidationResponse:

src/robusta/model/config.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from robusta.core.pubsub.event_emitter import EventEmitter
1010
from robusta.core.pubsub.event_subscriber import EventHandler
1111
from robusta.core.pubsub.events_pubsub import EventsPubSub
12+
from robusta.core.sinks.robusta.robusta_sink import RobustaSink
1213
from robusta.core.sinks.robusta.robusta_sink_params import RobustaSinkConfigWrapper, RobustaSinkParams
1314
from robusta.core.sinks.sink_base import SinkBase
1415
from robusta.core.sinks.sink_config import SinkConfigBase
@@ -35,6 +36,9 @@ def get_sink_by_name(self, sink_name: str) -> Optional[SinkBase]:
3536

3637
def get_all(self) -> Dict[str, SinkBase]:
3738
return self.sinks
39+
40+
def get_robusta_sinks(self) -> list[RobustaSink]:
41+
return [sink for sink in self.sinks.values() if isinstance(sink, RobustaSink)]
3842

3943
@classmethod
4044
def construct_new_sinks(
@@ -159,7 +163,7 @@ class Registry:
159163
_playbooks: PlaybooksRegistry = PlaybooksRegistry()
160164
_sinks: SinksRegistry = None
161165
_scheduler = None
162-
_receiver: ActionRequestReceiver = None
166+
_receiver: Optional[ActionRequestReceiver] = None
163167
_global_config = dict()
164168
_alert_relabel_config: List[AlertRelabel] = []
165169
_telemetry: Telemetry = Telemetry(
@@ -201,7 +205,7 @@ def get_scheduler(self) -> PlaybooksSchedulerManager:
201205
def set_receiver(self, receiver: ActionRequestReceiver):
202206
self._receiver = receiver
203207

204-
def get_receiver(self) -> ActionRequestReceiver:
208+
def get_receiver(self) -> Optional[ActionRequestReceiver]:
205209
return self._receiver
206210

207211
def get_telemetry(self) -> Telemetry:

src/robusta/runner/config_loader.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def __reload_scheduler(self, playbooks_registry: PlaybooksRegistry):
8686
def __reload_receiver(self):
8787
receiver = self.registry.get_receiver()
8888
if not receiver: # no existing receiver, just start one
89-
self.registry.set_receiver(ActionRequestReceiver(self.event_handler))
89+
self.__create_receiver()
9090
return
9191

9292
current_account_id = self.event_handler.get_global_config().get("account_id")
@@ -95,8 +95,21 @@ def __reload_receiver(self):
9595
if current_account_id != receiver.account_id or current_cluster_name != receiver.cluster_name:
9696
# need to re-create the receiver
9797
receiver.stop()
98-
self.registry.set_receiver(ActionRequestReceiver(self.event_handler))
98+
self.__create_receiver()
9999

100+
def __create_receiver(self):
101+
robusta_sinks = self.registry.get_sinks().get_robusta_sinks()
102+
if not robusta_sinks:
103+
logging.info("No robusta sinks found, skipping receiver creation")
104+
return
105+
106+
robusta_sink = robusta_sinks[0]
107+
token = robusta_sink.dal.get_session_token()
108+
109+
receiver = ActionRequestReceiver(self.event_handler, token)
110+
self.registry.set_receiver(receiver)
111+
return receiver
112+
100113
@staticmethod
101114
def __get_package_name_from_pyproject(local_path: str) -> str:
102115
with open(os.path.join(local_path, "pyproject.toml"), "r") as pyproj_toml:
@@ -235,8 +248,6 @@ def __reload_playbook_packages(self, change_name):
235248
# This needs to be set before the robusta sink is created since a cluster status is sent on creation
236249
self.registry.set_light_actions(runner_config.light_actions if runner_config.light_actions else [])
237250

238-
self.__reload_receiver()
239-
240251
(sinks_registry, playbooks_registry) = self.__prepare_runtime_config(
241252
runner_config,
242253
self.registry.get_sinks(),
@@ -250,6 +261,8 @@ def __reload_playbook_packages(self, change_name):
250261
self.registry.set_actions(action_registry)
251262
self.registry.set_playbooks(playbooks_registry)
252263
self.registry.set_sinks(sinks_registry)
264+
self.__reload_receiver()
265+
253266

254267
telemetry = self.registry.get_telemetry()
255268
telemetry.playbooks_count = len(runner_config.active_playbooks) if runner_config.active_playbooks else 0

0 commit comments

Comments
 (0)