-
-
Notifications
You must be signed in to change notification settings - Fork 304
Expand file tree
/
Copy pathexport.py
More file actions
206 lines (173 loc) · 7.39 KB
/
export.py
File metadata and controls
206 lines (173 loc) · 7.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import itertools
import logging
from itertools import groupby
from pathlib import Path
from timeit import default_timer as timer
from traceback import format_exc as traceback_format_exc
import saneyaml
from aboutcode.pipeline import LoopProgress
from aboutcode.pipeline import humanize_time
from django.core.management.base import BaseCommand
from django.core.management.base import CommandError
from packageurl import PackageURL
from aboutcode import hashid
from vulnerabilities.models import Package
logger = logging.getLogger(__name__)
def serialize_severity(sev):
return {
"score": sev.value,
"scoring_system": sev.scoring_system,
"scoring_elements": sev.scoring_elements,
"published_at": str(sev.published_at),
"url": sev.url,
}
def serialize_vulnerability(vuln):
"""
Return a plain data mapping seralized from ``vuln`` Vulnerability instance.
"""
aliases = list(vuln.aliases.values_list("alias", flat=True))
severities = [serialize_severity(sev) for sev in vuln.severities.all()]
weaknesses = [wkns.cwe for wkns in vuln.weaknesses.all()]
references = list(
vuln.references.values(
"url",
"reference_type",
"reference_id",
)
)
return {
"vulnerability_id": vuln.vcid,
"aliases": aliases,
"summary": vuln.summary,
"severities": severities,
"weaknesses": weaknesses,
"references": references,
}
class Command(BaseCommand):
help = """Export vulnerability and package data as YAML for use in FederatedCode
This command exports the data in a tree of directories and YAML files designed such that
it is possible to access directly a vulnerability data file by only knowing its VCID, and that
it is possible to access directly the package data files by only knowing its PURL.
"""
def add_arguments(self, parser):
parser.add_argument(
"path",
help="Path to a directory where to export data.",
)
def handle(self, *args, **options):
if path := options["path"]:
base_path = Path(path)
if not path or not base_path.is_dir():
raise CommandError("Enter a valid directory path")
self.stdout.write("Exporting vulnerablecode Package and Vulnerability data.")
self.export_data(base_path)
self.stdout.write(self.style.SUCCESS(f"Successfully exported data to {base_path}."))
def export_data(self, base_path: Path):
"""
Export vulnerablecode data to ``base_path``.`
"""
i = 0
seen_vcid = set()
export_start_time = timer()
distinct_packages_count = (
Package.objects.values("type", "namespace", "name")
.distinct("type", "namespace", "name")
.count()
)
progress = LoopProgress(
total_iterations=distinct_packages_count,
progress_step=1,
logger=self.stdout.write,
)
for i, (purl_without_version, package_versions) in enumerate(
progress.iter(packages_by_type_ns_name()), 1
):
pkg_version = None
try:
package_urls = []
package_vulnerabilities = []
for pkg_version in package_versions:
purl = pkg_version.package_url
package_urls.append(purl)
package_data = {
"purl": purl,
"affected_by_vulnerabilities": list(
pkg_version.affected_by.values_list("vulnerability_id", flat=True)
),
"fixing_vulnerabilities": list(
pkg_version.fixing.values_list("vulnerability_id", flat=True)
),
}
package_vulnerabilities.append(package_data)
vulnerabilities = itertools.chain(
pkg_version.affected_by_vulnerabilities.all(),
pkg_version.fixing_vulnerabilities.all(),
)
for vuln in vulnerabilities:
vcid = vuln.vulnerability_id
# do not write twice the same file
if vcid in seen_vcid:
continue
seen_vcid.add(vcid)
vulnerability = serialize_vulnerability(vuln)
vpath = hashid.get_vcid_yml_file_path(vcid)
write_file(base_path=base_path, file_path=vpath, data=vulnerability)
if (lv := len(seen_vcid)) % 100 == 0:
self.stdout.write(f"Processed {lv} vulnerabilities. Last VCID: {vcid}")
ppath = hashid.get_package_purls_yml_file_path(purl)
write_file(base_path=base_path, file_path=ppath, data=package_urls)
pvpath = hashid.get_package_vulnerabilities_yml_file_path(purl)
write_file(base_path=base_path, file_path=pvpath, data=package_vulnerabilities)
if i % 100 == 0:
self.stdout.write(f"Processed {i} package. Last PURL: {purl_without_version}")
except Exception as e:
self.stdout.write(
self.style.ERROR(
f"Failed to process Package {pkg_version}: {e!r} \n {traceback_format_exc()}"
)
)
self.stdout.write(f"Exported data for: {i} package and {len(seen_vcid)} vulnerabilities.")
export_run_time = timer() - export_start_time
self.stdout.write(f"Export completed in {humanize_time(export_run_time)}")
def by_purl_type_ns_name(package):
"""
Key function to sort packages by type, namespace and name
"""
return package.type, package.namespace, package.name
def packages_by_type_ns_name():
"""
Return a two-level iterator over all Packages grouped-by package, ignoring version.
"""
qs = (
Package.objects.order_by("type", "namespace", "name", "version")
.prefetch_related(
"affected_by_vulnerabilities",
"affected_by_vulnerabilities__references",
"affected_by_vulnerabilities__weaknesses",
"affected_by_vulnerabilities__severities",
"fixing_vulnerabilities",
"fixing_vulnerabilities__references",
"fixing_vulnerabilities__weaknesses",
"fixing_vulnerabilities__severities",
)
.iterator()
)
for tp_ns_name, packages in groupby(qs, key=by_purl_type_ns_name):
yield PackageURL(*tp_ns_name), packages
def write_file(base_path: Path, file_path: Path, data: dict):
"""
Write the ``data`` as YAML to the ``file_path`` in the ``base_path`` root directory.
Create directories in the path as needed.
"""
write_to = base_path / file_path
write_to.parent.mkdir(parents=True, exist_ok=True)
with open(write_to, encoding="utf-8", mode="w") as f:
f.write(saneyaml.dump(data))