Skip to content

Commit 19f8948

Browse files
committed
Added SUDO Advisory Pipeline
Signed-off-by: kunalsz <kunalavengers@gmail.com>
1 parent cbda0ca commit 19f8948

File tree

3 files changed

+288
-0
lines changed

3 files changed

+288
-0
lines changed

vulnerabilities/importers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from vulnerabilities.pipelines import nvd_importer
4343
from vulnerabilities.pipelines import pypa_importer
4444
from vulnerabilities.pipelines import pysec_importer
45+
from vulnerabilities.pipelines import sudo_importer
4546

4647
IMPORTERS_REGISTRY = [
4748
openssl.OpensslImporter,
@@ -78,6 +79,7 @@
7879
nvd_importer.NVDImporterPipeline,
7980
pysec_importer.PyPIImporterPipeline,
8081
alpine_linux_importer.AlpineLinuxImporterPipeline,
82+
sudo_importer.SUDOImporterPipeline,
8183
]
8284

8385
IMPORTERS_REGISTRY = {
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
import json
10+
import logging
11+
import re
12+
from datetime import timezone
13+
from typing import Iterable
14+
15+
import requests
16+
from bs4 import BeautifulSoup
17+
from dateutil import parser as dateparser
18+
from packageurl import PackageURL
19+
from univers.version_range import VersionRange
20+
from univers.versions import SemverVersion
21+
22+
from vulnerabilities.importer import AdvisoryData
23+
from vulnerabilities.importer import AffectedPackage
24+
from vulnerabilities.importer import Reference
25+
from vulnerabilities.importer import VulnerabilitySeverity
26+
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline
27+
from vulnerabilities.severity_systems import CVSSV3
28+
from vulnerabilities.severity_systems import CVSSV31
29+
from vulnerabilities.utils import fetch_response
30+
from vulnerabilities.utils import get_item
31+
32+
logging.basicConfig(level=logging.INFO)
33+
logger = logging.getLogger(__name__)
34+
35+
36+
class SUDOImporterPipeline(VulnerableCodeBaseImporterPipeline):
37+
"""Collect Advisories from Sudo"""
38+
39+
pipeline_id = "sudo_importer"
40+
spdx_license_expression = "ISC"
41+
license_url = "https://www.sudo.ws/about/license/"
42+
root_url = "https://www.sudo.ws/security/advisories/"
43+
importer_name = "SUDO Importer"
44+
45+
def __init__(self):
46+
super().__init__()
47+
self.active_pages = fetch_active_pages()
48+
self.advisory_links = fetch_advisory_links(self.active_pages)
49+
50+
@classmethod
51+
def steps(cls):
52+
return (
53+
cls.collect_and_store_advisories,
54+
cls.import_new_advisories,
55+
)
56+
57+
# num of advisories
58+
def advisories_count(self) -> int:
59+
return len(self.advisory_links)
60+
61+
# parse the response data
62+
def collect_advisories(self) -> Iterable[AdvisoryData]:
63+
for link in self.advisory_links:
64+
advisory_data = fetch_advisory_data(link)
65+
yield to_advisory_data(advisory_data)
66+
67+
68+
def fetch_advisory_links(active_pages):
69+
"""Fetches advisory links from a page,returns a list"""
70+
advisory_links = []
71+
for active_page in active_pages:
72+
html_content = requests.get(active_page).content
73+
soup = BeautifulSoup(html_content, "html.parser")
74+
75+
# find the a tag with the class "gdoc-post__readmore"
76+
readmore_links = soup.find_all("a", class_="gdoc-post__readmore")
77+
78+
for readmore_link in readmore_links:
79+
advisory_links.append("https://www.sudo.ws" + readmore_link["href"])
80+
return advisory_links
81+
82+
83+
def fetch_active_pages():
84+
"""Fetches active pages which contains advisory links,returns a list"""
85+
page_num = 2
86+
active_pages = ["https://www.sudo.ws/security/advisories/"]
87+
while True:
88+
page_url = f"https://www.sudo.ws/security/advisories/page/{page_num}/"
89+
status = requests.get(page_url).status_code
90+
if status == 404:
91+
break
92+
else:
93+
active_pages.append(page_url)
94+
page_num += 1
95+
96+
return active_pages
97+
98+
99+
def fetch_advisory_data(advisory_link):
100+
"""Fetches advisory data from the advisory page,returns a dict"""
101+
html_content = requests.get(advisory_link).content
102+
soup = BeautifulSoup(html_content, "html.parser")
103+
104+
publication_date = soup.find("time").get("datetime", None) if soup.find("time") else None
105+
106+
# extract the first p element inside <section>
107+
summary = (
108+
soup.find("section", class_="gdoc-markdown").find("p").get_text(strip=True)
109+
if soup.find("section", class_="gdoc-markdown")
110+
else None
111+
)
112+
113+
# Extract Sudo versions affected
114+
versions_affected_tag = soup.find("h2", id="sudo-versions-affected")
115+
versions_affected = (
116+
versions_affected_tag.find_next("p").get_text(strip=True) if versions_affected_tag else None
117+
)
118+
versions_affected = extract_versions(versions_affected)
119+
120+
cve_id_tag = soup.find("h2", id="cve-id")
121+
cve_id = (
122+
cve_id_tag.find_next("a", class_="gdoc-markdown__link").get_text(strip=True)
123+
if cve_id_tag
124+
else None
125+
)
126+
127+
# Extract Fixed versions
128+
fixed_versions_tag = soup.find("h2", id="fix")
129+
fixed_versions = (
130+
fixed_versions_tag.find_next("p").get_text(strip=True) if fixed_versions_tag else None
131+
)
132+
fixed_versions = extract_versions(fixed_versions)
133+
134+
return {
135+
"description": summary,
136+
"alias": cve_id,
137+
"date_published": publication_date,
138+
"affected_versions": versions_affected,
139+
"fixed_versions": fixed_versions,
140+
"url": advisory_link,
141+
}
142+
143+
144+
def to_advisory_data(raw_data) -> AdvisoryData:
145+
"""Parses extracted data to Advisory Data"""
146+
# alias
147+
alias = get_item(raw_data, "alias")
148+
149+
# affected packages
150+
affected_packages = []
151+
affected_versions = get_item(
152+
raw_data, "affected_versions"
153+
) # list of list of affected versions [['1.9.8', '1.9.13p1'],['1.2.9','1.2.17']]
154+
fixed_version = get_item(raw_data, "fixed_versions") # [["1.2.3"]]
155+
for vers_range in affected_versions: # ['1.9.8', '1.9.13p1']
156+
affected_packages.append(
157+
AffectedPackage(
158+
package=PackageURL(type="sudo", name="SUDO"),
159+
affected_version_range=VersionRange.from_string(
160+
f"vers:generic/>={vers_range[0]}|<={vers_range[1]}"
161+
),
162+
fixed_version=SemverVersion(fixed_version[0][0]),
163+
)
164+
)
165+
166+
# Reference
167+
references = []
168+
references.append(
169+
Reference(
170+
reference_id=alias,
171+
url=f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={alias}",
172+
)
173+
)
174+
175+
# description
176+
description = get_item(raw_data, "description")
177+
178+
# date published
179+
date_published = get_item(raw_data, "date_published")
180+
date_published = dateparser.parse(date_published, yearfirst=True).replace(tzinfo=timezone.utc)
181+
182+
# url
183+
url = get_item(raw_data, "url")
184+
185+
return AdvisoryData(
186+
aliases=[alias],
187+
summary=description,
188+
affected_packages=affected_packages,
189+
references=references,
190+
url=url,
191+
date_published=date_published,
192+
)
193+
194+
195+
def extract_versions(text):
196+
version_pattern = r"(\d+\.\d+\.\d+[a-zA-Z0-9]*)"
197+
versions = re.findall(version_pattern, text)
198+
versions = list(set(versions))
199+
200+
# Group versions into pairs
201+
pairs = [versions[i : i + 2] for i in range(0, len(versions), 2)]
202+
203+
return pairs # returns pairs/range

vulnerabilities/pipelines/test.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
"""
2+
whole in div gdoc-page
3+
each advisory in article gdoc-markdown gdoc-post
4+
5+
"""
6+
7+
from bs4 import BeautifulSoup
8+
import requests
9+
10+
root_url = "https://www.sudo.ws/security/advisories/"
11+
12+
13+
def fetch_advisory_links(active_pages):
14+
advisory_links =[]
15+
for active_page in active_pages:
16+
html_content = requests.get(active_page).content
17+
18+
# Parse the HTML content using BeautifulSoup
19+
soup = BeautifulSoup(html_content, "html.parser")
20+
21+
# Find the <a> tag with the class "gdoc-post__readmore"
22+
readmore_links = soup.find_all("a", class_="gdoc-post__readmore")
23+
24+
# Extract the href value
25+
26+
for readmore_link in readmore_links:
27+
advisory_links.append("https://www.sudo.ws"+readmore_link["href"])
28+
return advisory_links
29+
30+
31+
def fetch_active_pages():
32+
page_num = 2
33+
active_pages = ["https://www.sudo.ws/security/advisories/"]
34+
while True:
35+
page_url = f"https://www.sudo.ws/security/advisories/page/{page_num}/"
36+
status = requests.get(page_url).status_code
37+
if status==404:
38+
break
39+
else:
40+
active_pages.append(page_url)
41+
page_num+=1
42+
43+
return active_pages
44+
45+
"""active_pages = fetch_active_pages()
46+
advisory_links = fetch_advisory_links(active_pages)
47+
print(advisory_links)"""
48+
49+
def fetch_advisory_data(advisory_link):
50+
html_content = requests.get(advisory_link).content
51+
# Parse the HTML content using BeautifulSoup
52+
soup = BeautifulSoup(html_content, "html.parser")
53+
54+
# Extract the publication date (datetime), set to None if it doesn't exist
55+
publication_date = soup.find("time").get("datetime", None) if soup.find("time") else None
56+
57+
# Extract the first <p> element inside <section> (summary of the issue), set to None if it doesn't exist
58+
summary = soup.find("section", class_="gdoc-markdown").find("p").get_text(strip=True) if soup.find("section", class_="gdoc-markdown") else None
59+
60+
# Extract "Sudo versions affected", set to None if it doesn't exist
61+
versions_affected_tag = soup.find("h2", id="sudo-versions-affected")
62+
versions_affected = versions_affected_tag.find_next("p").get_text(strip=True) if versions_affected_tag else None
63+
print("Sudo Versions Affected:", versions_affected)
64+
65+
# Extract "CVE ID", set to None if it doesn't exist
66+
cve_id_tag = soup.find("h2", id="cve-id")
67+
cve_id = cve_id_tag.find_next("a", class_="gdoc-markdown__link").get_text(strip=True) if cve_id_tag else None
68+
69+
# Extract "Fixed versions", set to None if it doesn't exist
70+
fixed_versions_tag = soup.find("h2", id="fix")
71+
fixed_versions = fixed_versions_tag.find_next("p").get_text(strip=True) if fixed_versions_tag else None
72+
print("Fixed Versions:", fixed_versions)
73+
74+
return {
75+
"description": summary,
76+
"alias": cve_id,
77+
"date_published": publication_date,
78+
"affected_versions": versions_affected,
79+
"fixed_versions" : fixed_versions
80+
}
81+
82+
data = fetch_advisory_data("https://www.sudo.ws/security/advisories/sudoedit_escalate/")
83+
print(data)

0 commit comments

Comments
 (0)