-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathsource_analyzer.py
More file actions
234 lines (196 loc) · 9.8 KB
/
source_analyzer.py
File metadata and controls
234 lines (196 loc) · 9.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
from contextlib import nullcontext
from pathlib import Path
from typing import Optional
from api.entities.entity import Entity
from api.entities.file import File
from ..graph import Graph
from .analyzer import AbstractAnalyzer
# from .c.analyzer import CAnalyzer
from .csharp.analyzer import CSharpAnalyzer
from .java.analyzer import JavaAnalyzer
from .javascript.analyzer import JavaScriptAnalyzer
from .kotlin.analyzer import KotlinAnalyzer
from .python.analyzer import PythonAnalyzer
from multilspy import SyncLanguageServer
from multilspy.multilspy_config import MultilspyConfig
from multilspy.multilspy_logger import MultilspyLogger
import logging
# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(filename)s - %(asctime)s - %(levelname)s - %(message)s')
# List of available analyzers
analyzers: dict[str, AbstractAnalyzer] = {
# '.c': CAnalyzer(),
# '.h': CAnalyzer(),
'.py': PythonAnalyzer(),
'.java': JavaAnalyzer(),
'.cs': CSharpAnalyzer(),
'.js': JavaScriptAnalyzer(),
'.kt': KotlinAnalyzer(),
'.kts': KotlinAnalyzer()}
class NullLanguageServer:
def start_server(self):
return nullcontext()
class SourceAnalyzer():
def __init__(self) -> None:
self.files: dict[Path, File] = {}
def supported_types(self) -> list[str]:
"""
"""
return list(analyzers.keys())
def create_entity_hierarchy(self, entity: Entity, file: File, analyzer: AbstractAnalyzer, graph: Graph):
types = analyzer.get_entity_types()
stack = list(entity.node.children)
while stack:
node = stack.pop()
if node.type in types:
child = Entity(node)
child.id = graph.add_entity(analyzer.get_entity_label(node), analyzer.get_entity_name(node), analyzer.get_entity_docstring(node), str(file.path), node.start_point.row, node.end_point.row, {})
if not analyzer.is_dependency(str(file.path)):
analyzer.add_symbols(child)
file.add_entity(child)
entity.add_child(child)
graph.connect_entities("DEFINES", entity.id, child.id)
self.create_entity_hierarchy(child, file, analyzer, graph)
else:
stack.extend(node.children)
def create_hierarchy(self, file: File, analyzer: AbstractAnalyzer, graph: Graph):
types = analyzer.get_entity_types()
stack = [file.tree.root_node]
while stack:
node = stack.pop()
if node.type in types:
entity = Entity(node)
entity.id = graph.add_entity(analyzer.get_entity_label(node), analyzer.get_entity_name(node), analyzer.get_entity_docstring(node), str(file.path), node.start_point.row, node.end_point.row, {})
if not analyzer.is_dependency(str(file.path)):
analyzer.add_symbols(entity)
file.add_entity(entity)
graph.connect_entities("DEFINES", file.id, entity.id)
self.create_entity_hierarchy(entity, file, analyzer, graph)
else:
stack.extend(node.children)
def first_pass(self, path: Path, files: list[Path], ignore: list[str], graph: Graph) -> None:
"""
Perform the first pass analysis on source files in the given directory tree.
Args:
ignore (list(str)): List of paths to ignore
executor (concurrent.futures.Executor): The executor to run tasks concurrently.
"""
supoorted_types = self.supported_types()
for ext in set([file.suffix for file in files if file.suffix in supoorted_types]):
analyzers[ext].add_dependencies(path, files)
files_len = len(files)
for i, file_path in enumerate(files):
# Skip none supported files
if file_path.suffix not in analyzers:
logging.info(f"Skipping none supported file {file_path}")
continue
# Skip ignored files
if any([i in str(file_path) for i in ignore]):
logging.info(f"Skipping ignored file {file_path}")
continue
logging.info(f'Processing file ({i + 1}/{files_len}): {file_path}')
analyzer = analyzers[file_path.suffix]
# Parse file
source_code = file_path.read_bytes()
tree = analyzer.parser.parse(source_code)
# Create file entity
file = File(file_path, tree)
self.files[file_path] = file
# Walk thought the AST
graph.add_file(file)
self.create_hierarchy(file, analyzer, graph)
def second_pass(self, graph: Graph, files: list[Path], path: Path) -> None:
"""
Recursively analyze the contents of a directory.
Args:
base (str): The base directory for analysis.
root (str): The current directory being analyzed.
executor (concurrent.futures.Executor): The executor to run tasks concurrently.
"""
logger = MultilspyLogger()
logger.logger.setLevel(logging.ERROR)
lsps = {}
if any(path.rglob('*.java')):
config = MultilspyConfig.from_dict({"code_language": "java"})
lsps[".java"] = SyncLanguageServer.create(config, logger, str(path))
else:
lsps[".java"] = NullLanguageServer()
if any(path.rglob('*.py')):
config = MultilspyConfig.from_dict({"code_language": "python", "environment_path": f"{path}/venv"})
lsps[".py"] = SyncLanguageServer.create(config, logger, str(path))
else:
lsps[".py"] = NullLanguageServer()
if any(path.rglob('*.cs')):
config = MultilspyConfig.from_dict({"code_language": "csharp"})
lsps[".cs"] = SyncLanguageServer.create(config, logger, str(path))
else:
lsps[".cs"] = NullLanguageServer()
# For now, use NullLanguageServer for Kotlin as kotlin-language-server setup is not yet integrated
lsps[".kt"] = NullLanguageServer()
lsps[".kts"] = NullLanguageServer()
lsps[".js"] = NullLanguageServer()
with lsps[".java"].start_server(), lsps[".py"].start_server(), lsps[".cs"].start_server(), lsps[".js"].start_server(), lsps[".kt"].start_server(), lsps[".kts"].start_server():
files_len = len(self.files)
for i, file_path in enumerate(files):
if file_path not in self.files:
continue
# Skip symbol resolution when no real LSP is available
if isinstance(lsps.get(file_path.suffix), NullLanguageServer):
continue
file = self.files[file_path]
logging.info(f'Processing file ({i + 1}/{files_len}): {file_path}')
for _, entity in file.entities.items():
entity.resolved_symbol(lambda key, symbol, fp=file_path: analyzers[fp.suffix].resolve_symbol(self.files, lsps[fp.suffix], fp, path, key, symbol))
for key, resolved_set in entity.resolved_symbols.items():
for resolved in resolved_set:
if key == "base_class":
graph.connect_entities("EXTENDS", entity.id, resolved.id)
elif key == "implement_interface":
graph.connect_entities("IMPLEMENTS", entity.id, resolved.id)
elif key == "extend_interface":
graph.connect_entities("EXTENDS", entity.id, resolved.id)
elif key == "call":
graph.connect_entities("CALLS", entity.id, resolved.id)
elif key == "return_type":
graph.connect_entities("RETURNS", entity.id, resolved.id)
elif key == "parameters":
graph.connect_entities("PARAMETERS", entity.id, resolved.id)
def analyze_files(self, files: list[Path], path: Path, graph: Graph) -> None:
self.first_pass(path, files, [], graph)
self.second_pass(graph, files, path)
def analyze_sources(self, path: Path, ignore: list[str], graph: Graph) -> None:
path = path.resolve()
files = list(path.rglob("*.java")) + list(path.rglob("*.py")) + list(path.rglob("*.cs")) + [f for f in path.rglob("*.js") if "node_modules" not in f.parts] + list(path.rglob("*.kt")) + list(path.rglob("*.kts"))
# First pass analysis of the source code
self.first_pass(path, files, ignore, graph)
# Second pass analysis of the source code
self.second_pass(graph, files, path)
def analyze_local_folder(self, path: str, g: Graph, ignore: Optional[list[str]] = []) -> None:
"""
Analyze path.
Args:
path (str): Path to a local folder containing source files to process
ignore (List(str)): List of paths to skip
"""
logging.info(f"Analyzing local folder {path}")
# Analyze source files
self.analyze_sources(Path(path), ignore, g)
logging.info("Done analyzing path")
def analyze_local_repository(self, path: str, ignore: Optional[list[str]] = None) -> Graph:
"""
Analyze a local Git repository.
Args:
path (str): Path to a local git repository
ignore (List(str)): List of paths to skip
"""
if ignore is None:
ignore = []
from pygit2.repository import Repository
proj_name = Path(path).name
graph = Graph(proj_name)
self.analyze_local_folder(path, graph, ignore)
# Save processed commit hash to the DB
repo = Repository(path)
current_commit = repo.walk(repo.head.target).__next__()
graph.set_graph_commit(current_commit.short_id)
return graph