|
9 | 9 |
|
10 | 10 | import re |
11 | 11 | import xml.etree.ElementTree as ET |
| 12 | +from collections import defaultdict |
12 | 13 | from pathlib import Path |
13 | 14 | from typing import Iterable |
14 | 15 |
|
@@ -81,20 +82,23 @@ def process_file(self, file): |
81 | 82 |
|
82 | 83 | if child.tag == "affected": |
83 | 84 | affected_packages = [] |
84 | | - seen_packages = set() |
85 | | - |
86 | | - for purl, constraint in get_affected_and_safe_purls(child, logger=self.log): |
87 | | - signature = (purl.to_string(), str(constraint)) |
88 | | - |
89 | | - if signature not in seen_packages: |
90 | | - seen_packages.add(signature) |
91 | | - |
92 | | - affected_package = AffectedPackageV2( |
93 | | - package=purl, |
94 | | - affected_version_range=EbuildVersionRange(constraints=[constraint]), |
95 | | - fixed_version_range=None, |
96 | | - ) |
97 | | - affected_packages.append(affected_package) |
| 85 | + for purl, (affected_ranges, fixed_ranges) in get_affected_and_fixed_purls( |
| 86 | + child, logger=self.log |
| 87 | + ): |
| 88 | + affected_version_constraint = build_constraints( |
| 89 | + affected_ranges, logger=self.log |
| 90 | + ) |
| 91 | + fixed_version_constraint = build_constraints(fixed_ranges, logger=self.log) |
| 92 | + affected_version_range = EbuildVersionRange( |
| 93 | + constraints=affected_version_constraint |
| 94 | + ) |
| 95 | + fixed_version_range = EbuildVersionRange(constraints=fixed_version_constraint) |
| 96 | + affected_package = AffectedPackageV2( |
| 97 | + package=purl, |
| 98 | + affected_version_range=affected_version_range, |
| 99 | + fixed_version_range=fixed_version_range, |
| 100 | + ) |
| 101 | + affected_packages.append(affected_package) |
98 | 102 |
|
99 | 103 | if child.tag == "impact": |
100 | 104 | severity_value = child.attrib.get("type") |
@@ -131,61 +135,67 @@ def cves_from_reference(reference): |
131 | 135 | return cves |
132 | 136 |
|
133 | 137 |
|
134 | | -def extract_purls_and_constraints(pkg_name, pkg_ns, constraints, invert, logger): |
135 | | - for comparator, version, slot_value in constraints: |
136 | | - qualifiers = {"slot": slot_value} if slot_value else {} |
137 | | - purl = PackageURL(type="ebuild", name=pkg_name, namespace=pkg_ns, qualifiers=qualifiers) |
138 | | - |
| 138 | +def build_constraints(constraint_pairs, logger): |
| 139 | + """ |
| 140 | + Build a list of VersionConstraint objects from comparators, versions pairs. |
| 141 | + """ |
| 142 | + constraints = [] |
| 143 | + for comparator, version in constraint_pairs: |
139 | 144 | try: |
140 | 145 | constraint = VersionConstraint(version=GentooVersion(version), comparator=comparator) |
141 | | - |
142 | | - if invert: |
143 | | - constraint = constraint.invert() |
144 | | - |
145 | | - yield purl, constraint |
| 146 | + constraints.append(constraint) |
146 | 147 | except InvalidVersion as e: |
147 | 148 | logger(f"InvalidVersion constraints version: {version} error:{e}") |
| 149 | + return constraints |
| 150 | + |
148 | 151 |
|
| 152 | +def get_affected_and_fixed_purls(affected_elem, logger): |
| 153 | + """ |
| 154 | + Parses XML elements to extract PURLs associated with affected and fixed versions. |
| 155 | + """ |
149 | 156 |
|
150 | | -def get_affected_and_safe_purls(affected_elem, logger): |
151 | 157 | for pkg in affected_elem: |
152 | 158 | name = pkg.attrib.get("name") |
153 | 159 | if not name: |
154 | 160 | continue |
155 | | - pkg_ns, _, pkg_name = name.rpartition("/") |
156 | | - |
157 | | - safe_constraints, affected_constraints = get_safe_and_affected_constraints(pkg) |
158 | 161 |
|
159 | | - yield from extract_purls_and_constraints( |
160 | | - pkg_name, pkg_ns, affected_constraints, invert=False, logger=logger |
161 | | - ) |
162 | | - yield from extract_purls_and_constraints( |
163 | | - pkg_name, pkg_ns, safe_constraints, invert=True, logger=logger |
164 | | - ) |
165 | | - |
166 | | - |
167 | | -def get_safe_and_affected_constraints(pkg): |
168 | | - safe_versions = set() |
169 | | - affected_versions = set() |
170 | | - for info in pkg: |
171 | | - # All possible values of info.attrib['range'] = |
172 | | - # {'gt', 'lt', 'rle', 'rge', 'rgt', 'le', 'ge', 'eq'} |
173 | | - range_value = info.attrib.get("range") |
174 | | - slot_value = info.attrib.get("slot") |
175 | | - comparator_dict = { |
176 | | - "gt": ">", |
177 | | - "lt": "<", |
178 | | - "ge": ">=", |
179 | | - "le": "<=", |
180 | | - "eq": "=", |
181 | | - "rle": "<=", |
182 | | - "rge": ">=", |
183 | | - "rgt": ">", |
184 | | - } |
185 | | - comparator = comparator_dict.get(range_value) |
186 | | - if info.tag == "unaffected": |
187 | | - safe_versions.add((comparator, info.text, slot_value)) |
188 | | - |
189 | | - elif info.tag == "vulnerable": |
190 | | - affected_versions.add((comparator, info.text, slot_value)) |
191 | | - return safe_versions, affected_versions |
| 162 | + pkg_ns, _, pkg_name = name.rpartition("/") |
| 163 | + # purl_components, (fixed_ranges_set, affected_ranges_set) |
| 164 | + purl_ranges_map = defaultdict(lambda: {"fixed_ranges": set(), "affected_ranges": set()}) |
| 165 | + |
| 166 | + for info in pkg: |
| 167 | + # All possible values of info.attrib['range'] = |
| 168 | + # {'gt', 'lt', 'rle', 'rge', 'rgt', 'le', 'ge', 'eq'} |
| 169 | + # rge means revision greater than equals and rgt means revision greater than |
| 170 | + |
| 171 | + range_value = info.attrib.get("range") |
| 172 | + slot_value = info.attrib.get("slot") |
| 173 | + comparator_dict = { |
| 174 | + "gt": ">", |
| 175 | + "lt": "<", |
| 176 | + "ge": ">=", |
| 177 | + "le": "<=", |
| 178 | + "eq": "=", |
| 179 | + # "rle": "<=", |
| 180 | + # "rge": ">=", |
| 181 | + # "rgt": ">", |
| 182 | + } |
| 183 | + comparator = comparator_dict.get(range_value) |
| 184 | + if not comparator: |
| 185 | + logger(f"Unsupported range value {range_value}:{info.text}") |
| 186 | + continue |
| 187 | + |
| 188 | + if info.tag == "unaffected": |
| 189 | + purl_ranges_map[(pkg_name, pkg_ns, slot_value)]["fixed_ranges"].add( |
| 190 | + (comparator, info.text) |
| 191 | + ) |
| 192 | + |
| 193 | + elif info.tag == "vulnerable": |
| 194 | + purl_ranges_map[(pkg_name, pkg_ns, slot_value)]["affected_ranges"].add( |
| 195 | + (comparator, info.text) |
| 196 | + ) |
| 197 | + |
| 198 | + for (pkg_name, pkg_ns, slot_value), data in purl_ranges_map.items(): |
| 199 | + qualifiers = {"slot": slot_value} if slot_value else {} |
| 200 | + purl = PackageURL(type="ebuild", name=pkg_name, namespace=pkg_ns, qualifiers=qualifiers) |
| 201 | + yield purl, (data["affected_ranges"], data["fixed_ranges"]) |
0 commit comments