-
Notifications
You must be signed in to change notification settings - Fork 62
Expand file tree
/
Copy pathconfig_file_manager.py
More file actions
120 lines (88 loc) · 5.5 KB
/
config_file_manager.py
File metadata and controls
120 lines (88 loc) · 5.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import os
from collections.abc import Hashable
from typing import TYPE_CHECKING, Any, Optional, Union
from cycode.cli.consts import CYCODE_CONFIGURATION_DIRECTORY
from cycode.cli.user_settings.base_file_manager import BaseFileManager
if TYPE_CHECKING:
from pathlib import Path
class ConfigFileManager(BaseFileManager):
CYCODE_HIDDEN_DIRECTORY: str = CYCODE_CONFIGURATION_DIRECTORY
FILE_NAME: str = 'config.yaml'
ENVIRONMENT_SECTION_NAME: str = 'environment'
EXCLUSIONS_SECTION_NAME: str = 'exclusions'
SCAN_SECTION_NAME: str = 'scan'
INSTALLATION_ID_FIELD_NAME: str = 'installation_id'
LAST_REPORTED_ACTIVATION_VERSIONS_FIELD_NAME: str = 'last_reported_activation_versions'
API_URL_FIELD_NAME: str = 'cycode_api_url'
APP_URL_FIELD_NAME: str = 'cycode_app_url'
VERBOSE_FIELD_NAME: str = 'verbose'
MAX_COMMITS_FIELD_NAME: str = 'max_commits'
COMMAND_TIMEOUT_FIELD_NAME: str = 'command_timeout'
EXCLUDE_DETECTIONS_IN_DELETED_LINES: str = 'exclude_detections_in_deleted_lines'
def __init__(self, path: Union['Path', str]) -> None:
self.path = path
def get_api_url(self) -> Optional[Any]:
return self._get_value_from_environment_section(self.API_URL_FIELD_NAME)
def get_app_url(self) -> Optional[Any]:
return self._get_value_from_environment_section(self.APP_URL_FIELD_NAME)
def get_verbose_flag(self) -> Optional[Any]:
return self._get_value_from_environment_section(self.VERBOSE_FIELD_NAME)
def get_exclusions_by_scan_type(self, scan_type: str) -> dict[Hashable, Any]:
exclusions_section = self._get_section(self.EXCLUSIONS_SECTION_NAME)
return exclusions_section.get(scan_type, {})
def get_max_commits(self, command_scan_type: str) -> Optional[Any]:
return self._get_value_from_command_scan_type_configuration(command_scan_type, self.MAX_COMMITS_FIELD_NAME)
def get_command_timeout(self, command_scan_type: str) -> Optional[Any]:
return self._get_value_from_command_scan_type_configuration(command_scan_type, self.COMMAND_TIMEOUT_FIELD_NAME)
def get_exclude_detections_in_deleted_lines(self, command_scan_type: str) -> Optional[Any]:
return self._get_value_from_command_scan_type_configuration(
command_scan_type, self.EXCLUDE_DETECTIONS_IN_DELETED_LINES
)
def update_api_base_url(self, api_url: str) -> None:
update_data = {self.ENVIRONMENT_SECTION_NAME: {self.API_URL_FIELD_NAME: api_url}}
self.write_content_to_file(update_data)
def update_app_base_url(self, app_url: str) -> None:
update_data = {self.ENVIRONMENT_SECTION_NAME: {self.APP_URL_FIELD_NAME: app_url}}
self.write_content_to_file(update_data)
def get_installation_id(self) -> Optional[str]:
return self._get_value_from_environment_section(self.INSTALLATION_ID_FIELD_NAME)
def update_installation_id(self, installation_id: str) -> None:
update_data = {self.ENVIRONMENT_SECTION_NAME: {self.INSTALLATION_ID_FIELD_NAME: installation_id}}
self.write_content_to_file(update_data)
def get_last_reported_activation_versions(self) -> dict[str, str]:
value = self._get_value_from_environment_section(self.LAST_REPORTED_ACTIVATION_VERSIONS_FIELD_NAME)
return value if isinstance(value, dict) else {}
def update_last_reported_activation_version(self, client: str, version: str) -> None:
versions = self.get_last_reported_activation_versions()
versions[client] = version
update_data = {self.ENVIRONMENT_SECTION_NAME: {self.LAST_REPORTED_ACTIVATION_VERSIONS_FIELD_NAME: versions}}
self.write_content_to_file(update_data)
def add_exclusion(self, scan_type: str, exclusion_type: str, new_exclusion: str) -> None:
exclusions = self._get_exclusions_by_exclusion_type(scan_type, exclusion_type)
if new_exclusion in exclusions:
return
exclusions.append(new_exclusion)
update_data = {self.EXCLUSIONS_SECTION_NAME: {scan_type: {exclusion_type: exclusions}}}
self.write_content_to_file(update_data)
def get_config_directory_path(self) -> str:
return os.path.join(self.path, self.CYCODE_HIDDEN_DIRECTORY)
def get_filename(self) -> str:
return os.path.join(self.get_config_directory_path(), self.FILE_NAME)
@staticmethod
def get_config_file_route() -> str:
return os.path.join(ConfigFileManager.CYCODE_HIDDEN_DIRECTORY, ConfigFileManager.FILE_NAME)
def _get_exclusions_by_exclusion_type(self, scan_type: str, exclusion_type: str) -> list[Any]:
scan_type_exclusions = self.get_exclusions_by_scan_type(scan_type)
return scan_type_exclusions.get(exclusion_type, [])
def _get_value_from_environment_section(self, field_name: str) -> Optional[Any]:
environment_section = self._get_section(self.ENVIRONMENT_SECTION_NAME)
return environment_section.get(field_name)
def _get_scan_configuration_by_scan_type(self, command_scan_type: str) -> dict[Hashable, Any]:
scan_section = self._get_section(self.SCAN_SECTION_NAME)
return scan_section.get(command_scan_type, {})
def _get_value_from_command_scan_type_configuration(self, command_scan_type: str, field_name: str) -> Optional[Any]:
command_scan_type_configuration = self._get_scan_configuration_by_scan_type(command_scan_type)
return command_scan_type_configuration.get(field_name)
def _get_section(self, section_name: str) -> dict[Hashable, Any]:
file_content = self.read_file()
return file_content.get(section_name, {})