Skip to content

Commit 7cde4cc

Browse files
authored
[ROB-1605] Custom Namespaced resource monitoring (#1869)
* partially working * working version * updating namespace cache * changed names * changes * refactored * moved namespace discovery to discovery thread * removed unneeded discovery * changed error message * moved namespace discovery * cached namespaces * fixed issues * loaded metadata from cache * removed unused imports * bugfix, namespace metada being removed * made safer, if added new namespace
1 parent fe4c1a1 commit 7cde4cc

4 files changed

Lines changed: 211 additions & 12 deletions

File tree

src/robusta/core/discovery/discovery.py

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
Volume,
2020
)
2121
from kubernetes import client
22+
from kubernetes.client.exceptions import ApiException
2223
from kubernetes.client import (
2324
V1Container,
2425
V1DaemonSet,
@@ -87,7 +88,24 @@ class Config:
8788

8889
DISCOVERY_STACKTRACE_FILE = "/tmp/make_discovery_stacktrace"
8990
DISCOVERY_STACKTRACE_TIMEOUT_S = int(os.environ.get("DISCOVERY_STACKTRACE_TIMEOUT_S", 10))
90-
91+
KIND_TO_COREV1_METHOD = {
92+
"pods": "list_pod_for_all_namespaces",
93+
"configmaps": "list_config_map_for_all_namespaces",
94+
"endpoints": "list_endpoints_for_all_namespaces",
95+
"services": "list_service_for_all_namespaces",
96+
"secrets": "list_secret_for_all_namespaces",
97+
"persistentvolumeclaims": "list_persistent_volume_claim_for_all_namespaces",
98+
"serviceaccounts": "list_service_account_for_all_namespaces",
99+
"replicationcontrollers": "list_replication_controller_for_all_namespaces",
100+
"limitranges": "list_limit_range_for_all_namespaces",
101+
"resourcequotas": "list_resource_quota_for_all_namespaces",
102+
"events": "list_event_for_all_namespaces",
103+
"podtemplates": "list_pod_template_for_all_namespaces",
104+
}
105+
106+
class ResourceAccessForbiddenError(Exception):
107+
"""Raised when access to a Kubernetes resource is forbidden (HTTP 403)."""
108+
pass
91109

92110
class Discovery:
93111
executor = ProcessPoolExecutor(max_workers=1) # always 1 discovery process
@@ -184,6 +202,91 @@ def create_service_info_from_hikaru(obj: Union[Deployment, DaemonSet, StatefulSe
184202
),
185203
)
186204

205+
@staticmethod
206+
def count_resources(kind, api_group, version):
207+
if not api_group:
208+
items = Discovery._fetch_corev1_resources(kind, version)
209+
else:
210+
items = Discovery._fetch_crd_resources(kind, api_group, version)
211+
212+
return Discovery._count_items_by_namespace(items, kind)
213+
214+
215+
@staticmethod
216+
def _fetch_corev1_resources(kind, version):
217+
if version != "v1":
218+
logging.error(f"Unsupported CoreV1 resource version '{version}' for kind '{kind}'")
219+
return []
220+
221+
method_name = KIND_TO_COREV1_METHOD.get(kind.lower())
222+
if not method_name:
223+
logging.warning(f"No CoreV1Api mapping for kind: '{kind}'")
224+
return []
225+
226+
core_v1 = client.CoreV1Api()
227+
method = getattr(core_v1, method_name, None)
228+
if not method:
229+
logging.warning(f"CoreV1Api does not have method '{method_name}' for kind '{kind}'")
230+
return []
231+
232+
try:
233+
response = method()
234+
return getattr(response, 'items', response)
235+
except ApiException as e:
236+
if e.status == 403:
237+
raise ResourceAccessForbiddenError(f"Access forbidden to CoreV1 resource '{kind}': {e.body}")
238+
raise
239+
240+
241+
@staticmethod
242+
def _fetch_crd_resources(kind, api_group, version):
243+
items = []
244+
continue_ref = None
245+
for _ in range(DISCOVERY_MAX_BATCHES):
246+
try:
247+
crd_res = client.CustomObjectsApi().list_cluster_custom_object(
248+
group=api_group,
249+
version=version,
250+
plural=kind.lower(),
251+
limit=DISCOVERY_BATCH_SIZE,
252+
_continue=continue_ref,
253+
)
254+
items.extend(crd_res.get("items", []))
255+
continue_ref = crd_res.get("metadata", {}).get("continue")
256+
if not continue_ref:
257+
break
258+
except ApiException as e:
259+
if e.status == 403:
260+
raise ResourceAccessForbiddenError(
261+
f"Access forbidden to resource '{kind}' in group '{api_group}': {e.body}"
262+
)
263+
logging.exception(f"Failed to list {kind} from api group '{api_group}'.")
264+
break
265+
return items
266+
267+
268+
@staticmethod
269+
def _count_items_by_namespace(items, kind):
270+
namespace_counts = defaultdict(int)
271+
for item in items:
272+
metadata = getattr(item, 'metadata', None)
273+
namespace = None
274+
275+
if metadata:
276+
namespace = getattr(metadata, 'namespace', None)
277+
elif isinstance(item, dict):
278+
metadata = item.get('metadata', {})
279+
namespace = metadata.get('namespace')
280+
281+
if not namespace:
282+
logging.warning(f"Missing namespace for resource '{kind}': metadata={metadata}")
283+
continue
284+
285+
namespace_counts[namespace] += 1
286+
287+
return dict(namespace_counts)
288+
289+
187290
@staticmethod
188291
def discovery_process() -> DiscoveryResults:
189292
create_monkey_patches()

src/robusta/core/model/namespaces.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,21 @@
1+
from typing import Optional, List
12
from pydantic import BaseModel
23
from kubernetes.client import V1Namespace
4+
import json
35

6+
class ResourceCount(BaseModel):
7+
kind: str
8+
apiVersion: str
9+
apiGroup: str
10+
count: int
11+
12+
13+
class NamespaceMetadata(BaseModel):
14+
resources: Optional[List[ResourceCount]]
415

516
class NamespaceInfo(BaseModel):
617
name: str
18+
metadata: Optional[NamespaceMetadata] = None
719
deleted: bool = False
820

921
@classmethod
@@ -14,7 +26,17 @@ def from_api_server(cls, namespace: V1Namespace) -> "NamespaceInfo":
1426

1527
@classmethod
1628
def from_db_row(cls, namespace: dict) -> "NamespaceInfo":
29+
metadata_raw = namespace.get("metadata")
30+
31+
if isinstance(metadata_raw, str):
32+
metadata_dict = json.loads(metadata_raw)
33+
elif isinstance(metadata_raw, dict):
34+
metadata_dict = metadata_raw
35+
else:
36+
metadata_dict = None
37+
1738
return cls(
18-
name=namespace["name"],
19-
deleted=namespace["deleted"],
39+
name=namespace.get("name"),
40+
deleted=namespace.get("deleted", False),
41+
metadata=NamespaceMetadata(**metadata_dict) if metadata_dict else None
2042
)

src/robusta/core/sinks/robusta/robusta_sink.py

Lines changed: 75 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99

1010
import requests
1111
from hikaru.model.rel_1_26 import DaemonSet, Deployment, Job, Node, Pod, ReplicaSet, StatefulSet
12-
13-
from robusta.core.discovery.discovery import DISCOVERY_STACKTRACE_TIMEOUT_S, Discovery, DiscoveryResults
12+
from robusta.core.model.namespaces import NamespaceMetadata, ResourceCount
13+
from robusta.core.discovery.discovery import DISCOVERY_STACKTRACE_TIMEOUT_S, Discovery, DiscoveryResults, ResourceAccessForbiddenError
1414
from robusta.core.discovery.top_service_resolver import TopLevelResource, TopServiceResolver
1515
from robusta.core.discovery.utils import from_api_server_node
1616
from robusta.core.model.base_params import HolmesParams
@@ -51,6 +51,7 @@
5151
# Define the cache with a single slot and the configured TTL
5252
_holmes_slackbot_cache = TTLCache(maxsize=1, ttl=HOLMES_SLACKBOT_CACHE_TTL)
5353

54+
5455
class RobustaSink(SinkBase, EventHandler):
5556
services_publish_lock = threading.Lock()
5657

@@ -61,6 +62,9 @@ def __init__(self, sink_config: RobustaSinkConfigWrapper, registry):
6162
self.token = sink_config.robusta_sink.token
6263
self.ttl_hours = sink_config.robusta_sink.ttl_hours
6364
self.persist_events = sink_config.robusta_sink.persist_events
65+
self.namespace_monitored_resources = sink_config.robusta_sink.namespaceMonitoredResources
66+
self.namespace_discovery_seconds = sink_config.robusta_sink.namespace_discovery_seconds
67+
6468
robusta_token = RobustaToken(**json.loads(base64.b64decode(self.token)))
6569
if self.account_id != robusta_token.account_id:
6670
logging.error(
@@ -134,7 +138,7 @@ def _on_config_reload(self) -> None:
134138
if not hasattr(self, '_thread'):
135139
self._thread = threading.Thread(target=self.__discover_cluster, daemon=True)
136140
self._thread.start()
137-
141+
138142
def set_cluster_active(self, active: bool):
139143
self.dal.set_cluster_active(active)
140144

@@ -324,6 +328,63 @@ def __send_helm_release_events(self, release_data: List[HelmRelease]):
324328
except Exception:
325329
logging.error("Error occurred while sending `helm release` trigger event", exc_info=True)
326330

331+
def __discover_custom_namespaced_resources(self, namespaces: List[NamespaceInfo]):
332+
if not self.namespace_monitored_resources:
333+
return
334+
335+
try:
336+
# Step 1: Collect counts in a temporary map
337+
resource_map = {} # type: Dict[str, List[ResourceCount]]
338+
339+
for resource in self.namespace_monitored_resources:
340+
try:
341+
results = Discovery.count_resources(
342+
kind=resource.kind,
343+
api_group=resource.apiGroup,
344+
version=resource.apiVersion
345+
)
346+
347+
for namespace_name, count in results.items():
348+
if namespace_name not in resource_map:
349+
resource_map[namespace_name] = []
350+
351+
resource_map[namespace_name].append(ResourceCount(
352+
kind=resource.kind,
353+
apiVersion=resource.apiVersion,
354+
apiGroup=resource.apiGroup,
355+
count=count
356+
))
357+
358+
except ResourceAccessForbiddenError as e:
359+
logging.warning(f"Skipping resource {resource.kind} due to insufficient permissions: {e}")
360+
except Exception as e:
361+
logging.exception(f"Unexpected error counting resource {resource.kind}: {e}")
362+
363+
# Step 2: Apply metadata to matching NamespaceInfo entries
364+
for ns in namespaces:
365+
if ns.name in resource_map:
366+
if not ns.metadata:
367+
ns.metadata = NamespaceMetadata(resources=[])
368+
ns.metadata.resources.extend(resource_map[ns.name])
369+
370+
logging.info("Discovered Namespaced custom resources")
371+
return namespaces
372+
373+
except Exception as e:
374+
logging.exception(f"Namespace discovery failed: {e}")
375+
376+
def __add_cached_namespace_metadata(self, namespaces: List[NamespaceInfo]):
377+
discovered_namespaces = {namespace.name: namespace for namespace in namespaces}
378+
updated_namespaces: List[NamespaceInfo] = []
379+
380+
for namespace_name, namespace in discovered_namespaces.items():
381+
cached_namespace = self.__namespaces_cache.get(namespace_name)
382+
if cached_namespace:
383+
namespace.metadata = cached_namespace.metadata
384+
updated_namespaces.append(namespace)
385+
386+
return updated_namespaces
387+
327388
def __discover_resources(self) -> DiscoveryResults:
328389
# discovery is using the k8s python API and not Hikaru, since it's performance is 10 times better
329390
try:
@@ -342,7 +403,13 @@ def __discover_resources(self) -> DiscoveryResults:
342403
self.__publish_new_helm_releases(results.helm_releases)
343404

344405
self.__assert_namespaces_cache_initialized()
345-
self.__publish_new_namespaces(results.namespaces)
406+
namespaces = results.namespaces
407+
if self.namespace_monitored_resources and (time.time() - self.last_namespace_discovery) >= self.namespace_discovery_seconds:
408+
namespaces = self.__discover_custom_namespaced_resources(namespaces)
409+
self.last_namespace_discovery = time.time()
410+
elif self.namespace_monitored_resources:
411+
namespaces = self.__add_cached_namespace_metadata(namespaces)
412+
self.__publish_new_namespaces(namespaces)
346413

347414
self.__pods_running_count = results.pods_running_count
348415

@@ -559,25 +626,26 @@ def __discovery_watchdog(self):
559626
def __discover_cluster(self):
560627
logging.info("Cluster discovery initialized")
561628
get_history = self.__should_run_history()
629+
self.last_namespace_discovery = 0
562630
while self.__active:
563631
start_t = time.time()
564632
self.__periodic_cluster_status()
565633
discovery_results = self.__discover_resources()
634+
566635
if get_history:
567636
self.__get_events_history()
568637
get_history = False
569-
570638
if discovery_results and discovery_results.helm_releases:
571639
self.__send_helm_release_events(release_data=discovery_results.helm_releases)
572-
573640
duration = round(time.time() - start_t)
574-
# for small cluster duration is discovery_period_sec. For bigger clusters, up to 5 min
575641
sleep_dur = min(max(self.__discovery_period_sec, 3 * duration), 300)
642+
576643
logging.debug(f"Discovery duration: {duration} next discovery in {sleep_dur}")
577644
time.sleep(sleep_dur)
578645

579646
logging.info(f"Service discovery for sink {self.sink_name} ended.")
580647

648+
581649
def __periodic_cluster_status(self):
582650
first_alert = False
583651

src/robusta/core/sinks/robusta/robusta_sink_params.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from pydantic.main import BaseModel
2-
2+
from typing import List, Optional
33
from robusta.core.sinks.sink_base_params import SinkBaseParams
44
from robusta.core.sinks.sink_config import SinkConfigBase
55

@@ -11,12 +11,18 @@ class RobustaToken(BaseModel):
1111
email: str
1212
password: str
1313

14+
class NamespaceMonitoredResources(BaseModel):
15+
apiGroup: Optional[str] # no group in V1 core resources
16+
apiVersion: str
17+
kind: str
1418

1519
class RobustaSinkParams(SinkBaseParams):
1620
token: str
1721
ttl_hours: int = 4380 # Time before unactive cluster data is deleted. 6 Months default.
1822
persist_events: bool = False
19-
23+
namespaceMonitoredResources: Optional[List[NamespaceMonitoredResources]]
24+
namespace_discovery_seconds: int = 3600
25+
2026
@classmethod
2127
def _get_sink_type(cls):
2228
return "robusta"

0 commit comments

Comments
 (0)