Skip to content

Commit 8209c0a

Browse files
committed
fix(parser): support V3_FEATURE_LOCATIONS in Qualys VMDR parser
Replace direct Endpoint() instantiation with the new LocationData API when V3_FEATURE_LOCATIONS is enabled, falling back to Endpoint for the legacy code path. Tests pass under both flag states. Authored by T. Walker - DefectDojo
1 parent 4d649f9 commit 8209c0a

File tree

4 files changed

+45
-20
lines changed

4 files changed

+45
-20
lines changed

dojo/tools/qualys_vmdr/cve_parser.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from django.conf import settings
2+
13
from dojo.models import Finding
24
from dojo.tools.qualys_vmdr.helpers import (
35
build_description_cve,
@@ -6,6 +8,7 @@
68
map_qualys_severity,
79
parse_cvss_score,
810
parse_endpoints,
11+
parse_locations,
912
parse_qualys_csv_content,
1013
parse_qualys_date,
1114
parse_tags,
@@ -54,10 +57,17 @@ def _create_finding(self, row):
5457
if cvss_score is not None:
5558
finding.cvssv3_score = cvss_score
5659

57-
finding.unsaved_endpoints = parse_endpoints(
58-
row.get("Asset IPV4", ""),
59-
row.get("Asset IPV6", ""),
60-
)
60+
if settings.V3_FEATURE_LOCATIONS:
61+
finding.unsaved_locations = parse_locations(
62+
row.get("Asset IPV4", ""),
63+
row.get("Asset IPV6", ""),
64+
)
65+
else:
66+
# TODO: Delete this after the move to Locations
67+
finding.unsaved_endpoints = parse_endpoints(
68+
row.get("Asset IPV4", ""),
69+
row.get("Asset IPV6", ""),
70+
)
6171
finding.unsaved_tags = parse_tags(row.get("Asset Tags", ""))
6272

6373
if not is_qualys_null(cve):

dojo/tools/qualys_vmdr/helpers.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from dateutil import parser as dateutil_parser
66

77
from dojo.models import Endpoint
8+
from dojo.tools.locations import LocationData
89

910
SEVERITY_MAPPING = {
1011
"1": "Info",
@@ -143,16 +144,21 @@ def build_description_cve(row):
143144
return "\n\n".join(parts) if parts else "No details available."
144145

145146

147+
def _extract_hosts(ipv4_field, ipv6_field):
148+
if ipv4_field and ipv4_field.strip():
149+
return [ip.strip() for ip in ipv4_field.split(",") if ip.strip()]
150+
if ipv6_field and ipv6_field.strip():
151+
return [ipv6_field.strip()]
152+
return []
153+
154+
155+
# TODO: Delete this after the move to Locations
146156
def parse_endpoints(ipv4_field, ipv6_field):
147-
endpoints = []
157+
return [Endpoint(host=host) for host in _extract_hosts(ipv4_field, ipv6_field)]
148158

149-
if ipv4_field and ipv4_field.strip():
150-
ips = [ip.strip() for ip in ipv4_field.split(",") if ip.strip()]
151-
endpoints.extend(Endpoint(host=ip) for ip in ips)
152-
elif ipv6_field and ipv6_field.strip():
153-
endpoints.append(Endpoint(host=ipv6_field.strip()))
154159

155-
return endpoints
160+
def parse_locations(ipv4_field, ipv6_field):
161+
return [LocationData.url(host=host) for host in _extract_hosts(ipv4_field, ipv6_field)]
156162

157163

158164
def parse_tags(tags_field):

dojo/tools/qualys_vmdr/qid_parser.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
from django.conf import settings
2+
13
from dojo.models import Finding
24
from dojo.tools.qualys_vmdr.helpers import (
35
build_description_qid,
46
build_severity_justification,
57
map_qualys_severity,
68
parse_endpoints,
9+
parse_locations,
710
parse_qualys_csv_content,
811
parse_qualys_date,
912
parse_tags,
@@ -45,10 +48,17 @@ def _create_finding(self, row):
4548
dynamic_finding=False,
4649
)
4750

48-
finding.unsaved_endpoints = parse_endpoints(
49-
row.get("Asset IPV4", ""),
50-
row.get("Asset IPV6", ""),
51-
)
51+
if settings.V3_FEATURE_LOCATIONS:
52+
finding.unsaved_locations = parse_locations(
53+
row.get("Asset IPV4", ""),
54+
row.get("Asset IPV6", ""),
55+
)
56+
else:
57+
# TODO: Delete this after the move to Locations
58+
finding.unsaved_endpoints = parse_endpoints(
59+
row.get("Asset IPV4", ""),
60+
row.get("Asset IPV6", ""),
61+
)
5262
finding.unsaved_tags = parse_tags(row.get("Asset Tags", ""))
5363

5464
return finding

unittests/tools/test_qualys_vmdr_parser.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def test_qid_endpoints_single_ip(self):
137137
) as testfile:
138138
parser = QualysVMDRParser()
139139
findings = parser.get_findings(testfile, Test())
140-
endpoints = findings[0].unsaved_endpoints
140+
endpoints = self.get_unsaved_locations(findings[0])
141141
self.assertEqual(1, len(endpoints))
142142
self.assertEqual("10.0.0.1", endpoints[0].host)
143143

@@ -148,7 +148,7 @@ def test_qid_endpoints_multiple_ips(self):
148148
parser = QualysVMDRParser()
149149
findings = parser.get_findings(testfile, Test())
150150
multi_ip_finding = [f for f in findings if f.unique_id_from_tool == "100003"][0]
151-
endpoints = multi_ip_finding.unsaved_endpoints
151+
endpoints = self.get_unsaved_locations(multi_ip_finding)
152152
self.assertEqual(2, len(endpoints))
153153
hosts = [e.host for e in endpoints]
154154
self.assertIn("10.0.0.20", hosts)
@@ -161,7 +161,7 @@ def test_qid_endpoints_ipv6_fallback(self):
161161
parser = QualysVMDRParser()
162162
findings = parser.get_findings(testfile, Test())
163163
ipv6_finding = [f for f in findings if f.unique_id_from_tool == "100005"][0]
164-
endpoints = ipv6_finding.unsaved_endpoints
164+
endpoints = self.get_unsaved_locations(ipv6_finding)
165165
self.assertEqual(1, len(endpoints))
166166
self.assertEqual("2001:db8::1", endpoints[0].host)
167167

@@ -288,5 +288,4 @@ def test_qid_endpoint_clean(self):
288288
) as testfile:
289289
parser = QualysVMDRParser()
290290
findings = parser.get_findings(testfile, Test())
291-
for endpoint in findings[0].unsaved_endpoints:
292-
endpoint.clean()
291+
self.validate_locations(findings)

0 commit comments

Comments
 (0)