11import json
2- from typing import TYPE_CHECKING , List , Optional
2+ from typing import TYPE_CHECKING , List , Optional , Set , Union
33
44from requests import Response
55
6+ from cycode .cli import consts
7+ from cycode .cli .exceptions .custom_exceptions import CycodeError
68from cycode .cli .files_collector .models .in_memory_zip import InMemoryZip
79from cycode .cyclient import models
810from cycode .cyclient .cycode_client_base import CycodeClientBase
@@ -20,6 +22,7 @@ def __init__(
2022
2123 self .SCAN_CONTROLLER_PATH = 'api/v1/scan'
2224 self .DETECTIONS_SERVICE_CONTROLLER_PATH = 'api/v1/detections'
25+ self .POLICIES_SERVICE_CONTROLLER_PATH_V3 = 'api/v3/policies'
2326
2427 self ._hide_response_log = hide_response_log
2528
@@ -95,6 +98,58 @@ def get_scan_details(self, scan_id: str) -> models.ScanDetailsResponse:
9598 response = self .scan_cycode_client .get (url_path = self .get_scan_details_path (scan_id ))
9699 return models .ScanDetailsResponseSchema ().load (response .json ())
97100
101+ def get_detection_rules_path (self ) -> str :
102+ return (
103+ f'{ self .scan_config .get_detections_prefix ()} /'
104+ f'{ self .POLICIES_SERVICE_CONTROLLER_PATH_V3 } /'
105+ f'detection_rules'
106+ )
107+
108+ @staticmethod
109+ def _get_policy_type_by_scan_type (scan_type : str ) -> str :
110+ scan_type_to_policy_type = {
111+ consts .INFRA_CONFIGURATION_SCAN_TYPE : 'IaC' ,
112+ consts .SCA_SCAN_TYPE : 'SCA' ,
113+ consts .SECRET_SCAN_TYPE : 'SecretDetection' ,
114+ consts .SAST_SCAN_TYPE : 'SAST' ,
115+ }
116+
117+ if scan_type not in scan_type_to_policy_type :
118+ raise CycodeError ('Invalid scan type' )
119+
120+ return scan_type_to_policy_type [scan_type ]
121+
122+ @staticmethod
123+ def _filter_detection_rules_by_ids (
124+ detection_rules : List [models .DetectionRule ], detection_rules_ids : Union [Set [str ], List [str ]]
125+ ) -> List [models .DetectionRule ]:
126+ ids = set (detection_rules_ids ) # cast to set to perform faster search
127+ return [rule for rule in detection_rules if rule .detection_rule_id in ids ]
128+
129+ @staticmethod
130+ def parse_detection_rules_response (response : Response ) -> List [models .DetectionRule ]:
131+ return models .DetectionRuleSchema ().load (response .json (), many = True )
132+
133+ def get_detection_rules (
134+ self , scan_type : str , detection_rules_ids : Union [Set [str ], List [str ]]
135+ ) -> List [models .DetectionRule ]:
136+ # TODO(MarshalX): use filter by list of IDs instead of policy_type when BE will be ready
137+ params = {
138+ 'include_hidden' : False ,
139+ 'include_only_enabled_detection_rules' : True ,
140+ 'page_number' : 0 ,
141+ 'page_size' : 5000 ,
142+ 'policy_types_v2' : self ._get_policy_type_by_scan_type (scan_type ),
143+ }
144+ response = self .scan_cycode_client .get (
145+ url_path = self .get_detection_rules_path (),
146+ params = params ,
147+ hide_response_content_log = self ._hide_response_log ,
148+ )
149+
150+ # we are filtering rules by ids in-place for smooth migration when backend will be ready
151+ return self ._filter_detection_rules_by_ids (self .parse_detection_rules_response (response ), detection_rules_ids )
152+
98153 def get_scan_detections_path (self ) -> str :
99154 return f'{ self .scan_config .get_detections_prefix ()} /{ self .DETECTIONS_SERVICE_CONTROLLER_PATH } '
100155
0 commit comments