Skip to content

Commit c062c71

Browse files
authored
Merge pull request #844 from TG1999/bulk_search_cpe
Add bulk search support for CPEs #808
2 parents 9164cf5 + 05505b5 commit c062c71

3 files changed

Lines changed: 115 additions & 0 deletions

File tree

CHANGELOG.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ Version v30.0.0
5757

5858
- Add fixed packages in vulnerabilities details in packages endpoint.
5959

60+
- Add bulk search support for CPEs.
61+
6062
Other:
6163

6264
- we dropped calver to use a plain semver.

vulnerabilities/api.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,30 @@ class CPEViewSet(viewsets.ReadOnlyModelViewSet):
303303
filter_backends = (filters.DjangoFilterBackend,)
304304
filterset_class = CPEFilterSet
305305

306+
@action(detail=False, methods=["post"])
307+
def bulk_search(self, request):
308+
"""
309+
This endpoint is used to search for vulnerabilities by more than one CPE.
310+
"""
311+
response = []
312+
cpes = request.data.get("cpes", []) or []
313+
if not cpes or not isinstance(cpes, list):
314+
return Response(
315+
status=400,
316+
data={"Error": "A non-empty 'cpe' list of package URLs is required."},
317+
)
318+
for cpe in cpes:
319+
if not cpe.startswith("cpe"):
320+
return Response(status=400, data={"Error": f"Invalid CPE: {cpe}"})
321+
vulnerabilitiesResponse = Vulnerability.objects.filter(
322+
vulnerabilityreference__reference_id__in=cpes
323+
).distinct()
324+
return Response(
325+
VulnerabilitySerializer(
326+
vulnerabilitiesResponse, many=True, context={"request": request}
327+
).data
328+
)
329+
306330

307331
class AliasFilterSet(filters.FilterSet):
308332
alias = filters.CharFilter(method="filter_alias")

vulnerabilities/tests/test_fix_api.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,3 +270,92 @@ def test_api_response(self):
270270
content_type="application/json",
271271
).json()
272272
assert len(response) == 13
273+
274+
275+
class BulkSearchAPI(TestCase):
276+
def setUp(self):
277+
self.exclusive_cpes = [
278+
"cpe:/a:nginx:1.0.7",
279+
"cpe:/a:nginx:1.0.15",
280+
"cpe:/a:nginx:1.14.1",
281+
"cpe:/a:nginx:1.15.5",
282+
"cpe:/a:nginx:1.15.6",
283+
]
284+
vuln = Vulnerability.objects.create(summary="test")
285+
for cpe in self.exclusive_cpes:
286+
ref = VulnerabilityReference.objects.create(reference_id=cpe)
287+
VulnerabilityRelatedReference.objects.create(reference=ref, vulnerability=vuln)
288+
second_vuln = Vulnerability.objects.create(summary="test-A")
289+
self.non_exclusive_cpes = [
290+
"cpe:/a:nginx:1.16.1",
291+
"cpe:/a:nginx:1.17.2",
292+
"cpe:/a:nginx:1.17.3",
293+
"cpe:/a:nginx:1.9.5",
294+
"cpe:/a:nginx:1.20.1",
295+
"cpe:/a:nginx:1.20.0",
296+
"cpe:/a:nginx:1.21.0",
297+
]
298+
third_vuln = Vulnerability.objects.create(summary="test-B")
299+
for cpe in self.non_exclusive_cpes:
300+
ref = VulnerabilityReference.objects.create(reference_id=cpe)
301+
VulnerabilityRelatedReference.objects.create(reference=ref, vulnerability=second_vuln)
302+
VulnerabilityRelatedReference.objects.create(reference=ref, vulnerability=third_vuln)
303+
304+
def test_api_response_with_with_exclusive_cpes_associated_with_two_vulnerabilities(self):
305+
request_body = {
306+
"cpes": self.exclusive_cpes,
307+
}
308+
response = self.client.post(
309+
"/api/cpes/bulk_search",
310+
data=request_body,
311+
content_type="application/json",
312+
).json()
313+
assert len(response) == 1
314+
assert response[0]["summary"] == "test"
315+
references_in_vuln = response[0]["references"]
316+
cpes = [ref["reference_id"] for ref in references_in_vuln]
317+
assert set(cpes) == set(self.exclusive_cpes)
318+
319+
def test_api_response_with_no_cpe_associated(self):
320+
request_body = {
321+
"cpes": ["cpe:/a:nginx:1.10.7"],
322+
}
323+
response = self.client.post(
324+
"/api/cpes/bulk_search",
325+
data=request_body,
326+
content_type="application/json",
327+
).json()
328+
assert len(response) == 0
329+
330+
def test_api_response_with_with_non_exclusive_cpes_associated_with_two_vulnerabilities(self):
331+
request_body = {
332+
"cpes": self.non_exclusive_cpes,
333+
}
334+
response = self.client.post(
335+
"/api/cpes/bulk_search",
336+
data=request_body,
337+
content_type="application/json",
338+
).json()
339+
assert len(response) == 2
340+
341+
def test_with_empty_list(self):
342+
request_body = {
343+
"cpes": [],
344+
}
345+
response = self.client.post(
346+
"/api/cpes/bulk_search",
347+
data=request_body,
348+
content_type="application/json",
349+
).json()
350+
assert response == {"Error": "A non-empty 'cpe' list of package URLs is required."}
351+
352+
def test_with_invalid_cpes(self):
353+
request_body = {
354+
"cpes": ["CVE-2022-2022"],
355+
}
356+
response = self.client.post(
357+
"/api/cpes/bulk_search",
358+
data=request_body,
359+
content_type="application/json",
360+
).json()
361+
assert response == {"Error": "Invalid CPE: CVE-2022-2022"}

0 commit comments

Comments
 (0)