Skip to content

Commit b0e032d

Browse files
committed
Refactor NVD API script for improved vulnerability filtering and Telegram message handling; add CVE_DETAILS_URL to config
1 parent cfa5b46 commit b0e032d

3 files changed

Lines changed: 180 additions & 181 deletions

File tree

scripts/cve/nvd_api.py

Lines changed: 170 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
import os
2-
import re
3-
import html
42
import asyncio
53
import requests
64
from datetime import datetime
@@ -12,6 +10,7 @@
1210
API_URL,
1311
HEADERS,
1412
COLLECTION_NAME,
13+
CVE_DETAILS_URL,
1514
)
1615

1716
load_dotenv()
@@ -24,181 +23,175 @@
2423
raise Exception("Missing environment variables")
2524
translator = GoogleTranslator(source="auto", target="ar")
2625

27-
28-
# Generate Report:
29-
def generate_report(vector_string):
30-
mapping = {
31-
"AV": {
32-
"N": "NETWORK",
33-
"A": "ADJACENT",
34-
"L": "LOCAL",
35-
"P": "PHYSICAL",
36-
},
37-
"PR": {
38-
"N": "NONE",
39-
"L": "LOW",
40-
"H": "HIGH",
41-
},
42-
"AC": {"L": "LOW", "H": "HIGH"},
43-
"E": {
44-
"X": "NOT_DEFINED",
45-
"U": "UNPROVEN",
46-
"P": "PROOF_OF_CONCEPT",
47-
"A": "ATTACKED",
48-
},
49-
"VC": {"H": "HIGH", "L": "LOW", "N": "NONE"},
50-
"VI": {"H": "HIGH", "L": "LOW", "N": "NONE"},
51-
"VA": {"H": "HIGH", "L": "LOW", "N": "NONE"},
52-
}
53-
54-
parts = {
55-
p.split(":")[0]: p.split(":")[1] for p in vector_string.split("/") if ":" in p
56-
}
57-
58-
report = {
59-
"authRequired": mapping["PR"].get(parts.get("PR"), "Unknown"),
60-
"attackVector": mapping["AV"].get(parts.get("AV"), "Unknown"),
61-
"complexity": mapping["AC"].get(parts.get("AC"), "Unknown"),
62-
"exploitState": mapping["E"].get(parts.get("E"), "Unknown"),
63-
"confidentiality": mapping["VC"].get(parts.get("VC"), "Unknown"),
64-
"integrity": mapping["VI"].get(parts.get("VI"), "Unknown"),
65-
"availability": mapping["VA"].get(parts.get("VA"), "Unknown"),
66-
}
67-
68-
return report
69-
70-
71-
print("nvd_api Script Running:\n")
72-
73-
try:
74-
# Constants:
75-
now = datetime.now()
76-
startTimeParam = now.strftime("%Y-%m-%dT00:00:00.000")
77-
endTimeParam = now.strftime("%Y-%m-%dT23:59:59.999")
78-
79-
api = API_URL
80-
headers = {**HEADERS}
81-
params = {
82-
"pubStartDate": startTimeParam,
83-
"pubEndDate": endTimeParam,
84-
}
85-
apiResponse = requests.get(api, headers=headers, params=params)
86-
responseCode = apiResponse.status_code
87-
if responseCode == 200:
88-
89-
print("Getting CVE ids from database...")
90-
cveIdsCollection = get_collection(
91-
uri=MONGO_URI, collection_name=COLLECTION_NAME, db_name="my_db"
92-
)
93-
print(f"Get CVE ids from database successfully\n")
94-
95-
print(f"Request Success - CODE IS: {responseCode}\n")
96-
apiResponseJson = apiResponse.json()
97-
vulnerabilitiesList = apiResponseJson.get("vulnerabilities")
98-
print(f"CVEs Count: {len(vulnerabilitiesList)}\n")
99-
100-
if vulnerabilitiesList:
101-
for vulneItem in vulnerabilitiesList:
102-
cve = vulneItem.get("cve", {})
103-
104-
cveId = cve.get("id", None)
105-
if not cveId:
106-
print(f"CVE id not avaliable - Skipping\n")
26+
print("\nScript Starting\n")
27+
now = datetime.now()
28+
year = now.year
29+
month = str(now.month).zfill(2)
30+
day = str(now.day).zfill(2)
31+
params = {
32+
"resultsPerPage": 200,
33+
"pubStartDate": f"{year}-{month}-{day}T00:00:00.000-00:00",
34+
"pubEndDate": f"{year}-{month}-{day}T23:59:59.999-00:00",
35+
}
36+
response = requests.get(url=API_URL, headers=HEADERS, params=params)
37+
responseCode = response.status_code
38+
39+
if responseCode == 200:
40+
41+
# Get Data
42+
print(f"✅ Request Success: {responseCode}\n")
43+
responseData = response.json()
44+
if not isinstance(responseData, dict):
45+
raise Exception("No data avaliable => Exiting")
46+
vulnerabilities = responseData.get("vulnerabilities")
47+
if not vulnerabilities:
48+
raise Exception("❗ Vulnerabilities list is empty => Exiting")
49+
vulnerabilitiesCount = len(vulnerabilities)
50+
print(f"- Vulnerabilities count: {vulnerabilitiesCount} (ready to filtering)")
51+
52+
# Filter Vulnerabilities
53+
targetVulnerabilities = []
54+
for vulnerability in vulnerabilities:
55+
referencesExplioted = []
56+
description = "No Found Description"
57+
cve = vulnerability.get("cve")
58+
if not cve:
59+
continue
60+
cveReferences = cve.get("references")
61+
if not cve.get("references"):
62+
continue
63+
cveMetricsDict = cve.get("metrics")
64+
if not isinstance(cveMetricsDict, dict):
65+
continue
66+
cveMetricsVersionsList = list(cveMetricsDict)
67+
if not cveMetricsVersionsList:
68+
continue
69+
cveMetricsList = cve.get("metrics").get(cveMetricsVersionsList[0])
70+
if not isinstance(cveMetricsList, list):
71+
continue
72+
cveMetricsData = cveMetricsList[0]
73+
if not cveMetricsData:
74+
continue
75+
cvssData = cveMetricsData.get("cvssData")
76+
if not cvssData:
77+
continue
78+
baseScore = cvssData.get("baseScore")
79+
baseSeverity = cvssData.get("baseSeverity")
80+
if not baseScore >= 7:
81+
continue
82+
83+
vulnStatus = cve.get("vulnStatus")
84+
if not vulnStatus:
85+
continue
86+
valid_statuses = ["Analyzed", "Modified"]
87+
if cve.get("vulnStatus") in valid_statuses:
88+
for reference in cveReferences:
89+
referenceTags = reference.get("tags")
90+
if not referenceTags:
10791
continue
108-
109-
isInDatabase = cve_exists(collection=cveIdsCollection, cveId=cveId)
110-
if isInDatabase:
111-
print(f"- {cveId} in database - Continue\n")
112-
continue
113-
114-
print(f"\n- {cveId} Not in database - Working...")
115-
isMetricsDict = cve.get("metrics")
116-
if not isMetricsDict:
117-
print("No metrics avaliable! - Skiping\n")
118-
continue
119-
120-
metricsDict = cve.get("metrics")[list(cve.get("metrics"))[0]]
121-
cvssDataDict = metricsDict[0].get("cvssData")
122-
baseScore = cvssDataDict.get("baseScore")
123-
if baseScore < 7:
124-
print("Base Score Under 7 - Canceling\n")
125-
continue
126-
127-
descriptions = cve.get("descriptions", None)
128-
weaknesses = cve.get("weaknesses", None)
129-
references = cve.get("references", None)
130-
131-
if not all([descriptions, weaknesses, references]):
132-
print("Some data missing! - Skipping\n")
133-
continue
134-
135-
description = descriptions[0].get("value")
136-
clean_description = re.sub(r"\n{3,}", "\n\n", description).strip()
137-
descriptionLen = len(clean_description)
138-
print(f" - Description is: {clean_description[:20]}...")
139-
140-
weaknesse = cve.get("weaknesses")[0].get("description")[0].get("value")
141-
print(f" - Weaknesse is: {weaknesse}")
142-
143-
referenceUrl = cve.get("references")[0].get("url")
144-
referenceSouce = cve.get("references")[0].get("source")
145-
print(f" - Reference Url is: {referenceUrl}")
146-
print(f" - Reference Souce is: {referenceSouce}")
147-
148-
publishedDate = datetime.strptime(
149-
cve.get("published"), "%Y-%m-%dT%H:%M:%S.%f"
92+
important_tags = [
93+
"Third Party Advisory",
94+
"Patch",
95+
"Exploit",
96+
"Vendor Advisory",
97+
]
98+
for tag in referenceTags:
99+
if tag in important_tags:
100+
referencesExplioted.append(
101+
{
102+
"url": reference.get("url"),
103+
"source": reference.get("source"),
104+
"tag": tag,
105+
}
106+
)
107+
if referencesExplioted:
108+
cveId = cve.get("id")
109+
descriptions = cve.get("descriptions")
110+
vectorString = cvssData.get("vectorString")
111+
if isinstance(descriptions, list):
112+
firstDescription = descriptions[0]
113+
description = firstDescription.get("value")
114+
targetVulnerabilities.append(
115+
{
116+
"cve": {
117+
"id": cveId,
118+
"vectorString": vectorString,
119+
"description": description,
120+
"references": referencesExplioted,
121+
"baseScore": baseScore,
122+
"baseSeverity": baseSeverity,
123+
}
124+
}
150125
)
151-
messageTitle = (
152-
"🔴 CRITICAL" if baseScore >= 9 else "🟠 HIGH"
153-
) + " ALERT"
154-
reportDict = generate_report(cvssDataDict.get("vectorString"))
155-
authRequired = reportDict.get("authRequired", "UNKNOWN")
156-
attackVector = reportDict.get("attackVector", "UNKNOWN")
157-
complexity = reportDict.get("complexity", "UNKNOWN")
158-
exploitState = reportDict.get("exploitState", "UNKNOWN")
159-
confidentiality = reportDict.get("confidentiality", "UNKNOWN")
160-
integrity = reportDict.get("integrity", "UNKNOWN")
161-
availability = reportDict.get("availability", "UNKNOWN")
162-
163-
clean_description = html.escape(clean_description)
164-
clean_description = translator.translate(clean_description)
165-
message = (
166-
f"<b>{messageTitle} - <code>{cveId}</code> - <code>{weaknesse}</code></b>\n\n"
167-
f"<b>{clean_description[:2000]}{'...' if descriptionLen > 2000 else ''}</b>\n\n"
168-
f"CVSS Details:\n"
169-
f"- <b>Auth Required:</b> {authRequired}\n"
170-
f"- <b>Attack Vector:</b> {attackVector}\n"
171-
f"- <b>Complexity:</b> {complexity}\n"
172-
f"- <b>Exploit State:</b> {exploitState}\n"
173-
f"- <b>Confidentiality:</b> {confidentiality}\n"
174-
f"- <b>Integrity:</b> {integrity}\n"
175-
f"- <b>Availability:</b> {availability}\n"
176-
f"- <b>Base Score:</b> {baseScore}\n\n"
177-
f"Published at: {publishedDate.strftime('%I:%M %p, %A, %d-%m-%Y')}"
178-
)
179-
180-
# Send to telegram:
181-
print("Send message to telegram - Sending")
182-
isSuccessSend = asyncio.run(
183-
send_text_message(
184-
token=TELEGRAM_TOKEN_CVE,
185-
chat_id=TELEGRAM_CHAT_ID,
186-
text=message,
187-
source_url=f"https://www.cvedetails.com/cve/{cveId}",
188-
buttonText="CVE Details",
189-
)
190-
)
191-
if not isSuccessSend:
192-
print("Message not send to telegram - Skipping")
193-
continue
194-
print("Message sended to telegram successfully")
195-
196-
print("Save CVE to database:")
197-
save_to_database(collection=cveIdsCollection, data={"cve_id": cveId})
198-
print("CVE saved to database successfully")
126+
if not targetVulnerabilities:
127+
print("❗ No vulnerabilities apply to filter\n")
128+
print("✅ Exiting")
129+
exit()
130+
targetVulnerabilitiesCount = len(targetVulnerabilities)
131+
print(f"- Target vulnerabilities count: ({targetVulnerabilitiesCount} filtered)\n")
132+
133+
# Get from database
134+
print("- Get cves stored in database...")
135+
cvesCollection = get_collection(
136+
uri=MONGO_URI, collection_name=COLLECTION_NAME, db_name="my_db"
137+
)
138+
print("- Get cves stored successfully.\n")
139+
140+
# Start
141+
for vulnerability in targetVulnerabilities:
142+
cve = vulnerability.get("cve")
143+
cveId = cve.get("id")
144+
if cve_exists(collection=cvesCollection, cveId=cveId):
145+
print(f"- {cveId} in database - Skipping\n")
146+
continue
147+
description = translator.translate(
148+
cve.get("description")
149+
.replace("<", "&lt;")
150+
.replace(">", "&gt;")
151+
.replace("&&", "&amp;")
152+
)
153+
cveReferences = cve.get("references")
154+
vectorString = cve.get("vectorString")
155+
baseSeverity = cve.get("baseSeverity")
156+
baseScore = cve.get("baseScore")
157+
158+
ref_links = ""
159+
160+
if cveReferences:
161+
for refernce in cveReferences:
162+
url = refernce.get("url")
163+
source = refernce.get("source")
164+
tag = refernce.get("tag")
165+
ref_links += f"- ({tag})\n{url}\n\n"
166+
167+
formatted_text = (
168+
f"<b>⚠️ New Vulnerability Detected</b>\n\n"
169+
f"<b>CVE ID:</b> <code>{cveId}</code>\n"
170+
f"<b>Severity:</b> {baseSeverity} ( {baseScore} )\n\n"
171+
f"<b>الوصف:</b>\n{(description[:600] + "...") if len(description) > 600 else description}"
172+
f"\n\n{ref_links}"
173+
)
199174

200-
print("✅ All Done - Exsitting")
201-
else:
202-
print(f"Faild to get data from api! - CODE IS: {responseCode}")
203-
except Exception as e:
204-
print(e)
175+
print(f"- {cveId} - Base Score: {baseScore}")
176+
print("📩 Sned to telegram...")
177+
status = asyncio.run(
178+
send_text_message(
179+
token=TELEGRAM_TOKEN_CVE,
180+
chat_id=TELEGRAM_CHAT_ID,
181+
text=formatted_text,
182+
source_url=f"{CVE_DETAILS_URL}/{cveId}/",
183+
buttonText="CVE Details",
184+
)
185+
)
186+
if status == True or status == "TIMEOUT":
187+
print("☑️ Message sended to telegram successfully.\n")
188+
print("📊 Save to database...")
189+
save_to_database(collection=cvesCollection, data={"cve_id": cveId})
190+
print("☑️ CVE saved to database successfully.\n")
191+
else:
192+
print("❗ Faild to send message.\n")
193+
194+
# End
195+
print("✅ Script Ended")
196+
else:
197+
raise Exception(f"🚫 Request Faild: {responseCode} => Exiting")

scripts/cve/nvd_api_config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
2+
CVE_DETAILS_URL = "https://www.cvedetails.com/cve"
23
HEADERS = {"User-Agent": "Mozilla/5.0"}
34
COLLECTION_NAME = "cve_ids"
45
SOURCE_NAME = "nvd_api"

0 commit comments

Comments
 (0)