|
5 | 5 | from collections import defaultdict |
6 | 6 | from concurrent.futures.process import BrokenProcessPool, ProcessPoolExecutor |
7 | 7 | from typing import Dict, List, Optional, Union |
8 | | - |
| 8 | +import dpath.util |
9 | 9 | import prometheus_client |
10 | 10 | from hikaru.model.rel_1_26 import ( |
11 | 11 | Container, |
|
50 | 50 | DISCOVERY_PROCESS_TIMEOUT_SEC, |
51 | 51 | IS_OPENSHIFT, |
52 | 52 | OPENSHIFT_GROUPS, |
| 53 | + CUSTOM_CRD |
53 | 54 | ) |
54 | 55 | from robusta.core.model.helm_release import HelmRelease |
55 | 56 | from robusta.core.model.jobs import JobInfo |
|
58 | 59 | from robusta.core.model.openshift_group import OpenshiftGroup |
59 | 60 | from robusta.core.model.services import ContainerInfo, ServiceConfig, ServiceInfo, VolumeInfo |
60 | 61 | from robusta.integrations.kubernetes.custom_models import DeploymentConfig, DictToK8sObj, Rollout |
| 62 | +from robusta.integrations.kubernetes.custom_crds import CRDS_map |
61 | 63 | from robusta.patch.patch import create_monkey_patches |
62 | 64 | from robusta.utils.cluster_provider_discovery import cluster_provider |
63 | 65 | from robusta.utils.stack_tracer import StackTracer |
@@ -191,11 +193,53 @@ def discovery_process() -> DiscoveryResults: |
191 | 193 | node_requests = defaultdict(list) # map between node name, to request of pods running on it |
192 | 194 | active_services: List[ServiceInfo] = [] |
193 | 195 | openshift_groups: List[OpenshiftGroup] = [] |
194 | | - |
| 196 | + continue_ref: Optional[str] = None |
195 | 197 | # discover micro services |
| 198 | + |
196 | 199 | try: |
| 200 | + for cls_name in CUSTOM_CRD: |
| 201 | + if (cls := CRDS_map.get(cls_name)) is None: |
| 202 | + continue |
| 203 | + |
| 204 | + for _ in range(DISCOVERY_MAX_BATCHES): |
| 205 | + try: |
| 206 | + crd_res = client.CustomObjectsApi().list_cluster_custom_object( |
| 207 | + group=cls.group, |
| 208 | + version=cls.version, |
| 209 | + plural=cls.plural, |
| 210 | + limit=DISCOVERY_BATCH_SIZE, |
| 211 | + _continue=continue_ref, |
| 212 | + ) |
| 213 | + except Exception: |
| 214 | + logging.exception(msg=f"Failed to list {cls.name} from api.") |
| 215 | + break |
| 216 | + |
| 217 | + for crd in crd_res.get("items", []): |
| 218 | + try: |
| 219 | + meta = DictToK8sObj(crd.get("metadata"), V1ObjectMeta) |
| 220 | + active_services.extend( |
| 221 | + [ |
| 222 | + Discovery.__create_service_info( |
| 223 | + meta=meta, |
| 224 | + kind=cls.name, |
| 225 | + containers=[], |
| 226 | + volumes=[], |
| 227 | + total_pods=dpath.util.get(crd, cls.total_pods_path, default=0), |
| 228 | + ready_pods=dpath.util.get(crd, cls.ready_pods_path, default=0), |
| 229 | + is_helm_release=is_release_managed_by_helm( |
| 230 | + annotations=meta.annotations, labels=meta.labels |
| 231 | + ), |
| 232 | + ) |
| 233 | + ] |
| 234 | + ) |
| 235 | + except Exception: |
| 236 | + logging.exception(msg=f"Failed to parse {cls.name} {crd}") |
| 237 | + continue |
| 238 | + |
| 239 | + continue_ref = crd_res.get("metadata", {}).get("continue") |
| 240 | + if not continue_ref: |
| 241 | + break |
197 | 242 |
|
198 | | - continue_ref: Optional[str] = None |
199 | 243 | if IS_OPENSHIFT: |
200 | 244 | for _ in range(DISCOVERY_MAX_BATCHES): |
201 | 245 | try: |
|
0 commit comments