forked from cycodehq/cycode-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbase_restore_dependencies.py
More file actions
164 lines (131 loc) · 5.89 KB
/
base_restore_dependencies.py
File metadata and controls
164 lines (131 loc) · 5.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional
import typer
from cycode.cli.models import Document
from cycode.cli.utils.path_utils import get_file_content, get_file_dir, get_path_from_context, join_paths
from cycode.cli.utils.shell_executor import shell
from cycode.logger import get_logger
logger = get_logger('SCA Restore')
def build_dep_tree_path(path: str, generated_file_name: str) -> str:
return join_paths(get_file_dir(path), generated_file_name)
def execute_commands(
commands: list[list[str]],
timeout: int,
output_file_path: Optional[str] = None,
working_directory: Optional[str] = None,
) -> Optional[str]:
logger.debug(
'Executing restore commands, %s',
{
'commands_count': len(commands),
'timeout_sec': timeout,
'working_directory': working_directory,
'output_file_path': output_file_path,
},
)
if not commands:
return None
try:
outputs = []
for command in commands:
command_output = shell(command=command, timeout=timeout, working_directory=working_directory)
if command_output:
outputs.append(command_output)
joined_output = '\n'.join(outputs)
if output_file_path:
with open(output_file_path, 'w', encoding='UTF-8') as output_file:
output_file.writelines(joined_output)
except Exception as e:
logger.debug('Unexpected error during command execution', exc_info=e)
return None
return joined_output
class BaseRestoreDependencies(ABC):
def __init__(
self, ctx: typer.Context, is_git_diff: bool, command_timeout: int, create_output_file_manually: bool = False
) -> None:
self.ctx = ctx
self.is_git_diff = is_git_diff
self.command_timeout = command_timeout
self.create_output_file_manually = create_output_file_manually
def restore(self, document: Document) -> Optional[Document]:
return self.try_restore_dependencies(document)
def get_manifest_file_path(self, document: Document) -> str:
return (
join_paths(get_path_from_context(self.ctx), document.path) if self.ctx.obj.get('monitor') else document.path
)
def try_restore_dependencies(self, document: Document) -> Optional[Document]:
manifest_file_path = self.get_manifest_file_path(document)
restore_file_paths = [
build_dep_tree_path(document.absolute_path, restore_file_path_item)
for restore_file_path_item in self.get_lock_file_names()
]
restore_file_path = self.get_any_restore_file_already_exist(document, restore_file_paths)
relative_restore_file_path = build_dep_tree_path(
document.path, self.get_restored_lock_file_name(restore_file_path)
)
if not self.verify_restore_file_already_exist(restore_file_path):
output = execute_commands(
commands=self.get_commands(manifest_file_path),
timeout=self.command_timeout,
output_file_path=restore_file_path if self.create_output_file_manually else None,
working_directory=self.get_working_directory(document),
)
if output is None: # one of the commands failed
return None
else:
logger.debug(
'Lock file already exists, skipping restore commands, %s',
{'restore_file_path': restore_file_path},
)
restore_file_content = get_file_content(restore_file_path)
logger.debug(
'Restore file loaded, %s',
{
'restore_file_path': restore_file_path,
'content_size': len(restore_file_content) if restore_file_content else 0,
'content_empty': not restore_file_content,
},
)
return Document(relative_restore_file_path, restore_file_content, self.is_git_diff)
def get_manifest_dir(self, document: Document) -> Optional[str]:
"""Return the directory containing the manifest file, resolving monitor-mode paths.
Uses the same path resolution as get_manifest_file_path() to ensure consistency.
Falls back to document.absolute_path when the resolved manifest path is ambiguous.
"""
manifest_file_path = self.get_manifest_file_path(document)
if manifest_file_path:
parent = Path(manifest_file_path).parent
# Skip '.' (no parent) and filesystem root (its own parent)
if parent != Path('.') and parent != parent.parent:
return str(parent)
base = document.absolute_path or document.path
if base:
parent = Path(base).parent
if parent != Path('.') and parent != parent.parent:
return str(parent)
return None
def get_working_directory(self, document: Document) -> Optional[str]:
return str(Path(document.absolute_path).parent)
def get_restored_lock_file_name(self, restore_file_path: str) -> str:
return self.get_lock_file_name()
def get_any_restore_file_already_exist(self, document: Document, restore_file_paths: list[str]) -> str:
for restore_file_path in restore_file_paths:
if Path(restore_file_path).is_file():
return restore_file_path
return build_dep_tree_path(document.absolute_path, self.get_lock_file_name())
@staticmethod
def verify_restore_file_already_exist(restore_file_path: str) -> bool:
return Path(restore_file_path).is_file()
@abstractmethod
def is_project(self, document: Document) -> bool:
pass
@abstractmethod
def get_commands(self, manifest_file_path: str) -> list[list[str]]:
pass
@abstractmethod
def get_lock_file_name(self) -> str:
pass
@abstractmethod
def get_lock_file_names(self) -> list[str]:
pass