-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathfix_commits_collector.py
More file actions
143 lines (117 loc) · 4.36 KB
/
fix_commits_collector.py
File metadata and controls
143 lines (117 loc) · 4.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import hashlib
import json
import re
import shutil
import sys
import tempfile
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from aboutcode.pipeline import BasePipeline, LoopProgress
from git import Repo
from packageurl.contrib.url2purl import url2purl
class CollectVCSFixCommitPipeline(BasePipeline):
"""
Pipeline to collect fix commits from any git repository.
"""
vcs_url: str
patterns: list[str] = [
r"\bCVE-\d{4}-\d{4,19}\b",
r"GHSA-[2-9cfghjmpqrvwx]{4}-[2-9cfghjmpqrvwx]{4}-[2-9cfghjmpqrvwx]{4}",
]
def __init__(self, vcs_url: str, *args, **kwargs):
self.vcs_url = vcs_url
super().__init__(*args, **kwargs)
@classmethod
def steps(cls):
return (
cls.clone,
cls.collect_fix_commits,
cls.store_items,
cls.clean_downloads,
)
def log(self, message):
now_local = datetime.now(timezone.utc).astimezone()
timestamp = now_local.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
message = f"{timestamp} {message}"
print(message)
def clone(self):
"""Clone the repository."""
self.repo = Repo.clone_from(
url=self.vcs_url,
to_path=tempfile.mkdtemp(),
bare=True,
no_checkout=True,
multi_options=["--filter=blob:none"],
)
def extract_vulnerability_id(self, commit) -> list[str]:
"""
Extract vulnerability id from a commit message and returns a list of matched vulnerability IDs
"""
matches = []
for pattern in self.patterns:
found = re.findall(pattern, commit.message, flags=re.IGNORECASE)
matches.extend(found)
return matches
def collect_fix_commits(self):
"""
Iterate through repository commits and group them by vulnerability identifiers.
"""
self.log(
"Processing git repository fix commits (grouped by vulnerability IDs)."
)
self.collected_items = {
"vcs_url": self.vcs_url,
"vulnerabilities": defaultdict(dict),
}
for commit in self.repo.iter_commits("--all"):
matched_ids = self.extract_vulnerability_id(commit)
if not matched_ids:
continue
commit_id = commit.hexsha
commit_message = commit.message.strip()
for vuln_id in matched_ids:
vuln_id = vuln_id.upper()
self.collected_items["vulnerabilities"][vuln_id][
commit_id
] = commit_message
self.log(
f"Found {len(self.collected_items)} vulnerabilities with related commits."
)
self.log("Finished processing all commits.")
return self.collected_items
def store_items(self):
"""Storing collected fix commits for this repository"""
self.log("Storing collected fix commits")
purl = url2purl(self.vcs_url)
if not (purl and purl.name) or not self.collected_items.get("vulnerabilities"):
self.log("Nothing to store for collected fix commits")
return
vcs_url_hash = hashlib.sha256(self.vcs_url.encode("utf-8")).hexdigest()[:8]
path = Path(f"data/fix-commits/{purl.name}-{vcs_url_hash}.json")
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(self.collected_items, f, indent=2)
return
def clean_downloads(self):
"""Cleanup any temporary repository data"""
self.log("Cleaning up local repository resources")
if hasattr(self, "repo") and self.repo.working_dir:
shutil.rmtree(path=self.repo.working_dir)
if __name__ == "__main__":
with open("config/fix_commits_targets.json") as f:
vcs_urls = json.load(f)
progress = LoopProgress(
total_iterations=len(vcs_urls),
logger=print,
)
for vcs_url in progress.iter(vcs_urls):
status_code, error_msg = CollectVCSFixCommitPipeline(vcs_url=vcs_url).execute()
print(error_msg)
sys.exit(0)