Skip to content

Commit a3a5c36

Browse files
committed
Fix code formatting with black and isort
Signed-off-by: Sampurna Pyne <sampurnapyne1710@gmail.com>
1 parent 19d4d45 commit a3a5c36

File tree

2 files changed

+41
-30
lines changed

2 files changed

+41
-30
lines changed

vulnerabilities/pipelines/v2_importers/euvd_importer.py

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@
99

1010
import json
1111
import logging
12-
import requests
1312
import time
1413
from datetime import datetime
1514
from http import HTTPStatus
1615
from typing import Iterable
1716

17+
import requests
1818
from dateutil import parser as dateparser
1919

2020
from vulnerabilities.importer import AdvisoryData
@@ -51,7 +51,7 @@ def fetch_data(self):
5151
if self._cached_data is not None:
5252
logger.info(f"Using cached data: {len(self._cached_data)} items")
5353
return self._cached_data
54-
54+
5555
headers = {"User-Agent": "VulnerableCode"}
5656
all_items = []
5757
page = 0
@@ -61,7 +61,7 @@ def fetch_data(self):
6161
logger.info(f"Fetching data from EUVD API: {self.url}")
6262

6363
while True:
64-
64+
6565
retry_count = 0
6666
success = False
6767

@@ -75,7 +75,9 @@ def fetch_data(self):
7575
retry_count += 1
7676
if retry_count < max_retries:
7777
sleep_time = min(10 * (2 ** min(retry_count - 1, 5)), 60)
78-
logger.info(f"Retrying page {page} in {sleep_time}s (attempt {retry_count}/{max_retries})")
78+
logger.info(
79+
f"Retrying page {page} in {sleep_time}s (attempt {retry_count}/{max_retries})"
80+
)
7981
time.sleep(sleep_time)
8082
continue
8183
else:
@@ -87,23 +89,29 @@ def fetch_data(self):
8789

8890
if not items:
8991
logger.info(f"No items in response for page {page}; stopping fetch.")
90-
logger.info(f"Fetch completed successfully. Total items collected: {len(all_items)}")
91-
92+
logger.info(
93+
f"Fetch completed successfully. Total items collected: {len(all_items)}"
94+
)
95+
9296
# Cache the fetched data for reuse
9397
self._cached_data = all_items
9498
logger.info(f"Cached {len(all_items)} items for reuse")
95-
99+
96100
return all_items
97101

98102
all_items.extend(items)
99-
logger.info(f"Fetched page {page}: {len(items)} items (total: {len(all_items)})")
103+
logger.info(
104+
f"Fetched page {page}: {len(items)} items (total: {len(all_items)})"
105+
)
100106
success = True
101107
page += 1
102108

103109
except requests.exceptions.Timeout as e:
104110
retry_count += 1
105111
if retry_count < max_retries:
106-
logger.warning(f"Timeout on page {page}: {e}. Retrying in 10s (attempt {retry_count}/{max_retries})")
112+
logger.warning(
113+
f"Timeout on page {page}: {e}. Retrying in 10s (attempt {retry_count}/{max_retries})"
114+
)
107115
time.sleep(10)
108116
else:
109117
logger.error(f"Max retries reached for page {page} after timeout")
@@ -112,7 +120,9 @@ def fetch_data(self):
112120
except Exception as e:
113121
retry_count += 1
114122
if retry_count < max_retries:
115-
logger.error(f"Error fetching page {page}: {e}. Retrying in 10s (attempt {retry_count}/{max_retries})")
123+
logger.error(
124+
f"Error fetching page {page}: {e}. Retrying in 10s (attempt {retry_count}/{max_retries})"
125+
)
116126
time.sleep(10)
117127
else:
118128
logger.error(f"Max retries reached for page {page}")
@@ -134,41 +144,43 @@ def collect_advisories(self) -> Iterable[AdvisoryData]:
134144

135145
def parse_advisory(self, raw_data: dict) -> AdvisoryData:
136146
advisory_id = raw_data.get("id", "")
137-
147+
138148
aliases = [advisory_id] if advisory_id else []
139149
aliases_str = raw_data.get("aliases", "")
140150
if aliases_str:
141151
cve_aliases = [alias.strip() for alias in aliases_str.split("\n") if alias.strip()]
142152
aliases.extend(cve_aliases)
143-
153+
144154
summary = raw_data.get("description", "")
145-
155+
146156
date_published = None
147157
date_str = raw_data.get("datePublished", "")
148158
if date_str:
149159
try:
150160
date_published = dateparser.parse(date_str)
151161
if date_published and date_published.tzinfo is None:
152-
date_published = date_published.replace(tzinfo=datetime.now().astimezone().tzinfo)
162+
date_published = date_published.replace(
163+
tzinfo=datetime.now().astimezone().tzinfo
164+
)
153165
except Exception as e:
154166
logger.warning(f"Failed to parse date '{date_str}': {e}")
155-
167+
156168
references = []
157169
references_str = raw_data.get("references", "")
158170
if references_str:
159171
urls = [url.strip() for url in references_str.split("\n") if url.strip()]
160172
for url in urls:
161173
references.append(ReferenceV2(url=url))
162-
174+
163175
if advisory_id:
164176
advisory_url = f"https://euvd.enisa.europa.eu/vulnerability/{advisory_id}"
165177
references.append(ReferenceV2(url=advisory_url))
166-
178+
167179
severities = []
168180
base_score = raw_data.get("baseScore")
169181
base_score_version = raw_data.get("baseScoreVersion")
170182
base_score_vector = raw_data.get("baseScoreVector")
171-
183+
172184
if base_score and base_score_version:
173185
scoring_system = self.get_scoring_system(base_score_version)
174186
if scoring_system:
@@ -178,7 +190,7 @@ def parse_advisory(self, raw_data: dict) -> AdvisoryData:
178190
scoring_elements=base_score_vector or "",
179191
)
180192
severities.append(severity)
181-
193+
182194
return AdvisoryData(
183195
advisory_id=advisory_id,
184196
aliases=aliases,

vulnerabilities/tests/pipelines/v2_importers/test_euvd_importer_v2.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,15 @@ def test_collect_advisories(self, mock_get):
4646
assert first.advisory_id == "EUVD-2025-197757"
4747
assert "EUVD-2025-197757" in first.aliases
4848
assert "CVE-2025-13284" in first.aliases
49-
assert (
50-
first.summary == "ThinPLUS vulnerability that allows remote code execution"
51-
)
49+
assert first.summary == "ThinPLUS vulnerability that allows remote code execution"
5250
assert first.date_published is not None
5351
assert len(first.severities) == 1
5452
assert first.severities[0].system.identifier == "cvssv3.1"
5553
assert first.severities[0].value == "9.8"
56-
assert first.severities[0].scoring_elements == "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H"
57-
54+
assert (
55+
first.severities[0].scoring_elements == "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H"
56+
)
57+
5858
urls = [ref.url for ref in first.references_v2]
5959
assert "https://nvd.nist.gov/vuln/detail/CVE-2025-13284" in urls
6060
assert "https://euvd.enisa.europa.eu/vulnerability/EUVD-2025-197757" in urls
@@ -88,23 +88,23 @@ def test_collect_advisories(self, mock_get):
8888
def test_get_scoring_system(self):
8989
"""Test CVSS version to scoring system mapping"""
9090
pipeline = EUVDImporterPipeline()
91-
91+
9292
system_v4 = pipeline.get_scoring_system("4.0")
9393
assert system_v4 is not None
9494
assert system_v4.identifier == "cvssv4"
95-
95+
9696
system_v31 = pipeline.get_scoring_system("3.1")
9797
assert system_v31 is not None
9898
assert system_v31.identifier == "cvssv3.1"
99-
99+
100100
system_v3 = pipeline.get_scoring_system("3.0")
101101
assert system_v3 is not None
102102
assert system_v3.identifier == "cvssv3"
103-
103+
104104
system_v2 = pipeline.get_scoring_system("2.0")
105105
assert system_v2 is not None
106106
assert system_v2.identifier == "cvssv2"
107-
107+
108108
system_unknown = pipeline.get_scoring_system("unknown")
109109
assert system_unknown is None
110110

@@ -122,4 +122,3 @@ def test_advisories_count(self, mock_get):
122122
count = pipeline.advisories_count()
123123

124124
assert count == 3
125-

0 commit comments

Comments
 (0)